Python中的多線程理解

jopen 11年前發布 | 49K 次閱讀 Python Python開發

我們將會看到一些在 Python 中使用線程的實例和如何避免線程之間的競爭。
你應當將下邊的例子運行多次,以便可以注意到線程是不可預測的和線程每次運行出的不同結果。聲明:從這里開始忘掉你聽到過的關于 GIL 的東西,因為 GIL 不會影響到我想要展示的東西。

示例1,我們將要請求五個不同的url:

1、單線程

import time
import urllib2

def get_responses(): urls = [ '

get_responses()</pre> 輸出是:

http://www.google.com 200
http://www.amazon.com 200
http://www.ebay.com 200
http://www.alibaba.com 200
http://www.reddit.com 200

Elapsed time: 3.0814409256

解釋:

url順序的被請求
除非cpu從一個url獲得了回應,否則不會去請求下一個url
網絡請求會花費較長的時間,所以cpu在等待網絡請求的返回時間內一直處于閑置狀態。

2、多線程

import urllib2
import time
from threading import Thread

class GetUrlThread(Thread): def init(self, url): self.url = url super(GetUrlThread, self).init()

def run(self):
    resp = urllib2.urlopen(self.url)
    print self.url, resp.getcode()

def get_responses(): urls = [ '

get_responses()</pre> 輸出:
http://www.reddit.com 200
http://www.google.com 200
http://www.amazon.com 200
http://www.alibaba.com 200
http://www.ebay.com 200

Elapsed time: 0.689890861511

解釋:
意識到了程序在執行時間上的提升
我們寫了一個多線程程序來減少cpu的等待時間,當我們在等待一個線程內的網絡請求返回時,這時cpu可以切換到其他線程去進行其他線程內的網絡請求。
我們期望一個線程處理一個url,所以實例化線程類的時候我們傳了一個url。
線程運行意味著執行類里的run()方法。
無論如何我們想每個線程必須執行run()。
為每個url創建一個線程并且調用start()方法,這告訴了cpu可以執行線程中的run()方法了。
我們希望所有的線程執行完畢的時候再計算花費的時間,所以調用了join()方法。
join()可以通知主線程等待這個線程結束后,才可以執行下一條指令。
每個線程我們都調用了join()方法,所以我們是在所有線程執行完畢后計算的運行時間。
關于線程:
cpu可能不會在調用start()后馬上執行run()方法。
你不能確定run()在不同線程建間的執行順序。
對于單獨的一個線程,可以保證run()方法里的語句是按照順序執行的。
這就是因為線程內的url會首先被請求,然后打印出返回的結果。

示例2,全局變量的線程安全問題(race condition)

1、BUG 版

我們將會用一個程序演示一下多線程間的資源競爭,并修復這個問題。

from threading import Thread

define a global variable

some_var = 0

class IncrementThread(Thread): def run(self):

    #we want to read a global variable
    #and then increment it
    global some_var
    read_value = some_var
    print "some_var in %s is %d" % (self.name, read_value)
    some_var = read_value + 1
    print "some_var in %s after increment is %d" % (self.name, some_var)

def use_increment_thread(): threads = [] for i in range(50): t = IncrementThread() threads.append(t) t.start() for t in threads: t.join() print "After 50 modifications, some_var should have become 50" print "After 50 modifications, some_var is %d" % (some_var,)

use_increment_thread()</pre> 多次運行這個程序,你會看到多種不同的結果。
解釋:
有一個全局變量,所有的線程都想修改它。
所有的線程應該在這個全局變量上加 1 。
有50個線程,最后這個數值應該變成50,但是它卻沒有。
為什么沒有達到50?
在some_var是15的時候,線程t1讀取了some_var,這個時刻cpu將控制權給了另一個線程t2。
t2線程讀到的some_var也是15
t1和t2都把some_var加到16
當時我們期望的是t1 t2兩個線程使some_var + 2變成17
在這里就有了資源競爭。
相同的情況也可能發生在其它的線程間,所以出現了最后的結果小于50的情況。

2、解決資源競爭

from threading import Lock, Thread
lock = Lock()
some_var = 0

class IncrementThread(Thread): def run(self):

    #we want to read a global variable
    #and then increment it
    global some_var
    lock.acquire()
    read_value = some_var
    print "some_var in %s is %d" % (self.name, read_value)
    some_var = read_value + 1
    print "some_var in %s after increment is %d" % (self.name, some_var)
    lock.release()

def use_increment_thread(): threads = [] for i in range(50): t = IncrementThread() threads.append(t) t.start() for t in threads: t.join() print "After 50 modifications, some_var should have become 50" print "After 50 modifications, some_var is %d" % (some_var,)

use_increment_thread()</pre> 再次運行這個程序,達到了我們預期的結果。
解釋:
Lock 用來防止競爭條件
如果在執行一些操作之前,線程t1獲得了鎖。其他的線程在t1釋放Lock之前,不會執行相同的操作
我們想要確定的是一旦線程t1已經讀取了some_var,直到t1完成了修改some_var,其他的線程才可以讀取some_var
這樣讀取和修改some_var成了邏輯上的原子操作。

示例3,多線程環境下的原子操作

讓我們用一個例子來證明一個線程不能影響其他線程內的變量(非全局變量)。
time.sleep()可以使一個線程掛起,強制線程切換發生。

1、BUG 版

from threading import Thread
import time

class CreateListThread(Thread): def run(self): self.entries = [] for i in range(10): time.sleep(0.01) self.entries.append(i) print self.entries

def use_create_list_thread(): for i in range(3): t = CreateListThread() t.start()

use_create_list_thread()</pre>

運行幾次后發現并沒有打印出正確的結果。當一個線程正在打印的時候,cpu切換到了另一個線程,所以產生了不正確的結果。我們需要確保print self.entries是個邏輯上的原子操作,以防打印時被其他線程打斷。

2、加鎖保證操作的原子性

我們使用了Lock(),來看下邊的例子。
from threading import Thread, Lock
import time

lock = Lock()

class CreateListThread(Thread): def run(self): self.entries = [] for i in range(10): time.sleep(0.01) self.entries.append(i) lock.acquire() print self.entries lock.release()

def use_create_list_thread(): for i in range(3): t = CreateListThread() t.start()

use_create_list_thread()</pre>

這次我們看到了正確的結果。證明了一個線程不可以修改其他線程內部的變量(非全局變量)。

示例4,Python多線程簡易版:線程池 threadpool

上面的多線程代碼看起來有點繁瑣,下面我們用 treadpool 將案例 1 改寫下:

import threadpool
import time
import urllib2

urls = [ '

def myRequest(url): resp = urllib2.urlopen(url) print url, resp.getcode()

def timeCost(request, n): print "Elapsed time: %s" % (time.time()-start)

start = time.time() pool = threadpool.ThreadPool(5) reqs = threadpool.makeRequests(myRequest, urls, timeCost) [ pool.putRequest(req) for req in reqs ] pool.wait()</pre> 解釋關鍵代碼:

  • ThreadPool(poolsize)  

表示最多可以創建poolsize這么多線程;

  • makeRequests(some_callable, list_of_args, callback)  

makeRequests創建了要開啟多線程的函數,以及函數相關參數和回調函數,其中回調函數可以不寫,default是無,也就是說makeRequests只需要2個參數就可以運行;

注意:threadpool 是非線程安全的。

5、REF

http://agiliq.com/blog/2013/09/understanding-threads-in-python/

http://www.zhidaow.com/post/python-threadpool

6、推薦閱讀:

1、線程安全及Python中的GIL

http://www.cnblogs.com/mindsbook/archive/2009/10/15/thread-safety-and-GIL.html    

2、Python 不能利用多核的問題以后能被解決嗎?

本文主要討論了 python 中的 單線程、多線程、多進程、異步、協程、多核、VM、GIL、GC、greenlet、Gevent、性能、IO 密集型、CPU 密集型、業務場景 等問題,以這些方面來判斷去除 GIL 實現多線程的優劣:

http://www.zhihu.com/question/21219976

注:協程可以認為是一種用戶態的線程,與系統提供的線程不同點是,它需要主動讓出CPU時間,而不是由系統進行調度,即控制權在程序員手上,用來執行協作式多任務非常合適。

3、GIL 與線程調

             ——《Python源碼剖析--深度探索動態語言核心技術》第15章

大約在99年的時候,Greg Stein 和Mark Hammond 兩位老兄基于Python 1.5 創建了一份去除GIL 的branch,但是很不幸,這個分支在很多基準測試上,尤其是單線程操作的測試上,效率只有使用GIL 的Python 的一半左右。

http://book.51cto.com/art/200807/82530.htm

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!