Python 并發編程之使用多線程和多處理器

jopen 10年前發布 | 35K 次閱讀 Python Python開發

在Python編碼中我們經常討論的一個方面就是如何優化模擬執行的性能。盡管在考慮量化代碼時NumPy、SciPy和pandas在這方面已然非常有用,但在構建事件驅動系統時我們無法有效地使用這些工具。有沒有可以加速我們代碼的其他辦法?答案是肯定的,但需要留意!

在這篇文章中,我們看一種不同的模型-并發,我們可以將它引入我們Python程序中。這種模型在模擬中工作地特別好,它不需要共享狀態。Monte Carlo模擬器可以用來做期權定價以及檢驗算法交易等類型的各種參數的模擬。

我們將特別考慮Threading庫和Multiprocessing庫。

Python并發

當Python初學者探索多線程的代碼為了計算密集型優化時,問得最多的問題之一是:”當我用多線程的時候,為什么我的程序變慢了?“

在多核機器上,我們期望多線程的代碼使用額外的核,從而提高整體性能。不幸的是,主Python解釋器(CPython)的內部并不是真正的多線程,是通過一個全局解釋鎖(GIL)來進行處理的。

GIL是必須的,因為Python解釋器是非線程安全的。這意味著當從線程內嘗試安全的訪問Python對象的時候將有一個全局的強制鎖。在任何時候,僅僅一個單一的線程能夠獲取Python對象或者C API。每100個字節的Python指令解釋器將重新獲取鎖,這(潛在的)阻塞了I/0操作。因為鎖,CPU密集型的代碼使用線程庫時,不會獲得性能的提高,但是當它使用多處理庫時,性能可以獲得提高。

并行庫的實現

現在,我們將使用上面所提到的兩個庫來實現對一個“小”問題進行并發優化。

線程庫

上面我們提到: 運行CPython解釋器的Python不會支持通過多線程來實現多核處理。不過,Python確實有一個線程庫。那么如果我們(可能)不能使用多個核心進行處理,那么使用這個庫能取得什么好處呢?

許多程序,尤其是那些與網絡通信或者數據輸入/輸出(I/O)相關的程序,都經常受到網絡性能或者輸入/輸出(I/O)性能的限制。這樣Python解釋器就會等待哪些從諸如網絡地址或者硬盤等“遠端”數據源讀寫數據的函數調用返回。因此這樣的數據訪問比從本地內存或者CPU緩沖區讀取數據要慢的多。

因此,如果許多數據源都是通過這種方式訪問的,那么就有一種方式對這種數據訪問進行性能提高,那就是對每個需要訪問的數據項都產生一個線程 。

舉個例子,假設有一段Python代碼,它用來對許多站點的URL進行扒取。再假定下載每個URL所需時間遠遠超過計算機CPU對它的處理時間,那么僅使用一個線程來實現就會大大地受到輸入/輸出(I/O)性能限制。

通過給每個下載資源生成一個新的線程,這段代碼就會并行地對多個數據源進行下載,在所有下載都結束的時候再對結果進行組合。這就意味著每個后續下載都不會等待前一個網頁下載完成。此時,這段代碼就受收到客戶/服務端帶寬的限制。

不過,許多與財務相關的應用都受到CPU性能的限制,這是因為這樣的應用都是高度集中式的對數字進行處理。這樣的應用都會進行大型線性代數計算或者數值的隨機統計,比如進行蒙地卡羅模擬統計。所以只要對這樣的應用使用Python和全局解釋鎖(GIL),此時使用Python線程庫就不會有任何性能的提高。

Python實現

下面這段依次添加數字到列表的“玩具”代碼,舉例說明了多線程的實現。每個線程創建一個新的列表并隨機添加一些數字到列表中。這個已選的“玩具”例子對CPU的消耗非常高。

下面的代碼概述了線程庫的接口,但是他不會比我們用單線程實現的速度更快。當我們對下面的代碼用多處理庫時,我們將看到它會顯著的降低總的運行時間。

讓我們檢查一下代碼是怎樣工作的。首先我們導入threading庫。然后我們創建一個帶有三個參數的函數list_append。第一個參數count定義了創建列表的大小。第二個參數id是“工作”(用于我們輸出debug信息到控制臺)的ID。第三個參數out_list是追加隨機數的列表。

__main__函數創建了一個107的size,并用兩個threads執行工作。然后創建了一個jobs列表,用于存儲分離的線程。threading.Thread對象將list_append函數作為參數,并將它附加到jobs列表。

最后,jobs分別開始并分別“joined”。join()方法阻塞了調用的線程(例如主Python解釋器線程)直到線程終止。在打印完整的信息到控制臺之前,確認所有的線程執行完成。

# thread_test.pyimport randomimport threadingdef list_append(count, id, out_list):
    """
    Creates an empty list and then appends a 
    random number to the list 'count' number
    of times. A CPU-heavy operation!
    """
    for i in range(count):
        out_list.append(random.random())if name == "main":
    size = 10000000   # Number of random numbers to add
    threads = 2   # Number of threads to create

# Create a list of jobs and then iterate through
# the number of threads appending each thread to
# the job list 
jobs = []
for i in range(0, threads):
    out_list = list()
    thread = threading.Thread(target=list_append(size, i, out_list))
    jobs.append(thread)

# Start the threads (i.e. calculate the random number lists)
for j in jobs:
    j.start()

# Ensure all of the threads have finished
for j in jobs:
    j.join()

print "List processing complete."</pre> <p>我們能在控制臺中調用如下的命令time這段代碼</p>

time python thread_test.py

將產生如下的輸出

List processing complete.
real    0m2.003s
user    0m1.838s
sys     0m0.161s

注意user時間和sys時間相加大致等于real時間。這表明我們使用線程庫沒有獲得性能的提升。我們期待real時間顯著的降低。在并發編程的這些概念中分別被稱為CPU時間和掛鐘時間(wall-clock time)

多進程處理庫

為了充分地使用所有現代處理器所能提供的多個核心 ,我們就要使用多進程處理庫 。它的工作方式與線程庫完全不同 ,不過兩種庫的語法卻非常相似 。

多進程處理庫事實上對每個并行任務都會生成多個操作系統進程。通過給每個進程賦予單獨的Python解釋器和單獨的全局解釋鎖(GIL)十分巧妙地規避了一個全局解釋鎖所帶來的問題。而且每個進程還可獨自占有一個處理器核心,在所有進程處理都結束的時候再對結果進行重組。

不過也存在一些缺陷。生成許多進程就會帶來很多I/O管理問題,這是因為多個處理器對數據的處理會引起數據混亂 。這就會導致整個運行時間增多 。不過,假設把數據限制在每個進程內部 ,那么就可能大大的提高性能 。當然,再怎么提高也不會超過阿姆達爾法則所規定的極限值。

Python實現

使用Multiprocessing實現僅僅需要修改導入行和multiprocessing.Process行。這里單獨的向目標函數傳參數。除了這些,代碼幾乎和使用Threading實現的一樣:

# multiproc_test.pyimport randomimport multiprocessingdef list_append(count, id, out_list):
    """
    Creates an empty list and then appends a 
    random number to the list 'count' number
    of times. A CPU-heavy operation!
    """
    for i in range(count):
        out_list.append(random.random())if name == "main":
    size = 10000000   # Number of random numbers to add
    procs = 2   # Number of processes to create

# Create a list of jobs and then iterate through
# the number of processes appending each process to
# the job list 
jobs = []
for i in range(0, procs):
    out_list = list()
    process = multiprocessing.Process(target=list_append, 
                                      args=(size, i, out_list))
    jobs.append(process)

# Start the processes (i.e. calculate the random number lists)     
for j in jobs:
    j.start()

# Ensure all of the processes have finished
for j in jobs:
    j.join()

print "List processing complete."</pre> <p>控制臺測試運行時間:</p>

time python multiproc_test.py

得到如下輸出:

List processing complete.
real    0m1.045s
user    0m1.824s
sys     0m0.231s

在這個例子中可以看到user和sys時間基本相同,而real下降了近兩倍。之所以會這樣是因為我們使用了兩個進程。擴展到四個進程或者將列表的長度減半結果如下(假設你的電腦至少是四核的):

List processing complete.
real    0m0.540s
user    0m1.792s
sys     0m0.269s

使用四個進程差不多提高了3.8倍速度。但是,在將這個規律推廣到更大范圍,更復雜的程序上時要小心。數據轉換,硬件cacha層次以及其他一些問題會減弱加快的速度。

在下一篇文章中我們會將Event-Driben Basketer并行化,從而提高其運行多維參數尋優的能力。

相關閱讀:

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!