Python多進程并行編程實踐-multiprocessing模塊

ibyd4907 7年前發布 | 15K 次閱讀 Python 多進程 并行編程 Python開發

前言

并行計算是使用并行計算機來減少單個計算問題所需要的時間,我們可以通過利用編程語言顯式的說明計算中的不同部分如何再不同的處理器上同時執行來設計我們的并行程序,最終達到大幅度提升程序效率的目的。

眾所周知,Python中的GIL限制了Python多線程并行對多核CPU的利用,但是我們仍然可以通過各種其他的方式來讓Python真正利用多核資源, 例如通過C/C++擴展來實現多線程/多進程, 以及直接利用Python的多進程模塊multiprocessing來進行多進程編程。

本文主要嘗試僅僅通過python內置的multiprocessing模塊對自己的動力學計算程序來進行優化和效率提升,其中:

  • 實現了單機利用多核資源來實現并行并進行加速對比
  • 使用manager模塊實現了簡單的多機的分布式計算

正文

最近想用自己的微觀動力學程序進行一系列的求解并將結果繪制成二維Map圖進行可視化,這樣就需要對二維圖上的多個點進行計算并將結果收集起來并進行繪制,由于每個點都需要進行一次ODE積分以及牛頓法求解方程組,因此要串行地繪制整張圖可能會遇到極低的效率問題尤其是對參數進行測試的時候,每畫一張圖都需要等很久的時間。其中繪制的二維圖中每個點都是獨立計算的,于是很自然而然的想到了進行并行化處理。

串行的原始版本

由于腳本比較長,而且實現均為自己的程序,腳本的大致結構如下, 本質是一個二重循環,循環的變量分別為反應物氣體(O2 和 CO)的分壓的值:

import time
import numpyas np

省略若干...

pCOs = np.linspace(1e-5, 0.5, 10) pO2s = np.linspace(1e-5, 0.5, 10) if "main" == name:     try:         start = time.time()         for i, pO2in enumerate(pO2s):             # ...             for j, pCOin enumerate(pCOs):                 # 針對當前的分壓值 pCO, pO2進行動力學求解                 # 具體代碼略...         end = time.time()         t = end - start     finally:         # 收集計算的結果并進行處理繪圖 </code></pre>

整體過程就這么簡單,我需要做的就是使用multiprocessing的接口來對這個二重循環進行并行化。

使用單核串行繪制100個點所需要的時間如下, 總共花了240.76秒:

二維map圖繪制的效果如下:

進行多進程并行處理

multiprocessing模塊

multiprocessing模塊提供了類似threading模塊的接口,并對進程的各種操作進行了良好的封裝,提供了各種進程間通信的接口例如 Pipe , Queue 等等,可以幫助我們實現進程間的通信,同步等操作。

使用 Process 類來動態創建進程實現并行

multiprocessing模塊提供了 Process 能讓我們通過創建進程對象并執行該進程對象的 start 方法來創建一個真正的進程來執行任務,該接口類似 threading 模塊中的線程類 Thread .

但是當被操作對象數目不大的時候可以使用 Process 動態生成多個進程,但是如果需要的進程數一旦很多的時候,手動限制進程的數量以及處理不同進程返回值會變得異常的繁瑣,因此這個時候我們需要使用 進程池 來簡化操作。

使用進程池來管理進程

multiprocessing模塊提供了一個進程池 Pool 類,負責創建進程池對象,并提供了一些方法來講運算任務offload到不同的子進程中執行,并很方便的獲取返回值。例如我們現在要進行的循環并行便很容易的將其實現。

對于這里的單指令多數據流的并行,我們可以直接使用 Pool.map() 來將函數映射到參數列表中。 Pool.map 其實是map函數的并行版本,此函數將會阻塞直到所有進程全部結束,而且此函數返回的結果順序仍然不變。

首先,我先把針對每對分壓數據的處理過程封裝成一個函數,這樣可以將函數對象傳遞給子進程執行。

import time
from multiprocessing import Pool
import numpyas np

省略若干...

pCOs = np.linspace(1e-5, 0.5, 10) pO2s = np.linspace(1e-5, 0.5, 10) def task(pO2):     '''接受一個O2分壓,根據當前的CO分壓進行動力學求解'''     # 代碼細節省略... if "main" == name:     try:         start = time.time()         pool = Pool()                # 創建進程池對象,進程數與multiprocessing.cpu_count()相同         tofs = pool.map(task, pCOs)  # 并行執行函數         end = time.time()         t = end - start     finally:         # 收集計算的結果并進行處理繪圖 </code></pre>

使用兩個核心進行計算,計算時間從240.76s降到了148.61秒, 加速比為1.62

對不同核心的加速效果進行測試

為了查看使用不同核心數對程序效率的改善,我對不同的核心數和加速比進行了測試繪圖,效果如下:

運行核心數與程序運行時間:

運行核心數與加速比:

可見,由于我外層循環只循環了10次因此使用的核心數超過10以后核心數的增加并不能對程序進行加速,也就是多余的核心都浪費掉了。

使用manager實現簡單的分布式計算

前面使用了multiprocessing包提供的接口我們使用了再一臺機器上進行多核心計算的并行處理,但是multiprocessing的用處還有更多,通過multiprocessing.managers模塊,我們可以實現簡單的多機分布式并行計算,將計算任務分布到不同的計算機中運行。

Managers提供了另外的多進程通信工具,他提供了在多臺計算機之間共享數據的接口和數據對象,這些數據對象全部都是通過代理類實現的,比如 ListProxy 和 DictProxy 等等,他們都實現了與原生 list 和 dict 相同的接口,但是他們可以通過網絡在不同計算機中的進程中進行共享。

好了現在我們開始嘗試將繪圖程序改造成可以在多臺計算機中分布式并行的程序。改造的主要思想是:

  1. 使用一臺計算機作為服務端(server),此臺計算機通過一個Manager對象來管理共享對象,任務分配以及結果的接收,并再收集結果以后進行后處理(繪制二維map圖)。
  2. 其他多臺計算機可以作為客戶端來接收server的數據進行計算,并將結果傳到共享數據中,讓server可以收集。同時再client端可以同時進行上文所實現的多進程并行來充分利用計算機的多核優勢。

大致可總結為下圖:

服務進程

首先服務端需要一個manager對象來管理共享對象

def get_manager():
    '''創建服務端manager對象.
    '''
 
    # 自定義manager類
    class JobManager(BaseManager):
        pass
 
    # 創建任務隊列,并將此數據對象共享在網絡中
    jobid_queue = Queue()
    JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)
 
    # 創建列表代理類,并將其共享再網絡中
    tofs = [None]*N
    JobManager.register('get_tofs_list', callable=lambda: tofs, proxytype=ListProxy)
 
    # 將分壓參數共享到網絡中
    JobManager.register('get_pCOs', callable=lambda: pCOs, proxytype=ListProxy)
    JobManager.register('get_pO2s', callable=lambda: pCOs, proxytype=ListProxy)
 
    # 創建manager對象并返回
    manager = JobManager(address=(ADDR, PORT), authkey=AUTHKEY)
 
    return manager
  1. BaseManager.register

    是一個類方法,它可以將某種類型或者可調用的對象綁定到manager對象并共享到網絡中,使得其他在網絡中的計算機能夠獲取相應的對象。

    例如,

JobManager.register('get_jobid_queue', callable=lambda: jobid_queue)

我就將一個返回任務隊列的函數對象同manager對象綁定并共享到網絡中,這樣在網絡中的進程就可以通過自己的manager對象的 get_jobid_queue 方法得到相同的隊列,這樣便實現了數據的共享.

2. 創建manager對象的時候需要兩個參數,

  • address, 便是manager所在的ip以及用于監聽與服務端連接的端口號,例如我如果是在內網中的 192.168.0.1 地址的 5000 端口進行監聽,那么此參數可以是 ('192.169.0.1 , 5000)`
  • authkey, 顧名思義,就是一個認證碼,用于驗證客戶端時候可以連接到服務端,此參數必須是一個字符串對象.

進行任務分配

上面我們將一個任務隊列綁定到了manager對象中,現在我需要將隊列進行填充,這樣才能將任務發放到不同的客戶端來進行并行執行。

def fill_jobid_queue(manager, nclient):
    indices = range(N)
    interval = N/nclient
    jobid_queue = manager.get_jobid_queue()
    start = 0
    for i in range(nclient):
        jobid_queue.put(indices[start: start+interval])
        start += interval
    if N % nclient > 0:
        jobid_queue.put(indices[start:])

這里所謂的任務其實就是相應參數在list中的index值,這樣不同計算機中得到的結果可以按照相應的index將結果填入到結果列表中,這樣服務端就能在共享的網絡中收集各個計算機計算的結果。

啟動服務端進行監聽

def run_server():
    # 獲取manager
    manager = get_manager()
    print "Start manager at {}:{}...".format(ADDR, PORT)
    # 創建一個子進程來啟動manager
    manager.start()
    # 填充任務隊列
    fill_jobid_queue(manager, NNODE)
    shared_job_queue = manager.get_jobid_queue()
    shared_tofs_list = manager.get_tofs_list()
 
    queue_size = shared_job_queue.qsize()
 
    # 循環進行監聽,直到結果列表被填滿
    while None in shared_tofs_list:
        if shared_job_queue.qsize() < queue_size:
            queue_size = shared_job_queue.qsize()
            print "Job picked..."
 
    return manager

任務進程

服務進程負責進行簡單的任務分配和調度,任務進程則只負責獲取任務并進行計算處理。

在任務進程(客戶端)中基本代碼與我們上面單機中的多核運行的腳本基本相同(因為都是同一個函數處理不同的數據),但是我們也需要為客戶端創建一個manager來進行任務的獲取和返回。

def get_manager():
    class WorkManager(BaseManager):
        pass
 
    # 由于只是從共享網絡中獲取,因此只需要注冊名字即可
    WorkManager.register('get_jobid_queue')
    WorkManager.register('get_tofs_list')
    WorkManager.register('get_pCOs')
    WorkManager.register('get_pO2s')
 
    # 這里的地址和驗證碼要與服務端相同才可以進行數據共享
    manager = WorkManager(address=(ADDR, PORT), authkey=AUTHKEY)
 
    return manager

在客戶端我們仍然可以多進程利用多核資源來加速計算。

if "__main__" == __name__:
 
    manager = get_manager()
    print "work manager connect to {}:{}...".format(ADDR, PORT)
 
    # 將客戶端本地的manager連接到相應的服務端manager
    manager.connect()
 
    # 獲取共享的結果收集列表
    shared_tofs_list = manager.get_tofs_list()
 
    # 獲取共享的任務隊列
    shared_jobid_queue = manager.get_jobid_queue()
 
    # 從服務端獲取計算參數
    pCOs = manager.get_pCOs()
    shared_pO2s = manager.get_pO2s()
 
    # 創建進程池在本地計算機進行多核并行
    pool = Pool()
 
    while 1:
        try:
            indices = shared_jobid_queue.get_nowait()
            pO2s = [shared_pO2s[i] for i in indices]
            print "Run {}".format(str(pO2s))
            tofs_2d = pool.map(task, pO2s)
 
            # Update shared tofs list.
            for idx, tofs_1din zip(indices, tofs_2d):
                shared_tofs_list[idx] = tofs_1d
        # 直到將任務隊列中的任務全部取完,結束任務進程
        except Queue.Empty:
            break

下面我將在3臺在同一局域網中的電腦來進行簡單的分布式計算測試,

  • 其中一臺是實驗室器群中的管理節點, 內網ip為 10.10.10.245
  • 另一臺為集群中的一個節點, 共有12個核心
  • 最后一臺為自己的本本,4個核心
  1.  先在服務端運行服務腳本進行任務分配和監聽:
pythonserver.py

2. 在兩個客戶端運行任務腳本來獲取任務隊列中的任務并執行

pythonworker.py

當任務隊列為空且任務完成時,任務進程終止; 當結果列表中的結果收集完畢時,服務進程也會終止。

執行過程如圖:

執行結果如下圖:

上面的panel為服務端監聽,左下為自己的筆記本運行結果,右下panel為集群中的其中一個節點。

可見運行時間為56.86s,無奈,是我的本子脫了后腿(-_-!)

總結

本文通過python內置模塊multiprocessing實現了單機內多核并行以及簡單的多臺計算機的分布式并行計算,multiprocessing為我們提供了封裝良好并且友好的接口來使我們的Python程序更方面利用多核資源加速自己的計算程序,希望能對使用python實現并行話的童鞋有所幫助。

參考

 

 

來自:http://python.jobbole.com/87645/

 

 本文由用戶 ibyd4907 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!