日期:2014-05-16  浏览次数:20465 次

利用ORACLE JOB 模拟多线程应用
最近在做一个信息集成发布的项目,主要功能是根据用户输入的集成条件实现数据的过滤和目标数据的生成。由于数据量比较大,而且数据来源都是生产环境下的,完全实时的方式会对生成数据库造成压力,项目组考虑使用ORACLE存储过程的方式来实现,采用多个ORACLE JOB来模拟实现多线程的方式完成。

为了简单起见,前台把启动任务的相关参数信息写入到一张表,如下为表的结构:
--任务基础信息表
create table log_backstage_task (
id number,  --任务日志主键
begin_time date,--任务开始时间
end_time date,  --任务结束时间
start_type number(2),--启动类型 0 定时启动  1 人工启动
status number(1),    --任务状态 0 未启动   1 执行中 2 执行完成  3 执行失败
outline varchar2(400),--任务过程描述
task_parameter clob   --任务参数列表
);
--JOB模拟线程状态表
create table t_job_info (
job_number number , --主键  直接对应为ORACLE 的JOB编号
create_time date  , --JOB创建时间
isbusy  number(1) default 0  --是来标识JOB是否正在运行
);
--测试用数据表
create table DEMO1
(
  STR VARCHAR2(100)
);


于是根据JAVA里面的多线程编程思想,用主线程接收请求,然后看有无空闲子线程,有的话则启动子线程,让子线程接管待处理的请求。

--package 代码
create or replace package pk_full_integrate is
--定义并发处理的JOB数量   默认配置为5
  MAX_JOB_INSTANCE constant integer := 5;
--放空过程  用于子线程JOB创建后的默认执行过程
   procedure pro_null ;
--主JOB调用过程
procedure pro_job_schedule_main;
--子线程 执行过程..
procedure pro_job_execute_main(pJobNum number,pLog_id number  );
end pk_full_integrate;
/


--package body 代码
create or replace package body pk_full_integrate is
--空过程  用于子线程JOB创建后的默认执行过程   
procedure pro_null as
   begin
     null;
   end;
--判断是否还有空闲的JOB可以使用  有则返回JOB编号,没有则返回空
function fun_isHavaNullJob return number is
iResult number := null ;
begin
     select m.job_number into iResult
      from t_job_info m where m.isbusy = 0  and rownum <2; 
      return iResult;
exception when others then 
      return null ;
end;
--寻找最先提交的还没有启动的任务 有则返回ID,没有则返回空...
function fun_isHaveNotStartTask return number is
iResult number := null  ;
begin
     select t.id   into iResult  from (
     select id from log_backstage_task m where m.status = 0 order by id asc ) t
      where rownum < 2  ;
      return iResult;
exception when others then 
      return  null ;        
end;
--主JOB调用过程
procedure pro_job_schedule_main  is 
ptempJob_num number := null ;
ptempTask_num number := null; 
pExecSql varchar2(500) := ''; 
begin
     ptempJob_num := fun_isHavaNullJob;
     ptempTask_num := fun_isHaveNotStartTask ;
     while (ptempJob_num is not null ) and ( ptempTask_num is not null   )  loop
           begin
               pExecSql := 'pk_full_integrate.pro_job_execute_main('||ptempJob_num||','||ptempTask_num||' );' ;
                
               update t_job_info  m set m.isbusy = 1 where m.job_number = ptempJob_num;
               update log_backstage_task s set  s.status = 1 where s.id = ptempTask_num ;                
               --dbms_job.run(ptempJob_num);
               --改用dbms_job.broken函数,防止JOB出错之后无法重新启动,采用延迟10秒之后启动
dbms_job.what(ptempJob_num,pExecSql);
dbms_job.broken(ptempJob_num,false,sysdate+1/8640);
               commit;
               ptempJob_num := fun_isHavaNullJob;
               ptempTask_num := fun_isHaveNotStartTask ;
           exception when others then 
               dbms_output.put_line('error'||substr(sqlerrm,1,255)); 
               rollback; 
           end ;
     end loop;
 end;

--子线程 执行过程..
procedure pro_job_execute_main(pJobNum number,pLog_id number  ) is 
begin
     --dbms_output.put_line('executed successful ........ '||pJobNum);
     insert into demo1 values ('executed successful ........ '||pJobNum);
     update log_backstage_task set status = 1 where id = pLog_id ;
     update t_job_info set isBusy = 0 where job_number = pJobNum ;
     commit;
exception when others then 
     rollback;
     update log_backstage_task set status = 4  where id = pLog_id ;
     update t_job_info set isBusy = 0 where job_number = pJobNum ;
     commit;
end;
   
begin
     --包初始化脚本 
      --初始化JOB信息
     select count(1)  into pJob_cnt  from t_job_info ;
    if pJob_cnt < MAX_JOB_INSTANCE then 
        for i in 1..MAX_JOB_INSTANCE-pJob_cnt    loop
--子线程JOB的时间间隔调整为356000(1000年),理论上不会自动执行