Python多線程編程
Python線程編程(一)線程對象
我們在做軟件開發的時候很多要用到多線程技術。例如如果做一個下載軟件象flashget就要用到、象在線視頻工具realplayer也要用到因為要同時下載media stream還要播放。其實例子是很多的。
線程相對進程來說是“輕量級”的,操作系統用較少的資源創建和管理線程。程序中的線程在相同的內存空間中執行,并共享許多相同的資源。
在python中如何創建一個線程對象
如果你要創建一個線程對象,很簡單,只要你的類繼承threading.Thread,然后在__init__里首先調用threading.Thread的__init__方法即可
import threading
class mythread(threading.Thread):
def __init__(self, threadname):
threading.Thread.__init__(self, name = threadname)
....
這才僅僅是個空線程,我可不是要他拉空車的,他可得給我干點實在活。很簡單,重寫類的run()方法即可,把你要在線程執行時做的事情都放到里面
import threading
import time
class mythread(threading.Thread):
def __init__(...):
....
def run(self):
for i in range(10):
print self.getName, i
time.sleep(1)
以上代碼我們讓這個線程在執行之后每隔1秒輸出一次信息到屏幕,10次后結束
getName()是threading.Thread類的一個方法,用來獲得這個線程對象的name。還有一個方法setName()當然就是來設置這個線程對象的name的了。
如果要創建一個線程,首先就要先創建一個線程對象
mythread1 = mythread('mythread 1')
一個線程對象被創建后,他就處于“born”(誕生狀態)
如何讓這個線程對象開始運行呢?只要調用線程對象的start()方法即可
mythread1.start()
現在線程就處于“ready”狀態或者也稱為“runnable”狀態。
奇怪嗎?不是已經start了嗎?為什么不稱為“running”狀態呢?其實是有原因的。因為我們的計算機一般是不具有真正并行處理能力的。我們所謂的多線程只是把時間分成片段,然后隔一個時間段就讓一個線程執行一下,然后進入“sleeping ”狀態,然后喚醒另一個在“sleeping”的線程,如此循環runnable->sleeping->runnable... ,只是因為計算機執行速度很快,而時間片段間隔很小,我們感受不到,以為是同時進行的。所以說一個線程在start了之后只是處在了可以運行的狀態,他什么時候運行還是由系統來進行調度的。
那一個線程什么時候會“dead”呢?一般來說當線程對象的run方法執行結束或者在執行中拋出異常的話,那么這個線程就會結束了。系統會自動對“dead”狀態線程進行清理。
如果一個線程t1在執行的過程中需要等待另一個線程t2執行結束后才能運行的話那就可以在t1在調用t2的join()方法
....
def t1(...):
...
t2.join()
...
這樣t1在執行到t2.join()語句后就會等待t2結束后才會繼續運行。
但是假如t1是個死循環的話那么等待就沒有意義了,那怎么辦呢?可以在調用t2的join()方法的時候給一個浮點數做超時參數,這樣這個線程就不會等到花兒也謝了了。我等你10s,你不回來我還不允許我改嫁啊?:)
def t1(...):
...
t2.join(10)
...
如果一個進程的主線程運行完畢而子線程還在執行的話,那么進程就不會退出,直到所有子線程結束為止,如何讓主線程結束的時候其他子線程也乖乖的跟老大撤退呢?那就要把那些不聽話的人設置為聽話的小弟,使用線程對象的setDaemon()方法,參數為bool型。True的話就代表你要聽話,我老大(主線程)扯呼,你也要跟著撤,不能拖后腿。如果是False的話就不用那么聽話了,老大允許你們將在外軍命有所不受的。需要注意的是 setDaemon()方法必須在線程對象沒有調用start()方法之前調用,否則沒效果。
t1 = mythread('t1')
print t1.getName(),t1.isDaemon()
t1.setDaemon(True)
print t1.getName(),t1.isDaemon()
t1.start()
print 'main thread exit'
當執行到 print 'main thread exit' 后,主線程就退出了,當然t1這個線程也跟著結束了。但是如果不使用t1線程對象的setDaemon()方法的話,即便主線程結束了,還要等待t1線程自己結束才能退出進程。isDaemon()是用來獲得一個線程對象的Daemonflag狀態的。
如何來獲得與線程有關的信息呢?
獲得當前正在運行的線程的引用
running = threading.currentThread()
獲得當前所有活動對象(即run方法開始但是未終止的任何線程)的一個列表
threadlist = threading.enumerate()
獲得這個列表的長度
threadcount = threading.activeCount()
查看一個線程對象的狀態調用這個線程對象的isAlive()方法,返回1代表處于“runnable”狀態且沒有“dead”
threadflag = threading.isAlive()
Python線程編程(二)簡單的線程同步
多個執行線程經常要共享數據,如果僅僅讀取共享數據還好,但是如果多個線程要修改共享數據的話就可能出現無法預料的結果。
假如兩個線程對象t1和t2都要對數值num=0進行增1運算,那么t1和t2都各對num修改10次的話,那么num最終的結果應該為20。但是如果當t1取得num的值時(假如此時num為0),系統把t1調度為“sleeping”狀態,而此時t2轉換為“running”狀態,此時t2獲得的 num的值也為0,然后他把num+1的值1賦給num。系統又把t2轉化為“sleeping”狀態,t1為“running”狀態,由于t1已經得到 num值為0,所以他也把num+1的值賦給了num為1。本來是2次增1運行,結果卻是num只增了1次。類似這樣的情況在多線程同時執行的時候是有可能發生的。所以為了防止這類情況的出現就要使用線程同步機制。
最簡單的同步機制就是“鎖”
鎖對象用threading.RLock類創建
mylock = threading.RLock()
如何使用鎖來同步線程呢?線程可以使用鎖的acquire() (獲得)方法,這樣鎖就進入“locked”狀態。每次只有一個線程可以獲得鎖。如果當另一個線程試圖獲得這個鎖的時候,就會被系統變為 “blocked”狀態,直到那個擁有鎖的線程調用鎖的release() (釋放)方法,這樣鎖就會進入“unlocked”狀態。“blocked”狀態的線程就會收到一個通知,并有權利獲得鎖。如果多個線程處于 “blocked”狀態,所有線程都會先解除“blocked”狀態,然后系統選擇一個線程來獲得鎖,其他的線程繼續沉默(“blocked”)。
import threading
mylock = threading.RLock()
class mythread(threading.Thread)
...
def run(self ...):
... #此處 不可以 放置修改共享數據的代碼
mylock.acquire()
... #此處 可以 放置修改共享數據的代碼
mylock.release()
... #此處 不可以 放置修改共享數據的代碼
我們把修改共享數據的代碼稱為“臨界區”,必須將所有“臨界區”都封閉在同一鎖對象的acquire()和release()方法調用之間。
鎖只能提供最基本的同步級別。有時需要更復雜的線程同步,例如只在發生某些事件時才訪問一個臨界區(例如當某個數值改變時)。這就要使用“條件變量”。
條件變量用threading.Condition類創建
mycondition = threading.Condition()
條件變量是如何工作的呢?首先一個線程成功獲得一個條件變量后,調用此條件變量的wait()方法會導致這個線程釋放這個鎖,并進入“blocked” 狀態,直到另一個線程調用同一個條件變量的notify()方法來喚醒那個進入“blocked”狀態的線程。如果調用這個條件變量的 notifyAll()方法的話就會喚醒所有的在等待的線程。
如果程序或者線程永遠處于“blocked”狀態的話,就會發生死鎖。所以如果使用了鎖、條件變量等同步機制的話,一定要注意仔細檢查,防止死鎖情況的發生。對于可能產生異常的臨界區要使用異常處理機制中的finally子句來保證釋放鎖。等待一個條件變量的線程必須用notify()方法顯式的喚醒,否則就永遠沉默。保證每一個wait()方法調用都有一個相對應的notify()調用,當然也可以調用notifyAll()方法以防萬一。
Python線程編程(三)同步隊列
我們經常會采用生產者/消費者關系的兩個線程來處理一個共享緩沖區的數據。例如一個生產者線程接受用戶數據放入一個共享緩沖區里,等待一個消費者線程對數 據取出處理。但是如果緩沖區的太小而生產者和消費者兩個異步線程的速度不同時,容易出現一個線程等待另一個情況。為了盡可能的縮短共享資源并以相同速度工 作的各線程的等待時間,我們可以使用一個“隊列”來提供額外的緩沖區。
創建一個“隊列”對象
import Queue
myqueue = Queue.Queue(maxsize = 10)
Queue.Queue類即是一個隊列的同步實現。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小于1就表示隊列長度無限。
將一個值放入隊列中
myqueue.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,默認為1。如果隊列當前為空且block為1,put()方法就使調用線程暫停,直到空出一個數據單元。如果block為0,put方法將引發Full異常。
將一個值從隊列中取出
myqueue.get()
調用隊列對象的get()方法從隊頭刪除并返回一個項目。可選參數為block,默認為1。如果隊列為空且block為1,get()就使調用線程暫停,直至有項目可用。如果block為0,隊列將引發Empty異常。
我們用一個例子來展示如何使用Queue
# queue_example.py
from Queue import Queue
import threading
import random
import time
# Producer thread
class Producer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'adding',i,'to queue'
self.sharedata.put(i)
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Consumer thread
class Consumer(threading.Thread):
def __init__(self, threadname, queue):
threading.Thread.__init__(self, name = threadname)
self.sharedata = queue
def run(self):
for i in range(20):
print self.getName(),'got a value:',self.sharedata.get()
time.sleep(random.randrange(10)/10.0)
print self.getName(),'Finished'
# Main thread
def main():
queue = Queue()
producer = Producer('Producer', queue)
consumer = Consumer('Consumer', queue)
print 'Starting threads ...'
producer.start()
consumer.start()
producer.join()
consumer.join()
print 'All threads have terminated.'
if __name__ == '__main__':
main()
示例代碼中實現了兩個類:生產者類Producer和消費者類Consumer。前者在一個隨機的時間內放入一個值到隊列queue中然后顯示出來,后者在一定隨機的時間內從隊列queue中取出一個值并顯示出來。