Python 并行任務技巧
Python的并發處理能力臭名昭著。先撇開線程以及GIL方面的問題不說,我覺得多線程問題的根源不在技術上而在于理念。大部分關于Pyhon線程和多進程的資料雖然都很不錯,但卻過于細節。這些資料講的都是虎頭蛇尾,到了真正實際使用的部分卻草草結束了。
1、傳統例子
在DDG https://duckduckgo.com/ 搜索“Python threading tutorial”關鍵字,結果基本上卻都是相同的類+隊列的示例。
標準線程多進程,生產者/消費者示例:
#Example.py ''' Standard Producer/Consumer Threading Pattern ''' import time import threading import Queue class Consumer(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self._queue = queue def run(self): while True: # queue.get() blocks the current thread until # an item is retrieved. msg = self._queue.get() # Checks if the current message is # the "Poison Pill" if isinstance(msg, str) and msg == 'quit': # if so, exists the loop break # "Processes" (or in our case, prints) the queue item print "I'm a thread, and I received %s!!" % msg # Always be friendly! print 'Bye byes!' def Producer(): # Queue is used to share items between # the threads. queue = Queue.Queue() # Create an instance of the worker worker = Consumer(queue) # start calls the internal run() method to # kick off the thread worker.start() # variable to keep track of when we started start_time = time.time() # While under 5 seconds.. while time.time() - start_time < 5: # "Produce" a piece of work and stick it in # the queue for the Consumer to process queue.put('something at %s' % time.time()) # Sleep a bit just to avoid an absurd number of messages time.sleep(1) # This the "poison pill" method of killing a thread. queue.put('quit') # wait for the thread to close down worker.join() if __name__ == '__main__': Producer()
Mmm.. 感覺像是java代碼
在此我不想印證采用生產者/消費者模式來處理線程/多進程是錯誤的— 確實沒問題。實際上這也是解決很多問題的最佳選擇。但是,我卻不認為這是日常工作中常用的方式。
2、問題所在
一開始,你需要一個執行下面操作的鋪墊類。接著,你需要創建一個傳遞對象的隊列,并在隊列兩端實時監聽以完成任務。(很有可能需要兩個隊列互相通信或者存儲數據)
Worker越多,問題越大.
下一步,你可能會考慮把這些worker放入一個線程池一邊提高Python的處理速度。下面是
IBM tutorial 上關于線程較好的示例代碼。這是大家常用到的利用多線程處理web頁面的場景
#Example2.py ''' A more realistic thread pool example ''' import time import threading import Queue import urllib2 class Consumer(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self._queue = queue def run(self): while True: content = self._queue.get() if isinstance(content, str) and content == 'quit': break response = urllib2.urlopen(content) print 'Bye byes!' def Producer(): urls = [ 'http://www.python.org', 'http://www.yahoo.com' 'http://www.scala.org', 'http://www.google.com' # etc.. ] queue = Queue.Queue() worker_threads = build_worker_pool(queue, 4) start_time = time.time() # Add the urls to process for url in urls: queue.put(url) # Add the poison pillv for worker in worker_threads: queue.put('quit') for worker in worker_threads: worker.join() print 'Done! Time taken: {}'.format(time.time() - start_time) def build_worker_pool(queue, size): workers = [] for _ in range(size): worker = Consumer(queue) worker.start() workers.append(worker) return workers if __name__ == '__main__': Producer()
感覺效果應該很好,但是看看這些代碼!初始化方法、線程跟蹤,最糟的是,如果你也和我一樣是個容易犯死鎖問題的人,這里的join語句就要出錯了。這樣就開始變得更加復雜了!
到現在為止都做了些什么?基本上沒什么。上面的代碼都是些基礎功能,而且很容易出錯。(天啊,我忘了寫上在隊列對象上調用task_done()方法(我懶得修復這個問題在重新截圖)),這真是性價比太低。所幸的是,我們有更好的辦法.
3、引入:Map
Map 是個很酷的小功能,也是簡化Python并發代碼的關鍵。對那些不太熟悉Map的來說,它有點類似Lisp.它就是序列化的功能映射功能. e.g.
urls = [', '] results = map(urllib2.urlopen, urls)
這里調用urlopen方法,并把之前的調用結果全都返回并按順序存儲到一個集合中。這有點類似
results = [] for url in urls: results.append(urllib2.urlopen(url))
Map能夠處理集合按順序遍歷,最終將調用產生的結果保存在一個簡單的集合當中。
為什么要提到它?因為在引入需要的包文件后,Map能大大簡化并發的復雜度!
支持Map并發的包文件有兩個:
Multiprocessing,還有少為人知的但卻功能強大的子文件 multiprocessing.dummy. .
Digression這是啥東西?沒聽說過線程引用叫dummy的多進程包文件。我也是直到最近才知道。它在多進程的說明文檔中也只被提到了一句。它的效果也只是讓大家直到有這么個東西而已。這可真是營銷的失誤!
Dummy是一個多進程包的完整拷貝。唯一不同的是,多進程包使用進程,而dummy使用線程(自然也有Python本身的一些限制)。所以一個有的另一個也有。這樣在兩種模式間切換就十分簡單,并且在判斷框架調用時使用的是IO還是CPU模式非常有幫助。
4、開始使用 multiprocessing & Map
準備使用帶有并發的map功能首先要導入相關包文件:
from multiprocessing import Pool from multiprocessing.dummy import Pool as ThreadPool
然后初始化:
pool = ThreadPool()
就這么簡單一句解決了example2.py中build_worker_pool的功能. 具體來講,它首先創建一些有效的worker啟動它并將其保存在一些變量中以便隨時訪問。
pool對象需要一些參數,但現在最緊要的就是:進程。它可以限定線程池中worker的數量。如果不填,它將采用系統的內核數作為初值。
一般情況下,如果你進行的是計算密集型多進程任務,內核越多意味著速度越快(當然這是有前提的)。但如果是涉及到網絡計算方面,影響的因素就千差萬別。所以最好還是能給出合適的線程池大小數。
pool = ThreadPool(4) # Sets the pool size to 4
如果運行的線程很多,頻繁的切換線程會十分影響工作效率。所以最好還是能通過調試找出任務調度的時間平衡點。
好的,既然已經建好了線程池對象還有那些簡單的并發內容。咱們就來重寫一些example2.py中的url opener吧!
import urllib2 from multiprocessing.dummy import Pool as ThreadPool urls = [ 'http://www.python.org', 'http://www.python.org/about/', 'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html', 'http://www.python.org/doc/', 'http://www.python.org/download/', 'http://www.python.org/getit/', 'http://www.python.org/community/', 'https://wiki.python.org/moin/', 'http://planet.python.org/', 'https://wiki.python.org/moin/LocalUserGroups', 'http://www.python.org/psf/', 'http://docs.python.org/devguide/', 'http://www.python.org/community/awards/' # etc.. ] # Make the Pool of workers pool = ThreadPool(4) # Open the urls in their own threads # and return the results results = pool.map(urllib2.urlopen, urls) #close the pool and wait for the work to finish pool.close() pool.join() # results = [] # for url in urls: # result = urllib2.urlopen(url) # results.append(result) # # ------- VERSUS ------- # # # ------- 4 Pool ------- # # pool = ThreadPool(4) # results = pool.map(urllib2.urlopen, urls) # # ------- 8 Pool ------- # # pool = ThreadPool(8) # results = pool.map(urllib2.urlopen, urls) # # ------- 13 Pool ------- # # pool = ThreadPool(13) # results = pool.map(urllib2.urlopen, urls) # Single thread: 14.4 Seconds # 4 Pool: 3.1 Seconds # 8 Pool: 1.4 Seconds # 13 Pool: 1.3 Seconds看吧!只用4行代碼就搞定了!其中三行還是固定寫法。使用map方法簡單的搞定了之前需要40行代碼做的事!為了增加趣味性,我分別統計了不同線程池大小的運行時間。
效果驚人!看來調試一下確實很有用。當線程池大小超過9以后,在我本機上的運行效果已相差無幾。
5、示例 2:生成縮略圖
生成上千張圖像的縮略圖:
現在咱們看一年計算密集型的任務!我最常遇到的這類問題之一就是大量圖像文件夾的處理。
其中一項任務就是創建縮略圖。這也是并發中比較成熟的一項功能了。
基礎單線程創建過程

作為示例來說稍微有點復雜。但其實就是傳一個文件夾目錄進來,獲取到里面所有的圖片,分別創建好縮略圖然后保存到各自的目錄當中。
在我的電腦上,處理大約6000張圖片大約耗時27.9秒.
如果使用并發map處理替代其中的for循環:
只用了5.6 秒!
就改了幾行代碼速度卻能得到如此巨大的提升。最終版本的處理速度還要更快。因為我們將計算密集型與IO密集型任務分派到各自獨立的線程和進程當中,這也許會容易造成死鎖,但相對于map強勁的功能,通過簡單的調試我們最終總能設計出優美、高可靠性的程序。就現在而言,也別無它法。
好了。來感受一下一行代碼的并發程序吧。
6、關于Python并行任務技巧的幾點補完
早上逛微博發現了SegmentFault上的這篇文章:關于Python并行任務技巧 。看過之后大有裨益。順手試了試后遇到幾個小坑,記錄下來作為補完(作者也有點語焉不詳哦^_^)。
6.1 傳入的function,只能接收一個傳入參數
一開始以為在傳入的序列里用tuple可以自動解包成多個參數傳進去,可惜實踐后是不行的:
#coding=utf8 from multiprocessing import Pool def do_add(n1, n2): return n1+n2 pool = Pool(5) print pool.map(do_add, [(1,2),(3,4),(5,6)]) pool.close() pool.join()
執行后結果就報錯了:
Traceback (most recent call last): File "mt.py", line 8, in <module> print pool.map(do_add, [(1,2),(3,4),(5,6)]) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 250, in map return self.map_async(func, iterable, chunksize).get() File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 554, in get raise self._value TypeError: do_add() takes exactly 2 arguments (1 given)
6.2 傳入的function必須處理必要的異常
傳入的function如果要做長期執行,比如放一個死循環在里面長期執行的話,必須處理必要的異常,不然ctrl+c殺不掉進程,比如:
#coding=utf8 from multiprocessing import Pool import time def do_add(n1): while True: time.sleep(1) print n1 pool = Pool(5) print pool.map(do_add, [1,2,3,4,5,6]) pool.close() pool.join()
這段代碼一跑起來是ctrl+c殺不掉的,最后只能把console整個關掉才行。 不過這么寫就ok了:
#coding=utf8 from multiprocessing import Pool import time def do_add(n1): try: while True: time.sleep(1) print n1 except: return n1 pool = Pool(5) print pool.map(do_add, [1,2,3,4,5,6]) pool.close() pool.join()
補完的補完,有網友提供了解決辦法,使用functools的partial可以解決,詳見 爆棧
6.3 如何更加高效
第三點是為什么要在子進程里用死循環讓其長期執行。竊以為作者的直接把上千個任務暴力丟給進程池的做法并不是最高效的方式,即便是正在執行的進程數和CPU數能匹配得切到好處,但是一大堆的進程切換的開銷也會有相當的負擔。但是創建幾個長期運行的工作進程,每個工作進程處理多個任務,省略掉了大量開啟關閉進程的開銷,原理上來說會效率高一些。不過這個問題我沒有實測過。再不過其實從原理上來說這個開銷雖然有但是并不是有多么大,很多時候完全可以忽略,比如作者用的例子。 所以其實更確切一點的需求反而是用于實現生產者消費者模式。因為在作者的例子里,任務數是固定的,不可控的,更多的時候我們反而是需要用生產者創建任務,由worker進程去執行任務。舉個例子,生產者監聽一個redis的隊列,有新url放進去的時候就通知worker進程去取。
代碼如下:
#coding=utf8 from multiprocessing import Pool, Queue import redis import requests queue = Queue(20) def consumer(): r = redis.Redis(host='127.0.0.1',port=6379,db=1) while True: k, url = r.blpop(['pool',]) queue.put(url) def worker(): while True: url = queue.get() print requests.get(url).text def process(ptype): try: if ptype: consumer() else: worker() except: pass pool = Pool(5) print pool.map(process, [1,0,0,0,0]) pool.close() pool.join()
比起經典的方式來說簡單很多,效率高,易懂,而且沒什么死鎖的陷阱。
7、Refer:
(1)英文原文:https://medium.com/p/40e9b2b36148
(2)原文代碼:https://github.com/chriskiehl/Blog/tree/master/40e9b2b36148
(3)關于Python并行任務技巧的幾點補充 http://liming.me/2014/01/12/python-multitask-fixed/
(4)在單核 CPU、Python GIL 限制下,多線程需要加鎖嗎?
https://github.com/onlytiancai/codesnip/blob/master/python/sprace.py
(5)gevent程序員指南 http://xlambda.com/gevent-tutorial/#_8
(6)進程、線程和協程的理解