Python分布式進程
多任務可以用一個進程作為Master分配任務,其它進程作為Worker執行任務來實現。
這樣可以把Master放在一臺電腦上,Workers放在其他電腦上實現分布式進程。
#taskmanager.py!/usr/bin/env python
import random, time, Queue from multiprocessing.managers import BaseManager
task_queue = Queue.Queue() result_queue = Queue.Queue()
class QueueManager(BaseManager): pass
QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue)
manager = QueueManager(address=('', 5000), authkey='abc') manager.start() task = manager.get_task_queue() result = manager.get_result_queue()
for i in range(10): n = random.randint(0, 10000) print('Put task %d...' % n) task.put(n) print('Try get results...') for i in range(10): r = result.get(timeout=10) print('Result: %s' %r) manager.shutdown()</pre>
task_queue和result_queue是兩個隊列,分別存放任務和結果。它們用來進行進程間通信,交換對象。
官網上有如下例子。
from multiprocessing import Process, Queuedef f(queue): queue.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()其中p是一個進程,還有一個主進程的隊列q。列表[42, None, 'hello']從p進程傳到了主進程中。
因為是分布式的環境,放入queue中的數據需要等待Workers機器運算處理后再進行讀取,這樣就需要對queue用QueueManager進行封裝放到網絡中。這是通過
QueueManager.register('get_task_queue', callable=lambda: task_queue)實現的。我們給task_queue的網絡調用接口取了一個名字get_task_queue,而result_queue的名字是get_result_queue,方便區分對哪個queue進行操作。
task.put(n)即是對task_queue進行寫入數據,相當于分配任務。而result.get()即是等待workers處理后返回的結果。
下面是Workers的代碼。
#task_worker.py!/usr/bin/env python
import time, sys, Queue from multiprocessing.managers import BaseManager
class QueueManager(BaseManager): pass
QueueManager.register('get_task_queue') QueueManager.register('get_result_queue')
server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) m = QueueManager(address=(server_addr, 5000), authkey='abc') m.connect() task = m.get_task_queue() result = m.get_result_queue()
for i in range(10): try: n = task.get(timeout=1) print('run task %d %d...' % (n, n)) r = '%d %d = %d' % (n, n, n*n) time.sleep(1) result.put(r) except Queue.Empty: print('task queue is empty.') print('worker exit.')</pre>
task_worker這里的QueueManager注冊的名字必須和task_manager中的一樣。注意到taskworker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在taskmanager.py進程中。對比上面的例子,可以看出Queue對象從另一個進程通過網絡傳遞了過來。只不過這里的傳遞和網絡通信由QueueManager完成。
task_worker的主要功能是將task_queue中分配的數據取出來進行平方運算然后放入到result_queue中。這樣task_manager就能得到計算結果了。
結果如下:
![]()
![]()
References
[1].http://blog.csdn.net/fireroll/article/details/38895485
[2].http://www.jb51.net/article/58004.htm
[3].https://docs.python.org/3/library/multiprocessing.html?highlight=queuemanager
</div> 來自:http://my.oschina.net/lvyi/blog/383074