乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      源碼分析 | Spring定時(shí)任務(wù)Quartz執(zhí)行全過程源碼解讀

       小傅哥 2021-12-13

      微信公眾號(hào):bugstack蟲洞棧

      微信公眾號(hào):bugstack蟲洞棧 | 博客:https://

      沉淀、分享、成長,專注于原創(chuàng)專題案例,以最易學(xué)習(xí)編程的方式分享知識(shí),讓自己和他人都能有所收獲。目前已完成的專題有;Netty4.x實(shí)戰(zhàn)專題案例、用Java實(shí)現(xiàn)JVM、基于JavaAgent的全鏈路監(jiān)控、手寫RPC框架、架構(gòu)設(shè)計(jì)專題案例[Ing]等。

      你用劍🗡、我用刀🔪,好的代碼都很燒😏,望你不吝出招💨!

      一、前言介紹

      在日常開發(fā)中經(jīng)常會(huì)用到定時(shí)任務(wù),用來;庫表掃描發(fā)送MQ、T+n賬單結(jié)算、緩存數(shù)據(jù)更新、秒殺活動(dòng)狀態(tài)變更,等等。因?yàn)橛辛薙pring的Schedule極大的方便了我們對(duì)這類場景的使用。那么,除了應(yīng)用你還了解它多少呢;

      1. 默認(rèn)初始化多少個(gè)任務(wù)線程
      2. JobStore有幾種實(shí)現(xiàn),你平時(shí)用的都是哪個(gè)
      3. 一個(gè)定時(shí)任務(wù)的執(zhí)行流程簡述下

      蒙圈了吧,是不感覺平時(shí)只是使用了,根本沒關(guān)注過這些。有種沖動(dòng)趕緊搜索答案吧!但只是知道答案是沒有多少意義的,扛不住問不說,也不了解原理。所以,如果你想真的提升自己技能,還是要從根本搞定。

      二、案例工程

      為了更好的做源碼分析,我們將平時(shí)用的定時(shí)任務(wù)服務(wù)單獨(dú)抽離出來。工程下載,關(guān)注公眾號(hào):bugstack蟲洞棧,回復(fù):源碼分析

      itstack-demo-code-schedule
      └── src
          ├── main
          │   ├── java
          │   │   └── org.itstack.demo
          │   │       ├── DemoTask.java
          │   │       └── JobImpl.java   
          │   └── resources
          │       ├── props
          │       │   └── config.properties
          │       ├── spring
          │       │   └── spring-config-schedule-task.xml
          │       ├── logback.xml
          │       └── spring-config.xml
          └── test
               └── java
                   └── org.itstack.demo.test
                       ├── ApiTest.java
                       ├── MyQuartz.java
                       └── MyTask.java
      

      三、環(huán)境配置

      1. JDK 1.8
      2. IDEA 2019.3.1
      3. Spring 4.3.24.RELEASE
      4. quartz 2.3.2 {不同版本略有代碼差異}

      四、源碼分析

      <dependency>
          <groupId>org.quartz-scheduler</groupId>
          <artifactId>quartz</artifactId>
          <version>2.3.2</version>
      </dependency>
      

      依賴于Spring版本升級(jí)quartz選擇2.3.2,同時(shí)如果你如本文案例中所示使用xml配置任務(wù)。那么會(huì)有如下更改;

      Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean

       <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
           <property name="jobDetail" ref="taskHandler"/>
           <property name="cronExpression" value="0/5 * * * * ?"/>
       </bean>
      

      Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean

       <bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
           <property name="jobDetail" ref="taskHandler"/>
           <property name="cronExpression" value="0/5 * * * * ?"/>
       </bean>
      

      在正式分析前,可以看下quartz的默認(rèn)配置,很多初始化動(dòng)作都要從這里取得參數(shù),同樣你可以配置自己的配置文件。例如,當(dāng)你的任務(wù)很多時(shí),默認(rèn)初始化的10個(gè)線程組不滿足你的業(yè)務(wù)需求,就可以按需調(diào)整。

      quart.properties

      # Default Properties file for use by StdSchedulerFactory
      # to create a Quartz Scheduler Instance, if a different
      # properties file is not explicitly specified.
      #
      
      org.quartz.scheduler.instanceName: DefaultQuartzScheduler
      org.quartz.scheduler.rmi.export: false
      org.quartz.scheduler.rmi.proxy: false
      org.quartz.scheduler.wrapJobExecutionInUserTransaction: false
      
      org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
      org.quartz.threadPool.threadCount: 10
      org.quartz.threadPool.threadPriority: 5
      org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true
      
      org.quartz.jobStore.misfireThreshold: 60000
      
      org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
      

      1. 從一個(gè)簡單案例開始

      平時(shí)我們使用Schedule基本都是注解或者xml配置文件,但是為了可以更簡單的分析代碼,我們從一個(gè)簡單的Demo入手,放到main函數(shù)中。

      DemoTask.java & 定義一個(gè)等待被執(zhí)行的任務(wù)

      public class DemoTask {
      
          private Logger logger = LoggerFactory.getLogger(DemoTask.class);
      
          public void execute() throws Exception{
              logger.info("定時(shí)處理用戶信息任務(wù):0/5 * * * * ?");
          }
      
      }
      

      MyTask.java & 測試類,將配置在xml中的代碼抽離出來

      public class MyTask {
      
          public static void main(String[] args) throws Exception {
      
              DemoTask demoTask = new DemoTask();
      
              // 定義了;執(zhí)行的內(nèi)容
              MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
              methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
              methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
              methodInvokingJobDetailFactoryBean.setConcurrent(true);
              methodInvokingJobDetailFactoryBean.setName("demoTask");
              methodInvokingJobDetailFactoryBean.afterPropertiesSet();
      
              // 定義了;執(zhí)行的計(jì)劃
              CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
              cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
              cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
              cronTriggerFactoryBean.setName("demoTask");
              cronTriggerFactoryBean.afterPropertiesSet();
      
              // 實(shí)現(xiàn)了;執(zhí)行的功能
              SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
              schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
              schedulerFactoryBean.setAutoStartup(true);
              schedulerFactoryBean.afterPropertiesSet();
      
              schedulerFactoryBean.start();
      
              // 暫停住
              System.in.read();
      
          }
      
      }
      

      如果一切順利,那么會(huì)有如下結(jié)果:

      2020-01-04 10:47:16.369 [main] INFO  org.quartz.impl.StdSchedulerFactory[1220] - Using default implementation for ThreadExecutor
      2020-01-04 10:47:16.421 [main] INFO  org.quartz.core.SchedulerSignalerImpl[61] - Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
      2020-01-04 10:47:16.422 [main] INFO  org.quartz.core.QuartzScheduler[229] - Quartz Scheduler v.2.3.2 created.
      2020-01-04 10:47:16.423 [main] INFO  org.quartz.simpl.RAMJobStore[155] - RAMJobStore initialized.
      2020-01-04 10:47:16.424 [main] INFO  org.quartz.core.QuartzScheduler[294] - Scheduler meta-data: Quartz Scheduler (v2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED'
        Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
        NOT STARTED.
        Currently in standby mode.
        Number of jobs executed: 0
        Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
        Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
      
      2020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1374] - Quartz scheduler 'QuartzScheduler' initialized from an externally provided properties instance.
      2020-01-04 10:47:16.424 [main] INFO  org.quartz.impl.StdSchedulerFactory[1378] - Quartz scheduler version: 2.3.2
      2020-01-04 10:47:16.426 [main] INFO  org.quartz.core.QuartzScheduler[2293] - JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@3e9b1010
      2020-01-04 10:47:16.651 [main] INFO  org.quartz.core.QuartzScheduler[547] - Scheduler QuartzScheduler_$_NON_CLUSTERED started.
      一月 04, 2020 10:47:16 上午 org.springframework.scheduling.quartz.SchedulerFactoryBean startScheduler
      信息: Starting Quartz Scheduler now
      2020-01-04 10:47:20.321 [QuartzScheduler_Worker-1] INFO  org.itstack.demo.DemoTask[11] - 定時(shí)處理用戶信息任務(wù):0/5 * * * * ?
      2020-01-04 10:47:25.001 [QuartzScheduler_Worker-2] INFO  org.itstack.demo.DemoTask[11] - 定時(shí)處理用戶信息任務(wù):0/5 * * * * ?
      2020-01-04 10:47:30.000 [QuartzScheduler_Worker-3] INFO  org.itstack.demo.DemoTask[11] - 定時(shí)處理用戶信息任務(wù):0/5 * * * * ?
      2020-01-04 10:47:35.001 [QuartzScheduler_Worker-4] INFO  org.itstack.demo.DemoTask[11] - 定時(shí)處理用戶信息任務(wù):0/5 * * * * ?
      2020-01-04 10:47:40.000 [QuartzScheduler_Worker-5] INFO  org.itstack.demo.DemoTask[11] - 定時(shí)處理用戶信息任務(wù):0/5 * * * * ?
      
      Process finished with exit code -1
      

      2. 定義執(zhí)行內(nèi)容(MethodInvokingJobDetailFactoryBean)

      // 定義了;執(zhí)行的內(nèi)容
      MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
      methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
      methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
      methodInvokingJobDetailFactoryBean.setConcurrent(true);
      methodInvokingJobDetailFactoryBean.setName("demoTask");
      methodInvokingJobDetailFactoryBean.afterPropertiesSet();
      

      這塊內(nèi)容主要將我們的任務(wù)體(即待執(zhí)行任務(wù)DemoTask)交給MethodInvokingJobDetailFactoryBean管理,首先設(shè)置必要信息;

      • targetObject:目標(biāo)對(duì)象bean,也就是demoTask
      • targetMethod:目標(biāo)方法name,也就是execute
      • concurrent:是否并行執(zhí)行,非并行執(zhí)行任務(wù),如果上一個(gè)任務(wù)沒有執(zhí)行完,下一刻不會(huì)執(zhí)行
      • name:xml配置非必傳,源碼中可以獲取beanName

      最后我們通過手動(dòng)調(diào)用 afterPropertiesSet() 來模擬初始化。如果我們的類是交給 Spring 管理的,那么在實(shí)現(xiàn)了 InitializingBean 接口的類,在類配置信息加載后會(huì)自動(dòng)執(zhí)行 afterPropertiesSet() 。一般實(shí)現(xiàn)了 InitializingBean 接口的類,同時(shí)也會(huì)去實(shí)現(xiàn) FactoryBean 接口,因?yàn)檫@個(gè)接口實(shí)現(xiàn)后就可以通過 T getObject() 獲取自己自定義初始化的類。這也常常用在一些框架開發(fā)中。

      MethodInvokingJobDetailFactoryBean.afterPropertiesSet()

      public void afterPropertiesSet() throws ClassNotFoundException, NoSuchMethodException {
      prepare();
      // Use specific name if given, else fall back to bean name.
      String name = (this.name != null ? this.name : this.beanName);
      // Consider the concurrent flag to choose between stateful and stateless job.
      Class<?> jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
      // Build JobDetail instance.
      JobDetailImpl jdi = new JobDetailImpl();
      jdi.setName(name);
      jdi.setGroup(this.group);
      jdi.setJobClass((Class) jobClass);
      jdi.setDurability(true);
      jdi.getJobDataMap().put("methodInvoker", this);
      this.jobDetail = jdi;
      
      postProcessJobDetail(this.jobDetail);
      }
      
      • 源碼168行: 根據(jù)是否并行執(zhí)行選擇任務(wù)類,這兩個(gè)類都是MethodInvokingJobDetailFactoryBean的內(nèi)部類,非并行執(zhí)行的StatefulMethodInvokingJob只是繼承MethodInvokingJob添加了標(biāo)記注解。

      • 源碼171行: 創(chuàng)建JobDetailImpl,添加任務(wù)明細(xì)信息,注意這類的jdi.setJobClass((Class) jobClass)實(shí)際就是MethodInvokingJob。MethodInvokingJob也是我們最終要反射調(diào)用執(zhí)行的內(nèi)容。

      • 源碼177行: 初始化任務(wù)后賦值給this.jobDetail = jdi,也就是最終的類對(duì)象

        MethodInvokingJobDetailFactoryBean.getObject()

        @Override
        public JobDetail getObject() {
        return this.jobDetail;
        }
        
      • 源碼:220行: 獲取對(duì)象時(shí)返回 this.jobDetail,這也就解釋了為什么 MethodInvokingJobDetailFactoryBean 初始化后直接賦值給了一個(gè) JobDetail ;

        微信公眾號(hào):bugstack蟲洞棧 & Schedule.xml

      3. 定義執(zhí)行計(jì)劃(CronTriggerFactoryBeann)

      // 定義了;執(zhí)行的計(jì)劃
      CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
      cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
      cronTriggerFactoryBean.setCronExpression("0/5 * * * * ?");
      cronTriggerFactoryBean.setName("demoTask");
      cronTriggerFactoryBean.afterPropertiesSet();
      

      這一塊主要定義任務(wù)的執(zhí)行計(jì)劃,并將任務(wù)執(zhí)行內(nèi)容交給 CronTriggerFactoryBean 管理,同時(shí)設(shè)置必要信息;

      • jobDetail:設(shè)置任務(wù)體,xml 中可以直接將對(duì)象賦值,硬編碼中設(shè)置執(zhí)行的 JobDetail 對(duì)象信息。也就是我們上面設(shè)置的 JobDetailImpl ,通過 getObject() 獲取出來。
      • cronExpression:計(jì)劃表達(dá)式;秒、分、時(shí)、日、月、周、年

      CronTriggerFactoryBean.afterPropertiesSet()

      @Override
      public void afterPropertiesSet() throws ParseException {
          
      // ... 校驗(yàn)屬性信息
      
      CronTriggerImpl cti = new CronTriggerImpl();
      cti.setName(this.name);
      cti.setGroup(this.group);
      if (this.jobDetail != null) {
      cti.setJobKey(this.jobDetail.getKey());
      }
      cti.setJobDataMap(this.jobDataMap);
      cti.setStartTime(this.startTime);
      cti.setCronExpression(this.cronExpression);
      cti.setTimeZone(this.timeZone);
      cti.setCalendarName(this.calendarName);
      cti.setPriority(this.priority);
      cti.setMisfireInstruction(this.misfireInstruction);
      cti.setDescription(this.description);
      this.cronTrigger = cti;
      }
      
      • 源碼237行: 創(chuàng)建觸發(fā)器 CronTriggerImpl 并設(shè)置相關(guān)屬性信息

      • 源碼245行: 生成執(zhí)行計(jì)劃類 cti.setCronExpression(this.cronExpression);

        public void setCronExpression(String cronExpression) throws ParseException {
        TimeZone origTz = getTimeZone();
        this.cronEx = new CronExpression(cronExpression);
        this.cronEx.setTimeZone(origTz);
        }
        

        CronExpression.java & 解析Cron表達(dá)式

        protected void buildExpression(String expression) throws ParseException {
        expressionParsed = true;
        try {
        
        // ... 初始化 TreeSet xxx = new TreeSet<Integer>();
        
        int exprOn = SECOND;
        StringTokenizer exprsTok = new StringTokenizer(expression, " \t",
        false);
        
        while (exprsTok.hasMoreTokens() && exprOn <= YEAR) {
        String expr = exprsTok.nextToken().trim();
        
        // ... 校驗(yàn)DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符
        
        StringTokenizer vTok = new StringTokenizer(expr, ",");
        while (vTok.hasMoreTokens()) {
        String v = vTok.nextToken();
        storeExpressionVals(0, v, exprOn);
        }
        exprOn++;
        }
        
        // ... 校驗(yàn)DAY_OF_MONTH和DAY_OF_WEEK字段的特殊字符
        
        } catch (ParseException pe) {
        throw pe;
        } catch (Exception e) {
        throw new ParseException("Illegal cron expression format ("
        + e.toString() + ")", 0);
        }
        }
        
        • Cron表達(dá)式有7個(gè)字段,CronExpression 把7個(gè)字段解析為7個(gè) TreeSet 對(duì)象。
        • 填充TreeSet對(duì)象值的時(shí)候,表達(dá)式都會(huì)轉(zhuǎn)換為起始值、結(jié)束值和增量的計(jì)算模式,然后計(jì)算出匹配的值放進(jìn)TreeSet對(duì)象

      CronTriggerFactoryBean.getObject()

      @Override
      public CronTrigger getObject() {
      return this.cronTrigger;
      }
      
      • 源碼257行: 獲取對(duì)象時(shí)返回 this.cronTrigger ,也就是 CronTriggerImpl 對(duì)象

      4. 調(diào)度執(zhí)行計(jì)劃(SchedulerFactoryBean)

      // 調(diào)度了;執(zhí)行的計(jì)劃(scheduler)
      SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
      schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
      schedulerFactoryBean.setAutoStartup(true);
      schedulerFactoryBean.afterPropertiesSet();
      
      schedulerFactoryBean.start();
      

      這一部分如名字一樣調(diào)度工廠,相當(dāng)于一個(gè)指揮官,可以從全局做調(diào)度,比如監(jiān)聽哪些trigger已經(jīng)ready、分配線程等等,同樣也需要設(shè)置必要的屬性信息;

      • triggers:按需可以設(shè)置多個(gè)觸發(fā)器,本文設(shè)置了一個(gè) cronTriggerFactoryBean.getObject() 也就是 CronTriggerImpl 對(duì)象
      • autoStartup:默認(rèn)是否自動(dòng)啟動(dòng)任務(wù),默認(rèn)值為true

      這個(gè)過程較長包括:調(diào)度工廠、線程池、注冊任務(wù)等等,整體核心加載流程如下;

      微信公眾號(hào):bugstack蟲洞棧 & 調(diào)度工程初始化流程

      • 整個(gè)加載過程較長,抽取部分核心代碼塊進(jìn)行分析,其中包括的類;
        • StdScheduler
        • StdSchedulerFactory
        • SimpleThreadPool
        • QuartzScheduler
        • QuartzSchedulerThread
        • RAMJobStore
        • CronTriggerImpl
        • CronExpression

      SchedulerFactoryBean.afterPropertiesSet()

      public void afterPropertiesSet() throws Exception {
      if (this.dataSource == null && this.nonTransactionalDataSource != null) {
      this.dataSource = this.nonTransactionalDataSource;
      }
      if (this.applicationContext != null && this.resourceLoader == null) {
      this.resourceLoader = this.applicationContext;
      }
      // Initialize the Scheduler instance...
      this.scheduler = prepareScheduler(prepareSchedulerFactory());
      try {
      registerListeners();
      registerJobsAndTriggers();
      }
      catch (Exception ex) {
      try {
      this.scheduler.shutdown(true);
      }
      catch (Exception ex2) {
      logger.debug("Scheduler shutdown exception after registration failure", ex2);
      }
      throw ex;
      }
      }
      
      • 源碼474行: 為調(diào)度器做準(zhǔn)備工作 prepareScheduler(prepareSchedulerFactory()) ,依次執(zhí)行如下;

        1. SchedulerFactoryBean.prepareScheduler(SchedulerFactory schedulerFactory)
        2. SchedulerFactoryBean.createScheduler(schedulerFactory, this.schedulerName);
        3. SchedulerFactoryBean.createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
        4. Scheduler newScheduler = schedulerFactory.getScheduler();
        5. StdSchedulerFactory.getScheduler();
        6. sched = instantiate(); 包括一系列核心操作;
        1)初始化threadPool(線程池):開發(fā)者可以通過org.quartz.threadPool.class配置指定使用哪個(gè)線程池類,比如SimpleThreadPool。
        2)初始化jobStore(任務(wù)存儲(chǔ)方式):開發(fā)者可以通過org.quartz.jobStore.class配置指定使用哪個(gè)任務(wù)存儲(chǔ)類,比如RAMJobStore。
        3)初始化dataSource(數(shù)據(jù)源):開發(fā)者可以通過org.quartz.dataSource配置指定數(shù)據(jù)源詳情,比如哪個(gè)數(shù)據(jù)庫、賬號(hào)、密碼等。
        4)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
        5)初始化threadExecutor(線程執(zhí)行器):默認(rèn)為DefaultThreadExecutor;
        6)創(chuàng)建工作線程:根據(jù)配置創(chuàng)建N個(gè)工作thread,執(zhí)行start()啟動(dòng)thread,并將N個(gè)thread順序add進(jìn)threadPool實(shí)例的空閑線程列表availWorkers中;
        7)創(chuàng)建調(diào)度器線程:創(chuàng)建QuartzSchedulerThread實(shí)例,并通過threadExecutor.execute(實(shí)例)啟動(dòng)調(diào)度器線程;
        8)創(chuàng)建調(diào)度器:創(chuàng)建StdScheduler實(shí)例,將上面所有配置和引用組合進(jìn)實(shí)例中,并將實(shí)例存入調(diào)度器池中
        
      • 源碼477行: 調(diào)用父類 SchedulerAccessor.registerJobsAndTriggers() 注冊任務(wù)和觸發(fā)器

        for (Trigger trigger : this.triggers) {
        addTriggerToScheduler(trigger);
        }
        

      SchedulerAccessor.addTriggerToScheduler() & SchedulerAccessor 是SchedulerFactoryBean的父類

      private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {
      boolean triggerExists = (getScheduler().getTrigger(trigger.getKey()) != null);
      if (triggerExists && !this.overwriteExistingJobs) {
      return false;
      }
      // Check if the Trigger is aware of an associated JobDetail.
      JobDetail jobDetail = (JobDetail) trigger.getJobDataMap().remove("jobDetail");
      if (triggerExists) {
      if (jobDetail != null && !this.jobDetails.contains(jobDetail) && addJobToScheduler(jobDetail)) {
      this.jobDetails.add(jobDetail);
      }
      try {
      getScheduler().rescheduleJob(trigger.getKey(), trigger);
      }
      catch (ObjectAlreadyExistsException ex) {
      if (logger.isDebugEnabled()) {
      logger.debug("Unexpectedly encountered existing trigger on rescheduling, assumably due to " +
      "cluster race condition: " + ex.getMessage() + " - can safely be ignored");
      }
      }
      }
      else {
      try {
      if (jobDetail != null && !this.jobDetails.contains(jobDetail) &&
      (this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null)) {
      getScheduler().scheduleJob(jobDetail, trigger);
      this.jobDetails.add(jobDetail);
      }
      else {
      getScheduler().scheduleJob(trigger);
      }
      }
      catch (ObjectAlreadyExistsException ex) {
      if (logger.isDebugEnabled()) {
      logger.debug("Unexpectedly encountered existing trigger on job scheduling, assumably due to " +
      "cluster race condition: " + ex.getMessage() + " - can safely be ignored");
      }
      if (this.overwriteExistingJobs) {
      getScheduler().rescheduleJob(trigger.getKey(), trigger);
      }
      }
      }
      return true;
      }
      
      • 源碼299行: addJobToScheduler(jobDetail) 一直會(huì)調(diào)用到 RAMJobStore 進(jìn)行存放任務(wù)信息到 HashMap<JobKey, JobWrapper>(100)

        public void storeJob(JobDetail newJob,
            boolean replaceExisting) throws ObjectAlreadyExistsException {
        JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
        boolean repl = false;
        synchronized (lock) {
        if (jobsByKey.get(jw.key) != null) {
        if (!replaceExisting) {
        throw new ObjectAlreadyExistsException(newJob);
        }
        repl = true;
        }
        if (!repl) {
        // get job group
        HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
        if (grpMap == null) {
        grpMap = new HashMap<JobKey, JobWrapper>(100);
        jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
        }
        // add to jobs by group
        grpMap.put(newJob.getKey(), jw);
        // add to jobs by FQN map
        jobsByKey.put(jw.key, jw);
        } else {
        // update job detail
        JobWrapper orig = jobsByKey.get(jw.key);
        orig.jobDetail = jw.jobDetail; // already cloned
        }
        }
        }
        
      • 初始化線程組;

        • prepareScheduler
        • createScheduler
        • schedulerFactory
        • StdSchedulerFactory.getScheduler()
        • getScheduler()->instantiate()
        • 源碼1323行: tp.initialize();

        SimpleThreadPool.initialize() & 這里的count是默認(rèn)配置中的數(shù)量,可以更改

         // create the worker threads and start them
         Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
         while(workerThreads.hasNext()) {
         WorkerThread wt = workerThreads.next();
         wt.start();
         availWorkers.add(wt);
         }
        

      5. 啟動(dòng)定時(shí)任務(wù)

      案例中使用硬編碼方式調(diào)用 schedulerFactoryBean.start() 啟動(dòng)線程服務(wù)。線程的協(xié)作通過Object sigLock來實(shí)現(xiàn),關(guān)于sigLock.wait()方法都在QuartzSchedulerThread的run方法里面,所以sigLock喚醒的是只有線程QuartzSchedulerThread。核心流程如下;

      微信公眾號(hào):bugstack蟲洞棧 & 調(diào)度啟動(dòng)流程

      這個(gè)啟動(dòng)過程中,核心的代碼類,如下;

      • StdScheduler
      • QuartzScheduler
      • QuartzSchedulerThread
      • ThreadPool
      • RAMJobStore
      • CronTriggerImpl
      • JobRunShellFactory

      QuartzScheduler.start() & 啟動(dòng)

      public void start() throws SchedulerException {
      
          if (shuttingDown|| closed) {
              throw new SchedulerException(
                      "The Scheduler cannot be restarted after shutdown() has been called.");
          }
      
          // QTZ-212 : calling new schedulerStarting() method on the listeners
          // right after entering start()
          notifySchedulerListenersStarting();
          
      if (initialStart == null) {
              initialStart = new Date();
              this.resources.getJobStore().schedulerStarted();            
              startPlugins();
          } else {
              resources.getJobStore().schedulerResumed();
          }
      
          // 喚醒線程
      schedThread.togglePause(false);
      
          getLog().info(
                  "Scheduler " + resources.getUniqueIdentifier() + " started.");
          
          notifySchedulerListenersStarted();
      }
      

      QuartzSchedulerThread.run() & 執(zhí)行過程

      @Override
      public void run() {
          int acquiresFailed = 0;
      
      // 只有調(diào)用了halt()方法,才會(huì)退出這個(gè)死循環(huán)
          while (!halted.get()) {
              try {
      
      // 一、如果是暫停狀態(tài),則循環(huán)超時(shí)等待1000毫秒
      
                  // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..
                 
          // 阻塞直到有空閑的線程可用并返回可用的數(shù)量
                  int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                  if(availThreadCount > 0) {
      
                      List<OperableTrigger> triggers;
                      long now = System.currentTimeMillis();
                      clearSignaledSchedulingChange();
                      
      try {
      // 二、獲取acquire狀態(tài)的Trigger列表,也就是即將執(zhí)行的任務(wù)
                          triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                  now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat
                          acquiresFailed = 0;
                          if (log.isDebugEnabled())
                              log.debug("batch acquisition of " + (triggers == null ? 0 : triggers
                      } catch(){//...}
      
                      if (triggers != null && !triggers.isEmpty()) {
                          
      // 三:獲取List第一個(gè)Trigger的下次觸發(fā)時(shí)刻
      long triggerTime = triggers.get(0).getNextFireTime().getTime();
                          
      // 四:獲取任務(wù)觸發(fā)集合
      List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
      
      // 五:設(shè)置Triggers為'executing'狀態(tài)
      qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                          
      // 六:創(chuàng)建JobRunShell
      qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
      
      // 七:執(zhí)行Job
      qsRsrcs.getThreadPool().runInThread(shell)
      
                          continue; // while (!halted)
                      }
                  } else { // if(availThreadCount > 0)
                      // should never happen, if threadPool.blockForAvailableThreads() follows con
                      continue; // while (!halted)
                  }
      
                  
              } catch(RuntimeException re) {
                  getLog().error("Runtime error occurred in main trigger firing loop.", re);
              }
          }
          
          qs = null;
          qsRsrcs = null;
      }
      
      • 源碼391行: 創(chuàng)建JobRunShell,JobRunShell實(shí)例在initialize()方法就會(huì)把包含業(yè)務(wù)邏輯類的JobDetailImpl設(shè)置為它的成員屬性,為后面執(zhí)行業(yè)務(wù)邏輯代碼做準(zhǔn)備。執(zhí)行業(yè)務(wù)邏輯代碼在runInThread(shell)方法里面。

        QuartzSchedulerThread.run() & 部分代碼

        JobRunShell shell = null;
        try {
        shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
        shell.initialize(qs);
        } catch (SchedulerException se) {
        qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
        continue;
        }
        
      • 源碼398行: qsRsrcs.getThreadPool().runInThread(shell)

        SimpleThreadPool.runInThread

        // 保存所有WorkerThread的集合
        private List<WorkerThread> workers;
        // 空閑的WorkerThread集合
        private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
        // 任務(wù)的WorkerThread集合
        private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
        
        /**
         * 維護(hù)workers、availWorkers和busyWorkers三個(gè)列表數(shù)據(jù)
         * 有任務(wù)需要一個(gè)線程出來執(zhí)行:availWorkers.removeFirst();busyWorkers.add()
         * 然后調(diào)用WorkThread.run(runnable)方法
         */
        public boolean runInThread(Runnable runnable) {
        if (runnable == null) {
        return false;
        }
        
        synchronized (nextRunnableLock) {
        
        handoffPending = true;
        
        // Wait until a worker thread is available
        while ((availWorkers.size() < 1) && !isShutdown) {
        try {
        nextRunnableLock.wait(500);
        } catch (InterruptedException ignore) {
        }
        }
        
        if (!isShutdown) {
        WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
        busyWorkers.add(wt);
        wt.run(runnable);
        } else {
        // If the thread pool is going down, execute the Runnable
        // within a new additional worker thread (no thread from the pool).
        
        WorkerThread wt = new WorkerThread(this, threadGroup,
        "WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
        busyWorkers.add(wt);
        workers.add(wt);
        wt.start();
        }
        nextRunnableLock.notifyAll();
        handoffPending = false;
        }
        
        return true;
        }
        
      • 源碼428行: WorkerThread ,是一個(gè)內(nèi)部類,主要是賦值并喚醒lock對(duì)象的等待線程隊(duì)列

        WorkerThread.run(Runnable newRunnable)

        public void run(Runnable newRunnable) {
        synchronized(lock) {
        if(runnable != null) {
        throw new IllegalStateException("Already running a Runnable!");
        }
        runnable = newRunnable;
        lock.notifyAll();
        }
        }
        
      • 源碼561行: WorkerThread 的run方法,方法執(zhí)行l(wèi)ock.notifyAll()后,對(duì)應(yīng)的WorkerThread就會(huì)來到run()方法。到這!接近曙光了!終于來到了執(zhí)行業(yè)務(wù)的execute()方法的倒數(shù)第二步,runnable對(duì)象是一個(gè)JobRunShell對(duì)象,下面在看JobRunShell.run()方法。

        WorkerThread.run()

        @Override
        public void run() {
        boolean ran = false;
        
        while (run.get()) {
        try {
        synchronized(lock) {
        while (runnable == null && run.get()) {
        lock.wait(500);
        }
        if (runnable != null) {
        ran = true;
        // 啟動(dòng)真正執(zhí)行的內(nèi)容,runnable就是JobRunShell
        runnable.run();
        }
        }
        } cache(){//...}
        }
        //if (log.isDebugEnabled())
        try {
        getLog().debug("WorkerThread is shut down.");
        } catch(Exception e) {
        // ignore to help with a tomcat glitch
        }
        }
        

      JobRunShell.run() & 從上面WorkerThread.run(),調(diào)用到這里執(zhí)行

      public void run() {
          qs.addInternalSchedulerListener(this);
      
          try {
              OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
              JobDetail jobDetail = jec.getJobDetail();
      
              do {
                  // ...
      
                  long startTime = System.currentTimeMillis();
                  long endTime = startTime;
      
                  // execute the job
                  try {
                      log.debug("Calling execute on job " + jobDetail.getKey());
                      
      // 執(zhí)行業(yè)務(wù)代碼,也就是我們的task
      job.execute(jec);
                      
      endTime = System.currentTimeMillis();
                  } catch (JobExecutionException jee) {
                      endTime = System.currentTimeMillis();
                      jobExEx = jee;
                      getLog().info("Job " + jobDetail.getKey() +
                              " threw a JobExecutionException: ", jobExEx);
                  } catch (Throwable e) {
                      endTime = System.currentTimeMillis();
                      getLog().error("Job " + jobDetail.getKey() +
                              " threw an unhandled Exception: ", e);
                      SchedulerException se = new SchedulerException(
                              "Job threw an unhandled exception.", e);
                      qs.notifySchedulerListenersError("Job ("
                              + jec.getJobDetail().getKey()
                              + " threw an exception.", se);
                      jobExEx = new JobExecutionException(se, false);
                  }
      
                  jec.setJobRunTime(endTime - startTime);
      
                  // 其他代碼
              } while (true);
      
          } finally {
              qs.removeInternalSchedulerListener(this);
          }
      }
      

      QuartzJobBean.execte() & 繼續(xù)往下走

      public final void execute(JobExecutionContext context) throws JobExecutionException {
      try {
      BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
      MutablePropertyValues pvs = new MutablePropertyValues();
      pvs.addPropertyValues(context.getScheduler().getContext());
      pvs.addPropertyValues(context.getMergedJobDataMap());
      bw.setPropertyValues(pvs, true);
      }
      catch (SchedulerException ex) {
      throw new JobExecutionException(ex);
      }
      executeInternal(context);
      }
      

      MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)

      protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
      try {
      // 反射執(zhí)行業(yè)務(wù)代碼
      context.setResult(this.methodInvoker.invoke());
      }
      catch (InvocationTargetException ex) {
      if (ex.getTargetException() instanceof JobExecutionException) {
      // -> JobExecutionException, to be logged at info level by Quartz
      throw (JobExecutionException) ex.getTargetException();
      }
      else {
      // -> "unhandled exception", to be logged at error level by Quartz
      throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException());
      }
      }
      catch (Exception ex) {
      // -> "unhandled exception", to be logged at error level by Quartz
      throw new JobMethodInvocationFailedException(this.methodInvoker, ex);
      }
      }
      

      五、綜上總結(jié)

      • quartz,即石英的意思,隱喻如石英鐘般對(duì)時(shí)間的準(zhǔn)確把握。
      • 源碼分析是一個(gè)很快樂的過程,這個(gè)快樂是分析完才能獲得的快樂。縱橫交互的背后是面向?qū)ο蟮母叨冉怦?#xff0c;對(duì)線程精彩的使用,將任務(wù)執(zhí)行做成計(jì)劃單,簡直是一個(gè)超級(jí)棒的作品。
      • 對(duì)于quartz.properties,簡單場景下,開發(fā)者不用自定義配置,使用quartz默認(rèn)配置即可,但在要求較高的使用場景中還是要自定義配置,比如通過org.quartz.threadPool.threadCount設(shè)置足夠的線程數(shù)可提高多job場景下的運(yùn)行性能。
      • quartz 對(duì)任務(wù)處理高度解耦,job與trigger解藕,將任務(wù)本身和任務(wù)執(zhí)行策略解藕,這樣可以方便實(shí)現(xiàn)N個(gè)任務(wù)和M個(gè)執(zhí)行策略自由組合。
      • scheduler單獨(dú)分離出來,相當(dāng)于一個(gè)指揮官,可以從全局做調(diào)度,比如監(jiān)聽哪些trigger已經(jīng)ready、分配線程等等。
      • 外部鏈接:
        • http://www.
        • quartz-2.1.x/configuration

        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請遵守用戶 評(píng)論公約

        類似文章 更多