最近會開始給大家出些Python進(jìn)階系列的文章,這是該系列的第一篇文章,介紹進(jìn)程和線程的知識,本次的Python基礎(chǔ)教程則跟大家介紹下進(jìn)程和線程的概念,多進(jìn)程和多線程各自的實現(xiàn)方法和優(yōu)缺點,以及分別在哪些情況采用多進(jìn)程,或者是多線程。 概念 并發(fā)編程就是實現(xiàn)讓程序同時執(zhí)行多個任務(wù),而如何實現(xiàn)并發(fā)編程呢,這里就涉及到進(jìn)程和線程這兩個概念。 對于操作系統(tǒng)來說,一個任務(wù)(或者程序)就是一個進(jìn)程(Process),比如打開一個瀏覽器是開啟一個瀏覽器進(jìn)程,打開微信就啟動了一個微信的進(jìn)程,打開兩個記事本,就啟動兩個記事本進(jìn)程。 進(jìn)程的特點有: 操作系統(tǒng)以進(jìn)程為單位分配存儲空間, 每個進(jìn)程有自己的地址空間、數(shù)據(jù)棧以及其他用于跟蹤進(jìn)程執(zhí)行的輔助數(shù)據(jù); 進(jìn)程可以通過 fork 或者 spawn 方式創(chuàng)建新的進(jìn)程來執(zhí)行其他任務(wù) 進(jìn)程都有自己獨立的內(nèi)存空間,所以進(jìn)程需要通過進(jìn)程間通信機(jī)制(IPC,Inter-Process Communication)來實現(xiàn)數(shù)據(jù)共享,具體的方式包括管道、信號、套接字、共享內(nèi)存區(qū)等
一個進(jìn)程還可以同時做多件事情,比如在 Word 里面同時進(jìn)行打字、拼音檢查、打印等事情,也就是一個任務(wù)分為多個子任務(wù)同時進(jìn)行,這些進(jìn)程內(nèi)的子任務(wù)被稱為線程(Thread)。 
因為每個進(jìn)程至少需要完成一件事情,也就是一個進(jìn)程至少有一個線程。當(dāng)要實現(xiàn)并發(fā)編程,也就是同時執(zhí)行多任務(wù)時,有以下三種解決方案: 多進(jìn)程,每個進(jìn)程只有一個線程,但多個進(jìn)程一起執(zhí)行多個任務(wù); 多線程,只啟動一個進(jìn)程,但一個進(jìn)程內(nèi)開啟多個線程; 多進(jìn)程+多線程,即啟動多個進(jìn)程,每個進(jìn)程又啟動多個線程,但這種方法非常復(fù)雜,實際很少使用
注意:真正的并行執(zhí)行多任務(wù)只有在多核 CPU 上才可以實現(xiàn),單核 CPU 系統(tǒng)中,真正的并發(fā)是不可能的,因為在某個時刻能夠獲得CPU的只有唯一的一個線程,多個線程共享了CPU的執(zhí)行時間。 Python 是同時支持多進(jìn)程和多線程的,下面就分別介紹多進(jìn)程和多線程。 多進(jìn)程 在 Unix/Linux 系統(tǒng)中,提供了一個 fork() 系統(tǒng)調(diào)用,它是一個特殊的函數(shù),普通函數(shù)調(diào)用是調(diào)用一次,返回一次,但 fork 函數(shù)調(diào)用一次,返回兩次,因為調(diào)用該函數(shù)的是父進(jìn)程,然后復(fù)制出一份子進(jìn)程了,最后同時在父進(jìn)程和子進(jìn)程內(nèi)返回,所以會返回兩次。 子進(jìn)程返回的永遠(yuǎn)是 0 ,而父進(jìn)程會返回子進(jìn)程的 ID,因為父進(jìn)程可以復(fù)制多個子進(jìn)程,所以需要記錄每個子進(jìn)程的 ID,而子進(jìn)程可以通過調(diào)用 getpid() 獲取父進(jìn)程的 ID。 Python 中 os 模塊封裝了常見的系統(tǒng)調(diào)用,這就包括了 fork ,代碼示例如下: import os print('Process (%s) start...' % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else: print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
運行結(jié)果: Process (876) start... I (876) just created a child process (877). I am child process (877) and my parent is 876.
由于 windows 系統(tǒng)中是不存在 fork ,所以上述函數(shù)無法調(diào)用,但 Python 是跨平臺的,所以也還是有其他模塊可以實現(xiàn)多進(jìn)程的功能,比如 multiprocessing模塊。 multiprocess multiprocessing 模塊中提供了 Process 類來代表一個進(jìn)程對象,接下來用一個下載文件的例子來說明采用多進(jìn)程和不用多進(jìn)程的差別。 首先是不采用多進(jìn)程的例子: def download_task(filename): '''模擬下載文件''' print('開始下載%s...' % filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費了%d秒' % (filename, time_to_download)) def download_without_multiprocess(): '''不采用多進(jìn)程''' start = time() download_task('Python.pdf') download_task('nazha.mkv') end = time() print('總共耗費了%.2f秒.' % (end - start)) if __name__ == '__main__': download_without_multiprocess()
運行結(jié)果如下,這里用 randint 函數(shù)來隨機(jī)輸出當(dāng)前下載文件的耗時,從結(jié)果看,程序運行時間等于兩個下載文件的任務(wù)時間總和。 開始下載Python.pdf... Python.pdf下載完成! 耗費了9秒 開始下載nazha.mkv... nazha.mkv下載完成! 耗費了9秒 總共耗費了18.00秒.
如果是采用多進(jìn)程,例子如下所示: def download_task(filename): '''模擬下載文件''' print('開始下載%s...' % filename) time_to_download = randint(5, 10) sleep(time_to_download) print('%s下載完成! 耗費了%d秒' % (filename, time_to_download)) def download_multiprocess(): '''采用多進(jìn)程''' start = time() p1 = Process(target=download_task, args=('Python.pdf',)) p1.start() p2 = Process(target=download_task, args=('nazha.mkv',)) p2.start() p1.join() p2.join() end = time() print('總共耗費了%.2f秒.' % (end - start)) if __name__ == '__main__': download_multiprocess()
這里多進(jìn)程例子中,我們通過 Process 類創(chuàng)建了進(jìn)程對象,通過 target 參數(shù)傳入一個函數(shù)表示進(jìn)程需要執(zhí)行的任務(wù),args 是一個元組,表示傳遞給函數(shù)的參數(shù),然后采用 start 來啟動進(jìn)程,而 join 方法表示等待進(jìn)程執(zhí)行結(jié)束。 運行結(jié)果如下所示,耗時就不是兩個任務(wù)執(zhí)行時間總和,速度上也是大大的提升了。 開始下載Python.pdf... 開始下載nazha.mkv... Python.pdf下載完成! 耗費了5秒 nazha.mkv下載完成! 耗費了9秒 總共耗費了9.36秒.
Pool 上述例子是開啟了兩個進(jìn)程,但如果需要開啟大量的子進(jìn)程,上述代碼的寫法就不合適了,應(yīng)該采用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程,還是用下載文件的例子,但執(zhí)行下部分的代碼如下所示: import os from multiprocessing import Process, Pool from random import randint from time import time, sleep def download_multiprocess_pool(): '''采用多進(jìn)程,并用 pool 管理進(jìn)程池''' start = time() filenames = ['Python.pdf', 'nazha.mkv', 'something.mp4', 'lena.png', 'lol.avi'] # 進(jìn)程池 p = Pool(5) for i in range(5): p.apply_async(download_task, args=(filenames[i], )) print('Waiting for all subprocesses done...') # 關(guān)閉進(jìn)程池 p.close() # 等待所有進(jìn)程完成任務(wù) p.join() end = time() print('總共耗費了%.2f秒.' % (end - start)) if __name__ == '__main__': download_multiprocess_pool()
代碼中 Pool 對象先創(chuàng)建了 5 個進(jìn)程,然后 apply_async 方法就是并行啟動進(jìn)程執(zhí)行任務(wù)了,調(diào)用 join() 方法之前必須先調(diào)用 close() ,close() 主要是關(guān)閉進(jìn)程池,所以執(zhí)行該方法后就不能再添加新的進(jìn)程對象了。然后 join() 就是等待所有進(jìn)程執(zhí)行完任務(wù)。 運行結(jié)果如下所示: Waiting for all subprocesses done... 開始下載Python.pdf... 開始下載nazha.mkv... 開始下載something.mp4... 開始下載lena.png... 開始下載lol.avi... nazha.mkv下載完成! 耗費了5秒 lena.png下載完成! 耗費了6秒 something.mp4下載完成! 耗費了7秒 Python.pdf下載完成! 耗費了8秒 lol.avi下載完成! 耗費了9秒 總共耗費了9.80秒.
子進(jìn)程 大多數(shù)情況,子進(jìn)程是一個外部進(jìn)程,而非自身。在創(chuàng)建子進(jìn)程后,我們還需要控制子進(jìn)程的輸入和輸出。 subprocess 模塊可以讓我們很好地開啟子進(jìn)程以及管理子進(jìn)程的輸入和輸出。 下面是演示如何用 Python 演示命令 nslookup www.python.org,代碼如下所示: import subprocess print('$ nslookup www.python.org') r = subprocess.call(['nslookup', 'www.python.org']) print('Exit code:', r)
運行結(jié)果: $ nslookup www.python.org Server: 192.168.19.4 Address: 192.168.19.4#53 Non-authoritative answer: www.python.org canonical name = python.map.fastly.net. Name: python.map.fastly.net Address: 199.27.79.223 Exit code: 0
如果子進(jìn)程需要輸入,可以通過 communicate() 進(jìn)行輸入,代碼如下所示: import subprocess print('$ nslookup') p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8')) print('Exit code:', p.returncode)
這段代碼就是執(zhí)行命令 nslookup 時,輸入: set q=mx python.org exit
運行結(jié)果: $ nslookup Server: 192.168.19.4 Address: 192.168.19.4#53 Non-authoritative answer: python.org mail exchanger = 50 mail.python.org. Authoritative answers can be found from: mail.python.org internet address = 82.94.164.166 mail.python.org has AAAA address 2001:888:2000:d::a6 Exit code: 0
進(jìn)程間通信 進(jìn)程之間是需要通信的,multiprocess 模塊中也提供了 Queue、Pipes 等多種方式來交換數(shù)據(jù)。 這里以 Queue 為例,在父進(jìn)程創(chuàng)建兩個子進(jìn)程,一個往 Queue 寫入數(shù)據(jù),另一個從 Queue 讀取數(shù)據(jù)。代碼如下: import os from multiprocessing import Process, Queue import random from time import time, sleep # 寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼: def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) sleep(random.random()) # 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼: def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) def ipc_queue(): ''' 采用 Queue 實現(xiàn)進(jìn)程間通信 :return: ''' # 父進(jìn)程創(chuàng)建Queue,并傳給各個子進(jìn)程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子進(jìn)程pw,寫入: pw.start() # 啟動子進(jìn)程pr,讀取: pr.start() # 等待pw結(jié)束: pw.join() # pr進(jìn)程里是死循環(huán),無法等待其結(jié)束,只能強(qiáng)行終止: pr.terminate() if __name__ == '__main__': ipc_queue()
運行結(jié)果如下所示: Process to write: 24992 Put A to queue... Process to read: 22836 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
本Python基礎(chǔ)教程主要是介紹進(jìn)程和線程的概念,然后就是介紹多進(jìn)程及其實現(xiàn)方式,在下一篇文章會介紹多線程的實現(xiàn),以及兩種方式應(yīng)該如何選擇。
|