gevent程序員指南
<header> <h1><span class="green">gevent</span>程序員指南</h1>
<h3 class="author">
由Gevent社區編寫
</h3>
<blockquote>
gevent是一個基于<a href="/misc/goto?guid=4958852232882669364">libev</a>的并發庫。它為各種并發和網絡相關的任務提供了整潔的API。
</blockquote>
</header> <div class="toc">
- 介紹
- 貢獻者 </ul> </li>
- 核心部分
- Greenlets
- 同步和異步執行
- 確定性
- 創建Greenlets
- Greenlet狀態
- 程序停止
- 超時
- 猴子補丁(Monkey patching) </ul> </li>
- 數據結構
- 事件
- 隊列
- 組和池
- 鎖和信號量
- 線程局部變量
- 子進程
- Actors </ul> </li>
- 真實世界的應用
- Gevent ZeroMQ
- 簡單server
- WSGI Servers
- 流式server
- Long Polling
- Websockets
- 聊天server </ul> </li> </ul> </div>
started
-- Boolean, 指示此Greenlet是否已經啟動ready()
-- Boolean, 指示此Greenlet是否已經停止successful()
-- Boolean, 指示此Greenlet是否已經停止而且沒拋異常value
-- 任意值, 此Greenlet代碼返回的值exception
-- 異常, 此Greenlet內拋出的未捕獲異常
</ul>
-
在兼容POSIX的系統創建子進程(forking)之后, 在子進程的gevent的狀態是不適定的(ill-posed)。一個副作用就是,
</li>multiprocessing.Process
創建之前的greenlet創建動作,會在父進程和子進程兩 方都運行。 -
上例的
</li>put_msg()
中的a.send()
可能依然非協作式地阻塞調用的線程:一個 ready-to-write事件只保證寫了一個byte。在嘗試寫完成之前底下的buffer可能是滿的。 -
上面表示的基于
</li> </ul>wait_write()
/wait_read()
的方法在Windows上不工作 (IOError: 3 is not a socket (files are not supported)
),因為Windows不能監視 pipe事件。Python包gipc以大體上透明的方式在 兼容POSIX系統和Windows上克服了這些挑戰。它提供了gevent感知的基于
multiprocessing.Process
的子進程和gevent基于pipe的協作式進程間通信。Actors
actor模型是一個由于Erlang變得普及的更高層的并發模型。 簡單的說它的主要思想就是許多個獨立的Actor,每個Actor有一個可以從 其它Actor接收消息的收件箱。Actor內部的主循環遍歷它收到的消息,并 根據它期望的行為來采取行動。
Gevent沒有原生的Actor類型,但在一個子類化的Greenlet內使用隊列, 我們可以定義一個非常簡單的。
import gevent from gevent.queue import Queue
class Actor(gevent.Greenlet):
def __init__(self): self.inbox = Queue() Greenlet.__init__(self) def receive(self, message): """ Define in your subclass. """ raise NotImplemented() def _run(self): self.running = True while self.running: message = self.inbox.get() self.receive(message)</code> </pre> <p>下面是一個使用的例子:</p>
import gevent from gevent.queue import Queue from gevent import Greenlet
class Pinger(Actor): def receive(self, message): print(message) pong.inbox.put('ping') gevent.sleep(0)
class Ponger(Actor): def receive(self, message): print(message) ping.inbox.put('pong') gevent.sleep(0)
ping = Pinger() pong = Ponger()
ping.start() pong.start()
ping.inbox.put('start') gevent.joinall([ping, pong])</code> </pre>
真實世界的應用
Gevent ZeroMQ
ZeroMQ 被它的作者描述為 “一個表現得像一個并發框架的socket庫”。 它是一個非常強大的,為構建并發和分布式應用的消息傳遞層。
ZeroMQ提供了各種各樣的socket原語。最簡單的是請求-應答socket對 (Request-Response socket pair)。一個socket有兩個方法
send
和recv
, 兩者一般都是阻塞操作。但是Travis Cline 的一個杰出的庫彌補了這一點,這個庫使用gevent.socket來以非阻塞的方式 輪詢ZereMQ socket。通過命令:pip install gevent-zeromq
你可以從PyPi安裝gevent-zeremq。
# Note: Remember to
pip install pyzmq gevent_zeromq
import gevent from gevent_zeromq import zmqGlobal Context
context = zmq.Context()
def server(): server_socket = context.socket(zmq.REQ) server_socket.bind("tcp://127.0.0.1:5000")
for request in range(1,10): server_socket.send("Hello") print('Switched to Server for %s' % request) # Implicit context switch occurs here server_socket.recv()
def client(): client_socket = context.socket(zmq.REP) client_socket.connect("tcp://127.0.0.1:5000")
for request in range(1,10): client_socket.recv() print('Switched to Client for %s' % request) # Implicit context switch occurs here client_socket.send("World")
publisher = gevent.spawn(server) client = gevent.spawn(client)
gevent.joinall([publisher, client])</code></pre>
Switched to Server for 1 Switched to Client for 1 Switched to Server for 2 Switched to Client for 2 Switched to Server for 3 Switched to Client for 3 Switched to Server for 4 Switched to Client for 4 Switched to Server for 5 Switched to Client for 5 Switched to Server for 6 Switched to Client for 6 Switched to Server for 7 Switched to Client for 7 Switched to Server for 8 Switched to Client for 8 Switched to Server for 9 Switched to Client for 9
簡單server
# On Unix: Access with
$ nc 127.0.0.1 5000
On Window: Access with
$ telnet 127.0.0.1 5000
from gevent.server import StreamServer
def handle(socket, address): socket.send("Hello from a telnet!\n") for i in range(5): socket.send(str(i) + '\n') socket.close()
server = StreamServer(('127.0.0.1', 5000), handle) server.serve_forever()</code> </pre>
WSGI Servers
Gevent為HTTP內容服務提供了兩種WSGI server。從今以后就稱為
wsgi
和pywsgi
:- gevent.wsgi.WSGIServer
- gevent.pywsgi.WSGIServer </ul>
在1.0.x之前更早期的版本里,gevent使用libevent而不是libev。 Libevent包含了一個快速HTTP server,它被用在gevent的
wsgi
server。在gevent 1.0.x版本,沒有包括http server了。作為替代,
gevent.wsgi
現在是純Python servergevent.pywsgi
的一個別名。流式server
這個章節不適用于gevent 1.0.x版本
熟悉流式HTTP服務(streaming HTTP service)的人知道,它的核心思想 就是在頭部(header)不指定內容的長度。反而,我們讓連接保持打開, 在每塊數據前加一個16進制字節來指示數據塊的長度,并將數據刷入pipe中。 當發出一個0長度數據塊時,流會被關閉。
HTTP/1.1 200 OK Content-Type: text/plain Transfer-Encoding: chunked
8 <p>Hello
9 World</p>
0</code></pre>
上述的HTTP連接不能在wsgi中創建,因為它不支持流式。 請求只有被緩沖(buffered)下來。
from gevent.wsgi import WSGIServer
def application(environ, start_response): status = '200 OK' body = '<p>Hello World</p>'
headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body]
WSGIServer(('', 8000), application).serve_forever()</code> </pre>
然而使用pywsgi我們可以將handler寫成generator,并以塊的形式yield出結果。
from gevent.pywsgi import WSGIServer
def application(environ, start_response): status = '200 OK'
headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) yield "<p>Hello" yield "World</p>"
WSGIServer(('', 8000), application).serve_forever()</code> </pre>
但無論如何,與其它Python server相比gevent server性能是顯勝的。 Libev是得到非常好審查的技術,由它寫出的server在大規模上表現優異為人熟知。
為了測試基準,試用Apache Benchmark
ab
或瀏覽Benchmark of Python WSGI Servers 來與其它server作對比。$ ab -n 10000 -c 100 http://127.0.0.1:8000/
Long Polling
import gevent from gevent.queue import Queue, Empty from gevent.pywsgi import WSGIServer import simplejson as json
data_source = Queue()
def producer(): while True: data_source.put_nowait('Hello World') gevent.sleep(1)
def ajax_endpoint(environ, start_response): status = '200 OK' headers = [ ('Content-Type', 'application/json') ]
start_response(status, headers) while True: try: datum = data_source.get(timeout=5) yield json.dumps(datum) + '\n' except Empty: pass
gevent.spawn(producer)
WSGIServer(('', 8000), ajax_endpoint).serve_forever()</code> </pre>
Websockets
運行Websocket的例子需要gevent-websocket包。
# Simple gevent-websocket server import json import random
from gevent import pywsgi, sleep from geventwebsocket.handler import WebSocketHandler
class WebSocketApp(object): '''Send random data to the websocket'''
def __call__(self, environ, start_response): ws = environ['wsgi.websocket'] x = 0 while True: data = json.dumps({'x': x, 'y': random.randint(1, 5)}) ws.send(data) x += 1 sleep(0.5)
server = pywsgi.WSGIServer(("", 10000), WebSocketApp(), handler_class=WebSocketHandler) server.serve_forever()</code> </pre>
HTML Page:
<html> <head> <title>Minimal websocket application</title> <script type="text/javascript" src="jquery.min.js"></script> <script type="text/javascript"> $(function() { // Open up a connection to our server var ws = new WebSocket("ws://localhost:10000/");
// What do we do when we get a message? ws.onmessage = function(evt) { $("#placeholder").append('<p>' + evt.data + '</p>') } // Just update our conn_status field with the connection status ws.onopen = function(evt) { $('#conn_status').html('<b>Connected</b>'); } ws.onerror = function(evt) { $('#conn_status').html('<b>Error</b>'); } ws.onclose = function(evt) { $('#conn_status').html('<b>Closed</b>'); } }); </script> </head> <body> <h1>WebSocket Example</h1> <div id="conn_status">Not Connected</div> <div id="placeholder" style="width:600px;height:300px;"></div> </body>
</html></code></pre>
聊天server
最后一個生動的例子,實現一個實時聊天室。運行這個例子需要Flask (你可以使用Django, Pyramid等,但不是必須的)。 對應的Javascript和HTML文件可以在這里找到。
# Micro gevent chatroom.
----------------------
from flask import Flask, render_template, request
from gevent import queue from gevent.pywsgi import WSGIServer
import simplejson as json
app = Flask(name) app.debug = True
rooms = { 'topic1': Room(), 'topic2': Room(), }
users = {}
class Room(object):
def __init__(self): self.users = set() self.messages = [] def backlog(self, size=25): return self.messages[-size:] def subscribe(self, user): self.users.add(user) def add(self, message): for user in self.users: print(user) user.queue.put_nowait(message) self.messages.append(message)
class User(object):
def __init__(self): self.queue = queue.Queue()
@app.route('/') def choose_name(): return render_template('choose.html')
@app.route('/<uid>') def main(uid): return render_template('main.html', uid=uid, rooms=rooms.keys() )
@app.route('/<room>/<uid>') def join(room, uid): user = users.get(uid, None)
if not user: users[uid] = user = User() active_room = rooms[room] active_room.subscribe(user) print('subscribe %s %s' % (active_room, user)) messages = active_room.backlog() return render_template('room.html', room=room, uid=uid, messages=messages)
@app.route("/put/<room>/<uid>", methods=["POST"]) def put(room, uid): user = users[uid] room = rooms[room]
message = request.form['message'] room.add(':'.join([uid, message])) return ''
@app.route("/poll/<uid>", methods=["POST"]) def poll(uid): try: msg = users[uid].queue.get(timeout=10) except queue.Empty: msg = [] return json.dumps(msg)
if name == "main": http = WSGIServer(('', 5000), app) http.serve_forever()</code> </pre> </code></code></code></code></code></div>
介紹
本指南假定讀者有中級Python水平,但不要求有其它更多的知識,不期待讀者有 并發方面的知識。本指南的目標在于給予你需要的工具來開始使用gevent,幫助你 馴服現有的并發問題,并從今開始編寫異步應用程序。
貢獻者
按提供貢獻的時間先后順序列出如下:Stephen Diehl Jérémy Bethmont sww Bruno Bigras David Ripton Travis Cline Boris Feld youngsterxyf Eddie Hebert Alexis Metaireau Daniel Velkov
同時感謝Denis Bilenko寫了gevent和相應的指導以形成本指南。
這是一個以MIT許可證發布的協作文檔。你想添加一些內容?或看見一個排版錯誤? Fork一個分支發布一個request到Github. 我們歡迎任何貢獻。
本頁也有日文版本。
核心部分
Greenlets
在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程。 Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
在任何時刻,只有一個協程在運行。
</blockquote>這與
multiprocessing
或threading
等提供真正并行構造的庫是不同的。 這些庫輪轉使用操作系統調度的進程和線程,是真正的并行。同步和異步執行
并發的核心思想在于,大的任務可以分解成一系列的子任務,后者可以被調度成 同時執行或異步執行,而不是一次一個地或者同步地執行。兩個子任務之間的 切換也就是上下文切換。
在gevent里面,上下文切換是通過yielding來完成的. 在下面的例子里, 我們有兩個上下文,通過調用
gevent.sleep(0)
,它們各自yield向對方。import gevent
def foo(): print('Running in foo') gevent.sleep(0) print('Explicit context switch to foo again')
def bar(): print('Explicit context to bar') gevent.sleep(0) print('Implicit context switch back to bar')
gevent.joinall([ gevent.spawn(foo), gevent.spawn(bar), ])</code></pre>
Running in foo Explicit context to bar Explicit context switch to foo again Implicit context switch back to bar
下圖將控制流形象化,就像在調試器中單步執行整個程序,以說明上下文切換如何發生。
當我們在受限于網絡或IO的函數中使用gevent,這些函數會被協作式的調度, gevent的真正能力會得到發揮。Gevent處理了所有的細節, 來保證你的網絡庫會在可能的時候,隱式交出greenlet上下文的執行權。 這樣的一種用法是如何強大,怎么強調都不為過。或者我們舉些例子來詳述。
下面例子中的
select()
函數通常是一個在各種文件描述符上輪詢的阻塞調用。import time import gevent from gevent import select
start = time.time() tic = lambda: 'at %1.1f seconds' % (time.time() - start)
def gr1():
# Busy waits for a second, but we don't want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic())
def gr2():
# Busy waits for a second, but we don't want to stick around... print('Started Polling: %s' % tic()) select.select([], [], [], 2) print('Ended Polling: %s' % tic())
def gr3(): print("Hey lets do some stuff while the greenlets poll, %s" % tic()) gevent.sleep(1)
gevent.joinall([ gevent.spawn(gr1), gevent.spawn(gr2), gevent.spawn(gr3), ])</code></pre>
Started Polling: at 0.0 seconds Started Polling: at 0.0 seconds Hey lets do some stuff while the greenlets poll, at 0.0 seconds Ended Polling: at 2.0 seconds Ended Polling: at 2.0 seconds
下面是另外一個多少有點人造色彩的例子,定義一個非確定性的(non-deterministic) 的
task
函數(給定相同輸入的情況下,它的輸出不保證相同)。 此例中執行這個函數的副作用就是,每次task在它的執行過程中都會隨機地停某些秒。import gevent import random
def task(pid): """ Some non-deterministic task """ gevent.sleep(random.randint(0,2)*0.001) print('Task %s done' % pid)
def synchronous(): for i in range(1,10): task(i)
def asynchronous(): threads = [gevent.spawn(task, i) for i in xrange(10)] gevent.joinall(threads)
print('Synchronous:') synchronous()
print('Asynchronous:') asynchronous()</code></pre>
Synchronous: Task 1 done Task 2 done Task 3 done Task 4 done Task 5 done Task 6 done Task 7 done Task 8 done Task 9 done Asynchronous: Task 3 done Task 7 done Task 9 done Task 2 done Task 4 done Task 1 done Task 8 done Task 6 done Task 0 done Task 5 done
上例中,在同步的部分,所有的task都同步的執行, 結果當每個task在執行時主流程被阻塞(主流程的執行暫時停住)。
程序的重要部分是將task函數封裝到Greenlet內部線程的
gevent.spawn
。 初始化的greenlet列表存放在數組threads
中,此數組被傳給gevent.joinall
函數,后者阻塞當前流程,并執行所有給定的greenlet。執行流程只會在 所有greenlet執行完后才會繼續向下走。要重點留意的是,異步的部分本質上是隨機的,而且異步部分的整體運行時間比同步 要大大減少。事實上,同步部分的最大運行時間,即是每個task停0.002秒,結果整個 隊列要停0.02秒。而異步部分的最大運行時間大致為0.002秒,因為沒有任何一個task會 阻塞其它task的執行。
一個更常見的應用場景,如異步地向服務器取數據,取數據操作的執行時間 依賴于發起取數據請求時遠端服務器的負載,各個請求的執行時間會有差別。
import gevent.monkey gevent.monkey.patch_socket()
import gevent import urllib2 import simplejson as json
def fetch(pid): response = urllib2.urlopen('
print('Process %s: %s' % (pid, datetime)) return json_result['datetime']
def synchronous(): for i in range(1,10): fetch(i)
def asynchronous(): threads = [] for i in range(1,10): threads.append(gevent.spawn(fetch, i)) gevent.joinall(threads)
print('Synchronous:') synchronous()
print('Asynchronous:') asynchronous()</code> </pre>
確定性
就像之前所提到的,greenlet具有確定性。在相同配置相同輸入的情況下,它們總是 會產生相同的輸出。下面就有例子,我們在multiprocessing的pool之間執行一系列的 任務,與在gevent的pool之間執行作比較。
import time
def echo(i): time.sleep(0.001) return i
Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)
Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10) run1 = [a for a in p.imap_unordered(echo, xrange(10))] run2 = [a for a in p.imap_unordered(echo, xrange(10))] run3 = [a for a in p.imap_unordered(echo, xrange(10))] run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)</code> </pre>
False True
即使gevent通常帶有確定性,當開始與如socket或文件等外部服務交互時, 不確定性也可能溜進你的程序中。因此盡管gevent線程是一種“確定的并發”形式, 使用它仍然可能會遇到像使用POSIX線程或進程時遇到的那些問題。
涉及并發長期存在的問題就是競爭條件(race condition)。簡單來說, 當兩個并發線程/進程都依賴于某個共享資源同時都嘗試去修改它的時候, 就會出現競爭條件。這會導致資源修改的結果狀態依賴于時間和執行順序。 這是個問題,我們一般會做很多努力嘗試避免競爭條件, 因為它會導致整個程序行為變得不確定。
最好的辦法是始終避免所有全局的狀態。全局狀態和導入時(import-time)副作用總是會 反咬你一口!
創建Greenlets
gevent對Greenlet初始化提供了一些封裝,最常用的使用模板之一有
import gevent from gevent import Greenlet
def foo(message, n): """ Each thread will be passed the message, and n arguments in its initialization. """ gevent.sleep(n) print(message)
Initialize a new Greenlet instance running the named function
foo
thread1 = Greenlet.spawn(foo, "Hello", 1)
Wrapper for creating and running a new Greenlet from the named
function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)
Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)
threads = [thread1, thread2, thread3]
Block until all threads complete.
gevent.joinall(threads)</code></pre>
Hello I live!
除使用基本的Greenlet類之外,你也可以子類化Greenlet類,重載它的
_run
方法。import gevent from gevent import Greenlet
class MyGreenlet(Greenlet):
def __init__(self, message, n): Greenlet.__init__(self) self.message = message self.n = n def _run(self): print(self.message) gevent.sleep(self.n)
g = MyGreenlet("Hi there!", 3) g.start() g.join()</code></pre>
Hi there!
Greenlet狀態
就像任何其他成段代碼,Greenlet也可能以不同的方式運行失敗。 Greenlet可能未能成功拋出異常,不能停止運行,或消耗了太多的系統資源。
一個greenlet的狀態通常是一個依賴于時間的參數。在greenlet中有一些標志, 讓你可以監視它的線程內部狀態:
import gevent
def win(): return 'You win!'
def fail(): raise Exception('You fail at failing.')
winner = gevent.spawn(win) loser = gevent.spawn(fail)
print(winner.started) # True print(loser.started) # True
Exceptions raised in the Greenlet, stay inside the Greenlet.
try: gevent.joinall([winner, loser]) except Exception as e: print('This will never be reached')
print(winner.value) # 'You win!' print(loser.value) # None
print(winner.ready()) # True print(loser.ready()) # True
print(winner.successful()) # True print(loser.successful()) # False
The exception raised in fail, will not propogate outside the
greenlet. A stack trace will be printed to stdout but it
will not unwind the stack of the parent.
print(loser.exception)
It is possible though to raise the exception again outside
raise loser.exception
or with
loser.get()</code></pre>
True True You win! None True True True False You fail at failing.
程序停止
當主程序(main program)收到一個SIGQUIT信號時,不能成功做yield操作的 Greenlet可能會令意外地掛起程序的執行。這導致了所謂的僵尸進程, 它需要在Python解釋器之外被kill掉。
對此,一個通用的處理模式就是在主程序中監聽SIGQUIT信號,在程序退出 調用
gevent.shutdown
。import gevent import signal
def run_forever(): gevent.sleep(1000)
if name == 'main': gevent.signal(signal.SIGQUIT, gevent.shutdown) thread = gevent.spawn(run_forever) thread.join()</code> </pre>
超時
超時是一種對一塊代碼或一個Greenlet的運行時間的約束。
import gevent from gevent import Timeout
seconds = 10
timeout = Timeout(seconds) timeout.start()
def wait(): gevent.sleep(10)
try: gevent.spawn(wait).join() except Timeout: print('Could not complete')</code> </pre>
超時類也可以用在上下文管理器(context manager)中, 也就是with語句內。
import gevent from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception): pass
with Timeout(time_to_wait, TooLong): gevent.sleep(10)</code> </pre>
另外,對各種Greenlet和數據結構相關的調用,gevent也提供了超時參數。 例如:
import gevent from gevent import Timeout
def wait(): gevent.sleep(2)
timer = Timeout(1).start() thread1 = gevent.spawn(wait)
try: thread1.join(timeout=timer) except Timeout: print('Thread 1 timed out')
--
timer = Timeout.start_new(1) thread2 = gevent.spawn(wait)
try: thread2.get(timeout=timer) except Timeout: print('Thread 2 timed out')
--
try: gevent.with_timeout(1, wait) except Timeout: print('Thread 3 timed out')</code></pre>
Thread 1 timed out Thread 2 timed out Thread 3 timed out
猴子補丁(Monkey patching)
我們現在來到gevent的死角了. 在此之前,我已經避免提到猴子補丁(monkey patching) 以嘗試使gevent這個強大的協程模型變得生動有趣,但現在到了討論猴子補丁的黑色藝術 的時候了。你之前可能注意到我們提到了
monkey.patch_socket()
這個命令,這個 純粹副作用命令是用來改變標準socket庫的。import socket print(socket.socket)
print("After monkey patch") from gevent import monkey monkey.patch_socket() print(socket.socket)
import select print(select.select) monkey.patch_select() print("After monkey patch") print(select.select)</code> </pre>
class 'socket.socket' After monkey patch class 'gevent.socket.socket'
built-in function select After monkey patch function select at 0x1924de8</code> </pre>
Python的運行環境允許我們在運行時修改大部分的對象,包括模塊,類甚至函數。 這是個一般說來令人驚奇的壞主意,因為它創造了“隱式的副作用”,如果出現問題 它很多時候是極難調試的。雖然如此,在極端情況下當一個庫需要修改Python本身 的基礎行為的時候,猴子補丁就派上用場了。在這種情況下,gevent能夠 修改標準庫里面大部分的阻塞式系統調用,包括
socket
、ssl
、threading
和select
等模塊,而變為協作式運行。例如,Redis的python綁定一般使用常規的tcp socket來與
redis-server
實例通信。 通過簡單地調用gevent.monkey.patch_all()
,可以使得redis的綁定協作式的調度 請求,與gevent棧的其它部分一起工作。這讓我們可以將一般不能與gevent共同工作的庫結合起來,而不用寫哪怕一行代碼。 雖然猴子補丁仍然是邪惡的(evil),但在這種情況下它是“有用的邪惡(useful evil)”。
數據結構
事件
事件(event)是一個在Greenlet之間異步通信的形式。
import gevent from gevent.event import Event
''' Illustrates the use of events '''
evt = Event()
def setter(): '''After 3 seconds, wake all threads waiting on the value of evt''' print('A: Hey wait for me, I have to do something') gevent.sleep(3) print("Ok, I'm done") evt.set()
def waiter(): '''After 3 seconds the get call will unblock''' print("I'll wait for you") evt.wait() # blocking print("It's about time")
def main(): gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter), gevent.spawn(waiter) ])
if name == 'main': main()</code> </pre>
事件對象的一個擴展是AsyncResult,它允許你在喚醒調用上附加一個值。 它有時也被稱作是future或defered,因為它持有一個指向將來任意時間可設置 為任何值的引用。
import gevent from gevent.event import AsyncResult a = AsyncResult()
def setter(): """ After 3 seconds set the result of a. """ gevent.sleep(3) a.set('Hello!')
def waiter(): """ After 3 seconds the get call will unblock after the setter puts a value into the AsyncResult. """ print(a.get())
gevent.joinall([ gevent.spawn(setter), gevent.spawn(waiter), ])</code> </pre>
隊列
隊列是一個排序的數據集合,它有常見的
put
/get
操作, 但是它是以在Greenlet之間可以安全操作的方式來實現的。舉例來說,如果一個Greenlet從隊列中取出一項,此項就不會被 同時執行的其它Greenlet再取到了。
import gevent from gevent.queue import Queue
tasks = Queue()
def worker(n): while not tasks.empty(): task = tasks.get() print('Worker %s got task %s' % (n, task)) gevent.sleep(0)
print('Quitting time!')
def boss(): for i in xrange(1,25): tasks.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([ gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'nancy'), ])</code></pre>
Worker steve got task 1 Worker john got task 2 Worker nancy got task 3 Worker steve got task 4 Worker nancy got task 5 Worker john got task 6 Worker steve got task 7 Worker john got task 8 Worker nancy got task 9 Worker steve got task 10 Worker nancy got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker nancy got task 15 Worker steve got task 16 Worker nancy got task 17 Worker john got task 18 Worker steve got task 19 Worker john got task 20 Worker nancy got task 21 Worker steve got task 22 Worker nancy got task 23 Worker john got task 24 Quitting time! Quitting time! Quitting time!
如果需要,隊列也可以阻塞在
put
或get
操作上。put
和get
操作都有非阻塞的版本,put_nowait
和get_nowait
不會阻塞, 然而在操作不能完成時拋出gevent.queue.Empty
或gevent.queue.Full
異常。在下面例子中,我們讓boss與多個worker同時運行,并限制了queue不能放入多于3個元素。 這個限制意味著,直到queue有空余空間之間,
put
操作會被阻塞。相反地,如果隊列中 沒有元素,get
操作會被阻塞。它同時帶一個timeout參數,允許在超時時間內如果 隊列沒有元素無法完成操作就拋出gevent.queue.Empty
異常。import gevent from gevent.queue import Queue, Empty
tasks = Queue(maxsize=3)
def worker(n): try: while True: task = tasks.get(timeout=1) # decrements queue size by 1 print('Worker %s got task %s' % (n, task)) gevent.sleep(0) except Empty: print('Quitting time!')
def boss(): """ Boss will wait to hand out work until a individual worker is free since the maxsize of the task queue is 3. """
for i in xrange(1,10): tasks.put(i) print('Assigned all work in iteration 1') for i in xrange(10,20): tasks.put(i) print('Assigned all work in iteration 2')
gevent.joinall([ gevent.spawn(boss), gevent.spawn(worker, 'steve'), gevent.spawn(worker, 'john'), gevent.spawn(worker, 'bob'), ])</code></pre>
Worker steve got task 1 Worker john got task 2 Worker bob got task 3 Worker steve got task 4 Worker bob got task 5 Worker john got task 6 Assigned all work in iteration 1 Worker steve got task 7 Worker john got task 8 Worker bob got task 9 Worker steve got task 10 Worker bob got task 11 Worker john got task 12 Worker steve got task 13 Worker john got task 14 Worker bob got task 15 Worker steve got task 16 Worker bob got task 17 Worker john got task 18 Assigned all work in iteration 2 Worker steve got task 19 Quitting time! Quitting time! Quitting time!
組和池
組(group)是一個運行中greenlet的集合,集合中的greenlet像一個組一樣 會被共同管理和調度。 它也兼飾了像Python的
multiprocessing
庫那樣的 平行調度器的角色。import gevent from gevent.pool import Group
def talk(msg): for i in xrange(3): print(msg)
g1 = gevent.spawn(talk, 'bar') g2 = gevent.spawn(talk, 'foo') g3 = gevent.spawn(talk, 'fizz')
group = Group() group.add(g1) group.add(g2) group.join()
group.add(g3) group.join()</code></pre>
bar bar bar foo foo foo fizz fizz fizz
在管理異步任務的分組上它是非常有用的。
就像上面所說,
Group
也以不同的方式為分組greenlet/分發工作和收集它們的結果也提供了API。import gevent from gevent import getcurrent from gevent.pool import Group
group = Group()
def hello_from(n): print('Size of group %s' % len(group)) print('Hello from Greenlet %s' % id(getcurrent()))
group.map(hello_from, xrange(3))
def intensive(n): gevent.sleep(3 - n) return 'task', n
print('Ordered')
ogroup = Group() for i in ogroup.imap(intensive, xrange(3)): print(i)
print('Unordered')
igroup = Group() for i in igroup.imap_unordered(intensive, xrange(3)): print(i)</code></pre>
Size of group 3 Hello from Greenlet 31048720 Size of group 3 Hello from Greenlet 31049200 Size of group 3 Hello from Greenlet 31049040 Ordered ('task', 0) ('task', 1) ('task', 2) Unordered ('task', 2) ('task', 1) ('task', 0)
池(pool)是一個為處理數量變化并且需要限制并發的greenlet而設計的結構。 在需要并行地做很多受限于網絡和IO的任務時常常需要用到它。
import gevent from gevent.pool import Pool
pool = Pool(2)
def hello_from(n): print('Size of pool %s' % len(pool))
pool.map(hello_from, xrange(3))</code></pre>
Size of pool 2 Size of pool 2 Size of pool 1
當構造gevent驅動的服務時,經常會將圍繞一個池結構的整個服務作為中心。 一個例子就是在各個socket上輪詢的類。
from gevent.pool import Pool
class SocketPool(object):
def __init__(self): self.pool = Pool(1000) self.pool.start() def listen(self, socket): while True: socket.recv() def add_handler(self, socket): if self.pool.full(): raise Exception("At maximum pool size") else: self.pool.spawn(self.listen, socket) def shutdown(self): self.pool.kill()</code> </pre> <h2 id="_11">鎖和信號量</h2>
信號量是一個允許greenlet相互合作,限制并發訪問或運行的低層次的同步原語。 信號量有兩個方法,
acquire
和release
。在信號量是否已經被 acquire或release,和擁有資源的數量之間不同,被稱為此信號量的范圍 (the bound of the semaphore)。如果一個信號量的范圍已經降低到0,它會 阻塞acquire操作直到另一個已經獲得信號量的greenlet作出釋放。from gevent import sleep from gevent.pool import Pool from gevent.coros import BoundedSemaphore
sem = BoundedSemaphore(2)
def worker1(n): sem.acquire() print('Worker %i acquired semaphore' % n) sleep(0) sem.release() print('Worker %i released semaphore' % n)
def worker2(n): with sem: print('Worker %i acquired semaphore' % n) sleep(0) print('Worker %i released semaphore' % n)
pool = Pool() pool.map(worker1, xrange(0,2)) pool.map(worker2, xrange(3,6))</code></pre>
Worker 0 acquired semaphore Worker 1 acquired semaphore Worker 0 released semaphore Worker 1 released semaphore Worker 3 acquired semaphore Worker 4 acquired semaphore Worker 3 released semaphore Worker 4 released semaphore Worker 5 acquired semaphore Worker 5 released semaphore
范圍為1的信號量也稱為鎖(lock)。它向單個greenlet提供了互斥訪問。 信號量和鎖常常用來保證資源只在程序上下文被單次使用。
線程局部變量
Gevent也允許你指定局部于greenlet上下文的數據。 在內部,它被實現為以greenlet的
getcurrent()
為鍵, 在一個私有命名空間尋址的全局查找。import gevent from gevent.local import local
stash = local()
def f1(): stash.x = 1 print(stash.x)
def f2(): stash.y = 2 print(stash.y)
try: stash.x except AttributeError: print("x is not local to f2")
g1 = gevent.spawn(f1) g2 = gevent.spawn(f2)
gevent.joinall([g1, g2])</code></pre>
1 2 x is not local to f2
很多集成了gevent的web框架將HTTP會話對象以線程局部變量的方式存儲在gevent內。 例如使用Werkzeug實用庫和它的proxy對象,我們可以創建Flask風格的請求對象。
from gevent.local import local from werkzeug.local import LocalProxy from werkzeug.wrappers import Request from contextlib import contextmanager
from gevent.wsgi import WSGIServer
_requests = local() request = LocalProxy(lambda: _requests.request)
@contextmanager def sessionmanager(environ): _requests.request = Request(environ) yield _requests.request = None
def logic(): return "Hello " + request.remote_addr
def application(environ, start_response): status = '200 OK'
with sessionmanager(environ): body = logic() headers = [ ('Content-Type', 'text/html') ] start_response(status, headers) return [body]
WSGIServer(('', 8000), application).serve_forever()
Flask系統比這個例子復雜一點,然而使用線程局部變量作為局部的會話存儲, 這個思想是相同的。
子進程
自gevent 1.0起,
gevent.subprocess
,一個Pythonsubprocess
模塊 的修補版本已經添加。它支持協作式的等待子進程。import gevent from gevent.subprocess import Popen, PIPE
def cron(): while True: print("cron") gevent.sleep(0.2)
g = gevent.spawn(cron) sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True) out, err = sub.communicate() g.kill() print(out.rstrip())</code></pre>
cron cron cron cron cron Linux
很多人也想將
gevent
和multiprocessing
一起使用。最明顯的挑戰之一 就是multiprocessing
提供的進程間通信默認不是協作式的。由于基于multiprocessing.Connection
的對象(例如Pipe
)暴露了它們下面的 文件描述符(file descriptor),gevent.socket.wait_read
和wait_write
可以用來在直接讀寫之前協作式的等待ready-to-read/ready-to-write事件。</p>import gevent from multiprocessing import Process, Pipe from gevent.socket import wait_read, wait_write
To Process
a, b = Pipe()
From Process
c, d = Pipe()
def relay(): for i in xrange(10): msg = b.recv() c.send(msg + " in " + str(i))
def put_msg(): for i in xrange(10): wait_write(a.fileno()) a.send('hi')
def get_msg(): for i in xrange(10): wait_read(d.fileno()) print(d.recv())
if name == 'main': proc = Process(target=relay) proc.start()
g1 = gevent.spawn(get_msg) g2 = gevent.spawn(put_msg) gevent.joinall([g1, g2], timeout=1)</code> </pre> <p>然而要注意,組合<code>multiprocessing</code>和gevent必定帶來
依賴于操作系統(os-dependent)的缺陷,其中有:</p>