聲明:本文針對(duì)的是python3.4以后的版本的,因?yàn)閺?.4開始才引入asyncio,后面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。比如在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,但是在3.5以后的版本中使用async、await兩個(gè)關(guān)鍵字代替,雖然語法上稍微有所差異,但是原理是一樣的。本文用最通俗的語言解釋了pythonasyncio背后的一些核心概念,簡(jiǎn)要解析了asyncio的設(shè)計(jì)架構(gòu),并給出了使用python進(jìn)行asyncio異步編程的一般模板。本文較長(zhǎng):閱讀全文約30min。
一 一些最重要的概念 1.1 協(xié)程(coroutine)——本質(zhì)就是一個(gè) 函數(shù) 1.2 事件循環(huán)——event_loop 1.3 什么是awaitable對(duì)象——即可暫停 等待的對(duì)象 1.4 什么是task任務(wù) 1.5 什么是future? 二 asyncio的基本架構(gòu) 2.1 常見的一些高層API方法 2.2 Task 類詳解 2.3 異步函數(shù)的結(jié)果獲取 三 asyncio異步編程的基本模板
3.1 python3.7之前的版本 3.1.1 例子一:無參數(shù)、無返回值 3.1.2 例子二:有參數(shù)、有返回值
3.1.3 總結(jié):四步走(針對(duì)python3.7 之前的版本) 3.2 python3.7版本 3.2.1 例子一:無參數(shù)、無返回值 3.2.2 例子二:有參數(shù)、有返回值 3.2.3 總結(jié):兩步走(針對(duì)python3.7)
四 協(xié)程編程的優(yōu)點(diǎn)
協(xié)程(coroutine)——本質(zhì)就是一個(gè)函數(shù)
所謂的“協(xié)程”就是一個(gè)函數(shù),這個(gè)函數(shù)需要有兩個(gè)基本的組成要素,第一,需要使用@asyncio.coroutine進(jìn)行裝飾;第二,函數(shù)體內(nèi)一定要有yield from 返回的的generator,或者是說使用yield from 返回另一個(gè)協(xié)程對(duì)象。 當(dāng)然,這兩個(gè)條件并不是硬性規(guī)定的,如果沒有這兩個(gè)條件,依然是函數(shù),只不過是普通函數(shù)而已。 怎么判斷一個(gè)函數(shù)是不是協(xié)程?通過asyncio.iscoroutine(obj)和asyncio.iscoroutinefunction(func)加以判斷,返回true,則是。
那么協(xié)程函數(shù)有什么作用呢?
(1)result = yield from future
作用一:返回future的結(jié)果。什么是future?后面會(huì)講到。當(dāng)協(xié)程函數(shù)執(zhí)行到這一句,協(xié)程會(huì)被懸掛起來,知道future的結(jié)果被返回。如果是future被中途取消,則會(huì)觸發(fā)CancelledError異常。由于task是future的子類,后面也會(huì)介紹,關(guān)于future的所有應(yīng)用,都同樣適用于task
(2)result = yield from coroutine 等候另一個(gè)協(xié)程函數(shù)返回結(jié)果或者是觸發(fā)異常
(3)result= yield from task 返回一個(gè)task的結(jié)果
(4)return expression 作為一個(gè)函數(shù),他本身也是可以返回某一個(gè)結(jié)果的
(5)raise exception 協(xié)程函數(shù),不是像普通函數(shù)那樣直接調(diào)用運(yùn)行的,必須添加到事件循環(huán)中,然后由事件循環(huán)去運(yùn)行,單獨(dú)運(yùn)行協(xié)程函數(shù)是不會(huì)有結(jié)果的,看一個(gè)簡(jiǎn)單的例子:
import time import asyncio async def say_after_time(delay,what): await asyncio.sleep(delay) print(what)
async def main(): print(f'開始時(shí)間為: {time.time()}') await say_after_time(1,'hello') await say_after_time(2,'world') print(f'結(jié)束時(shí)間為: {time.time()}')
loop=asyncio.get_event_loop() #創(chuàng)建事件循環(huán)對(duì)象 #loop=asyncio.new_event_loop() #與上面等價(jià),創(chuàng)建新的事件循環(huán) loop.run_until_complete(main()) #通過事件循環(huán)對(duì)象運(yùn)行協(xié)程函數(shù) loop.close()
在python3.6版本中,如果我們單獨(dú)像執(zhí)行普通函數(shù)那樣執(zhí)行一個(gè)協(xié)程函數(shù),只會(huì)返回一個(gè)coroutine對(duì)象(python3.7)如下所示: >>> main() <coroutine object main at 0x1053bb7c8>
(1)獲取事件循環(huán)對(duì)象的幾種方式: 下面幾種方式可以用來獲取、設(shè)置、創(chuàng)建事件循環(huán)對(duì)象loop loop=asyncio.get_running_loop() 返回(獲?。┰诋?dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會(huì)顯示錯(cuò)誤;它是python3.7中新添加的loop=asyncio.get_event_loop() 獲得一個(gè)事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個(gè)新的事件循環(huán)loop; loop=asyncio.set_event_loop(loop) 設(shè)置一個(gè)事件循環(huán)為當(dāng)前線程的事件循環(huán); loop=asyncio.new_event_loop() 創(chuàng)建一個(gè)新的事件循環(huán)
(2)通過事件循環(huán)運(yùn)行協(xié)程函數(shù)的兩種方式:
①方式一:創(chuàng)建事件循環(huán)對(duì)象loop,即asyncio.get_event_loop(),通過事件循環(huán)運(yùn)行協(xié)程函數(shù) ②方式二:直接通過asyncio.run(function_name)運(yùn)行協(xié)程函數(shù)。但是需要注意的是,首先run函數(shù)是python3.7版本新添加的,前面的版本是沒有的;其次,這個(gè)run函數(shù)總是會(huì)創(chuàng)建一個(gè)新的事件循環(huán)并在run結(jié)束之后關(guān)閉事件循環(huán),所以,如果在同一個(gè)線程中已經(jīng)有了一個(gè)事件循環(huán),則不能再使用這個(gè)函數(shù)了,因?yàn)橥粋€(gè)線程不能有兩個(gè)事件循環(huán),而且這個(gè)run函數(shù)不能同時(shí)運(yùn)行兩次,因?yàn)樗呀?jīng)創(chuàng)建一個(gè)了。即同一個(gè)線程中是不允許有多個(gè)事件循環(huán)loop的。 asyncio.run()是python3.7 新添加的內(nèi)容,也是后面推薦的運(yùn)行任務(wù)的方式,因?yàn)樗歉邔覣PI,后面會(huì)講到它與asyncio.run_until_complete()的差異性,run_until_complete()是相對(duì)較低層的API。
注意:到底什么是事件循環(huán)?如何理解?
可以這樣理解:線程一直在各個(gè)協(xié)程方法之間永不停歇的游走,遇到一個(gè)yield from 或者await就懸掛起來,然后又走到另外一個(gè)方法,依次進(jìn)行下去,知道事件循環(huán)所有的方法執(zhí)行完畢。實(shí)際上loop是BaseEventLoop的一個(gè)實(shí)例,我們可以查看定義,它到底有哪些方法可調(diào)用。 什么是awaitable對(duì)象——即可暫停等待的對(duì)象
有三類對(duì)象是可等待的,即 coroutines, Tasks, and Futures. coroutine:本質(zhì)上就是一個(gè)函數(shù),一前面的生成器yield和yield from為基礎(chǔ),不再贅述; Tasks: 任務(wù),顧名思義,就是要完成某件事情,其實(shí)就是對(duì)協(xié)程函數(shù)進(jìn)一步的封裝; Future:它是一個(gè)“更底層”的概念,他代表一個(gè)一步操作的最終結(jié)果,因?yàn)橐徊讲僮饕话阌糜诤臅r(shí)操作,結(jié)果不會(huì)立即得到,會(huì)在“將來”得到異步運(yùn)行的結(jié)果,故而命名為Future。 三者的關(guān)系,coroutine可以自動(dòng)封裝成task,而Task是Future的子類。 如前所述,Task用來 并發(fā)調(diào)度的協(xié)程,即對(duì)協(xié)程函數(shù)的進(jìn)一步包裝?那為什么還需要包裝呢?因?yàn)閱渭兊膮f(xié)程函數(shù)僅僅是一個(gè)函數(shù)而已,將其包裝成任務(wù),任務(wù)是可以包含各種狀態(tài)的,異步編程最重要的就是對(duì)異步操作狀態(tài)的把控了。
(1)創(chuàng)建任務(wù)(兩種方法):
方法一:task = asyncio.create_task(coro()) # 這是3.7版本新添加的
方法二:task = asyncio.ensure_future(coro())
也可以使用 loop.create_future() loop.create_task(coro) 也是可以的。
備注:關(guān)于任務(wù)的詳解,會(huì)在后面的系列文章繼續(xù)講解,本文只是概括性的說明。
(2)獲取某一個(gè)任務(wù)的方法:
方法一:task=asyncio.current_task(loop=None) 返回在某一個(gè)指定的loop中,當(dāng)前正在運(yùn)行的任務(wù),如果沒有任務(wù)正在運(yùn)行,則返回None; 如果loop為None,則默認(rèn)為在當(dāng)前的事件循環(huán)中獲取,
方法二:asyncio.all_tasks(loop=None) 返回某一個(gè)loop中還沒有結(jié)束的任務(wù) Future是一個(gè)較低層的可等待(awaitable)對(duì)象,他表示的是異步操作的最終結(jié)果,當(dāng)一個(gè)Future對(duì)象被等待的時(shí)候,協(xié)程會(huì)一直等待,直到Future已經(jīng)運(yùn)算完畢。 Future是Task的父類,一般情況下,已不用去管它們兩者的詳細(xì)區(qū)別,也沒有必要去用Future,用Task就可以了,返回 future 對(duì)象的低級(jí)函數(shù)的一個(gè)很好的例子是 loop.run_in_executor().
前面介紹了asyncio里面最為核心的幾個(gè)概念,如果能夠很好地理解這些概念,對(duì)于學(xué)習(xí)協(xié)程是非常有幫助的,但是按照我個(gè)人的風(fēng)格,我會(huì)先說asyncio的架構(gòu),理解asyncio的設(shè)計(jì)架構(gòu)有助于更好地應(yīng)用和理解。
asyncio分為高層API和低層API,我們都可以使用,就像我前面在講matplotlib的架構(gòu)的時(shí)候所講的一樣,我們前面所講的Coroutine和Tasks屬于高層API,而Event Loop 和Future屬于低層API。當(dāng)然asyncio所涉及到的功能遠(yuǎn)不止于此,我們只看這么多。下面是是高層API和低層API的概覽:
High-level APIs
●Coroutines and Tasks(本文要寫的) ●Streams ●Synchronization Primitives ●Subprocesses ●Queues ●Exceptions
Low-level APIs
●Event Loop(下一篇要寫的) ●Futures ●Transports and Protocols ●Policies ●Platform Support
所謂的高層API主要是指那些asyncio.xxx()的方法
(1)運(yùn)行異步協(xié)程 asyncio.run(coro, *, debug=False) #運(yùn)行一個(gè)一步程序,參見上面
(2)創(chuàng)建任務(wù) task=asyncio.create_task(coro) #python3.7 ,參見上面 task = asyncio.ensure_future(coro())
(3)睡眠 await asyncio.sleep(delay, result=None, *, loop=None) 這個(gè)函數(shù)表示的是:當(dāng)前的那個(gè)任務(wù)(協(xié)程函數(shù))睡眠多長(zhǎng)時(shí)間,而允許其他任務(wù)執(zhí)行。這是它與time.sleep()的區(qū)別,time.sleep()是當(dāng)前線程休息,注意他們的區(qū)別哦。 另外如果提供了參數(shù)result,當(dāng)當(dāng)前任務(wù)(協(xié)程)結(jié)束的時(shí)候,它會(huì)返回; loop參數(shù)將會(huì)在3.10中移除,這里就不再說了。
(4)并發(fā)運(yùn)行多個(gè)任務(wù) await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False) 它本身也是awaitable的。 *coros_or_futures是一個(gè)序列拆分操作,如果是以個(gè)協(xié)程函數(shù),則會(huì)自動(dòng)轉(zhuǎn)換成Task。 當(dāng)所有的任務(wù)都完成之后,返回的結(jié)果是一個(gè)列表的形式,列表中值的順序和*coros_or_futures完成的順序是一樣的。 return_exceptions: False,這是他的默認(rèn)值,第一個(gè)出發(fā)異常的任務(wù)會(huì)立即返回,然后其他的任務(wù)繼續(xù)執(zhí)行; True,對(duì)于已經(jīng)發(fā)生了異常的任務(wù),也會(huì)像成功執(zhí)行了任務(wù)那樣,等到所有的任務(wù)執(zhí)行結(jié)束一起將錯(cuò)誤的結(jié)果返回到最終的結(jié)果列表里面。
如果gather()本身被取消了,那么綁定在它里面的任務(wù)也就取消了。
(5)防止任務(wù)取消 await asyncio.shield(*arg, *, loop=None) 它本身也是awaitable的。顧名思義,shield為屏蔽、保護(hù)的意思,即保護(hù)一個(gè)awaitable 對(duì)象防止取消,一般情況下不推薦使用,而且在使用的過程中,最好使用try語句塊更好。
try: res = await shield(something()) except CancelledError: res = None
(6)設(shè)置timeout——一定要好好理解 await asyncio. wait_for (aw, timeout, *, loop=None) 如果aw是一個(gè)協(xié)程函數(shù),會(huì)自動(dòng)包裝成一個(gè)任務(wù)task。參見下面的例子: import asyncio
async def eternity(): print('我馬上開始執(zhí)行') await asyncio.sleep(3600) #當(dāng)前任務(wù)休眠1小時(shí),即3600秒 print('終于輪到我了')
async def main(): # Wait for at most 1 second try: print('等你3秒鐘哦') await asyncio.wait_for(eternity(), timeout=3) #休息3秒鐘了執(zhí)行任務(wù) except asyncio.TimeoutError: print('超時(shí)了!')
asyncio.run(main())
'''運(yùn)行結(jié)果為: 等你3秒鐘哦 我馬上開始執(zhí)行 超時(shí)了! '''
為什么?首先調(diào)用main()函數(shù),作為入口函數(shù),當(dāng)輸出‘等你3秒鐘哦’之后,main掛起,執(zhí)行eternity,然后打印‘我馬上開始執(zhí)行’,然后eternity掛起,而且要掛起3600秒,大于3,這時(shí)候出發(fā)TimeoutError。修改一下:‘’ import asyncio
async def eternity(): print('我馬上開始執(zhí)行') await asyncio.sleep(2) #當(dāng)前任務(wù)休眠2秒鐘,2<3 print('終于輪到我了')
async def main(): # Wait for at most 1 second try: print('等你3秒鐘哦') await asyncio.wait_for(eternity(), timeout=3) #給你3秒鐘執(zhí)行你的任務(wù) except asyncio.TimeoutError: print('超時(shí)了!')
asyncio.run(main())
'''運(yùn)行結(jié)果為: 等你3秒鐘哦 我馬上開始執(zhí)行 終于輪到我了 '''
總結(jié):當(dāng)異步操作需要執(zhí)行的時(shí)間超過waitfor設(shè)置的timeout,就會(huì)觸發(fā)異常,所以在編寫程序的時(shí)候,如果要給異步操作設(shè)置timeout,一定要選擇合適,如果異步操作本身的耗時(shí)較長(zhǎng),而你設(shè)置的timeout太短,會(huì)涉及到她還沒做完,就拋出異常了。
(7)多個(gè)協(xié)程函數(shù)時(shí)候的等候
await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 與上面的區(qū)別是,第一個(gè)參數(shù)aws是一個(gè)集合,要寫成集合set的形式,比如: {func(),func(),func3()} 表示的是一系列的協(xié)程函數(shù)或者是任務(wù),其中協(xié)程會(huì)自動(dòng)包裝成任務(wù)。事實(shí)上,寫成列表的形式也是可以的。
注意:該函數(shù)的返回值是兩個(gè)Tasks/Futures的集合: (done, pending) 其中done是一個(gè)集合,表示已經(jīng)完成的任務(wù)tasks;pending也是一個(gè)集合,表示還沒有完成的任務(wù)。 常見的使用方法為:done, pending = await asyncio.wait(aws)
參數(shù)解釋: timeout (a float or int), 同上面的含義一樣,需要注意的是,這個(gè)不會(huì)觸發(fā)asyncio.TimeoutError異常,如果到了timeout還有任務(wù)沒有執(zhí)行完,那些沒有執(zhí)行完的tasks和futures會(huì)被返回到第二個(gè)集合pending里面。 return_when參數(shù),顧名思義,他表示的是,什么時(shí)候wait函數(shù)該返回值。只能夠去下面的幾個(gè)值。
Constant | Description | FIRST_COMPLETED | first_completes.當(dāng)任何一個(gè)task或者是future完成或者是取消,wait函數(shù)就返回 | FIRST_EXCEPTION | 當(dāng)任何一個(gè)task或者是future觸發(fā)了某一個(gè)異常,就返回,.如果是所有的task和future都沒有觸發(fā)異常,則等價(jià)與下面的 ALL_COMPLETED . | ALL_COMPLETED | 當(dāng)所有的task或者是future都完成或者是都取消的時(shí)候,再返回。 |
如下面例子所示: import asyncio import time
a=time.time()
async def hello1(): #大約2秒 print('Hello world 01 begin') yield from asyncio.sleep(2) print('Hello again 01 end')
async def hello2(): #大約3秒 print('Hello world 02 begin') yield from asyncio.sleep(3) print('Hello again 02 end')
async def hello3(): #大約4秒 print('Hello world 03 begin') yield from asyncio.sleep(4) print('Hello again 03 end')
async def main(): #入口函數(shù) done,pending=await asyncio.wait({hello1(),hello2(),hello3()},return_when=asyncio.FIRST_COMPLETED) for i in done: print(i) for j in pending: print(j)
asyncio.run(main()) #運(yùn)行入口函數(shù)
b=time.time() print('---------------------------------------') print(b-a)
'''運(yùn)行結(jié)果為: Hello world 02 begin Hello world 01 begin Hello world 03 begin Hello again 01 end <Task finished coro=<hello1() done, defined at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:46> result=None> <Task pending coro=<hello3() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:61> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001FA8D394438>()]>> <Task pending coro=<hello2() running at e:\Python學(xué)習(xí)\基礎(chǔ)入門\asyncio3.4詳解\test11.py:55> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001FA8D394378>()]>> --------------------------------------- 2.033155679702759 '''
從上面可以看出,hello1()試運(yùn)行結(jié)束了的,hello2()和hello3()還沒結(jié)束。 (8)asyncio.as_completed()函數(shù)
這個(gè)函數(shù)我沒有找到合適的中文名稱去描述,所以哪位大神如果知道,望告知,不勝感激!它的函數(shù)原型如下: asyncio.as_completed(aws, *, loop=None, timeout=None) 第一個(gè)參數(shù)aws:同上面一樣,是一個(gè)集合{}集合里面的元素是coroutine、task或者future 第三個(gè)參數(shù)timeout:意義和上面講的的一樣 那到底什么作用呢?其實(shí)很簡(jiǎn)單,個(gè)人感覺有點(diǎn)雞肋,從一個(gè)例子看起:
import asyncio import time import threading
a=time.time()
@asyncio.coroutine def hello1(): print('Hello world 01 begin') yield from asyncio.sleep(5) #大約5秒 print('Hello again 01 end') return '哈哈1'
@asyncio.coroutine def hello2(): print('Hello world 02 begin') yield from asyncio.sleep(3) #大約3秒 print('Hello again 02 end') return '哈哈2'
@asyncio.coroutine def hello3(): print('Hello world 03 begin') yield from asyncio.sleep(4) #大約4秒 print('Hello again 03 end') return '哈哈3'
async def main(): s=asyncio.as_completed({hello1(),hello2(),hello3()}) for f in s: result=await f print(result)
asyncio.run(main())
b=time.time() print('---------------------------------------') print(b-a)
'''運(yùn)行結(jié)果為: Hello world 03 begin Hello world 01 begin Hello world 02 begin Hello again 01 end 哈哈1 Hello again 02 end 哈哈2 Hello again 03 end 哈哈3 --------------------------------------- 4.0225794315338135 '''
結(jié)論:asyncio.as_completed()函數(shù)返回的是一個(gè)可迭代(iterator)的對(duì)象,對(duì)象的每個(gè)元素就是一個(gè)future對(duì)象,很多小伙伴說,這不是相當(dāng)于沒變嗎?其實(shí)返回的future集合是對(duì)參數(shù)的future集合重新組合,組合的順序就是,最先執(zhí)行完的協(xié)程函數(shù)(coroutine、task、future)最先返回,從上面的代碼可知,參數(shù)為 aws={hello1(),hello2(),hello3()},因?yàn)閔ello1大約花費(fèi)5秒、hello2大約花費(fèi)3秒、hello3大約花費(fèi)4秒。返回的結(jié)果為 s={hello2()、hello3()、hello(1)},因?yàn)閔ello2時(shí)間最短,故而放在前面,hello1時(shí)間最長(zhǎng),故而放在最后面。然后對(duì)返回的集合s開始迭代。
先來看一下Task類的簡(jiǎn)單介紹(英文原文文檔)。

上面的文字描述中推出了幾個(gè)非常重要的信息,特在此總結(jié)如下:
(1)他是作為一個(gè)python協(xié)程對(duì)象,和Future對(duì)象很像的這么一個(gè)對(duì)象,但不是線程安全的;他繼承了Future所有的API,,除了Future.set_result()和Future.set_Exception();
(2)使用高層API asyncio.ccreate_task()創(chuàng)建任務(wù),或者是使用低層API loop.create_task()或者是loop.ensure_future()創(chuàng)建任務(wù)對(duì)象;
(3)相比于協(xié)程函數(shù),任務(wù)時(shí)有狀態(tài)的,可以使用Task.cancel()進(jìn)行取消,這會(huì)觸發(fā)CancelledError異常,使用cancelled()檢查是否取消。
下面介紹Task類常見的一些使用函數(shù)
(1)cancel() Request the Task to be cancelled.
其實(shí)前面已經(jīng)有所介紹,最好是使用他會(huì)出發(fā)CancelledError異常,所以需要取消的協(xié)程函數(shù)里面的代碼最好在try-except語句塊中進(jìn)行,這樣方便觸發(fā)異常,打印相關(guān)信息,但是Task.cancel()沒有辦法保證任務(wù)一定會(huì)取消,而Future.cancel()是可以保證任務(wù)一定取消的。可以參見下面的一個(gè)例子:
import asyncio
async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) #模擬一個(gè)耗時(shí)任務(wù) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep')
async def main(): #通過協(xié)程創(chuàng)建一個(gè)任務(wù),需要注意的是,在創(chuàng)建任務(wù)的時(shí)候,就會(huì)跳入到異步開始執(zhí)行 #因?yàn)槭?.7版本,創(chuàng)建一個(gè)任務(wù)就相當(dāng)于是運(yùn)行了異步函數(shù)cancel_me task = asyncio.create_task(cancel_me()) #等待一秒鐘 await asyncio.sleep(1) print('main函數(shù)休息完了') #發(fā)出取消任務(wù)的請(qǐng)求 task.cancel() try: await task #因?yàn)槿蝿?wù)被取消,觸發(fā)了異常 except asyncio.CancelledError: print('main(): cancel_me is cancelled now')
asyncio.run(main())
'''運(yùn)行結(jié)果為: cancel_me(): before sleep main函數(shù)休息完了 cancel_me(): cancel sleep cancel_me(): after sleep main(): cancel_me is cancelled now '''
運(yùn)行過程分析:
首先run函數(shù)啟動(dòng)主函數(shù)入口main,在main中,因?yàn)榈谝痪湓捑褪钦{(diào)用異步函數(shù)cancel_me()函數(shù),所以先打印出了第一句話; 然后進(jìn)入cancel_me中的try語句,遇到await,暫停,這時(shí)候返回main中執(zhí)行,但是有在main中遇到了await,也會(huì)暫停,但是由于main中只需要暫停1秒,而cancel_me中要暫停3600秒,所以等到main的暫停結(jié)束后,接著運(yùn)行main,所以打印出第二句話; 接下來遇到取消任務(wù)的請(qǐng)求task.cancel(),然后繼續(xù)執(zhí)行main里面的try,又遇到了await,接著main進(jìn)入暫停,接下來進(jìn)入到cancel_me函數(shù)中,但是由于main中請(qǐng)求了取消任務(wù),所以那個(gè)耗時(shí)3600秒的任務(wù)就不再執(zhí)行了,直接觸發(fā)了Cancelled_Error異常,打印出第三句話,接下來又raise一個(gè)異常信息; 接下來執(zhí)行cancel_me的finally,打印出第四句話,此時(shí)cancel_me執(zhí)行完畢,由于他拋出了一個(gè)異常,返回到主程序main中,觸發(fā)異常,打印出第五句話。
(2)done() 當(dāng)一個(gè)被包裝得協(xié)程既沒有觸發(fā)異常、也沒有被取消的時(shí)候,意味著它是done的,返回true。
(3)result() 返回任務(wù)的執(zhí)行結(jié)果, 當(dāng)任務(wù)被正常執(zhí)行完畢,則返回結(jié)果; 當(dāng)任務(wù)被取消了,調(diào)用這個(gè)方法,會(huì)觸發(fā)CancelledError異常; 當(dāng)任務(wù)返回的結(jié)果是無用的時(shí)候,則調(diào)用這個(gè)方法會(huì)觸發(fā)InvalidStateError; 當(dāng)任務(wù)出發(fā)了一個(gè)異常而中斷,調(diào)用這個(gè)方法還會(huì)再次觸發(fā)這個(gè)使程序中斷的異常。
(4)exception()
返回任務(wù)的異常信息,觸發(fā)了什么異常,就返回什么異常,如果任務(wù)是正常執(zhí)行的無異常,則返回None; 當(dāng)任務(wù)被取消了,調(diào)用這個(gè)方法會(huì)觸發(fā)CancelledError異常; 當(dāng)任務(wù)沒有做完,調(diào)用這個(gè)方法會(huì)觸發(fā)InvalidStateError異常。 下面還有一些不常用的方法,如下:
(5)add_done_callback(callback, *, context=None) (6)remove_done_callback(callback) (7)get_stack(*, limit=None) (8)print_stack(*, limit=None, file=None) (9)all_tasks(loop=None),這是一個(gè)類方法 (10)current_task(loop=None),這是一個(gè)類方法
對(duì)于異步編程、異步函數(shù)而言,最重要的就是異步函數(shù)調(diào)用結(jié)束之后,獲取異步函數(shù)的返回值,我們可以有以下幾種方式來獲取函數(shù)的返回值,第一是直接通過Task.result()來獲??;第二種是綁定一個(gè)回調(diào)函數(shù)來獲取,即函數(shù)執(zhí)行完畢后調(diào)用一個(gè)函數(shù)來獲取異步函數(shù)的返回值。
(1)直接通過result來獲取
import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時(shí)任務(wù)3秒 print('Hello again 01 end') return a+b
coroutine=hello1(10,5) loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task=asyncio.ensure_future(coroutine) #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù)列表 loop.run_until_complete(task) #第三步:通過事件循環(huán)運(yùn)行 print('-------------------------------------') print(task.result()) loop.close()
'''運(yùn)行結(jié)果為 Hello world 01 begin Hello again 01 end ------------------------------------- 15 '''
(2)通過定義回調(diào)函數(shù)來獲取 import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時(shí)任務(wù)3秒 print('Hello again 01 end') return a+b
def callback(future): #定義的回調(diào)函數(shù) print(future.result())
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task=asyncio.ensure_future(hello1(10,5)) #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù) task.add_done_callback(callback) #并被任務(wù)綁定一個(gè)回調(diào)函數(shù)
loop.run_until_complete(task) #第三步:通過事件循環(huán)運(yùn)行 loop.close() #第四步:關(guān)閉事件循環(huán)
'''運(yùn)行結(jié)果為: Hello world 01 begin Hello again 01 end 15 '''
注意:所謂的回調(diào)函數(shù),就是指協(xié)程函數(shù)coroutine執(zhí)行結(jié)束時(shí)候會(huì)調(diào)用回調(diào)函數(shù)。并通過參數(shù)future獲取協(xié)程執(zhí)行的結(jié)果。我們創(chuàng)建的task和回調(diào)里的future對(duì)象,實(shí)際上是同一個(gè)對(duì)象,因?yàn)閠ask是future的子類。
事實(shí)上,在使用asyncio進(jìn)行異步編程的時(shí)候,語法形式往往是多樣性的,雖然理解異步編程的核心思想很重要,但是實(shí)現(xiàn)的時(shí)候終究還是要編寫語句的,本次給出的模板,是兩個(gè)不同的例子,例子一是三個(gè)異步方法,它們都沒有參數(shù),沒有返回值,都模擬一個(gè)耗時(shí)任務(wù);例子二是三個(gè)異步方法,都有參數(shù),都有返回值。
import asyncio import time
a=time.time()
async def hello1(): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時(shí)任務(wù)3秒 print('Hello again 01 end')
async def hello2(): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時(shí)任務(wù)2秒 print('Hello again 02 end')
async def hello3(): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時(shí)任務(wù)4秒 print('Hello again 03 end')
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) tasks = [hello1(), hello2(),hello3()] #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù)列表 loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環(huán)運(yùn)行 loop.close() #第四步:取消事件循環(huán)
'''運(yùn)行結(jié)果為: Hello world 02 begin Hello world 03 begin Hello world 01 begin Hello again 02 end Hello again 01 end Hello again 03 end '''
import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時(shí)任務(wù)3秒 print('Hello again 01 end') return a+b
async def hello2(a,b): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時(shí)任務(wù)2秒 print('Hello again 02 end') return a-b
async def hello3(a,b): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時(shí)任務(wù)4秒 print('Hello again 03 end') return a*b
loop = asyncio.get_event_loop() #第一步:創(chuàng)建事件循環(huán) task1=asyncio.ensure_future(hello1(10,5)) task2=asyncio.ensure_future(hello2(10,5)) task3=asyncio.ensure_future(hello3(10,5)) tasks = [task1,task2,task3] #第二步:將多個(gè)協(xié)程函數(shù)包裝成任務(wù)列表 loop.run_until_complete(asyncio.wait(tasks)) #第三步:通過事件循環(huán)運(yùn)行 print(task1.result()) #并且在所有的任務(wù)完成之后,獲取異步函數(shù)的返回值 print(task2.result()) print(task3.result()) loop.close() #第四步:關(guān)閉事件循環(huán)
'''運(yùn)行結(jié)果為: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
總結(jié):四步走(針對(duì)python3.7之前的版本) 第一步·:構(gòu)造事件循環(huán)
loop=asyncio.get_running_loop() #返回(獲取)在當(dāng)前線程中正在運(yùn)行的事件循環(huán),如果沒有正在運(yùn)行的事件循環(huán),則會(huì)顯示錯(cuò)誤;它是python3.7中新添加的
loop=asyncio.get_event_loop() #獲得一個(gè)事件循環(huán),如果當(dāng)前線程還沒有事件循環(huán),則創(chuàng)建一個(gè)新的事件循環(huán)loop;
loop=asyncio.set_event_loop(loop) #設(shè)置一個(gè)事件循環(huán)為當(dāng)前線程的事件循環(huán);
loop=asyncio.new_event_loop() #創(chuàng)建一個(gè)新的事件循環(huán)
第二步:將一個(gè)或者是多個(gè)協(xié)程函數(shù)包裝成任務(wù)Task #高層API task = asyncio.create_task(coro(參數(shù)列表)) # 這是3.7版本新添加的 task = asyncio.ensure_future(coro(參數(shù)列表))
#低層API loop.create_future(coro) loop.create_task(coro)
'''需要注意的是,在使用Task.result()獲取協(xié)程函數(shù)結(jié)果的時(shí)候,使用asyncio.create_task()卻會(huì)顯示錯(cuò) 但是使用asyncio.ensure_future卻正確,本人暫時(shí)不知道原因,哪位大神知道,望告知,不勝感激!'''
第三步:通過事件循環(huán)運(yùn)行 loop.run_until_complete(asyncio.wait(tasks)) #通過asyncio.wait()整合多個(gè)task
loop.run_until_complete(asyncio.gather(tasks)) #通過asyncio.gather()整合多個(gè)task
loop.run_until_complete(task_1) #單個(gè)任務(wù)則不需要整合
loop.run_forever() #但是這個(gè)方法在新版本已經(jīng)取消,不再推薦使用,因?yàn)槭褂闷饋聿缓?jiǎn)潔
''' 使用gather或者wait可以同時(shí)注冊(cè)多個(gè)任務(wù),實(shí)現(xiàn)并發(fā),但他們的設(shè)計(jì)是完全不一樣的,在前面的2.1.(4)中已經(jīng)討論過了,主要區(qū)別如下: (1)參數(shù)形式不一樣 gather的參數(shù)為 *coroutines_or_futures,即如這種形式 tasks = asyncio.gather(*[task1,task2,task3])或者 tasks = asyncio.gather(task1,task2,task3) loop.run_until_complete(tasks) wait的參數(shù)為列表或者集合的形式,如下 tasks = asyncio.wait([task1,task2,task3]) loop.run_until_complete(tasks) (2)返回的值不一樣 gather的定義如下,gather返回的是每一個(gè)任務(wù)運(yùn)行的結(jié)果, results = await asyncio.gather(*tasks) wait的定義如下,返回dones是已經(jīng)完成的任務(wù),pending是未完成的任務(wù),都是集合類型 done, pending = yield from asyncio.wait(fs) (3)后面還會(huì)講到他們的進(jìn)一步使用 '''
簡(jiǎn)單來說:async.wait會(huì)返回兩個(gè)值:done和pending,done為已完成的協(xié)程Task,pending為超時(shí)未完成的協(xié)程Task,需通過future.result調(diào)用Task的result。而async.gather返回的是已完成Task的result。 第四步:關(guān)閉事件循環(huán) loop.close()
''' 以上示例都沒有調(diào)用 loop.close,好像也沒有什么問題。所以到底要不要調(diào) loop.close 呢? 簡(jiǎn)單來說,loop 只要不關(guān)閉,就還可以再運(yùn)行: loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close() 但是如果關(guān)閉了,就不能再運(yùn)行了: loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常 建議調(diào)用 loop.close,以徹底清理 loop 對(duì)象防止誤用 '''
在最新的python3.7版本中,asyncio又引進(jìn)了一些新的特性和API import asyncio import time
async def hello1(): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時(shí)任務(wù)3秒 print('Hello again 01 end')
async def hello2(): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時(shí)任務(wù)2秒 print('Hello again 02 end')
async def hello3(): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時(shí)任務(wù)4秒 print('Hello again 03 end')
async def main(): results=await asyncio.gather(hello1(),hello2(),hello3()) for result in results: print(result) #因?yàn)闆]返回值,故而返回None
asyncio.run(main())
'''運(yùn)行結(jié)果為: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end None None None '''
import asyncio import time
async def hello1(a,b): print('Hello world 01 begin') await asyncio.sleep(3) #模擬耗時(shí)任務(wù)3秒 print('Hello again 01 end') return a+b
async def hello2(a,b): print('Hello world 02 begin') await asyncio.sleep(2) #模擬耗時(shí)任務(wù)2秒 print('Hello again 02 end') return a-b
async def hello3(a,b): print('Hello world 03 begin') await asyncio.sleep(4) #模擬耗時(shí)任務(wù)4秒 print('Hello again 03 end') return a*b
async def main(): results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5)) for result in results: print(result)
asyncio.run(main())
'''運(yùn)行結(jié)果為: Hello world 01 begin Hello world 02 begin Hello world 03 begin Hello again 02 end Hello again 01 end Hello again 03 end 15 5 50 '''
總結(jié):兩步走(針對(duì)python3.7) 第一步:構(gòu)建一個(gè)入口函數(shù)main 他也是一個(gè)異步協(xié)程函數(shù),即通過async定義,并且要在main函數(shù)里面await一個(gè)或者是多個(gè)協(xié)程,同前面一樣,我可以通過gather或者是wait進(jìn)行組合,對(duì)于有返回值的協(xié)程函數(shù),一般就在main里面進(jìn)行結(jié)果的獲取。
第二步:?jiǎn)?dòng)主函數(shù)main 這是python3.7新添加的函數(shù),就一句話,即 asyncio.run(main())
注意: 不再需要顯式的創(chuàng)建事件循環(huán),因?yàn)樵趩?dòng)run函數(shù)的時(shí)候,就會(huì)自動(dòng)創(chuàng)建一個(gè)新的事件循環(huán)。而且在main中也不需要通過事件循環(huán)去掉用被包裝的協(xié)程函數(shù),只需要向普通函數(shù)那樣調(diào)用即可 ,只不過使用了await關(guān)鍵字而已。
1、無cpu分時(shí)切換線程保存上下文問題(協(xié)程上下文怎么保存) 2、遇到io阻塞切換(怎么實(shí)現(xiàn)的) 3、無需共享數(shù)據(jù)的保護(hù)鎖(為什么) 4、系列文章下篇預(yù)告——介紹低層的API,事件循環(huán)到底是怎么實(shí)現(xiàn)的以及future類的實(shí)現(xiàn)。
|