利用 Python yield 創建協程將異步編程同步化
在 Lua 和 Python 等腳本語言中,經常提到一個概念: 協程。也經常會有同學對協程的概念及其作用比較疑惑,本文今天就來探討下協程的前世今生。
0、首先回答兩個大家最關心的問題:
0.1 什么是協程?
本質上協程就是用戶空間下的線程。
0.2 協程的好處是什么?
通俗易懂的回答:
-
讓原來要使用 異步 + 回調 方式寫的非人類代碼,可以用看似同步的方式寫出來。
</li> </ul>-
無需線程上下文切換、原子操作鎖定及同步的開銷。
</li> </ul>1、回顧同步與異步編程
同步編程即線性化編程,代碼按照既定順序執行,上一條語句執行完才會執行下一條,否則就一直等在那里。
但是許多實際操作都是CPU 密集型任務和 IO 密集型任務,比如網絡請求,此時不能讓這些任務阻塞主線程的工作,于是就會采用異步編程。
異步的標準元素就是回調函數(Callback, 后來衍生出Promise/Deferred概念),主線程發起一個異步任務,讓其自己到一邊去工作,當其完成后,會通過執行預先指定的回調函數完成后續任務,然后返回主線程。在異步任務執行過程中,主線程無需等待和阻塞,可以繼續處理其他任務。
下例大家并不陌生,是jQuery標準發送http異步請求的方式。
$.ajax({ url:"/echo/json/", success: function(response) { console.info(response.name); } });
而并發的核心思想在于,大的任務可以分解成一系列的子任務,后者可以被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。
2、回顧多線程編程
當主線程發起異步任務,這個任務跑到哪里去工作了呢?這就說到多線程(包括多進程)編程,一個主線程可以主動創建多個子線程,然后將任務交給子線程,每個子線程擁有自己的堆棧空間。操作系統可以通過分時的方式讓同一個CPU輪流調度各個線程,編程人員無需關心操作系統是如何工作的。
但是如果需要在多個線程之間通信,則需要編程人員自己寫代碼來控制線程之間的協作(利用鎖或信號量)以及通信(利用管道、隊列等)。
2.1 經典的Producer-Consumer問題
這個問題說的是有兩方進行通信和協作,一方只負責生產內容,另一方只負責消費內容。消費者并不知道,也無需知道生產者何時生產,只是當有內容生產出來負責消費即可,沒有內容時就等待。這是一個經典的異步問題。
2.1.1 Threading/Queue方案
傳統的解決方案即是采用多線程來實現,生產者和消費者分別處于不同的線程或進程中,由操作系統進行調度。來看一篇經典的多線程教程中的例子,是不是很像Java風格?——啰嗦。
import threading import time import logging import random import Queue
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-9s) %(message)s',)
BUF_SIZE = 10 q = Queue.Queue(BUF_SIZE)
class ProducerThread(threading.Thread): def init(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): super(ProducerThread,self).init() self.target = target self.name = name
def run(self): while True: if not q.full(): item = random.randint(1,10) q.put(item) logging.debug('Putting ' + str(item) + ' : ' + str(q.qsize()) + ' items in queue') time.sleep(random.random()) return
class ConsumerThread(threading.Thread): def init(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): super(ConsumerThread,self).init() self.target = target self.name = name return
def run(self): while True: if not q.empty(): item = q.get() logging.debug('Getting ' + str(item) + ' : ' + str(q.qsize()) + ' items in queue') time.sleep(random.random()) return
if name == 'main': p = ProducerThread(name='producer') c = ConsumerThread(name='consumer')
p.start() time.sleep(2) c.start() time.sleep(2)</pre>
2.1.2 MessageQueue方案
基于多線程方案,這個問題已經演變成消息中介模式(有些公司喜歡稱之為”郵局”),有各種的商業MQ方案可以直接使用。
這里以RabbitMQ開源方案為例,Producer一方向名為隊列中發送”Hello World!”內容,而Consumer一方則監聽隊列,當有內容進入隊列時,就執行callback函數來收取并處理內容。發送與收取的動作是異步執行的,互不干擾。
###### Producer ########
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
# Consumer
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body): print " [x] Received %r" % (body,)
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()</pre>
3、yield與協程
3.1 何為協程(Coroutine)及yield
python采用了GIL(Global Interpretor Lock,全局解釋器鎖),默認所有任務都是在同一進程中執行的。(當然,可以借助多進程多線程來實現并行化。)我們調用一個普通的Python函數時,一般是從函數的第一行代碼開始執行,結束于return語句、異常或者函數結束(可以看作隱式的返回None)。一旦函數將控制權交還給調用者,就意味著全部結束。函數中做的所有工作以及保存在局部變量中的數據都將丟失。再次調用這個函數時,一切都將從頭創建。
實現一個用戶態線程有兩個必須要處理的問題:一是碰著阻塞式 I/O 會導致整個進程被掛起;二是由于缺乏時鐘阻塞,進程需要自己擁有調度線程的能力。如果一種實現使得每個線程需要自己通過調用某個方法,主動交出控制權。那么我們就稱這種用戶態線程是協作式的,即是協程。
本質上協程就是用戶空間下的線程。
在 Python 中,所謂協程(Coroutine)就是在同一進程/線程中,利用生成器(generator)來”同時”執行多個函數(routine)。
Python的中yield關鍵字與Coroutine說的是一件事情,先看看yield的基本用法。
任何包含yield關鍵字的函數都會自動成為生成器(generator)對象,里面的代碼一般是一個有限或無限循環結構,每當第一次調用該函數時,會執行到yield代碼為止并返回本次迭代結果,yield指令起到的是return關鍵字的作用。然后函數的堆棧會自動凍結(freeze)在這一行。當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時,就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。通過這種方式,迭代器可以實現無限序列和惰性求值。
看一個用生成器來計算100以內斐波那契數列的例子。我們先用普通遞歸方式來進行計算。
a = b = 1 while a < 100: a, b = b, a + b print a,
再來用yield和生成器來計算斐波那契數列,該函數形成一個無限循環的生成器,由函數調用者顯式地控制迭代次數。
#!/usr/bin/env python
coding=utf-8
測試utf-8編碼
import sys
reload(sys) sys.setdefaultencoding('utf-8')
def fibonacci(): a = b = 1 # yield則像是generator函數的返回結果 yield a yield b while True: a, b = b, a+b # yield唯一所做的另一件事就是保存一個generator函數的狀態, # generator就是一個特殊類型的迭代器(iterator) yield b
num = 0 fib = fibonacci() while num < 100: # 和迭代器相似,我們可以通過使用next()來從generator中獲取下一個值,也可以通過隱式地調用next()來忽略一些值 num = next(fib) print num, # 1 1 2 3 5 8 13 21 34 55 89 144</pre>
總而言之,生成器(以及yield語句)最初的引入是為了讓程序員可以更簡單的編寫用來產生值的序列的代碼。 以前,要實現類似隨機數生成器的東西,需要實現一個類或者一個模塊,在生成數據的同時保持對每次調用之間狀態的跟蹤。引入生成器之后,這變得非常簡單。
-
yield則像是generator函數的返回結果
</li> -
yield唯一所做的另一件事就是保存一個generator函數的狀態
</li> -
generator就是一個特殊類型的迭代器(iterator)
</li> -
和迭代器相似,我們可以通過使用next()來從generator中獲取下一個值
</li> -
通過隱式地調用next()來忽略一些值
</li> </ul>3.2 用yield實現協程調度的原理
我們現在利用yield關鍵字會自動凍結函數堆棧的特性,想象一下,假如現在有兩個函數f1()和f2(),各自包含yield語句,見下例。主線程先啟動f1(), 當f1()執行到yield的時候,暫時返回。這時主線程可以將執行權交給f2(),執行到f2()的yield后,可以再將執行權交給f1(),從而實現了在同一線程中交錯執行f1()和f2()。f1()與f2()就是協同執行的程序,故名協程。
我們嘗試用yield建立協程,來解決Producer-Consumer問題。
# -*- coding: utf-8 -*- import random def get_data(): """返回0到9之間的3個隨機數,模擬異步操作""" return random.sample(range(10), 3) def consume(): """顯示每次傳入的整數列表的動態平均值""" running_sum = 0 data_items_seen = 0 while True: print('Waiting to consume') data = yield data_items_seen += len(data) running_sum += sum(data) print('Consumed, the running average is {}'.format(running_sum / float(data_items_seen))) def produce(consumer): """產生序列集合,傳遞給消費函數(consumer)""" while True: data = get_data() print('Produced {}'.format(data)) consumer.send(data) yield if __name__ == '__main__': consumer = consume() consumer.send(None) producer = produce(consumer) for _ in range(10): print('Producing...') next(producer)
如果你沒看明白,那還可以把上面的例子再寫的通熟易懂些, 不做任何邏輯處理:
#!/usr/bin/env python
coding=utf-8
測試utf-8編碼
import sys, time
reload(sys) sys.setdefaultencoding('utf-8')
""" 傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。 如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高。 """
注意到consumer函數是一個generator(生成器):
任何包含yield關鍵字的函數都會自動成為生成器(generator)對象
def consumer(): r = '' while True: # 3、consumer通過yield拿到消息,處理,又通過yield把結果傳回; # yield指令具有return關鍵字的作用。然后函數的堆棧會自動凍結(freeze)在這一行。 # 當函數調用者的下一次利用next()或generator.send()或for-in來再次調用該函數時, # 就會從yield代碼的下一行開始,繼續執行,再返回下一次迭代結果。通過這種方式,迭代器可以實現無限序列和惰性求值。 n = yield r if not n: return print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK'
def produce(c): # 1、首先調用c.next()啟動生成器 c.next() n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) # 2、然后,一旦生產了東西,通過c.send(n)切換到consumer執行; cr = c.send(n) # 4、produce拿到consumer處理的結果,繼續生產下一條消息; print('[PRODUCER] Consumer return: %s' % cr) # 5、produce決定不生產了,通過c.close()關閉consumer,整個過程結束。 c.close()
if name=='main': # 6、整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。 c = consumer() produce(c)
運行結果:
[PRODUCER] →→ Producing 1...
[CONSUMER] ←← Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 2...
[CONSUMER] ←← Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 3...
[CONSUMER] ←← Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 4...
[CONSUMER] ←← Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] →→ Producing 5...
[CONSUMER] ←← Consuming 5...
[PRODUCER] Consumer return: 200 OK</pre>
下圖將控制流形象化,就像在調試器中單步執行整個程序,以說明上下文切換如何發生。
注意:
-
在任何時刻,只有一個協程在運行。
</li> -
協程就是用戶態線程(邏輯流是共享一個地址空間的,不用特別麻煩的切換頁表、刷新TLB,避免反復系統調用,還有進程切換造成的開銷),比內核線程低廉(核心的操作需要陷入內核(kernel),切換到操作系統),切換阻塞成本低; 單調度器下,訪問共享資源無需上鎖,用于提高cpu單核的并發能力。
</li> </ul>4、異步編程同步化
4.1 不再需要回調
看一下Python官方的例子,利用一個@gen.coroutine裝飾器來簡化代碼編寫,原本調用-回調兩段邏輯,現在被放在了一起,yield充當了回調的入口。這就是異步編程同步化!
原始的回調編程模式:
class AsyncHandler(RequestHandler): @asynchronous def get(self): http_client = AsyncHTTPClient() http_client.fetch("
def on_fetch(self, response): do_something_with_response(response) self.render("template.html")</pre>
同步化編程后的結果:
class GenAsyncHandler(RequestHandler): @gen.coroutine def get(self): http_client = AsyncHTTPClient() response = yield http_client.fetch("http://example.com") do_something_with_response(response) self.render("template.html")
關于這個裝飾器的實現方式,可以參見 http://my.oschina.net/u/877348/blog/184058
4.2 Gevent 與 Greenlet 庫
看了上述代碼,你是不是覺得利用協程就可以將并發編程全部同步化了?錯!
仔細想想,即使用了協程,同一時間仍然只能有一段代碼得到執行,此時如果有同步的I/O任務,則仍會存在阻塞想象。除非…除非將I/O任務自動并發掉,才有可能真正利用協程來將大量異步并發任務同步化!注意這里的http_client是異步網絡庫,非同步阻塞庫。一般是需要回調,但利用協程對get()函數同步化以后,當執行到yield時,相當于發出了多個網絡請求,然后掛起這個get()函數,其他協程將得到調度。當異步網絡請求都已返回且協程調度有空閑時,會調用get.send(),繼續這個協程,以同步化編程的方式繼續完成原先放在回調函數中的邏輯。上例中網絡請求如果采用普通的urllib.urlopen()就不行了。
慢著,如果urllib.urlopen()能夠異步執行,那不就行了?
這就是Greenlet庫所做的,它是以C擴展模塊形式接入Python的輕量級協程,將一些原本同步運行的網絡庫以mockey_patch的方式進行了重寫。Greenlets全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
而Gevent庫則是基于Greenlet,實現了協程調度功能。將多個函數spawn為協程,然后join到一起,如此簡單!
看一個Gevent的官方例子:
import gevent.monkey gevent.monkey.patch_socket()
import gevent import urllib2 import simplejson as json
def fetch(pid): response = urllib2.urlopen('
print('Process %s: %s' % (pid, datetime)) return json_result['datetime']
def synchronous(): for i in range(1,10): fetch(i)
def asynchronous(): threads = [] for i in range(1,10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads)
print('Synchronous:') synchronous()
print('Asynchronous:') asynchronous()</pre>
4.3 multiprocessing.dummy.ThreadPool 庫
實現異步編程同步化還有一個方法,就是利用的map()函數。這個函數我們并不陌生,它可以在一個序列上實現某個函數之間的映射。
results = map(urllib2.urlopen, ['http://www.yahoo.com', 'http://www.reddit.com'])
上述代碼對會依次訪問每個url,不過因為只有一個進程,后一個urlopen仍然需要等待前一個urlopen完成后才會進行,仍然是一種串行的方式。但是,只要借助正確的庫,map()也可以輕松實現并行化操作,那就是multiprocessing庫。
這個庫以及其鮮為人知的子庫multiprocessing.dummy,一個用于多進程,一個用于多線程。后者提供改良的map()函數,可以自動將多個異步任務,分配到多個線程上,編程人員無需關注,也就自然地把異步編程轉為了同步編程的風格。IO 密集型任務選擇multiprocessing.dummy,CPU 密集型任務選擇multiprocessing。
前述那個教科書式的例子,可以改寫為:
import urllib2 from multiprocessing.dummy import Pool as ThreadPool urls = [ '
Make the Pool of workers
pool = ThreadPool()
Open the urls in their own threads and return the results
results = pool.map(urllib2.urlopen, urls)
close the pool and wait for the work to finish
print results pool.close() pool.join()</pre>
關于map()函數和yield關鍵字的解釋,請參考 @申導 的另一篇文章《Python函數式編程》
5、Refer:
[1] 利用python yielding創建協程將異步編程同步化
[2] 協程
[3] Python 中的進程、線程、協程、同步、異步、回調
http://segmentfault.com/a/1190000001813992
[4] 淺談coroutine與gevent
http://blog.ez2learn.com/2010/07/17/talk-about-coroutine-and-gevent/
[5] gevent程序員指南
http://xlambda.com/gevent-tutorial/
[6] 協程的好處是什么?
http://www.zhihu.com/question/20511233
[7] python中的協程(yield)內部是怎么實現的?python和lua在yield的實現原理上有什么區別?
http://www.zhihu.com/question/30133749
[8] Python中多繼承與super()用法
http://www.jackyshen.com/2015/08/19/multi-inheritance-with-super-in-Python/
[9] Python 并行任務技巧
-
-
-