乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      python協(xié)程系列(五)——asyncio的核心概念與基本架構(gòu)

       鷹兔牛熊眼 2019-01-23

      聲明:本文針對(duì)的是python3.4以后的版本的,因?yàn)閺?.4開始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。比如在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,但是在3.5以后的版本中使用async、await兩個(gè)關(guān)鍵字代替,雖然語法上稍微有所差異,但是原理是一樣的。本文用最通俗的語言解釋了pythonasyncio背后的一些核心概念,簡(jiǎn)要解析了asyncio的設(shè)計(jì)架構(gòu),并給出了使用python進(jìn)行asyncio異步編程的一般模板。本文較長(zhǎng):閱讀全文約30min。


      目錄

      一 一些最重要的概念

           1.1 協(xié)程(coroutine)——本質(zhì)就是一個(gè)

                函數(shù)

           1.2 事件循環(huán)——event_loop

           1.3 什么是awaitable對(duì)象——即可暫停

                等待的對(duì)象

           1.4 什么是task任務(wù)

           1.5 什么是future?

      二 asyncio的基本架構(gòu)

           2.1 常見的一些高層API方法

           2.2 Task 類詳解

           2.3 異步函數(shù)的結(jié)果獲取

      asyncio異步編程的基本模板

          3.1 python3.7之前的版本

            3.1.1 例子一:無參數(shù)、無返回值

            3.1.2 例子二:有參數(shù)、有返回值

            3.1.3 總結(jié):四步走(針對(duì)python3.7

                    之前的版本)

          3.2 python3.7版本

            3.2.1 例子一:無參數(shù)、無返回值

            3.2.2 例子二:有參數(shù)、有返回值

            3.2.3 總結(jié):兩步走(針對(duì)python3.7)

      協(xié)程編程的優(yōu)點(diǎn)


      01

      一些最重要的概念

      協(xié)程(coroutine)——本質(zhì)就是一個(gè)函數(shù)


      所謂的“協(xié)程”就是一個(gè)函數(shù),這個(gè)函數(shù)需要有兩個(gè)基本的組成要素,第一,需要使用@asyncio.coroutine進(jìn)行裝飾;第二,函數(shù)體內(nèi)一定要有yield from 返回的的generator,或者是說使用yield from 返回另一個(gè)協(xié)程對(duì)象。
      當(dāng)然,這兩個(gè)條件并不是硬性規(guī)定的,如果沒有這兩個(gè)條件,依然是函數(shù),只不過是普通函數(shù)而已。
      怎么判斷一個(gè)函數(shù)是不是協(xié)程?通過asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判斷,返回true,則是。

      那么協(xié)程函數(shù)有什么作用呢?


      (1)result = yield from future

      作用一:返回future的結(jié)果。什么是future?后面會(huì)講到。當(dāng)協(xié)程函數(shù)執(zhí)行到這一句,協(xié)程會(huì)被懸掛起來,知道future的結(jié)果被返回。如果是future被中途取消,則會(huì)觸發(fā)CancelledError異常。由于task是future的子類,后面也會(huì)介紹,關(guān)于future的所有應(yīng)用,都同樣適用于task


      (2)result = yield from coroutine
      等候另一個(gè)協(xié)程函數(shù)返回結(jié)果或者是觸發(fā)異常

      (3)result= yield from task
      返回一個(gè)task的結(jié)果

      (4)return expression
      作為一個(gè)函數(shù),他本身也是可以返回某一個(gè)結(jié)果的

      (5)raise exception

      事件循環(huán)——event_loop


      協(xié)程函數(shù),不是像普通函數(shù)那樣直接調(diào)用運(yùn)行的,必須添加到事件循環(huán)中,然后由事件循環(huán)去運(yùn)行,單獨(dú)運(yùn)行協(xié)程函數(shù)是不會(huì)有結(jié)果的,看一個(gè)簡(jiǎn)單的例子:
      import time
      import asyncio
      async def say_after_time(delay,what):
              await asyncio.sleep(delay)
              print(what)

      async def main():
              print(f'開始時(shí)間為: {time.time()}')
              await say_after_time(1,'hello')
              await say_after_time(2,'world')
              print(f'結(jié)束時(shí)間為: {time.time()}')

      loop=asyncio.get_event_loop()    #創(chuàng)建事件循環(huán)對(duì)象
      #loop=asyncio.new_event_loop()   #與上面等價(jià),創(chuàng)建新的事件循環(huán)
      loop.run_until_complete(main())  #通過事件循環(huán)對(duì)象運(yùn)行協(xié)程函數(shù)
      loop.close()

      在python3.6版本中,如果我們單獨(dú)像執(zhí)行普通函數(shù)那樣執(zhí)行一個(gè)協(xié)程函數(shù),只會(huì)返回一個(gè)coroutine對(duì)象(python3.7)如下所示:

      >>> main()
      <coroutine object main at 0x1053bb7c8>


      (1)獲取事件循環(huán)對(duì)象的幾種方式:
      下面幾種方式可以用來獲取、設(shè)置、創(chuàng)建事件循環(huán)對(duì)象loop
      loop=asyncio.get_running_loop() 返回(獲?。┰诋?dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會(huì)顯示錯(cuò)誤;它是python3.7中新添加的loop=asyncio.get_event_loop() 獲得一個(gè)事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個(gè)新的事件循環(huán)loop;
      loop=asyncio.set_event_loop(loop) 設(shè)置一個(gè)事件循環(huán)為當(dāng)前線程的事件循環(huán);
      loop=asyncio.new_event_loop() 創(chuàng)建一個(gè)新的事件循環(huán)

      (2)通過事件循環(huán)運(yùn)行協(xié)程函數(shù)的兩種方式:

      ①方式一:創(chuàng)建事件循環(huán)對(duì)象loop,即asyncio.get_event_loop(),通過事件循環(huán)運(yùn)行協(xié)程函數(shù)
      ②方式二:直接通過asyncio.run(function_name)運(yùn)行協(xié)程函數(shù)。但是需要注意的是,首先run函數(shù)是python3.7版本新添加的,前面的版本是沒有的;其次,這個(gè)run函數(shù)總是會(huì)創(chuàng)建一個(gè)新的事件循環(huán)并在run結(jié)束之后關(guān)閉事件循環(huán),所以,如果在同一個(gè)線程中已經(jīng)有了一個(gè)事件循環(huán),則不能再使用這個(gè)函數(shù)了,因?yàn)橥粋€(gè)線程不能有兩個(gè)事件循環(huán),而且這個(gè)run函數(shù)不能同時(shí)運(yùn)行兩次,因?yàn)樗呀?jīng)創(chuàng)建一個(gè)了。即同一個(gè)線程中是不允許有多個(gè)事件循環(huán)loop的
      asyncio.run()是python3.7 新添加的內(nèi)容,也是后面推薦的運(yùn)行任務(wù)的方式,因?yàn)樗歉邔覣PI,后面會(huì)講到它與asyncio.run_until_complete()的差異性,run_until_complete()是相對(duì)較低層的API。

      注意:到底什么是事件循環(huán)?如何理解?

      可以這樣理解:線程一直在各個(gè)協(xié)程方法之間永不停歇的游走,遇到一個(gè)yield from 或者await就懸掛起來,然后又走到另外一個(gè)方法,依次進(jìn)行下去,知道事件循環(huán)所有的方法執(zhí)行完畢。實(shí)際上loop是BaseEventLoop的一個(gè)實(shí)例,我們可以查看定義,它到底有哪些方法可調(diào)用。


      什么是awaitable對(duì)象——即可暫停等待的對(duì)象


      有三類對(duì)象是可等待的,即 coroutines, Tasks, and Futures.
      coroutine:本質(zhì)上就是一個(gè)函數(shù),一前面的生成器yield和yield from為基礎(chǔ),不再贅述;
      Tasks: 任務(wù),顧名思義,就是要完成某件事情,其實(shí)就是對(duì)協(xié)程函數(shù)進(jìn)一步的封裝;
      Future:它是一個(gè)“更底層”的概念,他代表一個(gè)一步操作的最終結(jié)果,因?yàn)橐徊讲僮饕话阌糜诤臅r(shí)操作,結(jié)果不會(huì)立即得到,會(huì)在“將來”得到異步運(yùn)行的結(jié)果,故而命名為Future。
      三者的關(guān)系,coroutine可以自動(dòng)封裝成task,而Task是Future的子類。

      什么是task任務(wù)


      如前所述,Task用來 并發(fā)調(diào)度的協(xié)程,即對(duì)協(xié)程函數(shù)的進(jìn)一步包裝?那為什么還需要包裝呢?因?yàn)閱渭兊膮f(xié)程函數(shù)僅僅是一個(gè)函數(shù)而已,將其包裝成任務(wù),任務(wù)是可以包含各種狀態(tài)的,異步編程最重要的就是對(duì)異步操作狀態(tài)的把控了。

      (1)創(chuàng)建任務(wù)(兩種方法):

      方法一:task = asyncio.create_task(coro())   # 這是3.7版本新添加的


      方法二:task = asyncio.ensure_future(coro())

      也可以使用
      loop.create_future()
      loop.create_task(coro)
      也是可以的。

      備注:關(guān)于任務(wù)的詳解,會(huì)在后面的系列文章繼續(xù)講解,本文只是概括性的說明。

      (2)獲取某一個(gè)任務(wù)的方法:

      方法一:task=asyncio.current_task(loop=None)
      返回在某一個(gè)指定的loop中,當(dāng)前正在運(yùn)行的任務(wù),如果沒有任務(wù)正在運(yùn)行,則返回None;
      如果loop為None,則默認(rèn)為在當(dāng)前的事件循環(huán)中獲取,

      方法二:asyncio.all_tasks(loop=None)
      返回某一個(gè)loop中還沒有結(jié)束的任務(wù)

      什么是future?


      Future是一個(gè)較低層的可等待(awaitable)對(duì)象,他表示的是異步操作的最終結(jié)果,當(dāng)一個(gè)Future對(duì)象被等待的時(shí)候,協(xié)程會(huì)一直等待,直到Future已經(jīng)運(yùn)算完畢。
      Future是Task的父類,一般情況下,已不用去管它們兩者的詳細(xì)區(qū)別,也沒有必要去用Future,用Task就可以了,返回 future 對(duì)象的低級(jí)函數(shù)的一個(gè)很好的例子是 loop.run_in_executor().




      02

      asyncio的基本架構(gòu)

      前面介紹了asyncio里面最為核心的幾個(gè)概念,如果能夠很好地理解這些概念,對(duì)于學(xué)習(xí)協(xié)程是非常有幫助的,但是按照我個(gè)人的風(fēng)格,我會(huì)先說asyncio的架構(gòu),理解asyncio的設(shè)計(jì)架構(gòu)有助于更好地應(yīng)用和理解。

      asyncio分為高層API和低層API,我們都可以使用,就像我前面在講matplotlib的架構(gòu)的時(shí)候所講的一樣,我們前面所講的Coroutine和Tasks屬于高層API,而Event Loop 和Future屬于低層API。當(dāng)然asyncio所涉及到的功能遠(yuǎn)不止于此,我們只看這么多。下面是是高層API和低層API的概覽:

      High-level APIs

          ●Coroutines and Tasks(本文要寫的)
          ●Streams
          ●Synchronization Primitives
          ●Subprocesses
          ●Queues
          ●Exceptions

      Low-level APIs

          ●Event Loop(下一篇要寫的)
          ●Futures
          ●Transports and Protocols
          ●Policies
          ●Platform Support

      所謂的高層API主要是指那些asyncio.xxx()的方法


      常見的一些高層API方法


      (1)運(yùn)行異步協(xié)程
      asyncio.run(coro, *, debug=False)  #運(yùn)行一個(gè)一步程序,參見上面

      (2)創(chuàng)建任務(wù)
      task=asyncio.create_task(coro)  #python3.7  ,參見上面
      task = asyncio.ensure_future(coro())

      (3)睡眠
      await asyncio.sleep(delay, result=None, *, loop=None)
      這個(gè)函數(shù)表示的是:當(dāng)前的那個(gè)任務(wù)(協(xié)程函數(shù))睡眠多長(zhǎng)時(shí)間,而允許其他任務(wù)執(zhí)行。這是它與time.sleep()的區(qū)別,time.sleep()是當(dāng)前線程休息,注意他們的區(qū)別哦。
      另外如果提供了參數(shù)result,當(dāng)當(dāng)前任務(wù)(協(xié)程)結(jié)束的時(shí)候,它會(huì)返回;
      loop參數(shù)將會(huì)在3.10中移除,這里就不再說了。

      (4)并發(fā)運(yùn)行多個(gè)任務(wù)
      await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
      它本身也是awaitable的。
      *coros_or_futures是一個(gè)序列拆分操作,如果是以個(gè)協(xié)程函數(shù),則會(huì)自動(dòng)轉(zhuǎn)換成Task。
      當(dāng)所有的任務(wù)都完成之后,返回的結(jié)果是一個(gè)列表的形式,列表中值的順序和*coros_or_futures完成的順序是一樣的。
      return_exceptions:

      False,這是他的默認(rèn)值,第一個(gè)出發(fā)異常的任務(wù)會(huì)立即返回,然后其他的任務(wù)繼續(xù)執(zhí)行;
      True,對(duì)于已經(jīng)發(fā)生了異常的任務(wù),也會(huì)像成功執(zhí)行了任務(wù)那樣,等到所有的任務(wù)執(zhí)行結(jié)束一起將錯(cuò)誤的結(jié)果返回到最終的結(jié)果列表里面。

      如果gather()本身被取消了,那么綁定在它里面的任務(wù)也就取消了。

      (5)防止任務(wù)取消
      await asyncio.shield(*arg, *, loop=None)
      它本身也是awaitable的。顧名思義,shield為屏蔽、保護(hù)的意思,即保護(hù)一個(gè)awaitable 對(duì)象防止取消,一般情況下不推薦使用,而且在使用的過程中,最好使用try語句塊更好。

      try:
          res = await shield(something())
      except CancelledError:
          res = None

      (6)設(shè)置timeout——一定要好好理解

      await asyncio.wait_for(aw, timeout, *, loop=None)

      如果aw是一個(gè)協(xié)程函數(shù),會(huì)自動(dòng)包裝成一個(gè)任務(wù)task。參見下面的例子:

      import asyncio

      async def eternity():
          print('我馬上開始執(zhí)行')
          await asyncio.sleep(3600)  #當(dāng)前任務(wù)休眠1小時(shí),即3600秒
          print('終于輪到我了')

      async def main():
          # Wait for at most 1 second
          try:
              print('等你3秒鐘哦')
              await asyncio.wait_for(eternity(), timeout=3)  #休息3秒鐘了執(zhí)行任務(wù)
          except asyncio.TimeoutError:
              print('超時(shí)了!')

      asyncio.run(main())

      '''運(yùn)行結(jié)果為:
      等你3秒鐘哦
      我馬上開始執(zhí)行
      超時(shí)了!
      '''

      為什么?首先調(diào)用main()函數(shù),作為入口函數(shù),當(dāng)輸出‘等你3秒鐘哦’之后,main掛起,執(zhí)行eternity,然后打印‘我馬上開始執(zhí)行’,然后eternity掛起,而且要掛起3600秒,大于3,這時(shí)候出發(fā)TimeoutError。修改一下:‘’

      import asyncio

      async def eternity():
          print('我馬上開始執(zhí)行')
          await asyncio.sleep(2)  #當(dāng)前任務(wù)休眠2秒鐘,2<3
          print('終于輪到我了')

      async def main():
          # Wait for at most 1 second
          try:
              print('等你3秒鐘哦')
              await asyncio.wait_for(eternity(), timeout=3)  #給你3秒鐘執(zhí)行你的任務(wù)
          except asyncio.TimeoutError:
              print('超時(shí)了!')

      asyncio.run(main())

      '''運(yùn)行結(jié)果為:
      等你3秒鐘哦
      我馬上開始執(zhí)行
      終于輪到我了
      '''


      總結(jié):當(dāng)異步操作需要執(zhí)行的時(shí)間超過waitfor設(shè)置的timeout,就會(huì)觸發(fā)異常,所以在編寫程序的時(shí)候,如果要給異步操作設(shè)置timeout,一定要選擇合適,如果異步操作本身的耗時(shí)較長(zhǎng),而你設(shè)置的timeout太短,會(huì)涉及到她還沒做完,就拋出異常了。

      (7)多個(gè)協(xié)程函數(shù)時(shí)候的等候

      await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
      與上面的區(qū)別是,第一個(gè)參數(shù)aws是一個(gè)集合,要寫成集合set的形式,比如:
      {func(),func(),func3()}
      表示的是一系列的協(xié)程函數(shù)或者是任務(wù),其中協(xié)程會(huì)自動(dòng)包裝成任務(wù)。事實(shí)上,寫成列表的形式也是可以的。

      注意:該函數(shù)的返回值是兩個(gè)Tasks/Futures的集合:
      (done, pending)
      其中done是一個(gè)集合,表示已經(jīng)完成的任務(wù)tasks;pending也是一個(gè)集合,表示還沒有完成的任務(wù)。
      常見的使用方法為:done, pending = await asyncio.wait(aws)

      參數(shù)解釋:
      timeout (a float or int), 同上面的含義一樣,需要注意的是,這個(gè)不會(huì)觸發(fā)asyncio.TimeoutError異常,如果到了timeout還有任務(wù)沒有執(zhí)行完,那些沒有執(zhí)行完的tasks和futures會(huì)被返回到第二個(gè)集合pending里面。
      return_when參數(shù),顧名思義,他表示的是,什么時(shí)候wait函數(shù)該返回值。只能夠去下面的幾個(gè)值。

      ConstantDescription
      FIRST_COMPLETEDfirst_completes.當(dāng)任何一個(gè)task或者是future完成或者是取消,wait函數(shù)就返回
      FIRST_EXCEPTION當(dāng)任何一個(gè)task或者是future觸發(fā)了某一個(gè)異常,就返回,.如果是所有的task和future都沒有觸發(fā)異常,則等價(jià)與下面的 ALL_COMPLETED.
      ALL_COMPLETED當(dāng)所有的task或者是future都完成或者是都取消的時(shí)候,再返回。

      如下面例子所示:

      import asyncio
      import time

      a=time.time()

      async def hello1():  #大約2秒
          print('Hello world 01 begin')
          yield from asyncio.sleep(2)
          print('Hello again 01 end')

      async def hello2():  #大約3秒
          print('Hello world 02 begin')
          yield from asyncio.sleep(3)
          print('Hello again 02 end')

      async def hello3():  #大約4秒
          print('Hello world 03 begin')
          yield from asyncio.sleep(4)
          print('Hello again 03 end')

      async def main():   #入口函數(shù)
          done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED)
          for i in done:
              print(i)
          for j in pending:
              print(j)

      asyncio.run(main()) #運(yùn)行入口函數(shù)

      b=time.time()
      print('---------------------------------------')
      print(b-a)

      '''運(yùn)行結(jié)果為:
      Hello world 02 begin
      Hello world 01 begin
      Hello world 03 begin
      Hello again 01 end
      <Task finished coro=<hello1() done, defined at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:46> result=None>
      <Task pending coro=<hello3() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
      at 0x000001FA8D394438>()]>>
      <Task pending coro=<hello2() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object
      at 0x000001FA8D394378>()]>>
      ---------------------------------------
      2.033155679702759
      '''

      從上面可以看出,hello1()試運(yùn)行結(jié)束了的,hello2()和hello3()還沒結(jié)束。
      (8)asyncio.as_completed()函數(shù)

      這個(gè)函數(shù)我沒有找到合適的中文名稱去描述,所以哪位大神如果知道,望告知,不勝感激!它的函數(shù)原型如下:
      asyncio.as_completed(aws, *, loop=None, timeout=None)
      第一個(gè)參數(shù)aws:同上面一樣,是一個(gè)集合{}集合里面的元素是coroutine、task或者future
      第三個(gè)參數(shù)timeout:意義和上面講的的一樣
      那到底什么作用呢?其實(shí)很簡(jiǎn)單,個(gè)人感覺有點(diǎn)雞肋,從一個(gè)例子看起:

      import asyncio
      import time
      import threading

      a=time.time()

      @asyncio.coroutine
      def hello1():
          print('Hello world 01 begin')
          yield from asyncio.sleep(5)  #大約5秒
          print('Hello again 01 end')
          return '哈哈1'

      @asyncio.coroutine
      def hello2():
          print('Hello world 02 begin')
          yield from asyncio.sleep(3#大約3秒
          print('Hello again 02 end')
          return '哈哈2'

      @asyncio.coroutine
      def hello3():
          print('Hello world 03 begin')
          yield from asyncio.sleep(4#大約4秒
          print('Hello again 03 end')
          return '哈哈3'

      async def main():
          s=asyncio.as_completed({hello1(),hello2(),hello3()})
          for f in s:
              result=await f
              print(result)

      asyncio.run(main())

      b=time.time()
      print('---------------------------------------')
      print(b-a)

      '''運(yùn)行結(jié)果為:
      Hello world 03 begin
      Hello world 01 begin
      Hello world 02 begin
      Hello again 01 end
      哈哈1
      Hello again 02 end
      哈哈2
      Hello again 03 end
      哈哈3
      ---------------------------------------
      4.0225794315338135
      '''

      結(jié)論:asyncio.as_completed()函數(shù)返回的是一個(gè)可迭代(iterator)的對(duì)象,對(duì)象的每個(gè)元素就是一個(gè)future對(duì)象,很多小伙伴說,這不是相當(dāng)于沒變嗎?其實(shí)返回的future集合是對(duì)參數(shù)的future集合重新組合,組合的順序就是,最先執(zhí)行完的協(xié)程函數(shù)(coroutine、task、future)最先返回,從上面的代碼可知,參數(shù)為
      aws={hello1(),hello2(),hello3()},因?yàn)閔ello1大約花費(fèi)5秒、hello2大約花費(fèi)3秒、hello3大約花費(fèi)4秒。返回的結(jié)果為
      s={hello2()、hello3()、hello(1)},因?yàn)閔ello2時(shí)間最短,故而放在前面,hello1時(shí)間最長(zhǎng),故而放在最后面。然后對(duì)返回的集合s開始迭代。


      Task 類詳解


      先來看一下Task類的簡(jiǎn)單介紹(英文原文文檔)。

      上面的文字描述中推出了幾個(gè)非常重要的信息,特在此總結(jié)如下

      (1)他是作為一個(gè)python協(xié)程對(duì)象,和Future對(duì)象很像的這么一個(gè)對(duì)象,但不是線程安全的;他繼承了Future所有的API,,除了Future.set_result()和Future.set_Exception();

      (2)使用高層API  asyncio.ccreate_task()創(chuàng)建任務(wù),或者是使用低層API loop.create_task()或者是loop.ensure_future()創(chuàng)建任務(wù)對(duì)象;

      (3)相比于協(xié)程函數(shù),任務(wù)時(shí)有狀態(tài)的,可以使用Task.cancel()進(jìn)行取消,這會(huì)觸發(fā)CancelledError異常,使用cancelled()檢查是否取消。

      下面介紹Task類常見的一些使用函數(shù)

      (1)cancel()
      Request the Task to be cancelled.

      其實(shí)前面已經(jīng)有所介紹,最好是使用他會(huì)出發(fā)CancelledError異常,所以需要取消的協(xié)程函數(shù)里面的代碼最好在try-except語句塊中進(jìn)行,這樣方便觸發(fā)異常,打印相關(guān)信息,但是Task.cancel()沒有辦法保證任務(wù)一定會(huì)取消,而Future.cancel()是可以保證任務(wù)一定取消的。可以參見下面的一個(gè)例子:

      import asyncio

      async def cancel_me():
          print('cancel_me(): before sleep')
          try:
              await asyncio.sleep(3600#模擬一個(gè)耗時(shí)任務(wù)
          except asyncio.CancelledError:
              print('cancel_me(): cancel sleep')
              raise
          finally:
              print('cancel_me(): after sleep')

      async def main():
          #通過協(xié)程創(chuàng)建一個(gè)任務(wù),需要注意的是,在創(chuàng)建任務(wù)的時(shí)候,就會(huì)跳入到異步開始執(zhí)行
          #因?yàn)槭?.7版本,創(chuàng)建一個(gè)任務(wù)就相當(dāng)于是運(yùn)行了異步函數(shù)cancel_me
          task = asyncio.create_task(cancel_me()) 
          #等待一秒鐘
          await asyncio.sleep(1)
          print('main函數(shù)休息完了')
          #發(fā)出取消任務(wù)的請(qǐng)求
          task.cancel()  
          try:
              await task  #因?yàn)槿蝿?wù)被取消,觸發(fā)了異常
          except asyncio.CancelledError:
              print('main(): cancel_me is cancelled now')

      asyncio.run(main())

      '''運(yùn)行結(jié)果為:
      cancel_me(): before sleep
      main函數(shù)休息完了
      cancel_me(): cancel sleep
      cancel_me(): after sleep
      main(): cancel_me is cancelled now
      '''

      運(yùn)行過程分析:

      首先run函數(shù)啟動(dòng)主函數(shù)入口main,在main中,因?yàn)榈谝痪湓捑褪钦{(diào)用異步函數(shù)cancel_me()函數(shù),所以先打印出了第一句話;
      然后進(jìn)入cancel_me中的try語句,遇到await,暫停,這時(shí)候返回main中執(zhí)行,但是有在main中遇到了await,也會(huì)暫停,但是由于main中只需要暫停1秒,而cancel_me中要暫停3600秒,所以等到main的暫停結(jié)束后,接著運(yùn)行main,所以打印出第二句話;
      接下來遇到取消任務(wù)的請(qǐng)求task.cancel(),然后繼續(xù)執(zhí)行main里面的try,又遇到了await,接著main進(jìn)入暫停,接下來進(jìn)入到cancel_me函數(shù)中,但是由于main中請(qǐng)求了取消任務(wù),所以那個(gè)耗時(shí)3600秒的任務(wù)就不再執(zhí)行了,直接觸發(fā)了Cancelled_Error異常,打印出第三句話,接下來又raise一個(gè)異常信息;
      接下來執(zhí)行cancel_me的finally,打印出第四句話,此時(shí)cancel_me執(zhí)行完畢,由于他拋出了一個(gè)異常,返回到主程序main中,觸發(fā)異常,打印出第五句話。

      (2)done()
      當(dāng)一個(gè)被包裝得協(xié)程既沒有觸發(fā)異常、也沒有被取消的時(shí)候,意味著它是done的,返回true。

      (3)result()
      返回任務(wù)的執(zhí)行結(jié)果,
      當(dāng)任務(wù)被正常執(zhí)行完畢,則返回結(jié)果;
      當(dāng)任務(wù)被取消了,調(diào)用這個(gè)方法,會(huì)觸發(fā)CancelledError異常;
      當(dāng)任務(wù)返回的結(jié)果是無用的時(shí)候,則調(diào)用這個(gè)方法會(huì)觸發(fā)InvalidStateError;
      當(dāng)任務(wù)出發(fā)了一個(gè)異常而中斷,調(diào)用這個(gè)方法還會(huì)再次觸發(fā)這個(gè)使程序中斷的異常。

      (4)exception()

      返回任務(wù)的異常信息,觸發(fā)了什么異常,就返回什么異常,如果任務(wù)是正常執(zhí)行的無異常,則返回None;
      當(dāng)任務(wù)被取消了,調(diào)用這個(gè)方法會(huì)觸發(fā)CancelledError異常;
      當(dāng)任務(wù)沒有做完,調(diào)用這個(gè)方法會(huì)觸發(fā)InvalidStateError異常。
      下面還有一些不常用的方法,如下:

      (5)add_done_callback(callback, *, context=None)
      (6)remove_done_callback(callback)
      (7)get_stack(*, limit=None)
      (8)print_stack(*, limit=None, file=None)
      (9)all_tasks(loop=None),這是一個(gè)類方法
      (10)current_task(loop=None),這是一個(gè)類方法


      異步函數(shù)的結(jié)果獲取


      對(duì)于異步編程、異步函數(shù)而言,最重要的就是異步函數(shù)調(diào)用結(jié)束之后,獲取異步函數(shù)的返回值,我們可以有以下幾種方式來獲取函數(shù)的返回值,第一是直接通過Task.result()來獲??;第二種是綁定一個(gè)回調(diào)函數(shù)來獲取,即函數(shù)執(zhí)行完畢后調(diào)用一個(gè)函數(shù)來獲取異步函數(shù)的返回值。

      (1)直接通過result來獲取

      import asyncio
      import time


      async def hello1(a,b):
          print('Hello world 01 begin')
          await asyncio.sleep(3)  #模擬耗時(shí)任務(wù)3秒
          print('Hello again 01 end')
          return a+b

      coroutine=hello1(10,5)
      loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
      task=asyncio.ensure_future(coroutine)         #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù)列表
      loop.run_until_complete(task)                  #第三步:通過事件循環(huán)運(yùn)行
      print('-------------------------------------')
      print(task.result())
      loop.close() 

      '''運(yùn)行結(jié)果為
      Hello world 01 begin
      Hello again 01 end
      -------------------------------------
      15
      '''

      (2)通過定義回調(diào)函數(shù)來獲取

      import asyncio
      import time


      async def hello1(a,b):
          print('Hello world 01 begin')
          await asyncio.sleep(3)  #模擬耗時(shí)任務(wù)3秒
          print('Hello again 01 end')
          return a+b

      def callback(future):   #定義的回調(diào)函數(shù)
          print(future.result())

      loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
      task=asyncio.ensure_future(hello1(10,5))       #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù)
      task.add_done_callback(callback)                      #并被任務(wù)綁定一個(gè)回調(diào)函數(shù)

      loop.run_until_complete(task)                  #第三步:通過事件循環(huán)運(yùn)行
      loop.close()                                   #第四步:關(guān)閉事件循環(huán)


      '''運(yùn)行結(jié)果為:
      Hello world 01 begin
      Hello again 01 end
      15
      '''

      注意:所謂的回調(diào)函數(shù),就是指協(xié)程函數(shù)coroutine執(zhí)行結(jié)束時(shí)候會(huì)調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果。我們創(chuàng)建的task和回調(diào)里的future對(duì)象,實(shí)際上是同一個(gè)對(duì)象,因?yàn)閠ask是future的子類。




      03

      asyncio異步編程的基本模板

      事實(shí)上,在使用asyncio進(jìn)行異步編程的時(shí)候,語法形式往往是多樣性的,雖然理解異步編程的核心思想很重要,但是實(shí)現(xiàn)的時(shí)候終究還是要編寫語句的,本次給出的模板,是兩個(gè)不同的例子,例子一是三個(gè)異步方法,它們都沒有參數(shù),沒有返回值,都模擬一個(gè)耗時(shí)任務(wù);例子二是三個(gè)異步方法,都有參數(shù),都有返回值。


      python3.7之前的版本


      例子一:無參數(shù)、無返回值
      import asyncio
      import time

      a=time.time()

      async def hello1():
          print('Hello world 01 begin')
          await asyncio.sleep(3)  #模擬耗時(shí)任務(wù)3秒
          print('Hello again 01 end')

      async def hello2():
          print('Hello world 02 begin')
          await asyncio.sleep(2)   #模擬耗時(shí)任務(wù)2秒
          print('Hello again 02 end')

      async def hello3():
          print('Hello world 03 begin')
          await asyncio.sleep(4)   #模擬耗時(shí)任務(wù)4秒
          print('Hello again 03 end')

      loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
      tasks = [hello1(), hello2(),hello3()]          #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù)列表
      loop.run_until_complete(asyncio.wait(tasks))   #第三步:通過事件循環(huán)運(yùn)行
      loop.close()                                   #第四步:取消事件循環(huán)

      '''運(yùn)行結(jié)果為:
      Hello world 02 begin
      Hello world 03 begin
      Hello world 01 begin
      Hello again 02 end
      Hello again 01 end
      Hello again 03 end
      '''


      例子二:有參數(shù)、有返回值

      import asyncio
      import time


      async def hello1(a,b):
          print('Hello world 01 begin')
          await asyncio.sleep(3)  #模擬耗時(shí)任務(wù)3秒
          print('Hello again 01 end')
          return a+b

      async def hello2(a,b):
          print('Hello world 02 begin')
          await asyncio.sleep(2)   #模擬耗時(shí)任務(wù)2秒
          print('Hello again 02 end')
          return a-b

      async def hello3(a,b):
          print('Hello world 03 begin')
          await asyncio.sleep(4)   #模擬耗時(shí)任務(wù)4秒
          print('Hello again 03 end')
          return a*b

      loop = asyncio.get_event_loop()                #第一步:創(chuàng)建事件循環(huán)
      task1=asyncio.ensure_future(hello1(10,5))
      task2=asyncio.ensure_future(hello2(10,5))
      task3=asyncio.ensure_future(hello3(10,5))
      tasks = [task1,task2,task3]                    #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù)列表
      loop.run_until_complete(asyncio.wait(tasks))   #第三步:通過事件循環(huán)運(yùn)行
      print(task1.result())                               #并且在所有的任務(wù)完成之后,獲取異步函數(shù)的返回值   
      print(task2.result())
      print(task3.result())
      loop.close()                                   #第四步:關(guān)閉事件循環(huán)

      '''運(yùn)行結(jié)果為:
      Hello world 01 begin
      Hello world 02 begin
      Hello world 03 begin
      Hello again 02 end
      Hello again 01 end
      Hello again 03 end
      15
      5
      50
      '''



      總結(jié):四步走(針對(duì)python3.7之前的版本)

      第一步·:構(gòu)造事件循環(huán)

      loop=asyncio.get_running_loop() #返回(獲取)在當(dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會(huì)顯示錯(cuò)誤;它是python3.7中新添加的

      loop=asyncio.get_event_loop() #獲得一個(gè)事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個(gè)新的事件循環(huán)loop;

      loop=asyncio.set_event_loop(loop) #設(shè)置一個(gè)事件循環(huán)為當(dāng)前線程的事件循環(huán);

      loop=asyncio.new_event_loop()  #創(chuàng)建一個(gè)新的事件循環(huán)

      第二步:將一個(gè)或者是多個(gè)協(xié)程函數(shù)包裝成任務(wù)Task

      #高層API
      task = asyncio.create_task(coro(參數(shù)列表))   # 這是3.7版本新添加的
      task = asyncio.ensure_future(coro(參數(shù)列表)) 

      #低層API
      loop.create_future(coro)
      loop.create_task(coro)

      '''需要注意的是,在使用Task.result()獲取協(xié)程函數(shù)結(jié)果的時(shí)候,使用asyncio.create_task()卻會(huì)顯示錯(cuò)
      但是使用asyncio.ensure_future卻正確,本人暫時(shí)不知道原因,哪位大神知道,望告知,不勝感激!'''

      第三步:通過事件循環(huán)運(yùn)行

      loop.run_until_complete(asyncio.wait(tasks))  #通過asyncio.wait()整合多個(gè)task

      loop.run_until_complete(asyncio.gather(tasks))  #通過asyncio.gather()整合多個(gè)task

      loop.run_until_complete(task_1)  #單個(gè)任務(wù)則不需要整合

      loop.run_forever()  #但是這個(gè)方法在新版本已經(jīng)取消,不再推薦使用,因?yàn)槭褂闷饋聿缓?jiǎn)潔

      '''
      使用gather或者wait可以同時(shí)注冊(cè)多個(gè)任務(wù),實(shí)現(xiàn)并發(fā),但他們的設(shè)計(jì)是完全不一樣的,在前面的2.1.(4)中已經(jīng)討論過了,主要區(qū)別如下:
      (1)參數(shù)形式不一樣
      gather的參數(shù)為 *coroutines_or_futures,即如這種形式
            tasks = asyncio.gather(*[task1,task2,task3])或者
            tasks = asyncio.gather(task1,task2,task3)
            loop.run_until_complete(tasks)
      wait的參數(shù)為列表或者集合的形式,如下
            tasks = asyncio.wait([task1,task2,task3])
            loop.run_until_complete(tasks)
      (2)返回的值不一樣
      gather的定義如下,gather返回的是每一個(gè)任務(wù)運(yùn)行的結(jié)果,
            results = await asyncio.gather(*tasks) 
      wait的定義如下,返回dones是已經(jīng)完成的任務(wù),pending是未完成的任務(wù),都是集合類型
       done, pending = yield from asyncio.wait(fs)
      (3)后面還會(huì)講到他們的進(jìn)一步使用
      '''

      簡(jiǎn)單來說:async.wait會(huì)返回兩個(gè)值:done和pending,done為已完成的協(xié)程Task,pending為超時(shí)未完成的協(xié)程Task,需通過future.result調(diào)用Task的result。而async.gather返回的是已完成Task的result。

      第四步:關(guān)閉事件循環(huán)

      loop.close()

      '''
      以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢?
      簡(jiǎn)單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行:
      loop.run_until_complete(do_some_work(loop, 1))
      loop.run_until_complete(do_some_work(loop, 3))
      loop.close()
      但是如果關(guān)閉了,就不能再運(yùn)行了:
      loop.run_until_complete(do_some_work(loop, 1))
      loop.close()
      loop.run_until_complete(do_some_work(loop, 3))  # 此處異常
      建議調(diào)用 loop.close,以徹底清理 loop 對(duì)象防止誤用
      '''



      python3.7版本


      在最新的python3.7版本中,asyncio又引進(jìn)了一些新的特性和API


      例子一:無參數(shù)、無返回值
      import asyncio
      import time


      async def hello1():
          print('Hello world 01 begin')
          await asyncio.sleep(3)  #模擬耗時(shí)任務(wù)3秒
          print('Hello again 01 end')

      async def hello2():
          print('Hello world 02 begin')
          await asyncio.sleep(2)   #模擬耗時(shí)任務(wù)2秒
          print('Hello again 02 end')

      async def hello3():
          print('Hello world 03 begin')
          await asyncio.sleep(4)   #模擬耗時(shí)任務(wù)4秒
          print('Hello again 03 end')

      async def main():
          results=await asyncio.gather(hello1(),hello2(),hello3())
          for result in results:
              print(result)     #因?yàn)闆]返回值,故而返回None

      asyncio.run(main())

      '''運(yùn)行結(jié)果為:
      Hello world 01 begin
      Hello world 02 begin
      Hello world 03 begin
      Hello again 02 end
      Hello again 01 end
      Hello again 03 end
      None
      None
      None
      '''


      例子二:有參數(shù)、有返回值

      import asyncio
      import time


      async def hello1(a,b):
          print('Hello world 01 begin')
          await asyncio.sleep(3)  #模擬耗時(shí)任務(wù)3秒
          print('Hello again 01 end')
          return a+b

      async def hello2(a,b):
          print('Hello world 02 begin')
          await asyncio.sleep(2)   #模擬耗時(shí)任務(wù)2秒
          print('Hello again 02 end')
          return a-b

      async def hello3(a,b):
          print('Hello world 03 begin')
          await asyncio.sleep(4)   #模擬耗時(shí)任務(wù)4秒
          print('Hello again 03 end')
          return a*b

      async def main():
          results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
          for result in results:
              print(result)

      asyncio.run(main())

      '''運(yùn)行結(jié)果為:
      Hello world 01 begin
      Hello world 02 begin
      Hello world 03 begin
      Hello again 02 end
      Hello again 01 end
      Hello again 03 end
      15
      5
      50
      '''


      總結(jié):兩步走(針對(duì)python3.7)

      第一步:構(gòu)建一個(gè)入口函數(shù)main
      他也是一個(gè)異步協(xié)程函數(shù),即通過async定義,并且要在main函數(shù)里面await一個(gè)或者是多個(gè)協(xié)程,同前面一樣,我可以通過gather或者是wait進(jìn)行組合,對(duì)于有返回值的協(xié)程函數(shù),一般就在main里面進(jìn)行結(jié)果的獲取。

      第二步:?jiǎn)?dòng)主函數(shù)main
      這是python3.7新添加的函數(shù),就一句話,即
      asyncio.run(main())

      注意:
      不再需要顯式的創(chuàng)建事件循環(huán),因?yàn)樵趩?dòng)run函數(shù)的時(shí)候,就會(huì)自動(dòng)創(chuàng)建一個(gè)新的事件循環(huán)。而且在main中也不需要通過事件循環(huán)去掉用被包裝的協(xié)程函數(shù),只需要向普通函數(shù)那樣調(diào)用即可 ,只不過使用了await關(guān)鍵字而已。



      04

      協(xié)程編程的優(yōu)點(diǎn)


      1、無cpu分時(shí)切換線程保存上下文問題(協(xié)程上下文怎么保存)

      2、遇到io阻塞切換(怎么實(shí)現(xiàn)的)

      3、無需共享數(shù)據(jù)的保護(hù)鎖(為什么)

      4、系列文章下篇預(yù)告——介紹低層的API,事件循環(huán)到底是怎么實(shí)現(xiàn)的以及future類的實(shí)現(xiàn)。


        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多