乡下人产国偷v产偷v自拍,国产午夜片在线观看,婷婷成人亚洲综合国产麻豆,久久综合给合久久狠狠狠9

  • <output id="e9wm2"></output>
    <s id="e9wm2"><nobr id="e9wm2"><ins id="e9wm2"></ins></nobr></s>

    • 分享

      你真的懂線程嗎?史上最全Python線程解析

       立志德美 2019-04-29

      使用Python中的線程模塊,能夠同時運行程序的不同部分,并簡化設(shè)計。如果你已經(jīng)入門Python,并且想用線程來提升程序運行速度的話,那本文就是為你準備的!

      通過閱讀本文,你將了解到:

      · 什么是線程?

      · 如何創(chuàng)建、執(zhí)行線程?

      · 如何使用線程池ThreadPoolExecutor?

      · 如何避免資源競爭問題?

      · 如何使用Python中線程模塊threading提供的常用工具?

      目錄

      1. 什么是線程

      2. 創(chuàng)建線程

          2.1. 守護線程

          2.2. 加入線程

      3. 多線程

      4. 線程池

      5. 競態(tài)條件

          5.1. 單線程

          5.2. 兩個線程

          5.3. 示例的意義

      6. 同步鎖

      7. 死鎖

      8. 生產(chǎn)者-消費者模型中的線程

          8.1 在生產(chǎn)者-消費者模型中使用鎖

          8.2 在生產(chǎn)者-消費者模型中使用隊列

      9. 線程對象

          9.1 信號量

          9.2 定時器

          9.3 柵欄

      閱讀提醒:

      已掌握Python基本知識;

      使用Python 3.6以上版本運行。

      1. 什么是線程

      線程是操作系統(tǒng)能夠進行運算調(diào)度的最小單位,它被包含在進程之中,是進程中的實際運作單位。一條線程指的是進程中一個單一順序的控制流,一個進程中可以并發(fā)多個線程,每條線程并行執(zhí)行不同的任務(wù)。

      在Python3中實現(xiàn)的大部分運行任務(wù)里,不同的線程實際上并沒有同時運行:它們只是看起來像是同時運行的。

      大家很容易認為線程化是在程序上運行兩個(或多個)不同的處理器,每個處理器同時執(zhí)行一個獨立的任務(wù)。這種理解并不完全正確,線程可能會在不同的處理器上運行,但一次只能運行一個線程。

      同時執(zhí)行多個任務(wù)需要使用非標準的Python運行方式:用不同的語言編寫一部分代碼,或者使用多進程模塊multiprocessing,但這么做會帶來一些額外的開銷。

      由于Python默認的運行環(huán)境是CPython(C語言開發(fā)的Python),所以線程化可能不會提升所有任務(wù)的運行速度。這是因為和GIL(Global Interpreter Lock)的交互形成了限制:一次只能運行一個Python線程。

      線程化的一般替代方法是:讓各項任務(wù)花費大量時間等待外部事件。但問題是,如果想縮短等待時間,會需要大量的CPU計算,結(jié)果是程序的運行速度可能并不會提升。

      當代碼是用Python語言編寫并在默認執(zhí)行環(huán)境CPython上運行時,會出現(xiàn)這種情況。如果線程代碼是用C語言寫的,那它們就能夠釋放GIL并同時運行。如果是在別的Python執(zhí)行環(huán)境(如IPython, PyPy,Jython, IronPython)上運行,請參考相關(guān)文檔了解它們是如何處理線程的。

      如果只用Python語言在默認的Python執(zhí)行環(huán)境下運行,并且遇到CPU受限的問題,那就應(yīng)該用多進程模塊multiprocessing來解決。

      在程序中使用線程也可以簡化設(shè)計。本文中的大部分示例并不保證可以提升程序運行速度,其目的是使設(shè)計結(jié)構(gòu)更加清晰、便于邏輯推理。

      下面就來看看如何使用線程吧!

      2. 創(chuàng)建線程

      既然已經(jīng)對什么是線程有了初步了解,下面讓我們來學習如何創(chuàng)建一個線程。

      Python標準庫提供了threading模塊,里面包含將在本文中介紹的大部分基本模塊。在這個模塊中,Thread類很好地封裝了有關(guān)線程的子類,為我們提供了干凈的接口來使用它們。

      要啟動一個線程,需要創(chuàng)建一個Thread實例,然后調(diào)用.start()方法:

      import logging
      import threading
      import time


      def thread_function(name):
          logging.info('Thread %s: starting', name)
          time.sleep(2)
          logging.info('Thread %s: finishing', name)


      if __name__ == '__main__':
           format = '%(asctime)s: %(message)s'
           logging.basicConfig(format=format, level=logging.INFO,
                              datefmt='%H:%M:%S')

           logging.info('Main    : before creating thread')
           x = threading.Thread(target=thread_function, args=(1,))
           logging.info('Main    : before running thread')
           x.start()
           logging.info('Main    : wait for the thread to finish')
           # x.join()
           logging.info('Main    : all done')

      查看日志語句,可以看到__main__部分正在創(chuàng)建并啟動線程:

      x = threading.Thread(target=thread_function, args=(1,))
      x.start()

      創(chuàng)建線程時,我們需要傳遞兩個參數(shù),第一個參數(shù)target是函數(shù)名,指定這個線程去哪個函數(shù)里面去執(zhí)行代碼,第二個參數(shù)args是一個元組類型,指定為這個函數(shù)傳遞的參數(shù)。在本例中,Thread運行函數(shù)thread_function(),并將1作為參數(shù)傳遞給該函數(shù)。

      在本文中,我們用連續(xù)整數(shù)為線程命名。雖然threading.get_ident()方法能夠為每一個線程生成唯一的名稱,但這些名稱通常會比較長,而且可讀性差。

      這里的thread_function()函數(shù)本身沒做什么,它只是簡單地記錄了一些信息,并用time.sleep()隔開。

      運行程序(注釋掉倒數(shù)第二行代碼),結(jié)果如下:

      $ ./single_thread.py
      Main    : before creating thread
      Main    : before running thread
      Thread 1: starting
      Main    : wait for the thread to finish
      Main    : all done
      Thread 1: finishing

      可以看到,線程Thread__main__部分代碼運行完后才結(jié)束。下一節(jié)會對這一現(xiàn)象做出解釋,并討論被注釋掉那行代碼。

      2.1. 守護線程

      在計算機科學中,守護進程daemon是一類在后臺運行的特殊進程,用于執(zhí)行特定的系統(tǒng)任務(wù)。

      守護進程daemon在Python線程模塊threading中有著特殊的含義。當程序退出時,守護線程將立即關(guān)閉??梢赃@么理解,守護線程是一個在后臺運行,且不用費心去關(guān)閉它的線程,因為它會隨程序自動關(guān)閉。

      如果程序運行的線程是非守護線程,那么程序?qū)⒌却芯€程結(jié)束后再終止。但如果運行的是守護線程,當程序退出時,守護線程會被自動殺死。

      我們仔細研究一下上面程序運行的結(jié)果,注意看最后兩行。當運行程序時,在__main__部分打印完all done信息后、線程結(jié)束前,有一個大約2秒的停頓。

      這時,Python在等待非守護線程完成運行。當Python程序結(jié)束時,關(guān)閉過程的一部分是清理線程。

      查看Python線程模塊的源代碼,可以看到thread ._shutdown()方法遍歷所有正在運行的線程,并在每個非守護線程上調(diào)用.join()函數(shù),檢查它們是否已經(jīng)結(jié)束運行。

      因此,程序退出時需要等待,因為守護線程本身會在休眠中等待其他非守護線程運行結(jié)束。一旦thread ._shutdown()運行完畢并打印出信息,程序就可以退出。

      守護線程這種自動退出的特性很實用,但其實還有其他的方法能實現(xiàn)相同的功能。我們先用守護線程重復運行一下上面的程序,看看結(jié)果。只需在創(chuàng)建線程時,添加參數(shù)daemon=True。

      x = threading.Thread(target=thread_function, args=(1,), daemon=True)

      現(xiàn)在運行程序,結(jié)果如下:

      $ ./single_thread.py
      Main    : before creating thread
      Main    : before running thread
      Thread 1: starting
      Main    : wait for the thread to finish
      Main    : all done
      Thread 1: finishing

      添加參數(shù)daemon=True前

      $ ./daemon_thread.py
      Main    : before creating thread
      Main    : before running thread
      Thread 1: starting
      Main    : wait for the thread to finish
      Main    : all done

      添加參數(shù)daemon=True后

      不同的地方是,之前輸出的最后一行不見了,說明thread_function()函數(shù)沒有機會完成運行。這是一個守護線程,所以當__main__部分運行完最后一行代碼,程序終止,守護線程被殺死。

      2.2. 加入一個線程

      守護線程用起來很方便,但如果想讓守護線程運行完畢后再結(jié)束程序該怎么辦?或者想讓守護線程運行完后不退出程序呢?

      讓我們來看一下剛剛注釋掉的那行代碼:

      # x.join()

      要讓一個線程等待另一個線程完成,可以調(diào)用.join()函數(shù)。如果取消對這行代碼的注釋,主線程將會暫停,等待線程x完成運行。

      這個功能在守護線程和非守護線程上同樣適用。如果用.join()函數(shù)加入了一個線程,則主線程將一直等待,直到被加入的線程運行完成。

      3. 多線程

      到目前為止,示例代碼中只用到了兩個線程:主線程和一個threading.Thread線程對象。

      通常,我們希望同時啟動多個線程,讓它們執(zhí)行不同的任務(wù)。先來看看比較復雜的創(chuàng)建多線程的方法,然后再看簡單的。

      這個復雜的創(chuàng)建方法其實前面已經(jīng)展示過了:

      import logging
      import threading
      import time

      def thread_function(name):
          logging.info('Thread %s: starting', name)
          time.sleep(2)
          logging.info('Thread %s: finishing', name)

      if __name__ == '__main__':
          format = '%(asctime)s: %(message)s'
          logging.basicConfig(format=format, level=logging.INFO,
                              datefmt='%H:%M:%S')

          threads = list()
          for index in range(3):
              logging.info('Main    : create and start thread %d.', index)
              x = threading.Thread(target=thread_function, args=(index,))
              threads.append(x)
              x.start()

          for index, thread in enumerate(threads):
              logging.info('Main    : before joining thread %d.', index)
              thread.join()
              logging.info('Main    : thread %d done', index)

      這段代碼和前面提到的創(chuàng)建單線程時的結(jié)構(gòu)是一樣的,創(chuàng)建線程對象,然后調(diào)用.start()方法。程序中會保存一個包含多個線程對象的列表,為稍后使用.join()函數(shù)做準備。

      多次運行這段代碼可能會產(chǎn)生一些有趣的結(jié)果:

      Main    : create and start thread 0.
      Thread 0: starting
      Main    : create and start thread 1.
      Thread 1: starting
      Main    : create and start thread 2.
      Thread 2: starting
      Main    : before joining thread 0.
      Thread 2: finishing
      Thread 1: finishing
      Thread 0: finishing
      Main    : thread 0 done
      Main    : before joining thread 1.
      Main    : thread 1 done
      Main    : before joining thread 2.
      Main    : thread 2 done

      仔細看一下輸出結(jié)果,三個線程都按照預想的順序創(chuàng)建0,1,2,但它們的結(jié)束順序卻是相反的!多次運行將會生成不同的順序。查看線程Thread x: finish中的信息,可以知道每個線程都在何時完成。

      線程的運行順序是由操作系統(tǒng)決定的,并且很難預測。很有可能每次運行所得到的順序都不一樣,所以在用線程設(shè)計算法時需要注意這一點。

      幸運的是,Python中提供了幾個基礎(chǔ)模塊,可以用來協(xié)調(diào)線程并讓它們一起運行。在介紹這部分內(nèi)容之前,讓我們先看看如何更簡單地創(chuàng)建一組線程。

      4. 線程池

      我們可以用一種更簡單的方法來創(chuàng)建一組線程:線程池ThreadPoolExecutor,它是Python中concurrent.futures標準庫的一部分。(Python 3.2 以上版本適用)。

      最簡單的方式是把它創(chuàng)建成上下文管理器,并使用with語句管理線程池的創(chuàng)建和銷毀。

      ThreadPoolExecutor重寫上例中的__main__部分,代碼如下:

      import concurrent.futures

      # [rest of code]

      if __name__ == '__main__':
          format = '%(asctime)s: %(message)s'
          logging.basicConfig(format=format, level=logging.INFO,
                              datefmt='%H:%M:%S')

          with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
              executor.map(thread_function, range(3))

      這段代碼創(chuàng)建一個線程池ThreadPoolExecutor作為上下文管理器,并傳入需要的工作線程數(shù)量。然后使用.map()遍歷可迭代對象,本例中是range(3),每個對象生成池中的一個線程。

      with模塊的結(jié)尾,會讓線程池ThreadPoolExecutor對池中的每個線程調(diào)用.join()。強烈建議使用線程池ThreadPoolExecutor作為上下文管理器,因為這樣就不會忘記寫.join()。

      注:

      使用線程池ThreadPoolExecutor可能會報一些奇怪的錯誤。例如,調(diào)用一個沒有參數(shù)的函數(shù),但將參數(shù)傳入.map()時,線程將拋出異常。

      不幸的是,線程池ThreadPoolExecutor會隱藏該異常,程序會在沒有任何輸出的情況下終止。剛開始調(diào)試時,這會讓人很頭疼。

      運行修改后的示例代碼,結(jié)果如下:

      $ ./executor.py
      Thread 0: starting
      Thread 1: starting
      Thread 2: starting
      Thread 1: finishing
      Thread 0: finishing
      Thread 2: finishing

      再提醒一下,這里的線程1在線程0之前完成,這是因為線程的調(diào)度是由操作系統(tǒng)決定的,并不遵循一個特定的順序。

      5. 競態(tài)條件

      在繼續(xù)介紹Python線程模塊的一些其他特性之前,讓我們先討論一下在編寫線程化程序時會遇到的一個更頭疼的問題: 競態(tài)條件。

      我們先了解一下競態(tài)條件的含義,然后看一個實例,再繼續(xù)學習標準庫提供的其他模塊,來防止競態(tài)條件的發(fā)生。

      當兩個或多個線程訪問共享的數(shù)據(jù)或資源時,可能會出現(xiàn)競態(tài)條件。在本例中,我們創(chuàng)建了一個每次都會發(fā)生的大型競態(tài)條件,但請注意,大多數(shù)競態(tài)條件不會如此頻繁發(fā)生。通常情況下,它們很少發(fā)生,但一旦發(fā)生,會很難進行調(diào)試。

      在本例中,我們會寫一個更新數(shù)據(jù)庫的類,但這里并不需要一個真正的數(shù)據(jù)庫,只是一個虛擬的,因為這不是本文討論的重點。

      這個FakeDatabase類包括.__init__().update()方法。

      class FakeDatabase:
          def __init__(self):
              self.value = 0

          def update(self, name):
              logging.info('Thread %s: starting update', name)
              local_copy = self.value
              local_copy += 1
              time.sleep(0.1)
              self.value = local_copy
              logging.info('Thread %s: finishing update', name)

      FakeDatabase類會一直跟蹤一個值: .value,它是共享數(shù)據(jù),這里會出現(xiàn)競態(tài)條件。

      .__init__()方法將.value的值初始化為0。.update()方法從數(shù)據(jù)庫中讀取一個值,對其進行一些計算,然后將新值寫回數(shù)據(jù)庫。

      FakeDatabase類的使用實例如下:

      if __name__ == '__main__':
          format = '%(asctime)s: %(message)s'
          logging.basicConfig(format=format, level=logging.INFO,
                              datefmt='%H:%M:%S')

          database = FakeDatabase()
          logging.info('Testing update. Starting value is %d.', database.value)
          with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
              for index in range(2):
                  executor.submit(database.update, index)
          logging.info('Testing update. Ending value is %d.', database.value)

      該程序創(chuàng)建一個線程池ThreadPoolExecutor,里面包含兩個線程,然后在每個線程上調(diào)用.submit()方法,告訴它們運行database.update()函數(shù)。

      .submit()允許將位置參數(shù)和關(guān)鍵字參數(shù)傳遞給正在線程中運行的函數(shù):

      .submit(function, *args, **kwargs)

      示例代碼中,index作為唯一一個位置參數(shù)傳遞給database.update()函數(shù),后面會介紹,也可以用類似的方式傳遞多個參數(shù)。

      由于每個線程都會運行.update(), 讓.value的變量值加上1,所以最后打印出的database.value值應(yīng)該是2。但如果是這樣的話,舉這個例子就沒有什么意義了。

      實際上,運行上面這段代碼的輸出如下:

      $ ./racecond.py
      Testing unlocked update. Starting value is 0.
      Thread 0: starting update
      Thread 1: starting update
      Thread 0: finishing update
      Thread 1: finishing update
      Testing unlocked update. Ending value is 1.

      我們來仔細研究一下這里究竟發(fā)生了什么,有助于更好地理解有關(guān)這個問題的解決方案。

      5.1. 單線程

      在深入研究上面有關(guān)兩個線程的問題之前,我們先回過頭看一下線程到底是如何工作的。

      這里不會討論所有的細節(jié),因為在目前這個學習階段還沒必要掌握這么多內(nèi)容。我們還將簡化一些東西,雖然可能在技術(shù)上不夠精確,但可以方便大家理解其中的原理。

      當線程池ThreadPoolExecutor運行每個線程時,我們會指定運行哪個函數(shù),以及傳遞給該函數(shù)的參數(shù):executor.submit(database.update, index),這里是指運行database.update函數(shù),并傳入index參數(shù)。

      這么做的結(jié)果是,線程池中的每個線程都將調(diào)用database.update(index)。注意,主線程__main__中創(chuàng)建的database是對FakeDatabase對象的引用。在這個對象上調(diào)用.update(),會調(diào)用該對象的實例方法。

      每個線程都將引用同一個FakeDatabase對象:database。每個線程還有一個獨特的index值,使得日志語句更易閱讀:

      當線程開始運行.update()函數(shù)時,它擁有局部變量local_copy。這絕對是一件好事,否則,運行相同函數(shù)的兩個線程總是會相互混淆。也就是說,函數(shù)內(nèi)定義的局部變量是線程安全的。

      現(xiàn)在我們可以看一下,如果使用單線程、調(diào)用一次.update()函數(shù)運行上面的程序會發(fā)生什么。

      下圖展示了在只運行一個線程的情況下,.update()函數(shù)是如何逐步執(zhí)行的。代碼顯示在左上角,后面跟著一張圖,顯示線程中局部變量local_value和共享數(shù)據(jù)database.value的值:

      這張圖是這樣布局的,從上至下時間增加,它以創(chuàng)建線程1開始,并在線程1終止時結(jié)束。

      線程1啟動時,FakeDatabase.value的值為0。第一行代碼將值0復制給局部變量local_copy。接下來,local_copy += 1語句讓local_copy的值增加1,可以看到線程1中的.value值變成了1。

      然后調(diào)用time.sleep()方法,暫停當前線程,并允許其他線程運行。因為本例中只有一個線程,這里沒什么影響。

      當線程1被喚醒繼續(xù)運行時,它將新值從局部變量local_copy復制到FakeDatabase.value,線程完成運行??梢钥吹?strong>database.value的值被設(shè)為1。

      到目前為止,一切順利。我們運行了一次.update()函數(shù),FakeDatabase.value值增加到1。

      5.2. 兩個線程

      回到競態(tài)條件,這兩個線程會并發(fā)運行,但不會同時運行。它們都有各自的局部變量local_copy,并指向相同的database對象。正是database這個共享數(shù)據(jù)導致了這些問題。

      程序創(chuàng)建線程1,運行update()函數(shù):

      當線程1調(diào)用time.sleep()方法時,它允許另一個線程開始運行。這時,線程2啟動并執(zhí)行相同的操作。它也將database.value的值復制給私有變量local_copy,但共享數(shù)據(jù)database.value的值還未更新,仍為0:

      當線程2進入休眠狀態(tài)時,共享數(shù)據(jù)database.value的值還是未被修改的0,而且兩個線程中的私有變量local_copy的值都是1。

      現(xiàn)在線程1被喚醒并保存其私有變量local_copy的值,然后終止,線程2繼續(xù)運行。線程2在休眠的時候并不知道線程1已經(jīng)運行完畢并更新了database.value中的值,當繼續(xù)運行時, 它將自己私有變量local_copy的值存儲到database.value中,也是1。

      這兩個線程交錯訪問同一個共享對象,覆蓋了彼此的結(jié)果。當一個線程釋放內(nèi)存或在另一個線程完成訪問之前關(guān)閉文件句柄時,可能會出現(xiàn)類似的競爭條件。

      5.3. 示例的意義

      上面的例子是為了確保每次運行程序時都發(fā)生競態(tài)條件。因為操作系統(tǒng)可以在任何時候交換出一個線程,所以有可能在讀取了x的值之后,像x = x + 1這樣的語句會中斷,導致寫回數(shù)據(jù)庫的值不是我們想要的。

      這一過程中的細節(jié)非常有趣,但本文剩下部分的學習不需要了解具體細節(jié),所以可以先跳過。

      看完有關(guān)競態(tài)條件的實例,讓我們接下來看看如何解決它們!

      6. 同步鎖

      有很多方法可以避免或解決競態(tài)條件,這里不會介紹所有的解決方法,但會提到一些會經(jīng)常用到的。讓我們先從鎖Lock開始學習。

      要解決上述競態(tài)條件問題,需要找到一種方法,每次只允許一個線程進入代碼的read-modify-write部分。最常用就是Python中的鎖。在一些其他語言中,同樣的思想被稱為互斥鎖mutex?;コ怄imutex屬于進程互斥MUTual EXclusion的一部分,它和鎖所做的工作是一樣的。

      鎖是一種類似于通行證的東西,每次只有一個線程可以擁有鎖,任何其他想要獲得鎖的線程必須等待,直到該鎖的所有者將它釋放出來。

      完成此任務(wù)的基本函數(shù)是.acquire().release()。線程將調(diào)用my_lock.acquire()來獲取鎖。如果鎖已經(jīng)存在,則調(diào)用線程將會等待,直到鎖被釋放。這里有一點很重要,如果一個線程獲得了鎖,但從未釋放,程序會被卡住。稍后會介紹更多關(guān)于這方面的內(nèi)容。

      幸運的是,Python的鎖也將作為上下文管理器運行,所以可以在with語句中使用它,并且當with模塊出于任何原因退出時,鎖會自動釋放。

      讓我們看看添加了鎖的FakeDatabase,調(diào)用函數(shù)保持不變:

      class FakeDatabase:
          def __init__(self):
              self.value = 0
              self._lock = threading.Lock()

          def locked_update(self, name):
              logging.info('Thread %s: starting update', name)
              logging.debug('Thread %s about to lock', name)
              with self._lock:
                  logging.debug('Thread %s has lock', name)
                  local_copy = self.value
                  local_copy += 1
                  time.sleep(0.1)
                  self.value = local_copy
                  logging.debug('Thread %s about to release lock', name)
              logging.debug('Thread %s after release', name)
              logging.info('Thread %s: finishing update', name)

      除了添加一些調(diào)試日志以便更清楚地查看鎖的運行之外,這里最大的變化是添加了一個叫._lock的成員,它是一個thread . lock()對象。這個._lock在未鎖定狀態(tài)下被初始化,并由with語句鎖定和釋放。

      值得注意的是,運行該函數(shù)的線程將一直持有這個鎖,直到它完全更新完數(shù)據(jù)庫。在本例中,這意味著它將在復制、更新、休眠并將值寫回數(shù)據(jù)庫的整個過程中持有鎖。

      日志設(shè)置為警告級別,運行程序,結(jié)果如下:

      $ ./fixrace.py
      Testing locked update. Starting value is 0.
      Thread 0: starting update
      Thread 1: starting update
      Thread 0: finishing update
      Thread 1: finishing update
      Testing locked update. Ending value is 2.

      在主線程__main__中配置完日志輸出后,將日志級別設(shè)置為DEBUG可以打開完整的日志:

      logging.getLogger().setLevel(logging.DEBUG)

      用調(diào)試日志運行程序的結(jié)果如下:

      $ ./fixrace.py
      Testing locked update. Starting value is 0.
      Thread 0: starting update
      Thread 0 about to lock
      Thread 0 has lock
      Thread 1: starting update
      Thread 1 about to lock
      Thread 0 about to release lock
      Thread 0 after release
      Thread 0: finishing update
      Thread 1 has lock
      Thread 1 about to release lock
      Thread 1 after release
      Thread 1: finishing update
      Testing locked update. Ending value is 2.

      線程0獲得鎖,并且在它進入睡眠狀態(tài)時仍然持有鎖。然后線程1啟動并嘗試獲取同一個鎖,因為線程0仍然持有它,線程1就必須等待。這就是互斥鎖。

      本文其余部分的許多示例都有警告和調(diào)試級別的日志記錄。我們通常只顯示警告級別的輸出,因為調(diào)試日志可能非常長。

      7. 死鎖

      在繼續(xù)學習之前,我們先看一下使用鎖時會出現(xiàn)的常見問題。在上例中,如果鎖已經(jīng)被某個線程獲取,那么第二次調(diào)用.acquire()時將一直等待,直到持有鎖的線程調(diào)用.release()將鎖釋放。

      思考一下,運行下面這段代碼會得到什么結(jié)果:

      import threading

      l = threading.Lock()
      print('before first acquire')
      l.acquire()
      print('before second acquire')
      l.acquire()
      print('acquired lock twice')

      當程序第二次調(diào)用l.acquire()時,它需要等待鎖被釋放。在本例中,可以刪除第二次調(diào)用修復死鎖,但是死鎖通常在以下兩種情況下會發(fā)生:

      ① 鎖沒有被正確釋放時會產(chǎn)生運行錯誤;

      ② 在一個實用程序函數(shù)需要被其他函數(shù)調(diào)用的地方會出現(xiàn)設(shè)計問題,這些函數(shù)可能已經(jīng)擁有或者沒有鎖。

      第一種情況有時會發(fā)生,但是使用鎖作為上下文管理器可以大大減少這種情況發(fā)生的頻率。建議充分利用上下文管理器來編寫代碼,因為它們有助于避免出現(xiàn)異常跳過.release()調(diào)用的情況。

      在某些語言中,設(shè)計問題可能有點棘手。慶幸的是,Python的線程模塊還提供了另一個鎖對象RLock。它允許線程在調(diào)用.release()之前多次獲取.acquire()鎖,且程序不會阻塞。該線程仍需要保證.release().acquire()的調(diào)用次數(shù)相同,但它是用了另一種方式而已。

      LockRLock是線程化編程中用來防止競爭條件的兩個基本工具,還有一些其他的工具。在研究它們之前,我們先轉(zhuǎn)移到一個稍微不同的領(lǐng)域。

      8. 生產(chǎn)者-消費者模型中的線程

      生產(chǎn)者-消費者模型是一個標準的計算機科學領(lǐng)域的問題,用于解決線程同步或進程同步。我們先介紹一個它的變形,大致了解一下Python中的線程模塊提供了哪些基礎(chǔ)模塊。

      本例中,假設(shè)需要寫一個從網(wǎng)絡(luò)讀取消息并將其寫入磁盤的程序。該程序不會主動請求消息,它必須在消息傳入時偵聽并接受它們。而且這些消息不會以固定的速度傳入,而是以突發(fā)的方式傳入。這一部分程序叫做生產(chǎn)者。

      另一方面,一旦傳入了消息,就需要將其寫入數(shù)據(jù)庫。數(shù)據(jù)庫訪問很慢,但訪問速度足以跟上消息傳入的平均速度。但當大量消息同時傳入時,速度會跟不上。這部分程序叫消費者。

      在生產(chǎn)者和消費者之間,需要創(chuàng)建一個管道Pipeline,隨著對不同同步對象的深入了解,我們需要對管道里面的內(nèi)容進行修改。

      這就是基本的框架。讓我們看看使用Lock的解決方案。雖然它并不是最佳的解決方法,但它運用的是前面已經(jīng)介紹過的工具,所以比較容易理解。

      8.1. 在生產(chǎn)者-消費者模型中使用鎖

      既然這是一篇關(guān)于Python線程的文章,而且剛剛已經(jīng)閱讀了有關(guān)鎖的內(nèi)容,所以讓我們嘗試用鎖解決競態(tài)條件問題。

      先寫一個生產(chǎn)者線程,從虛擬網(wǎng)絡(luò)中讀取消息并放入管道中:

      SENTINEL = object()

      def producer(pipeline):
          '''Pretend we're getting a message from the network.'''
          for index in range(10):
              message = random.randint(1, 101)
              logging.info('Producer got message: %s', message)
              pipeline.set_message(message, 'Producer')

          # Send a sentinel message to tell consumer we're done
          pipeline.set_message(SENTINEL, 'Producer')

      生產(chǎn)者獲得一個介于1到100之間的隨機數(shù),作為生成的虛擬消息。它調(diào)用管道上的.set_message()方法將其發(fā)送給消費者。

      生產(chǎn)者還用一個SENTINEL值來警告消費者,在它發(fā)送10個值之后停止。這有點奇怪,但不必擔心,在完成本示例后,會介紹如何去掉這個SENTINEL值。

      管道pipeline的另一端是消費者:

      def consumer(pipeline):
          '''Pretend we're saving a number in the database.'''
          message = 0
          while message is not SENTINEL:
              message = pipeline.get_message('Consumer')
              if message is not SENTINEL:
                  logging.info('Consumer storing message: %s', message)

      消費者從管道中讀取一條消息并將其寫入虛擬數(shù)據(jù)庫,在本例中,只是將其儲存到磁盤中。如果消費者獲取了SENTINEL值,線程會終止。

      在研究管道Pipeline之前,先看一下生成這些線程的主線程__main__部分:

      if __name__ == '__main__':
          format = '%(asctime)s: %(message)s'
          logging.basicConfig(format=format, level=logging.INFO,
                              datefmt='%H:%M:%S')
          # logging.getLogger().setLevel(logging.DEBUG)

          pipeline = Pipeline()
          with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
              executor.submit(producer, pipeline)
              executor.submit(consumer, pipeline)

      看起來應(yīng)該很熟悉,因為它和前面示例中介紹過的__main__部分類似。

      注意,打開調(diào)試日志可以查看所有的日志消息,方法是取消對這一行的注釋:

      # logging.getLogger().setLevel(logging.DEBUG)

      我們有必要遍歷調(diào)試日志消息,來查看每個線程是在何處獲得和釋放鎖的。

      現(xiàn)在讓我們看一下將消息從生產(chǎn)者傳遞給消費者的管道Pipeline:

      class Pipeline:
          '''
          Class to allow a single element pipeline between producer and consumer.
          '''
          def __init__(self):
              self.message = 0
              self.producer_lock = threading.Lock()
              self.consumer_lock = threading.Lock()
              self.consumer_lock.acquire()

          def get_message(self, name):
              logging.debug('%s:about to acquire getlock', name)
              self.consumer_lock.acquire()
              logging.debug('%s:have getlock', name)
              message = self.message
              logging.debug('%s:about to release setlock', name)
              self.producer_lock.release()
              logging.debug('%s:setlock released', name)
              return message

          def set_message(self, message, name):
              logging.debug('%s:about to acquire setlock', name)
              self.producer_lock.acquire()
              logging.debug('%s:have setlock', name)
              self.message = message
              logging.debug('%s:about to release getlock', name)
              self.consumer_lock.release()
              logging.debug('%s:getlock released', name)

      好長一段代碼!別害怕,大部分是日志語句,刪除所有日志語句后的代碼如下:

      class Pipeline:
          '''
          Class to allow a single element pipeline between producer and consumer.
          '''
          def __init__(self):
              self.message = 0
              self.producer_lock = threading.Lock()
              self.consumer_lock = threading.Lock()
              self.consumer_lock.acquire()

          def get_message(self, name):
              self.consumer_lock.acquire()
              message = self.message
              self.producer_lock.release()
              return message

          def set_message(self, message, name):
              self.producer_lock.acquire()
              self.message = message
              self.consumer_lock.release()

      這樣看起來更清晰,管道類中有三個成員:

      .message存儲要傳遞的消息;

      .producer_lock是一個線程鎖對象,限制生產(chǎn)者線程對消息的訪問;

      .consumer_lock也是一個線程鎖,限制消費者線程對消息的訪問。

      __init__() 初始化這三個成員,然后在.consumer_lock上調(diào)用.acquire(),消費者獲得鎖。生產(chǎn)者可以添加新消息,但消費者需要等待消息出現(xiàn)。

      get_message().set_messages()幾乎是相反的操作。.get_message()consumer_lock上調(diào)用.acquire(),這么做的目的是讓消費者等待,直到有消息傳入。

      一旦消費者獲得了鎖.consumer_lock,它會將self.message的值復制給.message,然后在.producer_lock上調(diào)用.release()。釋放此鎖允許生產(chǎn)者在管道中插入下一條消息。

      .get_message()函數(shù)中有一些細節(jié)很容易被忽略。大家思考一下,為什么不把message變量刪掉,直接返回self.message的值呢?

      答案如下。

      只要消費者調(diào)用.producer_lock.release(),它就被交換出去,生產(chǎn)者開始運行,這可能發(fā)生在鎖被完全釋放之前!也就是說,存在一種微小的可能性,當函數(shù)返回self.message時,這個值是生產(chǎn)者生成的下一條消息,導致第一條消息丟失。這是競態(tài)條件的另一個例子。

      我們繼續(xù)看事務(wù)的另一端:.set_message()。生產(chǎn)者通過傳入一條消息來調(diào)用該函數(shù),獲得鎖.producer_lock,傳入.message值,然后調(diào)用consumer_lock.release()釋放鎖,這將允許消費者讀取該值。

      運行代碼,日志設(shè)置為警告級別,結(jié)果如下:

      $ ./prodcom_lock.py
      Producer got data 43
      Producer got data 45
      Consumer storing data: 43
      Producer got data 86
      Consumer storing data: 45
      Producer got data 40
      Consumer storing data: 86
      Producer got data 62
      Consumer storing data: 40
      Producer got data 15
      Consumer storing data: 62
      Producer got data 16
      Consumer storing data: 15
      Producer got data 61
      Consumer storing data: 16
      Producer got data 73
      Consumer storing data: 61
      Producer got data 22
      Consumer storing data: 73
      Consumer storing data: 22

      大家可能會覺得奇怪,生產(chǎn)者在消費者還沒運行之前就獲得了兩條消息。回過頭仔細看一下生產(chǎn)者和.set_message()函數(shù),生產(chǎn)者先獲取消息,打印出日志語句,然后試圖將消息放入管道中,這時才需要等待鎖。

      當生產(chǎn)者試圖傳入第二條消息時,它會第二次調(diào)用.set_message(),發(fā)生阻塞。

      操作系統(tǒng)可以在任何時候交換線程,但它通常會允許每個線程在交換之前有一段合理的運行時間。這就是為什么生產(chǎn)者會一直運行,直到第二次調(diào)用.set_message()時被阻塞。

      一旦線程被阻塞,操作系統(tǒng)總是會把它交換出去,并找到另一個線程去運行。在本例中,就是消費者線程。

      消費者調(diào)用.get_message()函數(shù),它讀取消息并在.producer_lock上調(diào)用.release()方法,釋放鎖,允許生產(chǎn)者再次運行。

      注意,第一個值是43,正是消費者所讀取的值,雖然生產(chǎn)者已經(jīng)生成了新值45。

      盡管使用鎖的這種方法適用于本例,但對于常見的生產(chǎn)者-消費者模式問題,這不是一個很好的解決方法,因為它一次只允許管道中有一個值。當生產(chǎn)者收到大量值時,將無處安放。

      讓我們繼續(xù)看一個更好的解決方法:使用隊列Queue.

      8.2. 在生產(chǎn)者-消費者模型中使用隊列

      如果想在管道中一次處理多個值,我們需要為管道提供一個數(shù)據(jù)結(jié)構(gòu),當從生產(chǎn)者線程備份數(shù)據(jù)時,該結(jié)構(gòu)允許管道中的數(shù)據(jù)量靈活變動,不再是單一值。

      Python標準庫中有一個模塊叫隊列queue,里面有一個類叫Queue。讓我們用隊列Queue改寫一下上面受鎖保護的管道。

      此外,我們還會介紹另一種停止工作線程的方法,使用Python線程模塊中的事件Event對象。

      事件的觸發(fā)機制可以是多種多樣的。在本例中,主線程只是休眠一段時間,然后調(diào)用event.set()方法,通知所有處于等待阻塞狀態(tài)的線程恢復運行狀態(tài):

      1 if __name__ == '__main__':
      2     format = '%(asctime)s: %(message)s'
      3     logging.basicConfig(format=format, level=logging.INFO,
      4                          datefmt='%H:%M:%S')
      5     # logging.getLogger().setLevel(logging.DEBUG)
      6
      7     pipeline = Pipeline()
      8     event = threading.Event()
      9     with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
      10        executor.submit(producer, pipeline, event)
      11        executor.submit(consumer, pipeline, event)
      12
      13        time.sleep(0.1)
      14        logging.info('Main: about to set event')
      15        event.set()

      這里惟一的變化是在第8行創(chuàng)建了事件對象event,在第10行和第11行傳遞了event參數(shù),代碼的最后一個部分13-15行,先休眠0.1秒,記錄一條消息,然后在事件上調(diào)用.set()方法。

      生產(chǎn)者也不用變太多:

      def producer(pipeline, event):
          '''Pretend we're getting a number from the network.'''
          while not event.is_set():
          message = random.randint(1, 101)
          logging.info('Producer got message: %s', message)
          pipeline.set_message(message, 'Producer')

          logging.info('Producer received EXIT event. Exiting')

      在第3行循環(huán)部分設(shè)置了事件,而且也不用再把SENTINEL值放入管道中。

      消費者的變化稍多:

      def consumer(pipeline, event):
          '''Pretend we're saving a number in the database.'''
          while not event.is_set() or not pipeline.empty():
           message = pipeline.get_message('Consumer')
           logging.info(
           'Consumer storing message: %s  (queue size=%s)',
           message,
           pipeline.qsize(),
              )

          logging.info('Consumer received EXIT event. Exiting')

      除了需要刪掉和SENTINEL值相關(guān)的代碼,還要執(zhí)行稍微復雜一點的循環(huán)條件。它會一直循環(huán),直到事件結(jié)束,管道中的數(shù)據(jù)被清空。

      一定要確保當消費者退出時,隊列是空的。如果消費者在管道包含消息時退出,可能會出現(xiàn)兩個問題。一是會丟失那部分數(shù)據(jù),但更嚴重的是生產(chǎn)者會被鎖住。

      在生產(chǎn)者檢查.is_set()條件后、但在調(diào)用pipeline.set_message()前觸發(fā)事件,則會發(fā)生這種情況。

      一旦發(fā)生這種情況,生產(chǎn)者可能被喚醒并退出,但此時鎖仍被消費者持有。然后,生產(chǎn)者將嘗試用.acquire()方法獲取鎖,但是消費者已經(jīng)退出,而且永遠不會釋放鎖,所以生產(chǎn)者就會一直等下去。

      消費者的其余部分看起來應(yīng)該很熟悉。

      管道類的寫法變化最大:

      class Pipeline(queue.Queue):
          def __init__(self):
              super().__init__(maxsize=10)

          def get_message(self, name):
              logging.debug('%s:about to get from queue', name)
              value = self.get()
              logging.debug('%s:got %d from queue', name, value)
              return value

          def set_message(self, value, name):
              logging.debug('%s:about to add %d to queue', name, value)
              self.put(value)
              logging.debug('%s:added %d to queue', name, value)

      Pipelinequeue.Queue的一個子類。Queue隊列里面有一個可選參數(shù),在初始化時指定隊列所能容納的最大數(shù)據(jù)量。

      .get_message().set_message()變得更簡短,被隊列中的.get().put()方法替代。

      大家可能想知道,防止競爭條件的代碼都跑哪里去了?

      編寫標準庫的核心開發(fā)人員知道,在多線程環(huán)境中經(jīng)常使用隊列Queue,因此將所有鎖定代碼合并到了隊列Queue模塊內(nèi)部。隊列Queue本身就是線程安全的。

      程序運行結(jié)果如下:

      $ ./prodcom_queue.py
      Producer got message: 32
      Producer got message: 51
      Producer got message: 25
      Producer got message: 94
      Producer got message: 29
      Consumer storing message: 32 (queue size=3)
      Producer got message: 96
      Consumer storing message: 51 (queue size=3)
      Producer got message: 6
      Consumer storing message: 25 (queue size=3)
      Producer got message: 31

      [many lines deleted]

      Producer got message: 80
      Consumer storing message: 94 (queue size=6)
      Producer got message: 33
      Consumer storing message: 20 (queue size=6)
      Producer got message: 48
      Consumer storing message: 31 (queue size=6)
      Producer got message: 52
      Consumer storing message: 98 (queue size=6)
      Main: about to set event
      Producer got message: 13
      Consumer storing message: 59 (queue size=6)
      Producer received EXIT event. Exiting
      Consumer storing message: 75 (queue size=6)
      Consumer storing message: 97 (queue size=5)
      Consumer storing message: 80 (queue size=4)
      Consumer storing message: 33 (queue size=3)
      Consumer storing message: 48 (queue size=2)
      Consumer storing message: 52 (queue size=1)
      Consumer storing message: 13 (queue size=0)
      Consumer received EXIT event. Exiting

      生產(chǎn)者創(chuàng)建了5條消息,并將其中4條放到隊列中。但在放置第5條消息之前,它被操作系統(tǒng)交換出去了。

      然后消費者開始運行并儲存第1條消息,打印出該消息和隊列大?。?/p>

      Consumer storing message: 32 (queue size=3)

      這就是為什么第5條消息沒有成功進入管道。刪除一條消息后,隊列的大小縮減到3個。因為隊列最多可以容納10條消息,所以生產(chǎn)者線程沒有被隊列阻塞,而是被操作系統(tǒng)交換出去了。

      注意:每次運行所得到的結(jié)果會不同。這就是使用線程的樂趣所在!

      當程序開始結(jié)束時,主線程觸發(fā)事件,生產(chǎn)者立即退出。但消費者仍有很多工作要做,所以它會繼續(xù)運行,直到清理完管道中的數(shù)據(jù)為止。

      嘗試修改生產(chǎn)者或消費者中的隊列大小和time.sleep()中的休眠時間,來分別模擬更長的網(wǎng)絡(luò)或磁盤訪問時間。即使是輕微的更改,也會對結(jié)果產(chǎn)生很大的影響。

      對于生產(chǎn)者-消費者模型,這是一個更好的解決方法,但其實可以進一步簡化。去掉管道Pipeline和日志語句,就只剩下和queue.Queue相關(guān)的語句了。

      直接使用queue.Queue的最終代碼如下:

      import concurrent.futures
      import logging
      import queue
      import random
      import threading
      import time

      def producer(queue, event):
          '''Pretend we're getting a number from the network.'''
          while not event.is_set():
              message = random.randint(1, 101)
              logging.info('Producer got message: %s', message)
              queue.put(message)

          logging.info('Producer received event. Exiting')

      def consumer(queue, event):
          '''Pretend we're saving a number in the database.'''
          while not event.is_set() or not queue.empty():
              message = queue.get()
              logging.info(
                  'Consumer storing message: %s (size=%d)', message, queue.qsize()
              )

          logging.info('Consumer received event. Exiting')

      if __name__ == '__main__':
          format = '%(asctime)s: %(message)s'
          logging.basicConfig(format=format, level=logging.INFO,
                              datefmt='%H:%M:%S')

          pipeline = queue.Queue(maxsize=10)
          event = threading.Event()
          with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
              executor.submit(producer, pipeline, event)
              executor.submit(consumer, pipeline, event)

              time.sleep(0.1)
              logging.info('Main: about to set event')
              event.set()

      可以看到,使用Python的內(nèi)置基礎(chǔ)模塊能夠簡化復雜的問題,讓代碼閱讀起來更清晰。

      Lock和隊列Queue是解決并發(fā)問題非常方便的兩個類,但其實標準庫還提供了其他類。在結(jié)束本教程之前,讓我們快速瀏覽一下還有哪些類。

      9. 線程對象

      Python的線程threading模塊還有其他一些基本類型。雖然在上面的例子中沒有用到,但它們會在不同的情況下派上用場,所以熟悉一下還是很好處的。

      9.1 信號量

      首先要介紹的是信號量thread.semaphore,信號量是具有一些特殊屬性的計數(shù)器。

      第一個屬性是計數(shù)的原子性,可以保證操作系統(tǒng)不會在計數(shù)器遞增或遞減的過程中交換線程。

      內(nèi)部計數(shù)器在調(diào)用.release()時遞增,在調(diào)用.acquire()時遞減。

      另一個特殊屬性是,如果線程在計數(shù)器為0時調(diào)用.acquire(),那么該線程將阻塞,直到另一個線程調(diào)用.release()并將計數(shù)器的值增加到1。

      信號量通常用于保護容量有限的資源。例如,我們有一個連接池,并且希望限制該連接池中的元素數(shù)量,就可以用信號量來進行管理。

      9.2 定時器

      threading.Timer是一個定時器功能的類,指定函數(shù)在間隔特定時間后執(zhí)行任務(wù)。我們可以通過傳入需要等待的時間和函數(shù)來創(chuàng)建一個定時器:

      t = threading.Timer(30.0, my_function)

      調(diào)用.start()啟動定時器,函數(shù)將在指定時間過后的某個時間點上被新線程調(diào)用。但請注意,這里并不能保證函數(shù)會在我們所期望的確切時間被調(diào)用,可能會存在誤差。  

      如果想要停止已經(jīng)啟動的定時器,可以調(diào)用.cancel()。在定時器觸發(fā)后調(diào)用.cancel()不會執(zhí)行任何操作,也不會產(chǎn)生異常。

      定時器可用于在特定時間之后提示用戶執(zhí)行操作。如果用戶在定時器過時之前執(zhí)行了操作,可以調(diào)用.cancel()取消定時。

      9.3 柵欄

      threading模塊中的柵欄Barrier可以用來指定需要同步運行的線程數(shù)量。創(chuàng)建柵欄Barrier時,我們必須指定所需同步的線程數(shù)。每個線程都會在Barrier上調(diào)用.wait()方法,它們會先保持阻塞狀態(tài),直到等待的線程數(shù)量達到指定值時,會被同時釋放。

      注意,線程是由操作系統(tǒng)調(diào)度的,因此,即使所有線程同時被釋放,一次也只能運行一個線程。

      柵欄可以用來初始化一個線程池。讓線程初始化后在柵欄里等待,可以確保程序在所有線程都完成初始化后再開始運行。

      米哥點評

      感謝Little monster同學的翻譯和整理。本篇對線程及多線程開發(fā)進行了很好的詮釋,從基礎(chǔ)介紹到線程實例,從入門到進階,全方位多角度講解有關(guān)線程開發(fā)方方面面的內(nèi)容,即便是工作了多年的老程序員,看完之后也是收獲頗多。多線程在數(shù)據(jù)采集和處理方面都有不少的應(yīng)用場景,相信對很多Tushare用戶會有所助益,寄以此篇促大家學好用好,提升數(shù)據(jù)效率。

        本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
        轉(zhuǎn)藏 分享 獻花(0

        0條評論

        發(fā)表

        請遵守用戶 評論公約

        類似文章 更多