乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      python入門系列:多線程

       xiaoyimin 2019-02-14

      python中的GIL

      • GIL(Global Interpreter Lock),就是一個(gè)鎖。
      • Python中的一個(gè)線程對(duì)應(yīng)于 C語(yǔ)言 中的一個(gè)線程。
      • GIL使得同一時(shí)刻只有一個(gè)線程在一個(gè)cpu上執(zhí)行字節(jié)碼,無(wú)法將多個(gè)線程分配到多個(gè)cpu上進(jìn)行同步運(yùn)行。如果在單核cpu上,線程是并發(fā)運(yùn)行,而不是并行。

      首先,這樣效率不高,但是看似也不會(huì)產(chǎn)生數(shù)據(jù)訪問(wèn)沖突的問(wèn)題,畢竟同一時(shí)刻只有一個(gè)線程在一個(gè)核上運(yùn)行嘛,然而:

      sum = 0

      def add:

      global sum

      for i in range(1000000):

      sum += 1

      def subtract:

      global sum

      for i in range(1000000):

      sum -= 1

      import threading

      add_thread = threading.Thread(target=add)

      sub_thread = threading.Thread(target=subtract)

      add_thread.start

      sub_thread.start

      add_thread.join

      sub_thread.join

      print(sum)

      如果按照上面的理解,線程間很安全,最后結(jié)果應(yīng)該會(huì)是 0,運(yùn)行三次代碼的結(jié)果如下:

      # result:

      # 358918

      # 718494

      # -162684

      這說(shuō)明兩個(gè)線程并沒(méi)有順序異步執(zhí)行。在一些特定的情況,GIL這把鎖會(huì)被打開,一定程度上達(dá)到并行的效果。

      GIL會(huì)根據(jù)線程執(zhí)行的字節(jié)碼行數(shù)以及時(shí)間片以及遇到 I/O 操作打開,所以Python的多線程對(duì) I/O 密集型代碼比較友好,比如,文件處理和網(wǎng)絡(luò)爬蟲。

      多線程編程

      線程模塊

      在Python3中提供了兩個(gè)模塊來(lái)使用線程_thread和threading,前者提供了低級(jí)別、原始的線程以及一個(gè)簡(jiǎn)單的鎖,相比后者功能還是比較有限的,所以我們使用threading模塊。

      使用案例

      直接使用Thread來(lái)實(shí)例化

      import time

      import threading

      def learn(obj):

      print('learning {sth} started'.format(sth=obj))

      time.sleep(2)

      print('learning {sth} end'.format(sth=obj))

      def play(obj):

      print('playing {sth} started'.format(sth=obj))

      time.sleep(4)

      print('playing {sth} end'.format(sth=obj))

      # 創(chuàng)建出兩個(gè)線程對(duì)象

      learn_thread = threading.Thread(target=learn, args=('Python',))

      play_thread = threading.Thread(target=play, args=('Game',))

      start_time = time.time

      # 啟動(dòng)線程,開始執(zhí)行

      learn_thread.start

      play_thread.start

      end_time = time.time

      span = end_time - start_time

      print('[lasting for {time_span}s]'.format(time_span=span))

      # result:

      # learning Python started

      # playing Game started

      # [lasting for 0.0005965232849121094s]

      # learning Python end

      # playing Game end

      可以看到,整個(gè)程序的運(yùn)行時(shí)間基本上是 0s,這是因?yàn)檎麄€(gè)程序中實(shí)際有三個(gè)線程:創(chuàng)建出來(lái)的兩個(gè)線程和主線程(MainThread),那兩個(gè)線程創(chuàng)建出來(lái)后就不受主線程控制了,他們的工作不占用主線程的時(shí)間,主線程除了計(jì)時(shí)就沒(méi)有其他邏輯了,因此主線程持續(xù)時(shí)間是幾乎是 0s。

      但是主線程邏輯完成后并沒(méi)有退出,它等待了另外兩個(gè)線程運(yùn)行的結(jié)束,如果主線程在其他兩個(gè)線程結(jié)束之前就退出了,意味著整個(gè)程序進(jìn)程終止了,另外兩個(gè)線程會(huì)迅速終止。如果我們就是有這種需求,那就可以將另外兩個(gè)線程配置成守護(hù)線程,主線程結(jié)束,他們也立刻結(jié)束。

      learn_thread.setDeamon(True)

      play_thread.setDeamon(True)

      # result:

      # learning Python started

      # playing Game started

      # [lasting for 0.015601158142089844s]

      主線程邏輯執(zhí)行完后就退出了,其他兩個(gè)線程還沒(méi)來(lái)得及打印消息也被一并終止了。

      如果主線程需要等到兩個(gè)線程執(zhí)行完后再打印整個(gè)運(yùn)行時(shí)間,就可以這么設(shè)置:

      learn_thread.start

      play_thread.start

      # 主線程會(huì)在這里阻塞,創(chuàng)建出來(lái)的線程執(zhí)行完后才繼續(xù)往下執(zhí)行

      learn_thread.join

      play_thread.join

      end_time = time.time

      span = end_time - start_time

      print('[lasting for {time_span}s]'.format(time_span=span))

      # result:

      # learning Python started

      # playing Game started

      # learning Python end

      # playing Game end

      # [lasting for 4.003938436508179s]

      現(xiàn)在總的運(yùn)行時(shí)間是 4s,意味著主線程等待了兩個(gè)子線程的運(yùn)行,而且兩個(gè)子線程是同步執(zhí)行的,2s + 4s = max(2s, 4s)。

      通過(guò)繼承Thread實(shí)現(xiàn)多線程

      import threading

      class LearnThread(threading.Thread):

      def __init__(self, obj):

      self.sth = obj # 處理一下參數(shù)問(wèn)題

      super.__init__ # 委托給父類完成創(chuàng)建

      # 這個(gè)函數(shù)在 start 之后會(huì)自動(dòng)調(diào)用,里面寫主要的業(yè)務(wù)邏輯

      def run(self):

      print('learning {sth} started'.format(sth=self.sth))

      time.sleep(2)

      print('learning {sth} end'.format(sth=self.sth))

      用同樣的邏輯實(shí)現(xiàn)PlayThread子線程類,最后結(jié)果如下:

      ...

      # 創(chuàng)建兩個(gè)線程實(shí)例

      learn_thread = LearnThread('Python')

      play_thread = PlayThread('BasketBall')

      start_time = time.time

      # 啟動(dòng)線程,開始執(zhí)行

      learn_thread.start

      play_thread.start

      learn_thread.join

      play_thread.join

      end_time = time.time

      span = end_time - start_time

      print('[lasting for {time_span}s]'.format(time_span=span))

      # result:

      # learning Python started

      # playing BasketBall started

      # learning Python end

      # playing BasketBall end

      # [lasting for 4.0020973682403564s]

      這種方式更加靈活,在線程內(nèi)可以自定義我們的邏輯,如果線程非常復(fù)雜,這樣寫可以使程序更加模塊化,也更容易后續(xù)維護(hù)。

      線程間的通信

      引言

      如果程序中有多個(gè)線程,他們的推進(jìn)順序可能相互依賴,a線程執(zhí)行到某一階段后,b線程才能開始執(zhí)行,b線程執(zhí)行完畢后,a線程才能繼續(xù)進(jìn)行。

      這樣一種情況之下,線程之間就要進(jìn)行通信,才能保證程序的正常運(yùn)行。

      通過(guò)共享變量來(lái)實(shí)現(xiàn)

      線程安全不能保證,不推薦,就不詳細(xì)講解了。

      通過(guò)Queue來(lái)進(jìn)行線程通信

      思路和共享變量差不多,只不過(guò)這里使用的數(shù)據(jù)結(jié)構(gòu)是經(jīng)過(guò)封裝的,是線程安全的,使用起來(lái)也更加方便。

      import time

      import threading

      from queue import Queue

      # 用來(lái)向 queue 中加入數(shù)據(jù)

      def append(q):

      for i in range(4):

      print('[append_thread] putting data {data} to q...'.format(data=i))

      q.put(i)

      time.sleep(1)

      q.put(None) # end flag

      # 用來(lái)向 queue 中取出數(shù)據(jù)

      def pop(q):

      while True:

      data = q.get

      if data is None:

      print('[pop_thread] all clear in the queue')

      q.task_done

      break

      else:

      print('[pop_thread] get data j99bm99'.format(d=data))

      time.sleep(3)

      q.task_done

      q = Queue(2)

      append_thread = threading.Thread(target=append, args=(q,))

      pop_thread = threading.Thread(target=pop, args=(q,))

      append_thread.start

      pop_thread.start

      q.join # 利用 queue 來(lái)阻塞主線程

      print(' ===Done===')

      # result:

      # [append_thread] putting data 0 to q...

      # [pop_thread] get data 0

      # [append_thread] putting data 1 to q...

      # [append_thread] putting data 2 to q...

      # [pop_thread] get data 1

      # [append_thread] putting data 3 to q...

      # [pop_thread] get data 2

      # [pop_thread] get data 3

      # [pop_thread] all clear in the queue

      # ===Done===

      q = Queue(2)

      這里創(chuàng)建了一個(gè)Queue類型對(duì)象,接收一個(gè)整數(shù)參數(shù),告知隊(duì)列的容量,如果傳入一個(gè)非正數(shù),容量默認(rèn)是正無(wú)窮(當(dāng)然,這取決于你電腦的配置情況)。

      q.get(block=True, timeout=None)

      q.put(item, block=True, timeout=None)

      這兩個(gè)方法向隊(duì)列中添加元素,或取出元素。默認(rèn)情況下,如果隊(duì)列滿了,調(diào)用q.put會(huì)進(jìn)行阻塞,直到隊(duì)列中有空位才放入元素,完成整個(gè)函數(shù)調(diào)用??梢栽O(shè)置block=False將它轉(zhuǎn)為非阻塞調(diào)用,如果隊(duì)列滿了,則直接引發(fā)一個(gè)Full exception,通過(guò)timeout來(lái)設(shè)置一定的等待時(shí)間,如果在阻塞等待時(shí)間內(nèi)任然沒(méi)空位放入元素,再拋出異常。q.get方法邏輯類似,拋出異常Empty exception

      q.task_done

      q.join

      q.join通過(guò)隊(duì)列來(lái)阻塞主線程,隊(duì)列內(nèi)部有一個(gè)計(jì)數(shù)器,每放入一個(gè)元素,計(jì)數(shù)器加一,當(dāng)計(jì)數(shù)器重新歸零后,解除阻塞。q.task_done就是將計(jì)數(shù)器減一的,一般和q.get配合使用,如果使用過(guò)量,導(dǎo)致計(jì)數(shù)器 小于0 ,則引發(fā)ValueError Exception

      q.qsize # 獲得隊(duì)列中的元素個(gè)數(shù)

      q.empty # 隊(duì)列是否為空

      q.full # 隊(duì)列是否滿

      線程同步

      引言

      再回到最開始的GIL案例,兩個(gè)線程,其中一個(gè)對(duì)全局變量進(jìn)行一百萬(wàn)次加1運(yùn)算,另外一個(gè)進(jìn)行一百萬(wàn)次減1運(yùn)算。最后全局變量的值是幾乎隨機(jī)的,與我們預(yù)想的 0

      并不相同。因?yàn)閮蓚€(gè)線程是異步修改這個(gè)變量,不能保證某一時(shí)刻的取值就是正確的。

      因此,要對(duì)線程進(jìn)行同步控制,當(dāng)一個(gè)線程操作時(shí),另一個(gè)等待,然后交換執(zhí)行。

      使用Lock

      import threading

      sum = 0

      lock = threading.Lock

      def add:

      global sum

      for i in range(1000000):

      lock.acquire # lock here

      sum += 1

      lock.release # unlock here

      def subtract:

      global sum

      for i in range(1000000):

      lock.acquire # lock here

      sum -= 1

      lock.release # unlock here

      add_thread = threading.Thread(target=add)

      sub_thread = threading.Thread(target=subtract)

      add_thread.start

      sub_thread.start

      add_thread.join

      sub_thread.join

      print(sum)

      # result:

      # 0

      通過(guò)Lock,我們可以在執(zhí)行相關(guān)代碼之前申請(qǐng),將一段代碼邏輯鎖起來(lái),鎖資源全局只有一個(gè),一個(gè)線程申請(qǐng)了另外一個(gè)就不能夠申請(qǐng),它要等到資源釋放后才能申請(qǐng)。因此,就保證了同一時(shí)刻只有一個(gè)線程拿到鎖,只有一個(gè)線程能夠進(jìn)行變量的修改。

      這種方式比較影響性能,獲取鎖釋放鎖都需要時(shí)間,也可能引起死鎖問(wèn)題,連續(xù)兩次執(zhí)行l(wèi)ock.acquire就可以引發(fā)死鎖,死鎖可以通過(guò)另外一個(gè)線程來(lái)解開。這也是后面使用Condition的一個(gè)核心理念。

      import threading

      lock = threading.Lock

      def dead_lock(lock):

      lock.acquire

      lock.acquire # 直接調(diào)用兩次會(huì)死鎖這個(gè)線程

      print('unlock')

      def un_lock(lock):

      lock.release # 用這個(gè)線程來(lái)開鎖

      dead_lock_thread = threading.Thread(target=dead_lock, args=(lock,))

      un_lock_thread = threading.Thread(target=un_lock, args=(lock,))

      dead_lock_thread.start # 注意調(diào)用順序

      un_lock_thread.start # 通過(guò)它,死鎖的線程會(huì)被打開,繼續(xù)執(zhí)行打印結(jié)果

      # result:

      # unlock

      使用RLock

      在同一個(gè)線程中,可以連續(xù)多次調(diào)用lock.acquire,注意最后獲取鎖和釋放鎖的次數(shù)要相同。

      lock = threading.RLock

      def add:

      global sum

      for i in range(1000000):

      lock.acquire # lock here

      do_sth(lock) # 在對(duì)變量 +1 之前,要對(duì)變量做其他操作,在函數(shù)中可以再次加鎖

      sum += 1

      lock.release # unlock here

      使用Condition

      底層使用 RLock 實(shí)現(xiàn)的,實(shí)現(xiàn)了上下文管理器協(xié)議,可以用with語(yǔ)句進(jìn)行操作,不用擔(dān)心acquire和release的問(wèn)題。

      import threading

      msg =

      conn = threading.Condition

      def repeater_one(conn):

      with conn:

      global msg

      for i in range(3):

      data = '小伙子,沒(méi)想到你也是復(fù)讀機(jī) ({idx})'.format(idx=i)

      msg.append(data)

      print('[one]:', data)

      conn.notify

      conn.wait

      def repeater_two(conn):

      with conn:

      global msg

      for i in range(3):

      conn.wait

      data = msg.pop

      print('[two]:', data)

      conn.notify

      repeater_one_thread = threading.Thread(target=repeater_one, args=(conn,))

      repeater_two_thread = threading.Thread(target=repeater_two, args=(conn,))

      # 注意啟動(dòng)的順序非常重要

      repeater_two_thread.start

      repeater_one_thread.start

      # result:

      # [one]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (0)

      # [two]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (0)

      # [one]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (1)

      # [two]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (1)

      # [one]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (2)

      # [two]: 小伙子,沒(méi)想到你也是復(fù)讀機(jī) (2)

      這里實(shí)現(xiàn)了兩個(gè)復(fù)讀機(jī)線程,一個(gè)線程打印完數(shù)據(jù)后,另外一個(gè)線程進(jìn)行復(fù)述,數(shù)據(jù)保留在全局變量msg中,通過(guò)Condition來(lái)協(xié)調(diào)兩個(gè)線程的訪問(wèn)順序,實(shí)現(xiàn)復(fù)讀效果。

      這種控制方式的思想就是當(dāng)滿足了某些條件,線程才能繼續(xù)運(yùn)行下去,否則,線程會(huì)一直阻塞,直到條件被滿足。

      conn.notify

      conn.wait

      當(dāng)條件滿足是,使用notify用來(lái)喚醒等待的線程,要等待條件時(shí),再使用wait進(jìn)行阻塞。repeater one首先說(shuō)一句話,然后喚醒repeater two,自身進(jìn)入等待狀態(tài);repeater two等到有人說(shuō)話后,進(jìn)行復(fù)讀,然后喚醒repeater one再次說(shuō)話,自身進(jìn)入等待狀態(tài)。要保證的是,每一時(shí)刻,只能有一個(gè)線程處于等待狀態(tài),否則兩個(gè)線程都會(huì)被阻塞。因此,線程啟動(dòng)的順序和阻塞喚醒條件非常重要。

      with conn:

      # todo

      # 或者

      conn.require

      # todo

      conn.release

      這里直接使用with語(yǔ)句,省略了一些邏輯,也可以使用完整的寫法,但是要注意操作的匹配。

      Condition的底層其實(shí)使用了兩層鎖,當(dāng)我們?cè)谝粋€(gè)線程中調(diào)用require的時(shí)候,內(nèi)部維護(hù)的一個(gè)鎖(Rlock)會(huì)自動(dòng)鎖上,另外一個(gè)線程在調(diào)用require時(shí),就會(huì)被阻塞。

      下面是wait的主要邏輯,調(diào)用wait會(huì)將鎖打開self._release_save,這就允許了另外一個(gè)線程調(diào)用require,同時(shí)建立第二層鎖,waiter,將它加入到隊(duì)列(底層是 deque)中,每調(diào)用一次就會(huì)產(chǎn)生一把鎖,同時(shí)調(diào)用waiter.acquire,接著在后面,會(huì)用各種邏輯判斷再次調(diào)用waiter.acquire,前面講過(guò),連續(xù)兩次調(diào)用會(huì)造成這個(gè)線程的阻塞。 那這個(gè)鎖在哪里打開呢?在另外一個(gè)線程的notify方法中,這個(gè)鎖打開了,它就可以繼續(xù)往下運(yùn)行了。

      使用Semaphore

      用來(lái)控制資源使用數(shù)量的鎖,對(duì)于文件來(lái)說(shuō),讀操作可以有多個(gè)線程同時(shí)進(jìn)行,共享文件資源,而寫操作,就只能有一個(gè)線程來(lái)獨(dú)占資源,一般用來(lái)控制線程的并發(fā)數(shù)量。

      現(xiàn)在我們來(lái)控制讀線程的并發(fā)數(shù)量,每一時(shí)刻只有 3 個(gè)線程在工作,而且工作時(shí)間是 2s,總共有10個(gè)讀線程要完成操作。

      import threading

      import time

      sem = threading.Semaphore(3) # 指明信號(hào)的數(shù)量

      def read(sem):

      sem.acquire # 拿到信號(hào)

      print('doing reading staff...')

      time.sleep(2)

      sem.release # 釋放信號(hào)

      for i in range(7):

      read_thread = threading.Thread(target=read, args=(sem,))

      read_thread.start

      '''

      最后的結(jié)果是每?jī)擅刖陀腥齻€(gè)線程開始讀操作

      '''

      # result:

      # doing reading staff...

      # doing reading staff...

      # doing reading staff...

      # doing reading staff...

      # doing reading staff...

      # doing reading staff...

      # doing reading staff...

      Semaphore的底層是使用Condition進(jìn)行實(shí)現(xiàn)的,內(nèi)部維護(hù)了一個(gè)_value變量,用來(lái)計(jì)數(shù)。

      acquire和release內(nèi)部的邏輯有些改變,在申請(qǐng)資源時(shí),首先要看_value的值有沒(méi)有減到 0 ,如果有,再有線程執(zhí)行acquire就會(huì)執(zhí)行wait進(jìn)行阻塞,資源釋放時(shí)要增加_value的值,同時(shí)使用notify喚醒隊(duì)列中等待的一個(gè)線程。

      concurrent線程池

      引言

      為什么要線程池呢?回看上面的Semaphore例子,我們定義了一個(gè)數(shù)量為 3 的信號(hào)量,保證了同一時(shí)刻只有 3 個(gè)線程存在于內(nèi)存中。但是從程序開始運(yùn)行到結(jié)束,我們一共使用過(guò) 7 個(gè)線程range(7),完成了 7 次同樣的讀操作,也就是說(shuō)創(chuàng)建了 7 次線程,又銷毀了 7 次線程。如果每個(gè)線程的執(zhí)行時(shí)間非常短,又需要?jiǎng)?chuàng)建大量的線程,那么資源都在創(chuàng)建/銷毀線程的過(guò)程中被消耗了。

      能不能總共就使用 3 個(gè)線程達(dá)到同樣的效果呢?每個(gè)線程多做幾次同樣的操作邏輯就可以了,concurrent.futures就提供了這樣的管理方案,同時(shí),還有下面這些優(yōu)點(diǎn):

      • 主線程中可以獲取某一個(gè)線程狀態(tài)或者一個(gè)任務(wù)的狀態(tài),還有返回值。
      • 當(dāng)一個(gè)線程完成工作后,主線程能立刻知道。
      • futures使得多線程多進(jìn)程編碼接口一致。

      使用案例

      from concurrent.futures import ThreadPoolExecutor

      import time

      def read(sth):

      print('Reading {sth}...'.format(sth=sth))

      time.sleep(1)

      return '{sth} done'.format(sth=sth)

      executor = ThreadPoolExecutor(2) # 創(chuàng)建一個(gè)可以容納 兩個(gè) 線程的線程池

      if __name__ == '__main__':

      task_one = executor.submit(read, 'books')

      task_two = executor.submit(read, 'newspaper')

      task_three = executor.submit(read, 'comics')

      print(task_one.done)

      time.sleep(4)

      print(task_two.done)

      print(task_three.done)

      # result:

      # Reading books...

      # Reading newspaper...

      # False

      # Reading comics...

      # True

      # True

      executor = ThreadPoolExecutor(2)

      這里創(chuàng)建了一個(gè)容納兩個(gè)線程的線程池,如果不指定線程數(shù)量參數(shù),它會(huì)以 5倍 cpu內(nèi)核數(shù)量作為默認(rèn)值。Python3在3.5, 3.6, 3.7版本的更新中都加入了可選參數(shù),可以查看官方文檔熟悉新的使用方式。

      submit(fn, *args, **kwargs)

      task_one = executor.submit(read, 'books') # reture Future object

      task_two = executor.submit(read, 'newspaper')

      task_three = executor.submit(read, 'comics')

      這里使用submit方法向線程池中提交任務(wù),提交的任務(wù)數(shù)量可以大于線程池中申請(qǐng)的線程數(shù)量。第一個(gè)參數(shù)是任務(wù)函數(shù),后面依次列出參數(shù)。一旦任務(wù)被提交,線程池中的線程自動(dòng)進(jìn)行調(diào)度,直到所有提交任務(wù)的完成。提交任務(wù)后,會(huì)返回一個(gè)Future對(duì)象。

      '''

      Future 類型對(duì)象會(huì)在 submit 函數(shù)調(diào)用之后返回

      '''

      future.done # 如果提交的任務(wù)完成,這個(gè)方法會(huì)返回 True

      future.cancel # 取消任務(wù)執(zhí)行,如果任務(wù)已經(jīng)調(diào)度執(zhí)行,就不能取消

      future.result # 返回任務(wù)函數(shù)執(zhí)行后的返回結(jié)果

      在main中加入新的邏輯:

      ...

      print(task_one.result) # 這些都是阻塞式調(diào)用,獲得結(jié)果后才會(huì)繼續(xù)向下執(zhí)行

      print(task_two.result)

      print(task_three.result)

      # result:

      # books done

      # newspaper done

      # comics done

      上面的寫法其實(shí)比較麻煩,如果向線程池中提交的任務(wù)過(guò)多,這樣操作每個(gè)Future對(duì)象會(huì)相當(dāng)繁瑣??梢耘窟M(jìn)行任務(wù)的提交,將Future對(duì)象加入一個(gè)列表進(jìn)行管理,配合使用模塊中的as_complete函數(shù),可以一次性獲得所有執(zhí)行完成任務(wù)函數(shù)的Future對(duì)象。

      from concurrent.futures import as_completed

      ...

      # ---修改main中的邏輯

      items = ['books', 'newspaper', 'comics']

      task_list = [executor.submit(read, item) for item in items]

      for future in as_completed(task_list):

      print(future.result)

      # result:

      # newspaper done

      # books done

      # comics done

      核心邏輯中,as_complete將已經(jīng)執(zhí)行完的任務(wù)函數(shù)對(duì)應(yīng)的Future對(duì)象通過(guò)yield進(jìn)行返回,這里完成的順序和任務(wù)提交的順序并不一樣,和內(nèi)部的調(diào)度邏輯有關(guān),我多次執(zhí)行結(jié)果沒(méi)有完全一樣。這里的yield邏輯是在一定的條件下才會(huì)發(fā)生的,因此,只要有線程沒(méi)有運(yùn)行完,就無(wú)法yield結(jié)果,會(huì)在for這里進(jìn)行阻塞,等到所有任務(wù)執(zhí)行完畢之后,for結(jié)束。

      還有一種更加簡(jiǎn)潔的辦法,在executor中,有一個(gè)和Python內(nèi)置函數(shù)map邏輯相似的函數(shù)。它將任務(wù)函數(shù)和參數(shù)進(jìn)行一一匹配調(diào)用,直接返回future.result對(duì)象。這種方式是順序進(jìn)行調(diào)度的,完成順序總是:books,newspaper, comics

      ...

      items = ['books', 'newspaper', 'comics']

      for res in executor.map(read, items):

      print(res)

      補(bǔ)充

      wait函數(shù)

      用來(lái)讓主線程在不同條件下等待線程池中線程的運(yùn)行。

      def wait(fs, timeout=None, return_when=ALL_COMPLETED):

      '''

      1. fs: Futures 對(duì)象的序列,當(dāng)他們對(duì)應(yīng)的任務(wù)函數(shù)都完成后,解除阻塞

      2. timeout: 等待的時(shí)間,超過(guò)時(shí)間就不等了

      3. return_when

      FIRST_COMPLETED: 任何一個(gè)任務(wù)函數(shù)完成

      FIRST_EXCEPTION: 任何一個(gè)任務(wù)函數(shù)執(zhí)行時(shí)拋異常

      ALL_COMPLETED: 所有都完成

      3個(gè)參數(shù)條件,哪個(gè)先滿足就直接解除阻塞

      from concurrent.futures import ThreadPoolExecutor, wait

      import time

      def read(times):

      time.sleep(times)

      print('read for {span}s'.format(span=times))

      executor = ThreadPoolExecutor(2)

      time_list = [1, 2, 3, 4]

      task_list = [executor.submit(read, times) for times in time_list]

      print('done')

      # result:

      # done

      # read for 1s

      # read for 2s

      # read for 3s

      # read for 4s

      這里修改了read函數(shù)的邏輯,由讀不同的內(nèi)容改為讀不同的時(shí)間長(zhǎng)度。這里主線程沒(méi)有等待線程池中的任務(wù),提交任務(wù)后直接執(zhí)行了print。

      ...

      wait(task_list)

      print('done')

      # result:

      # read for 1s

      # read for 2s

      # read for 3s

      # read for 4s

      # done

      ...

      wait(task_list, 2) # 就等兩秒鐘

      print('done')

      # read for 1s

      # read for 2s

      # done

      # read for 3s

      # read for 4s

      from concurrent.futures import FIRST_COMPLETED

      # 注意常量值的使用,選一種合適的方法進(jìn)行導(dǎo)入使用

      # from concurrent import futures

      # futures.FIRST_COMPLETED

      ...

      wait(task_list, return_when=FIRST_COMPLETED)

      print('done')

      # result:

      # read for 1s

      # done

      # read for 2s

      # read for 3s

      # read for 4s

      Future對(duì)象

      未來(lái)對(duì)象?隨便叫啥吧。前面也看到過(guò),通過(guò)submit提交任務(wù)函數(shù)后,就會(huì)返回這么一個(gè)對(duì)象,可以通過(guò)它來(lái)監(jiān)控運(yùn)行我們?nèi)蝿?wù)函數(shù)的那個(gè)線程:判斷是否運(yùn)行完成、得到返回值等。

      從源碼中,可以看出,任務(wù)提交之后,先創(chuàng)建了一個(gè)Future類型對(duì)象f,然后將這個(gè)對(duì)象連同我們提交的任務(wù)參數(shù)一起委托給了_WorkItem,之后也返回一個(gè)對(duì)象w,將它加入隊(duì)列self._work_queue,這應(yīng)該就是內(nèi)部調(diào)度的邏輯了,最后返回f。

      _adjust_thread_count用來(lái)創(chuàng)建出我們需要的線程數(shù)量,并且target=worker,參數(shù)列表中有self._work_queue作為參數(shù)。_threads是用來(lái)存放線程的一個(gè)集合。我們提交的不同任務(wù)函數(shù)怎么變成了一個(gè)_worker函數(shù)呢?

      _worker的主要邏輯中,有一個(gè)循環(huán),從self._work_queue中不斷取出一個(gè)任務(wù),它是_WorkItem類型的,在submit函數(shù)中加入,然后調(diào)用它的run方法,隨后del work_item將它刪除。多個(gè)任務(wù)線程讀取的是同一個(gè)任務(wù)隊(duì)列,直到任務(wù)全部完成。

      在_WorkItem中,它的run方法調(diào)用了我們提交的任務(wù)函數(shù)fn,并且記錄了它的返回值,進(jìn)行了一些異常處理。

      shutdown

      前面都在講線程池的使用方法和工作原理,還有一個(gè)細(xì)節(jié)需要補(bǔ)充的就是executor的shutdown方法。

      它是用來(lái)關(guān)閉線程池,如果在關(guān)閉之后繼續(xù)向里面提交任務(wù),會(huì)拋出一個(gè)異常。調(diào)用之后,self._work_queue.put(None)往任務(wù)隊(duì)列中加入了一個(gè)標(biāo)記,當(dāng)線程調(diào)度時(shí)拿到這個(gè)標(biāo)志就知道任務(wù)結(jié)束了,這與在前面使用Queue進(jìn)行線程間通信的案例用了同樣的方式。

      有一個(gè)可選參數(shù)wait,如果它是True,則主線程在這里阻塞,等待所有線程完成任務(wù)。如果是False,主線程繼續(xù)向下執(zhí)行。

      可以使用with語(yǔ)句,當(dāng)所有的任務(wù)完成之后自動(dòng)調(diào)用shutdown,這里就不再多舉例子,官方文檔 給出了一個(gè)經(jīng)典的例子,以作參考。

      下篇:python入門系列 多進(jìn)程

        本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
        轉(zhuǎn)藏 分享 獻(xiàn)花(0

        0條評(píng)論

        發(fā)表

        請(qǐng)遵守用戶 評(píng)論公約

        類似文章 更多