python多進程并發編程
Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。
借助這個包,可以輕松完成從單進程到并發執行的轉換。
一、單進程編程
如果我們新建少量進程,可以如下:import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."
二、進程池操作函數詳細
2.1 類multiprocessing.Pool
class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])進程池對象控制了一個工作進程池,從而可以將任務提交給工作進程池并行處理。
它支持異步結果生成,支持超時回調。
processes: 工作進程的個數。
如果它的值為空,則工作進程個數由 cpu_count()得到;
如果initializer不為空,則每個工作進程在啟動時都會調用initializer(*initargs)來初始化。
NOTE:
A. 進程池對象的方法只能被創建進程的池所調用。
B. 在Python Version 2.7中,
參數maxtasksperchild 表示一個工作進程在它被退出并被新的工作進程代替之前可以完成的任務數。
它用來支持向未被使用的資源提交任務。
它的默認值為空,表示工作進程會和池存在一樣長。
方法定義:
1). apply_async(func[, args[, kwds[, callback]]])
apply()方法的一個變種,它會返回一個結果對象;
如果指定了callback, 它應該是可被調用的,且能接收單個參數。
當結果已經生成時,就會調用callback。
callback應該要能立即完成,否則處理結果的線程將會被阻塞。
2). map(func, iterable[, chunksize])
A parallel equivalent of the map() built-in function (it supports only one iterable argument though).
It blocks until the result is ready.
This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks.
The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.
3). map_async(func, iterable[, chunksize[, callback]])
A variant of the map() method which returns a result object.
If callback is specified then it should be a callable which accepts a single argument.
When the result becomes ready callback is applied to it (unless the call failed).
callback should complete immediately since otherwise the thread which handles the results will get blocked.
4). imap(func, iterable[, chunksize])
An equivalent of itertools.imap().
The chunksize argument is the same as the one used by the map() method.
For very long iterables using a large value for chunksize can make the job complete much faster than
using the default value of 1.
Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional
timeout parameter: next(timeout) will raise multiprocessing.TimeoutError
if the result cannot be returned within timeout seconds.
5). imap_unordered(func, iterable[, chunksize])
The same as imap() except that the ordering of the results from the returned iterator should be considered arbitrary.
(Only when there is only one worker process is the order guaranteed to be “correct”.)
6). close()
關于進程池的任務提交,當所有任務完成后,工作進程將退出。
7). terminate()
立即停止工作進程而不管工作是否完成。
當進程池對象被回收時,必須立即調用 terminate()
8). join()
等待所有工作進程退出。
在調用它之前,必須先調用 close() 或 terminate()。
2.2 類 multiprocessing.pool.AsyncResult
class multiprocessing.pool.AsyncResult由Pool.apply_async() and Pool.map_async()返回的結果的類
1). get([timeout])
返回生成的結果。
如果 timeout 不為空時,如果結果在 timeout秒內未生成,將會報 multiprocessing.TimeoutError 異常。
如果遠程調用報了異常,那這個異常也會被get()提交。
2). wait([timeout])
等待直到結果生成或超時。
3). ready()
返回調用是否完成。
4). successful()
返回調用是否完成并且沒有提交異常。
如果結果未準備好,則報AssertionError異常。
三、應用示例
The following example demonstrates the use of a pool:1. 示例一
from multiprocessing import Pooldef f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
import time
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises TimeoutError
2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。注意要用apply_async,如果落下async,就變成阻塞版本了。
processes=4是最多并發進程數量。代碼如下
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."
3、使用Pool,并需要關注結果
更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,代碼如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
4. 示例四
在利用Python進行系統管理的時候,特別是同時操作多個文件目錄,或者遠程控制多臺主機,并行操作可以節約大量的時間。當被操作對象數目不大時,可以直接利用multiprocessing中的Process動態成生多個進程,10幾個還好,
但如果是上百個,上千個目標,手動的去限制進程數量卻又太過繁瑣,這時候進程池Pool發揮作用的時候就到了。
Pool可以提供指定數量的進程,供用戶調用,當有新的請求提交到pool中時,如果池還沒有滿,
那么就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到規定最大值,那么該請求就會等待,
直到池中有進程結束,才會創建新的進程來它。這里有一個簡單的例子:
#!/usr/bin/env python
#coding=utf-8
"""
Author: Squall
Last modified: 2011-10-18 16:50
Filename: pool.py
Description: a simple sample for pool class
"""
from multiprocessing import Pool
from time import sleep
def f(x):
for i in range(10):
print '%s --- %s ' % (i, x)
sleep(1)
def main():
pool = Pool(processes=3) # set the processes max number 3
for i in range(11,20):
result = pool.apply_async(f, (i,))
pool.close()
pool.join()
if result.successful():
print 'successful'
if __name__ == "__main__":
main()
先創建容量為3的進程池,然后將f(i)依次傳遞給它,運行腳本后利用ps aux | grep pool.py查看進程情況,
會發現最多只會有三個進程執行。pool.apply_async()用來向進程池提交目標請求,
pool.join()是用來等待進程池中的worker進程執行完畢,防止主進程在worker進程結束前結束。
但必pool.join()必須使用在pool.close()或者pool.terminate()之后。
其中close()跟terminate()的區別在于close()會等待池中的worker進程執行結束再關閉pool,而terminate()則是直接關閉。
result.successful()表示整個調用執行的狀態,如果還有worker沒有執行完,則會拋出AssertionError異常。
利用multiprocessing下的Pool可以很方便的同時自動處理幾百或者上千個并行操作,腳本的復雜性也大大降低。
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!