Python分布式進程

jopen 10年前發布 | 18K 次閱讀 Python Python開發

多任務可以用一個進程作為Master分配任務,其它進程作為Worker執行任務來實現。

這樣可以把Master放在一臺電腦上,Workers放在其他電腦上實現分布式進程。

#taskmanager.py

!/usr/bin/env python

import random, time, Queue from multiprocessing.managers import BaseManager

task_queue = Queue.Queue() result_queue = Queue.Queue()

class QueueManager(BaseManager):     pass

QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue)

manager = QueueManager(address=('', 5000), authkey='abc') manager.start() task = manager.get_task_queue() result = manager.get_result_queue()

for i in range(10):     n = random.randint(0, 10000)     print('Put task %d...' % n)     task.put(n) print('Try get results...')   for i in range(10):     r = result.get(timeout=10)     print('Result: %s' %r) manager.shutdown()</pre>

task_queue和result_queue是兩個隊列,分別存放任務和結果。它們用來進行進程間通信,交換對象。

官網上有如下例子。

from multiprocessing import Process, Queuedef f(queue):
    queue.put([42, None, 'hello'])
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

其中p是一個進程,還有一個主進程的隊列q。列表[42, None, 'hello']從p進程傳到了主進程中。

因為是分布式的環境,放入queue中的數據需要等待Workers機器運算處理后再進行讀取,這樣就需要對queue用QueueManager進行封裝放到網絡中。這是通過

QueueManager.register('get_task_queue', callable=lambda: task_queue)

實現的。我們給task_queue的網絡調用接口取了一個名字get_task_queue,而result_queue的名字是get_result_queue,方便區分對哪個queue進行操作。

task.put(n)即是對task_queue進行寫入數據,相當于分配任務。而result.get()即是等待workers處理后返回的結果。

下面是Workers的代碼。

#task_worker.py

!/usr/bin/env python

import time, sys, Queue from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):     pass

QueueManager.register('get_task_queue') QueueManager.register('get_result_queue')

server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) m = QueueManager(address=(server_addr, 5000), authkey='abc') m.connect() task = m.get_task_queue() result = m.get_result_queue()

for i in range(10):     try:         n = task.get(timeout=1)         print('run task %d  %d...' % (n, n))         r = '%d  %d = %d' % (n, n, n*n)         time.sleep(1)         result.put(r)     except Queue.Empty:         print('task queue is empty.') print('worker exit.')</pre>

task_worker這里的QueueManager注冊的名字必須和task_manager中的一樣。注意到taskworker.py中根本沒有創建Queue的代碼,所以,Queue對象存儲在taskmanager.py進程中。對比上面的例子,可以看出Queue對象從另一個進程通過網絡傳遞了過來。只不過這里的傳遞和網絡通信由QueueManager完成。

task_worker的主要功能是將task_queue中分配的數據取出來進行平方運算然后放入到result_queue中。這樣task_manager就能得到計算結果了。

結果如下:

Python分布式進程

Python分布式進程


References

[1].http://blog.csdn.net/fireroll/article/details/38895485

[2].http://www.jb51.net/article/58004.htm

[3].https://docs.python.org/3/library/multiprocessing.html?highlight=queuemanager

</div> 來自:http://my.oschina.net/lvyi/blog/383074

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!