乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      Python學習教程:定時庫APScheduler的原理及用法

       千鋒Python學堂 2019-12-12

      Python學習教程之定時庫APScheduler的原理及用法:

      APScheduler簡介

      APscheduler全稱Advanced Python Scheduler

      作用為在指定的時間規(guī)則執(zhí)行指定的作業(yè)。

      • 指定時間規(guī)則的方式可以是間隔多久執(zhí)行,可以是指定日期時間的執(zhí)行,也可以類似Linux系統(tǒng)中Crontab中的方式執(zhí)行任務。

      • 指定的任務就是一個Python函數(shù)。

      APScheduler組件

      APScheduler版本 3.6.3

      APScheduler中幾個重要的概念

      Job 作業(yè)

      作用

      Job作為APScheduler最小執(zhí)行單位。
      創(chuàng)建Job時指定執(zhí)行的函數(shù),函數(shù)中所需參數(shù),Job執(zhí)行時的一些設置信息。

      構建說明

      id:指定作業(yè)的唯一ID

      name:指定作業(yè)的名字

      trigger:apscheduler定義的觸發(fā)器,用于確定Job的執(zhí)行時間,根據(jù)設置的trigger規(guī)則,計算得到下次執(zhí)行此job的
      時間, 滿足時將會執(zhí)行

      executor:apscheduler定義的執(zhí)行器,job創(chuàng)建時設置執(zhí)行器的名字,根據(jù)字符串你名字到scheduler獲取到執(zhí)行此
      job的 執(zhí)行器,執(zhí)行job指定的函數(shù)

      max_instances:執(zhí)行此job的最大實例數(shù),executor執(zhí)行job時,根據(jù)job的id來計算執(zhí)行次數(shù),根據(jù)設置的最大實例數(shù)
      來確定是否可執(zhí)行

      next_run_time:Job下次的執(zhí)行時間,創(chuàng)建Job時可以指定一個時間[datetime],不指定的話則默認根據(jù)trigger獲取觸
      發(fā)時間

      misfire_grace_time:Job的延遲執(zhí)行時間,例如Job的計劃執(zhí)行時間是21:00:00,但因服務重啟或其他原因?qū)е?br>21:00:31才執(zhí)行,如果設置此key為40,則該job會繼續(xù)執(zhí)行,否則將會丟棄此job

      coalesce:Job是否合并執(zhí)行,是一個bool值。例如scheduler停止20s后重啟啟動,而job的觸發(fā)器設置為5s執(zhí)行
      一次,因此此job錯過了4個執(zhí)行時間,如果設置為是,則會合并到一次執(zhí)行,否則會逐個執(zhí)行

      func:Job執(zhí)行的函數(shù)

      args:Job執(zhí)行函數(shù)需要的位置參數(shù)

      kwargs:Job執(zhí)行函數(shù)需要的關鍵字參數(shù)

      Trigger 觸發(fā)器

      Trigger綁定到Job,在scheduler調(diào)度篩選Job時,根據(jù)觸發(fā)器的規(guī)則計算出Job的觸發(fā)時間,然后與當前時間比較
      確定此Job是否會被執(zhí)行,總之就是根據(jù)trigger規(guī)則計算出下一個執(zhí)行時間。

      Trigger有多種種類,指定時間的DateTrigger,指定間隔時間的IntervalTrigger,像Linux的crontab
      一樣的CronTrigger

      目前APScheduler支持觸發(fā)器:

      DateTrigger
      IntervalTrigger
      CronTrigger

      Executor 執(zhí)行器

      Executor在scheduler中初始化,另外也可通過scheduler的add_executor動態(tài)添加Executor。 
      每個executor都會綁定一個alias,這個作為唯一標識綁定到Job,在實際執(zhí)行時會根據(jù)Job綁定的executor
      找到實際的執(zhí)行器對象,然后根據(jù)執(zhí)行器對象執(zhí)行Job

      Executor的種類會根據(jù)不同的調(diào)度來選擇,如果選擇AsyncIO作為調(diào)度的庫,那么選擇AsyncIOExecutor,如果
      選擇tornado作為調(diào)度的庫,選擇TornadoExecutor,如果選擇啟動進程作為調(diào)度,
      選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以

      Executor的選擇需要根據(jù)實際的scheduler來選擇不同的執(zhí)行器

      目前APScheduler支持的Executor:

      AsyncIOExecutor
      GeventExecutor
      ThreadPoolExecutor
      ProcessPoolExecutor
      TornadoExecutor
      TwistedExecutor

      Jobstore 作業(yè)存儲

      Jobstore在scheduler中初始化,另外也可通過scheduler的add_jobstore動態(tài)添加Jobstore。每個jobstore都會
      綁定一個alias,scheduler在Add Job時,根據(jù)指定的jobstore在scheduler中找到相應的jobstore,
      并將job添加到jobstore中。

      Jobstore主要是通過pickle庫的loads和dumps【實現(xiàn)核心是通過python的__getstate__和__setstate__重寫實現(xiàn)】,
      每次變更時將Job動態(tài)保存到存儲中,使用時再動態(tài)的加載出來,作為存儲的可以是redis,也可以是數(shù)據(jù)庫【通過
      sqlarchemy這個庫集成多種數(shù)據(jù)庫】,也可以是mongodb等

      目前APScheduler支持的Jobstore:

      MemoryJobStore
      MongoDBJobStore
      RedisJobStore
      RethinkDBJobStore
      SQLAlchemyJobStore
      ZooKeeperJobStore

      Event 事件

      Event是APScheduler在進行某些操作時觸發(fā)相應的事件,用戶可以自定義一些函數(shù)來監(jiān)聽這些事件,
      當觸發(fā)某些Event時,做一些具體的操作

      常見的比如。Job執(zhí)行異常事件 EVENT_JOB_ERROR。Job執(zhí)行時間錯過事件 EVENT_JOB_MISSED。

      目前APScheduler定義的Event

      EVENT_SCHEDULER_STARTED
      EVENT_SCHEDULER_START
      EVENT_SCHEDULER_SHUTDOWN
      EVENT_SCHEDULER_PAUSED
      EVENT_SCHEDULER_RESUMED
      EVENT_EXECUTOR_ADDED
      EVENT_EXECUTOR_REMOVED
      EVENT_JOBSTORE_ADDED
      EVENT_JOBSTORE_REMOVED
      EVENT_ALL_JOBS_REMOVED
      EVENT_JOB_ADDED
      EVENT_JOB_REMOVED
      EVENT_JOB_MODIFIED
      EVENT_JOB_EXECUTED
      EVENT_JOB_ERROR
      EVENT_JOB_MISSED
      EVENT_JOB_SUBMITTED
      EVENT_JOB_MAX_INSTANCES

      Listener 監(jiān)聽事件

      Listener表示用戶自定義監(jiān)聽的一些Event,當Job觸發(fā)了EVENT_JOB_MISSED事件時可以根據(jù)需求做一些其他處理。

      Scheduler 調(diào)度器

      Scheduler是APScheduler的核心,所有相關組件通過其定義。scheduler啟動之后,將開始按照配置的任務進行調(diào)度。
      除了依據(jù)所有定義Job的trigger生成的將要調(diào)度時間喚醒調(diào)度之外。當發(fā)生Job信息變更時也會觸發(fā)調(diào)度。

      scheduler可根據(jù)自身的需求選擇不同的組件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則選擇
      TornadoScheduler。

      目前APScheduler支持的Scheduler:

      AsyncIOScheduler
      BackgroundScheduler
      BlockingScheduler
      GeventScheduler
      QtScheduler
      TornadoScheduler
      TwistedScheduler

      這里前面一期的Python學習教程里有提到過!

      Scheduler工作流程圖

      這里重點挑選兩個重要的流程畫一個簡陋的流程圖,來看一下scheduler的工作原理。其一個是添加add job,另一是scheduler每次喚醒調(diào)度時的執(zhí)行過程

      Scheduler添加job流程

      Python學習教程:定時庫APScheduler的原理及用法

      Scheduler調(diào)度流程

      Python學習教程:定時庫APScheduler的原理及用法

      APScheduler使用示例

      AsyncIO調(diào)度示例

      import asyncio
      import datetime

      from apscheduler.events import EVENT_JOB_EXECUTED
      from apscheduler.executors.asyncio import AsyncIOExecutor
      from apscheduler.jobstores.redis import RedisJobStore # 需要安裝redis
      from apscheduler.schedulers.asyncio import AsyncIOScheduler
      from apscheduler.triggers.interval import IntervalTrigger
      from apscheduler.triggers.cron import CronTrigger

      # 定義jobstore 使用redis 存儲job信息
      default_redis_jobstore = RedisJobStore(
      db=2,
      jobs_key="apschedulers.default_jobs",
      run_times_key="apschedulers.default_run_times",
      host="127.0.0.1",
      port=6379,
      password="test"
      )

      # 定義executor 使用asyncio是的調(diào)度執(zhí)行規(guī)則
      first_executor = AsyncIOExecutor()

      # 初始化scheduler時,可以直接指定jobstore和executor
      init_scheduler_options = {
      "jobstores": {
      # first 為 jobstore的名字,在創(chuàng)建Job時直接直接此名字即可
      "default": default_redis_jobstore
      },
      "executors": {
      # first 為 executor 的名字,在創(chuàng)建Job時直接直接此名字,執(zhí)行時則會使用此executor執(zhí)行
      "first": first_executor
      },
      # 創(chuàng)建job時的默認參數(shù)
      "job_defaults": {
      'coalesce': False, # 是否合并執(zhí)行
      'max_instances': 1 # 最大實例數(shù)
      }
      }
      # 創(chuàng)建scheduler
      scheduler = AsyncIOScheduler(**init_scheduler_options)

      # 啟動調(diào)度
      scheduler.start()

      second_redis_jobstore = RedisJobStore(
      db=2,
      jobs_key="apschedulers.second_jobs",
      run_times_key="apschedulers.second_run_times",
      host="127.0.0.1",
      port=6379,
      password="test"
      )

      scheduler.add_jobstore(second_redis_jobstore, 'second')
      # 定義executor 使用asyncio是的調(diào)度執(zhí)行規(guī)則
      second_executor = AsyncIOExecutor()
      scheduler.add_executor(second_executor, "second")


      # *********** 關于 APScheduler中有關Event相關使用示例 *************
      # 定義函數(shù)監(jiān)聽事件
      def job_execute(event):
      """
      監(jiān)聽事件處理
      :param event:
      :return:
      """
      print(
      "job執(zhí)行job:\ncode => {}\njob.id => {}\njobstore=>{}".format(
      event.code,
      event.job_id,
      event.jobstore
      ))


      # 給EVENT_JOB_EXECUTED[執(zhí)行完成job事件]添加回調(diào),這里就是每次Job執(zhí)行完成了我們就輸出一些信息
      scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED)


      # *********** 關于 APScheduler中有關Job使用示例 *************
      # 使用的是asyncio,所以job執(zhí)行的函數(shù)可以是一個協(xié)程,也可以是一個普通函數(shù),AsyncIOExecutor會根據(jù)配置的函數(shù)來進行調(diào)度,
      # 如果是協(xié)程則會直接丟入到loop中,如果是普通函數(shù)則會啟用線程處理
      # 我們定義兩個函數(shù)來看看執(zhí)行的結果

      def interval_func(message):
      print("現(xiàn)在時間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
      print("我是普通函數(shù)")
      print(message)


      async def async_func(message):
      print("現(xiàn)在時間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
      print("我是協(xié)程")
      print(message)


      # 將上述的兩個函數(shù)按照不同的方式創(chuàng)造觸發(fā)器來執(zhí)行
      # *********** 關于 APScheduler中有關Trigger使用示例 *************
      # 使用Trigger有兩種方式,一種是用類創(chuàng)建使用,另一個是使用字符串的方式
      # 使用字符串指定別名, scheduler初始化時已將其定義的trigger加載,所以指定字符串可以直接使用


      if scheduler.get_job("interval_func_test", "default"):
      # 存在的話,先刪除
      scheduler.remove_job("interval_func_test", "default")

      # 立馬開始 2分鐘后結束, 每10s執(zhí)行一次 存儲到first jobstore second執(zhí)行
      scheduler.add_job(interval_func, "interval",
      args=["我是10s執(zhí)行一次,存放在jobstore default, executor default"],
      seconds=10,
      id="interval_func_test",
      jobstore="default",
      executor="default",
      start_date=datetime.datetime.now(),
      end_date=datetime.datetime.now() + datetime.timedelta(seconds=240))

      # 先創(chuàng)建tigger
      trigger = IntervalTrigger(seconds=5)

      if scheduler.get_job("interval_func_test_2", "second"):
      # 存在的話,先刪除
      scheduler.remove_job("interval_func_test_2", "second")
      # 每隔5s執(zhí)行一次
      scheduler.add_job(async_func, trigger, args=["我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second"],
      id="interval_func_test_2",
      jobstore="second",
      executor="second")

      # 使用協(xié)程的函數(shù)執(zhí)行,且使用cron的方式配置觸發(fā)器

      if scheduler.get_job("cron_func_test", "default"):
      # 存在的話,先刪除
      scheduler.remove_job("cron_func_test", "default")

      # 立馬開始 每10s執(zhí)行一次
      scheduler.add_job(async_func, "cron",
      args=["我是 每分鐘 30s 時執(zhí)行一次,存放在jobstore default, executor default"],
      second='30',
      id="cron_func_test",
      jobstore="default",
      executor="default")

      # 先創(chuàng)建tigger
      trigger = CronTrigger(second='20,40')

      if scheduler.get_job("cron_func_test_2", "second"):
      # 存在的話,先刪除
      scheduler.remove_job("cron_func_test_2", "second")
      # 每隔5s執(zhí)行一次
      scheduler.add_job(async_func, trigger, args=["我是每分鐘 20s 40s時各執(zhí)行一次,存放在jobstore second, executor = second"],
      id="cron_func_test_2",
      jobstore="second",
      executor="second")

      # 使用創(chuàng)建trigger對象直接創(chuàng)建
      print("啟動: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
      asyncio.get_event_loop().run_forever()

      輸出結果部分截取

      啟動之后,每隔5s運行一次的JOB

      啟動: 2019-12-05 14:13:11
      【這部分是定義的協(xié)程函數(shù)輸出的內(nèi)容】
      現(xiàn)在時間: 2019-12-05 14:13:16
      我是協(xié)程
      我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second
      【這部分是監(jiān)聽job執(zhí)行完成之后的回調(diào)輸出】
      job執(zhí)行job:
      code => 4096
      job.id => interval_func_test_2
      jobstore=>second

      在20s和40s時各執(zhí)行一次的Job

      現(xiàn)在時間: 2019-12-05 14:13:20
      我是協(xié)程
      我是每分鐘 20s 40s時各執(zhí)行一次,存放在jobstore second, executor = second
      job執(zhí)行job:
      code => 4096
      job.id => cron_func_test_2
      jobstore=>second

      每隔10s執(zhí)行一次的job

      現(xiàn)在時間: 2019-12-05 14:13:21
      我是普通函數(shù)
      我是10s執(zhí)行一次,存放在jobstore default, executor default
      現(xiàn)在時間: 2019-12-05 14:13:21
      我是協(xié)程
      我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second
      job執(zhí)行job:
      code => 4096
      job.id => interval_func_test
      jobstore=>default
      job執(zhí)行job:
      code => 4096
      job.id => interval_func_test_2
      jobstore=>second

      每隔5s執(zhí)行一次的Job

      現(xiàn)在時間: 2019-12-05 14:13:26
      我是協(xié)程
      我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second
      job執(zhí)行job:
      code => 4096
      job.id => interval_func_test_2
      jobstore=>second

      每分鐘30s時執(zhí)行一次

      現(xiàn)在時間: 2019-12-05 14:13:30
      我是協(xié)程
      我是 每分鐘 30s 時執(zhí)行一次,存放在jobstore default, executor default
      job執(zhí)行job:
      code => 4096
      job.id => cron_func_test
      jobstore=>default

      總結

      apscheduler的工作原理及用法基本這樣。

      apscheduler強大的地方是可以集成到tornado,django,flask等框架,也可以單獨運行。比如CronTrigger還有更強大的用法,可以參照官網(wǎng)的cron用法

      上面例子只列舉了一些常規(guī)用法,其實還有一些更切合實際的用法,利用APSchedulder的特性,動態(tài)的添加Job,暫停Job,刪除Job,重啟Job等。先按照功能性質(zhì)定義好不同的函數(shù),然后開發(fā)一個web服務。在web服務中動態(tài)操作各種Job,可以想象在監(jiān)控系統(tǒng)中根據(jù)需求添加一些任務,豈不美哉。

      有時間將這部分做一個例子再來分享,大家可以期待一下下次的Python學習教程,本期Python教程有不清楚的地方可以留言哈!

        本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多