python中的GIL
首先,這樣效率不高,但是看似也不會(huì)產(chǎn)生數(shù)據(jù)訪問(wèn)沖突的問(wèn)題,畢竟同一時(shí)刻只有一個(gè)線程在一個(gè)核上運(yùn)行嘛,然而: sum = 0 def add: global sum for i in range(1000000): sum += 1 def subtract: global sum for i in range(1000000): sum -= 1 import threading add_thread = threading.Thread(target=add) sub_thread = threading.Thread(target=subtract) add_thread.start sub_thread.start add_thread.join sub_thread.join print(sum) 如果按照上面的理解,線程間很安全,最后結(jié)果應(yīng)該會(huì)是 0,運(yùn)行三次代碼的結(jié)果如下: # result: # 358918 # 718494 # -162684 這說(shuō)明兩個(gè)線程并沒(méi)有順序異步執(zhí)行。在一些特定的情況,GIL這把鎖會(huì)被打開,一定程度上達(dá)到并行的效果。 GIL會(huì)根據(jù)線程執(zhí)行的字節(jié)碼行數(shù)以及時(shí)間片以及遇到 I/O 操作打開,所以Python的多線程對(duì) I/O 密集型代碼比較友好,比如,文件處理和網(wǎng)絡(luò)爬蟲。 多線程編程線程模塊 在Python3中提供了兩個(gè)模塊來(lái)使用線程_thread和threading,前者提供了低級(jí)別、原始的線程以及一個(gè)簡(jiǎn)單的鎖,相比后者功能還是比較有限的,所以我們使用threading模塊。 使用案例 直接使用Thread來(lái)實(shí)例化 import time import threading def learn(obj): print('learning {sth} started'.format(sth=obj)) time.sleep(2) print('learning {sth} end'.format(sth=obj)) def play(obj): print('playing {sth} started'.format(sth=obj)) time.sleep(4) print('playing {sth} end'.format(sth=obj))
# 創(chuàng)建出兩個(gè)線程對(duì)象 learn_thread = threading.Thread(target=learn, args=('Python',)) play_thread = threading.Thread(target=play, args=('Game',)) start_time = time.time # 啟動(dòng)線程,開始執(zhí)行 learn_thread.start play_thread.start end_time = time.time span = end_time - start_time print('[lasting for {time_span}s]'.format(time_span=span)) # result: # learning Python started # playing Game started # [lasting for 0.0005965232849121094s] # learning Python end # playing Game end 可以看到,整個(gè)程序的運(yùn)行時(shí)間基本上是 0s,這是因?yàn)檎麄€(gè)程序中實(shí)際有三個(gè)線程:創(chuàng)建出來(lái)的兩個(gè)線程和主線程(MainThread),那兩個(gè)線程創(chuàng)建出來(lái)后就不受主線程控制了,他們的工作不占用主線程的時(shí)間,主線程除了計(jì)時(shí)就沒(méi)有其他邏輯了,因此主線程持續(xù)時(shí)間是幾乎是 0s。 但是主線程邏輯完成后并沒(méi)有退出,它等待了另外兩個(gè)線程運(yùn)行的結(jié)束,如果主線程在其他兩個(gè)線程結(jié)束之前就退出了,意味著整個(gè)程序進(jìn)程終止了,另外兩個(gè)線程會(huì)迅速終止。如果我們就是有這種需求,那就可以將另外兩個(gè)線程配置成守護(hù)線程,主線程結(jié)束,他們也立刻結(jié)束。 learn_thread.setDeamon(True) play_thread.setDeamon(True) # result: # learning Python started # playing Game started # [lasting for 0.015601158142089844s] 主線程邏輯執(zhí)行完后就退出了,其他兩個(gè)線程還沒(méi)來(lái)得及打印消息也被一并終止了。 如果主線程需要等到兩個(gè)線程執(zhí)行完后再打印整個(gè)運(yùn)行時(shí)間,就可以這么設(shè)置: learn_thread.start play_thread.start # 主線程會(huì)在這里阻塞,創(chuàng)建出來(lái)的線程執(zhí)行完后才繼續(xù)往下執(zhí)行 learn_thread.join play_thread.join end_time = time.time span = end_time - start_time print('[lasting for {time_span}s]'.format(time_span=span)) # result: # learning Python started # playing Game started # learning Python end # playing Game end # [lasting for 4.003938436508179s] 現(xiàn)在總的運(yùn)行時(shí)間是 4s,意味著主線程等待了兩個(gè)子線程的運(yùn)行,而且兩個(gè)子線程是同步執(zhí)行的,2s + 4s = max(2s, 4s)。 通過(guò)繼承Thread實(shí)現(xiàn)多線程 import threading class LearnThread(threading.Thread):
def __init__(self, obj): self.sth = obj # 處理一下參數(shù)問(wèn)題 super.__init__ # 委托給父類完成創(chuàng)建 # 這個(gè)函數(shù)在 start 之后會(huì)自動(dòng)調(diào)用,里面寫主要的業(yè)務(wù)邏輯 def run(self): print('learning {sth} started'.format(sth=self.sth)) time.sleep(2) print('learning {sth} end'.format(sth=self.sth)) 用同樣的邏輯實(shí)現(xiàn)PlayThread子線程類,最后結(jié)果如下: ... # 創(chuàng)建兩個(gè)線程實(shí)例 learn_thread = LearnThread('Python') play_thread = PlayThread('BasketBall') start_time = time.time # 啟動(dòng)線程,開始執(zhí)行 learn_thread.start play_thread.start learn_thread.join play_thread.join end_time = time.time span = end_time - start_time print('[lasting for {time_span}s]'.format(time_span=span)) # result: # learning Python started # playing BasketBall started # learning Python end # playing BasketBall end # [lasting for 4.0020973682403564s] 這種方式更加靈活,在線程內(nèi)可以自定義我們的邏輯,如果線程非常復(fù)雜,這樣寫可以使程序更加模塊化,也更容易后續(xù)維護(hù)。 線程間的通信引言 如果程序中有多個(gè)線程,他們的推進(jìn)順序可能相互依賴,a線程執(zhí)行到某一階段后,b線程才能開始執(zhí)行,b線程執(zhí)行完畢后,a線程才能繼續(xù)進(jìn)行。 這樣一種情況之下,線程之間就要進(jìn)行通信,才能保證程序的正常運(yùn)行。 通過(guò)共享變量來(lái)實(shí)現(xiàn) 線程安全不能保證,不推薦,就不詳細(xì)講解了。 通過(guò)Queue來(lái)進(jìn)行線程通信 思路和共享變量差不多,只不過(guò)這里使用的數(shù)據(jù)結(jié)構(gòu)是經(jīng)過(guò)封裝的,是線程安全的,使用起來(lái)也更加方便。 import time import threading from queue import Queue # 用來(lái)向 queue 中加入數(shù)據(jù) def append(q): for i in range(4): print('[append_thread] putting data {data} to q...'.format(data=i)) q.put(i) time.sleep(1) q.put(None) # end flag # 用來(lái)向 queue 中取出數(shù)據(jù) def pop(q): while True: data = q.get if data is None: print('[pop_thread] all clear in the queue') q.task_done break else: print('[pop_thread] get data j99bm99'.format(d=data)) time.sleep(3) q.task_done
q = Queue(2) append_thread = threading.Thread(target=append, args=(q,)) pop_thread = threading.Thread(target=pop, args=(q,)) append_thread.start pop_thread.start q.join # 利用 queue 來(lái)阻塞主線程 print(' ===Done===') # result: # [append_thread] putting data 0 to q... # [pop_thread] get data 0 # [append_thread] putting data 1 to q... # [append_thread] putting data 2 to q... # [pop_thread] get data 1 # [append_thread] putting data 3 to q... # [pop_thread] get data 2 # [pop_thread] get data 3 # [pop_thread] all clear in the queue # ===Done=== q = Queue(2) 這里創(chuàng)建了一個(gè)Queue類型對(duì)象,接收一個(gè)整數(shù)參數(shù),告知隊(duì)列的容量,如果傳入一個(gè)非正數(shù),容量默認(rèn)是正無(wú)窮(當(dāng)然,這取決于你電腦的配置情況)。 q.get(block=True, timeout=None) q.put(item, block=True, timeout=None) 這兩個(gè)方法向隊(duì)列中添加元素,或取出元素。默認(rèn)情況下,如果隊(duì)列滿了,調(diào)用q.put會(huì)進(jìn)行阻塞,直到隊(duì)列中有空位才放入元素,完成整個(gè)函數(shù)調(diào)用??梢栽O(shè)置block=False將它轉(zhuǎn)為非阻塞調(diào)用,如果隊(duì)列滿了,則直接引發(fā)一個(gè)Full exception,通過(guò)timeout來(lái)設(shè)置一定的等待時(shí)間,如果在阻塞等待時(shí)間內(nèi)任然沒(méi)空位放入元素,再拋出異常。q.get方法邏輯類似,拋出異常Empty exception q.task_done q.join q.join通過(guò)隊(duì)列來(lái)阻塞主線程,隊(duì)列內(nèi)部有一個(gè)計(jì)數(shù)器,每放入一個(gè)元素,計(jì)數(shù)器加一,當(dāng)計(jì)數(shù)器重新歸零后,解除阻塞。q.task_done就是將計(jì)數(shù)器減一的,一般和q.get配合使用,如果使用過(guò)量,導(dǎo)致計(jì)數(shù)器 小于0 ,則引發(fā)ValueError Exception q.qsize # 獲得隊(duì)列中的元素個(gè)數(shù) q.empty # 隊(duì)列是否為空 q.full # 隊(duì)列是否滿 線程同步引言 再回到最開始的GIL案例,兩個(gè)線程,其中一個(gè)對(duì)全局變量進(jìn)行一百萬(wàn)次加1運(yùn)算,另外一個(gè)進(jìn)行一百萬(wàn)次減1運(yùn)算。最后全局變量的值是幾乎隨機(jī)的,與我們預(yù)想的 0 并不相同。因?yàn)閮蓚€(gè)線程是異步修改這個(gè)變量,不能保證某一時(shí)刻的取值就是正確的。 因此,要對(duì)線程進(jìn)行同步控制,當(dāng)一個(gè)線程操作時(shí),另一個(gè)等待,然后交換執(zhí)行。 使用Lock import threading sum = 0 lock = threading.Lock def add: global sum for i in range(1000000): lock.acquire # lock here sum += 1 lock.release # unlock here def subtract: global sum for i in range(1000000): lock.acquire # lock here sum -= 1 lock.release # unlock here add_thread = threading.Thread(target=add) sub_thread = threading.Thread(target=subtract) add_thread.start sub_thread.start add_thread.join sub_thread.join print(sum) # result: # 0 通過(guò)Lock,我們可以在執(zhí)行相關(guān)代碼之前申請(qǐng)鎖,將一段代碼邏輯鎖起來(lái),鎖資源全局只有一個(gè),一個(gè)線程申請(qǐng)了另外一個(gè)就不能夠申請(qǐng),它要等到資源釋放后才能申請(qǐng)。因此,就保證了同一時(shí)刻只有一個(gè)線程拿到鎖,只有一個(gè)線程能夠進(jìn)行變量的修改。 這種方式比較影響性能,獲取鎖和釋放鎖都需要時(shí)間,也可能引起死鎖問(wèn)題,連續(xù)兩次執(zhí)行l(wèi)ock.acquire就可以引發(fā)死鎖,死鎖可以通過(guò)另外一個(gè)線程來(lái)解開。這也是后面使用Condition的一個(gè)核心理念。 import threading lock = threading.Lock def dead_lock(lock): lock.acquire lock.acquire # 直接調(diào)用兩次會(huì)死鎖這個(gè)線程 print('unlock') def un_lock(lock): lock.release # 用這個(gè)線程來(lái)開鎖 dead_lock_thread = threading.Thread(target=dead_lock, args=(lock,)) un_lock_thread = threading.Thread(target=un_lock, args=(lock,)) dead_lock_thread.start # 注意調(diào)用順序 un_lock_thread.start # 通過(guò)它,死鎖的線程會(huì)被打開,繼續(xù)執(zhí)行打印結(jié)果 # result: # unlock 使用RLock 在同一個(gè)線程中,可以連續(xù)多次調(diào)用lock.acquire,注意最后獲取鎖和釋放鎖的次數(shù)要相同。 lock = threading.RLock def add: global sum for i in range(1000000): lock.acquire # lock here
do_sth(lock) # 在對(duì)變量 +1 之前,要對(duì)變量做其他操作,在函數(shù)中可以再次加鎖
sum += 1 lock.release # unlock here 使用Condition 底層使用 RLock 實(shí)現(xiàn)的,實(shí)現(xiàn)了上下文管理器協(xié)議,可以用with語(yǔ)句進(jìn)行操作,不用擔(dān)心acquire和release的問(wèn)題。 import threading msg = conn = threading.Condition def repeater_one(conn): with conn: global msg for i in range(3): data = '小伙子,沒(méi)想到你也是復(fù)讀機(jī) ({idx})'.format(idx=i) msg.append(data) print('[one]:', data) conn.notify conn.wait def repeater_two(conn): with conn: global msg for i in range(3): conn.wait data = msg.pop print('[two]:', data) conn.notify repeater_one_thread = threading.Thread(target=repeater_one, args=(conn,)) repeater_two_thread = threading.Thread(target=repeater_two, args=(conn,)) # 注意啟動(dòng)的順序非常重要 repeater_two_thread.start repeater_one_thread.start # result: # [one]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (0) # [two]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (0) # [one]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (1) # [two]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (1) # [one]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (2) # [two]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (2) 這里實(shí)現(xiàn)了兩個(gè)復(fù)讀機(jī)線程,一個(gè)線程打印完數(shù)據(jù)后,另外一個(gè)線程進(jìn)行復(fù)述,數(shù)據(jù)保留在全局變量msg中,通過(guò)Condition來(lái)協(xié)調(diào)兩個(gè)線程的訪問(wèn)順序,實(shí)現(xiàn)復(fù)讀效果。 這種控制方式的思想就是當(dāng)滿足了某些條件,線程才能繼續(xù)運(yùn)行下去,否則,線程會(huì)一直阻塞,直到條件被滿足。 conn.notify conn.wait 當(dāng)條件滿足是,使用notify用來(lái)喚醒等待的線程,要等待條件時(shí),再使用wait進(jìn)行阻塞。repeater one首先說(shuō)一句話,然后喚醒repeater two,自身進(jìn)入等待狀態(tài);repeater two等到有人說(shuō)話后,進(jìn)行復(fù)讀,然后喚醒repeater one再次說(shuō)話,自身進(jìn)入等待狀態(tài)。要保證的是,每一時(shí)刻,只能有一個(gè)線程處于等待狀態(tài),否則兩個(gè)線程都會(huì)被阻塞。因此,線程啟動(dòng)的順序和阻塞喚醒條件非常重要。 with conn: # todo
# 或者 conn.require # todo conn.release 這里直接使用with語(yǔ)句,省略了一些邏輯,也可以使用完整的寫法,但是要注意操作的匹配。 Condition的底層其實(shí)使用了兩層鎖,當(dāng)我們?cè)谝粋€(gè)線程中調(diào)用require的時(shí)候,內(nèi)部維護(hù)的一個(gè)鎖(Rlock)會(huì)自動(dòng)鎖上,另外一個(gè)線程在調(diào)用require時(shí),就會(huì)被阻塞。 下面是wait的主要邏輯,調(diào)用wait會(huì)將鎖打開self._release_save,這就允許了另外一個(gè)線程調(diào)用require,同時(shí)建立第二層鎖,waiter,將它加入到隊(duì)列(底層是 deque)中,每調(diào)用一次就會(huì)產(chǎn)生一把鎖,同時(shí)調(diào)用waiter.acquire,接著在后面,會(huì)用各種邏輯判斷再次調(diào)用waiter.acquire,前面講過(guò),連續(xù)兩次調(diào)用會(huì)造成這個(gè)線程的阻塞。 那這個(gè)鎖在哪里打開呢?在另外一個(gè)線程的notify方法中,這個(gè)鎖打開了,它就可以繼續(xù)往下運(yùn)行了。 使用Semaphore 用來(lái)控制資源使用數(shù)量的鎖,對(duì)于文件來(lái)說(shuō),讀操作可以有多個(gè)線程同時(shí)進(jìn)行,共享文件資源,而寫操作,就只能有一個(gè)線程來(lái)獨(dú)占資源,一般用來(lái)控制線程的并發(fā)數(shù)量。 現(xiàn)在我們來(lái)控制讀線程的并發(fā)數(shù)量,每一時(shí)刻只有 3 個(gè)線程在工作,而且工作時(shí)間是 2s,總共有10個(gè)讀線程要完成操作。 import threading import time sem = threading.Semaphore(3) # 指明信號(hào)的數(shù)量 def read(sem): sem.acquire # 拿到信號(hào) print('doing reading staff...') time.sleep(2) sem.release # 釋放信號(hào) for i in range(7): read_thread = threading.Thread(target=read, args=(sem,)) read_thread.start ''' 最后的結(jié)果是每?jī)擅刖陀腥齻€(gè)線程開始讀操作 ''' # result: # doing reading staff... # doing reading staff... # doing reading staff... # doing reading staff... # doing reading staff... # doing reading staff... # doing reading staff... Semaphore的底層是使用Condition進(jìn)行實(shí)現(xiàn)的,內(nèi)部維護(hù)了一個(gè)_value變量,用來(lái)計(jì)數(shù)。 acquire和release內(nèi)部的邏輯有些改變,在申請(qǐng)資源時(shí),首先要看_value的值有沒(méi)有減到 0 ,如果有,再有線程執(zhí)行acquire就會(huì)執(zhí)行wait進(jìn)行阻塞,資源釋放時(shí)要增加_value的值,同時(shí)使用notify喚醒隊(duì)列中等待的一個(gè)線程。 concurrent線程池引言 為什么要線程池呢?回看上面的Semaphore例子,我們定義了一個(gè)數(shù)量為 3 的信號(hào)量,保證了同一時(shí)刻只有 3 個(gè)線程存在于內(nèi)存中。但是從程序開始運(yùn)行到結(jié)束,我們一共使用過(guò) 7 個(gè)線程range(7),完成了 7 次同樣的讀操作,也就是說(shuō)創(chuàng)建了 7 次線程,又銷毀了 7 次線程。如果每個(gè)線程的執(zhí)行時(shí)間非常短,又需要?jiǎng)?chuàng)建大量的線程,那么資源都在創(chuàng)建/銷毀線程的過(guò)程中被消耗了。 能不能總共就使用 3 個(gè)線程達(dá)到同樣的效果呢?每個(gè)線程多做幾次同樣的操作邏輯就可以了,concurrent.futures就提供了這樣的管理方案,同時(shí),還有下面這些優(yōu)點(diǎn):
使用案例 from concurrent.futures import ThreadPoolExecutor import time def read(sth): print('Reading {sth}...'.format(sth=sth)) time.sleep(1) return '{sth} done'.format(sth=sth) executor = ThreadPoolExecutor(2) # 創(chuàng)建一個(gè)可以容納 兩個(gè) 線程的線程池 if __name__ == '__main__': task_one = executor.submit(read, 'books') task_two = executor.submit(read, 'newspaper') task_three = executor.submit(read, 'comics') print(task_one.done) time.sleep(4) print(task_two.done) print(task_three.done)
# result: # Reading books... # Reading newspaper... # False # Reading comics... # True # True executor = ThreadPoolExecutor(2) 這里創(chuàng)建了一個(gè)容納兩個(gè)線程的線程池,如果不指定線程數(shù)量參數(shù),它會(huì)以 5倍 cpu內(nèi)核數(shù)量作為默認(rèn)值。Python3在3.5, 3.6, 3.7版本的更新中都加入了可選參數(shù),可以查看官方文檔熟悉新的使用方式。 submit(fn, *args, **kwargs) task_one = executor.submit(read, 'books') # reture Future object task_two = executor.submit(read, 'newspaper') task_three = executor.submit(read, 'comics') 這里使用submit方法向線程池中提交任務(wù),提交的任務(wù)數(shù)量可以大于線程池中申請(qǐng)的線程數(shù)量。第一個(gè)參數(shù)是任務(wù)函數(shù),后面依次列出參數(shù)。一旦任務(wù)被提交,線程池中的線程自動(dòng)進(jìn)行調(diào)度,直到所有提交任務(wù)的完成。提交任務(wù)后,會(huì)返回一個(gè)Future對(duì)象。 ''' Future 類型對(duì)象會(huì)在 submit 函數(shù)調(diào)用之后返回 ''' future.done # 如果提交的任務(wù)完成,這個(gè)方法會(huì)返回 True future.cancel # 取消任務(wù)執(zhí)行,如果任務(wù)已經(jīng)調(diào)度執(zhí)行,就不能取消 future.result # 返回任務(wù)函數(shù)執(zhí)行后的返回結(jié)果 在main中加入新的邏輯: ... print(task_one.result) # 這些都是阻塞式調(diào)用,獲得結(jié)果后才會(huì)繼續(xù)向下執(zhí)行 print(task_two.result) print(task_three.result) # result: # books done # newspaper done # comics done 上面的寫法其實(shí)比較麻煩,如果向線程池中提交的任務(wù)過(guò)多,這樣操作每個(gè)Future對(duì)象會(huì)相當(dāng)繁瑣??梢耘窟M(jìn)行任務(wù)的提交,將Future對(duì)象加入一個(gè)列表進(jìn)行管理,配合使用模塊中的as_complete函數(shù),可以一次性獲得所有執(zhí)行完成任務(wù)函數(shù)的Future對(duì)象。 from concurrent.futures import as_completed ... # ---修改main中的邏輯 items = ['books', 'newspaper', 'comics'] task_list = [executor.submit(read, item) for item in items] for future in as_completed(task_list): print(future.result)
# result: # newspaper done # books done # comics done 核心邏輯中,as_complete將已經(jīng)執(zhí)行完的任務(wù)函數(shù)對(duì)應(yīng)的Future對(duì)象通過(guò)yield進(jìn)行返回,這里完成的順序和任務(wù)提交的順序并不一樣,和內(nèi)部的調(diào)度邏輯有關(guān),我多次執(zhí)行結(jié)果沒(méi)有完全一樣。這里的yield邏輯是在一定的條件下才會(huì)發(fā)生的,因此,只要有線程沒(méi)有運(yùn)行完,就無(wú)法yield結(jié)果,會(huì)在for這里進(jìn)行阻塞,等到所有任務(wù)執(zhí)行完畢之后,for結(jié)束。 還有一種更加簡(jiǎn)潔的辦法,在executor中,有一個(gè)和Python內(nèi)置函數(shù)map邏輯相似的函數(shù)。它將任務(wù)函數(shù)和參數(shù)進(jìn)行一一匹配調(diào)用,直接返回future.result對(duì)象。這種方式是順序進(jìn)行調(diào)度的,完成順序總是:books,newspaper, comics ... items = ['books', 'newspaper', 'comics'] for res in executor.map(read, items): print(res) 補(bǔ)充 wait函數(shù) 用來(lái)讓主線程在不同條件下等待線程池中線程的運(yùn)行。 def wait(fs, timeout=None, return_when=ALL_COMPLETED): ''' 1. fs: Futures 對(duì)象的序列,當(dāng)他們對(duì)應(yīng)的任務(wù)函數(shù)都完成后,解除阻塞 2. timeout: 等待的時(shí)間,超過(guò)時(shí)間就不等了 3. return_when FIRST_COMPLETED: 任何一個(gè)任務(wù)函數(shù)完成 FIRST_EXCEPTION: 任何一個(gè)任務(wù)函數(shù)執(zhí)行時(shí)拋異常 ALL_COMPLETED: 所有都完成
3個(gè)參數(shù)條件,哪個(gè)先滿足就直接解除阻塞 from concurrent.futures import ThreadPoolExecutor, wait import time def read(times): time.sleep(times) print('read for {span}s'.format(span=times)) executor = ThreadPoolExecutor(2) time_list = [1, 2, 3, 4] task_list = [executor.submit(read, times) for times in time_list] print('done') # result: # done # read for 1s # read for 2s # read for 3s # read for 4s 這里修改了read函數(shù)的邏輯,由讀不同的內(nèi)容改為讀不同的時(shí)間長(zhǎng)度。這里主線程沒(méi)有等待線程池中的任務(wù),提交任務(wù)后直接執(zhí)行了print。 ... wait(task_list) print('done') # result: # read for 1s # read for 2s # read for 3s # read for 4s # done ... wait(task_list, 2) # 就等兩秒鐘 print('done') # read for 1s # read for 2s # done # read for 3s # read for 4s from concurrent.futures import FIRST_COMPLETED # 注意常量值的使用,選一種合適的方法進(jìn)行導(dǎo)入使用 # from concurrent import futures # futures.FIRST_COMPLETED ... wait(task_list, return_when=FIRST_COMPLETED) print('done') # result: # read for 1s # done # read for 2s # read for 3s # read for 4s Future對(duì)象 未來(lái)對(duì)象?隨便叫啥吧。前面也看到過(guò),通過(guò)submit提交任務(wù)函數(shù)后,就會(huì)返回這么一個(gè)對(duì)象,可以通過(guò)它來(lái)監(jiān)控運(yùn)行我們?nèi)蝿?wù)函數(shù)的那個(gè)線程:判斷是否運(yùn)行完成、得到返回值等。 從源碼中,可以看出,任務(wù)提交之后,先創(chuàng)建了一個(gè)Future類型對(duì)象f,然后將這個(gè)對(duì)象連同我們提交的任務(wù)參數(shù)一起委托給了_WorkItem,之后也返回一個(gè)對(duì)象w,將它加入隊(duì)列self._work_queue,這應(yīng)該就是內(nèi)部調(diào)度的邏輯了,最后返回f。 ![]() _adjust_thread_count用來(lái)創(chuàng)建出我們需要的線程數(shù)量,并且target=worker,參數(shù)列表中有self._work_queue作為參數(shù)。_threads是用來(lái)存放線程的一個(gè)集合。我們提交的不同任務(wù)函數(shù)怎么變成了一個(gè)_worker函數(shù)呢? ![]() _worker的主要邏輯中,有一個(gè)循環(huán),從self._work_queue中不斷取出一個(gè)任務(wù),它是_WorkItem類型的,在submit函數(shù)中加入,然后調(diào)用它的run方法,隨后del work_item將它刪除。多個(gè)任務(wù)線程讀取的是同一個(gè)任務(wù)隊(duì)列,直到任務(wù)全部完成。 ![]() 在_WorkItem中,它的run方法調(diào)用了我們提交的任務(wù)函數(shù)fn,并且記錄了它的返回值,進(jìn)行了一些異常處理。 ![]() shutdown 前面都在講線程池的使用方法和工作原理,還有一個(gè)細(xì)節(jié)需要補(bǔ)充的就是executor的shutdown方法。 ![]() 它是用來(lái)關(guān)閉線程池,如果在關(guān)閉之后繼續(xù)向里面提交任務(wù),會(huì)拋出一個(gè)異常。調(diào)用之后,self._work_queue.put(None)往任務(wù)隊(duì)列中加入了一個(gè)標(biāo)記,當(dāng)線程調(diào)度時(shí)拿到這個(gè)標(biāo)志就知道任務(wù)結(jié)束了,這與在前面使用Queue進(jìn)行線程間通信的案例用了同樣的方式。 有一個(gè)可選參數(shù)wait,如果它是True,則主線程在這里阻塞,等待所有線程完成任務(wù)。如果是False,主線程繼續(xù)向下執(zhí)行。 可以使用with語(yǔ)句,當(dāng)所有的任務(wù)完成之后自動(dòng)調(diào)用shutdown,這里就不再多舉例子,官方文檔 給出了一個(gè)經(jīng)典的例子,以作參考。 下篇:python入門系列 多進(jìn)程 |
|