Python3中的asyncio
來自: http://blog.theerrorlog.com/asyncio-in-python-3.html
最近想換換口味,在用 asyncio 寫一個小東西,過程中碰到各種概念上、實踐上的問題,悄悄 記在這里XD.
所謂的“異步”
回歸到最初的定義的話,“異步”是指不同硬件之間可以工作在 不同的時鐘信號下——試想一下要求所有硬件工作在相同時鐘信 號下的系統該有多脆弱。所以同步總線通常出現在與系統本身 工作時鐘接近的硬件接口上,而異步總線正好相反,用來連接 遠遠達不到系統工作頻率的硬件。
這些概念投射到同根同源的軟件上的話,由于軟件命令最終都 是由硬件去執行的,所以軟件的操作也有快有慢,“異步操作” 的重點就是協調“快”的操作和“慢”的操作。然后眾所周知,最 慢的操作是IO.
與PC硬件里琳瑯滿目的總線速度不同,軟件里一般只有兩種速 度:指令執行速度和IO速度——像Erlang計算reduction的時候 也只考慮指令執行時間和IO時間,所有的指令和IO類型都是一 視同仁的。這是為神馬呢?大概是因為人腦跟不上電腦吧……
“正確”的異步操作
網上到處都是這樣的例子:
import socket
import multiprocessing
def handler(conn, addr):
message = conn.recv(1024)
while message:
conn.send(message)
conn.close()
def server(host, port):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind((host, port))
listener.listen(10)
while True:
conn, addr = listener.accept()
process = multiprocessing.Process(target=handler, args=(conn, addr))
process.start() |
這是異步操作沒錯,用另一個進程來處理請求,只是粒度略大, 因為這只是將快的進程(handler進程)和慢的進程(server進 程)分開了而已,無論在handler還是server里都要等待IO. 而 且我在#python上貼出 這個程序的時候馬上有一堆人出來說服務器不應該這樣寫之類 的XD.
于是武林中就有了下面這種模式:
import socket
import selectors
def handler(conn):
message = conn.recv(1024)
conn.send(message)
def server(host, port):
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind((host, port))
listener.listen(10)
selector = selectors.DefaultSelector()
selector.register(listener, selectors.EVENT_READ)
while True:
event_list = selector.select()
for key, events in event_list:
conn = key.fileobj
if conn == listener:
new_conn, addr = conn.accept()
selector.register(new_conn, selectors.EVENT_READ)
else:
handler(conn) |
我將這個模式稱為“在忙完指令之后等待IO”,坊間說的“異步IO” 一般也是特指這種兩段式模式:一段等待IO,一段執行指令。別 看上面的程序很簡單,很大一部分高性能的服務器或者框架(像 Tornado、Twisted和Erlang)都是基于這個模式的,因為它的資 源消耗比基于多進程/多線程的服務器實在是少太多了(參考Nginx 和Apache的對比),所以擴展性(scalability)也好太多了。
asyncio模塊
asyncio是在Python 3.4中添加的新模塊,實現了上面的“忙完指 令之后等待IO”模式。
這世上已經有好多異步框架和庫了,Guido老爺子為什么要推行這 樣一個新模塊?他在 PEP 里說的原因是,這些第三方異步代碼相互之間不兼容不能移植 blahblah……我倒覺得是Twisted的camelCaseNaming不合老爺子胃 口而已……
還是上面的echo server例子,用外星科技實現:
import asyncio
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, exc):
self.transport.close()
def data_received(self, data):
self.transport.write(data)
def server(host, port):
loop = asyncio.get_event_loop()
srv = loop.create_server(EchoProtocol, host, port)
asyncio.async(srv)
loop.run_forever() |
熟悉Twisted的同學應該會有deja vu的感覺,我們提供一個接收 事件的對象(實際上是產生對象的factory),然后事件就源源不 斷地自動出現了。
但是這看起來和之前selector的例子不像啊?提示:真正的循環 在 BaseEventLoop的_run_once方法里 :)
實踐中使用asyncio的要點
好了上面的例子很美好,但其實只展示了asyncio一小部分的威力, 下面來一些私人干貨。
在事件處理方法中創建TCP連接
由于實際上asyncio是在事件循環中調用asyncio.Protocol類 (或者子類)的data_received等方法的,這些事件處理方法 如果阻塞的話,會將整個事件循環也阻塞住,失去了所有“異 步IO”模式帶來的好處,所以所有的事件處理方法——包括 data_received、connection_made、connection_lost等——都 不能調用任何可能阻塞的函數,包括socket對象的recv方法、 文件對象的read方法等,當然socket的connect方法由于域名 解析和網絡延遲等也是會阻塞的……那我們要怎么從事件處理 方法里做connect操作呢?
答案是asyncio提供了一系列異步操作的、不會阻塞的接口, 當然也包括“創建TCP連接”。這些接口全部以coroutine的形 式提供,調用時要使用 yield from 語法。例如可以這樣搭 建一個簡單的代理服務器:
HOST = 'www.google.com'
PORT = '80'
@asyncio.coroutine
def new_connection(host, port):
loop = asyncio.get_event_loop()
client_transport, client_proto = yield from \
loop.create_connection(ClientProtocol, host, port)
return client_transport, client_proto
class ClientProtocol(asyncio.Protocol):
....
class ServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.client_task = asyncio.Task(new_connection(HOST, PORT))
self.client_task.add_done_callback(self.client_connect_done)
....
def client_connect_done(self, future):
client_transport, client_proto = future.result()
....
def server(host, port):
loop = asyncio.get_event_loop()
srv = loop.create_server(ProxyProtocol, host, port)
asyncio.async(srv)
loop.run_forever() |
ServerProtocol在收到一個新連接(connection_made)的時候用 asyncio.Task調度一個創建新連接的異步函數,這個函數會由asyncio 的事件循環在connection_made返回后擇機執行,執行完成后事 件循環再去調用通過add_done_callback注冊的處理函數 (client_connect_done),滿滿的javascript既視感啊有木有。
為什么yield from不能直接寫在connection_made里,而需要另外 封裝一個函數呢?因為asyncio的事件循環認定了Protocol對象的 事件處理方法是普通函數,如果yield from直接出現在 connection_made中的話,事件循環調用connection_made的時候 只會返回一個generator,connection_made的函數體完全不會被 執行,所以在事件處理方法中只能通過調度Task(或者使用 asyncio.async(...),效果一樣)的方式執行異步操作。
替換事件循環使用的selector
Python 3.4中還有一個和asyncio配套的新模塊:selectors. 這個模塊將select、epoll、kqueue等等系統級異步IO接口抽象 成“selector”類型,規定了統一的對外接口,于是程序只管使 用selector的接口就行了,不用管底層的實現到底是select還 是epoll.
asyncio中用的就是selector模塊, asyncio.selector_events.BaseSelectorEventLoop類的構造 函數有一個selector參數,通常使用默認值就可以了,但是 當然我們也可以把它給換成我們自己的類。比方說如果我們希 望事件循環能支持 ZeroMQ 的socket, 可以把selector的底層實現換成zmq.Poller():
class ZmqSelector(selectors._BaseSelectorImpl):
def __init__(self, poller=None):
super().__init__()
if poller is not None:
self._zmq_poller = poller
else:
self._zmq_poller = zmq.Poller()
def _fileobj_lookup(self, fileobj):
if isinstance(fileobj, zmq.Socket):
return fileobj
else:
return super()._fileobj_lookup(fileobj)
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
flags = 0
if events & selectors.EVENT_READ:
flags |= zmq.POLLIN
if events & selectors.EVENT_WRITE:
flags |= zmq.POLLOUT
self._zmq_poller.register(fileobj, flags)
return key
def unregister(self, fileobj):
key = super().unregister(fileobj)
self._zmq_poller.unregister(fileobj)
return key
def select(self, timeout=None):
if timeout is not None:
poll_timeout = max(0, math.ceil(timeout * 1e3))
else:
poll_timeout = None
select_ready = []
try:
zmq_events = self._zmq_poller.poll(poll_timeout)
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
return select_ready
else:
raise e
for sock, ev in zmq_events:
key = self._key_from_fd(sock)
if key is not None:
events = 0
if ev & zmq.POLLIN:
events |= selectors.EVENT_READ
if ev & zmq.POLLOUT:
events |= selectors.EVENT_WRITE
if ev & zmq.POLLERR:
events = selectors.EVENT_READ | selectors.EVENT_WRITE
select_ready.append((key, events & key.events))
return select_ready
def install_zmq_event_loop():
event_loop = asyncio.SelectorEventLoop(ZmqSelector())
asyncio.set_event_loop(event_loop) |
將大文件的讀寫拆成小塊
在事件循環里做任何耗時的操作都是不對的,尤其是IO, 即便是可以隨時讀寫的本地文件,內存里裝不下的話還是 會啟動硬盤馬達讓你等個半天。最簡單的方法是將大文件 的讀寫拆分成小塊,例如每次只讀一頁的內容:
class AsyncFileWrapper(object):
DEFAULT_BLOCK_SIZE = 4096
def __init__(self, loop=None, filename=None,
fileobj=None, mode='rb'):
if (filename is None and fileobj is None) or \
(filename is not None and fileobj is not None):
raise RuntimeError('Confilicting arguments')
if filename is not None:
if 'b' not in mode:
raise RuntimeError('Only binary mode is supported')
fileobj = open(filename, mode=mode)
elif 'b' not in fileobj.mode:
raise RuntimeError('Only binary mode is supported')
self.fileobj = fileobj
if loop is None:
loop = asyncio.get_event_loop()
self.loop = loop
self.rbuffer = bytearray()
def read_ready(self, future, n, total):
res = self.fileobj.read1(n)
if not res: # EOF
future.set_result(bytes(self.rbuffer))
return
self.rbuffer.extend(res)
if total > 0:
more_to_go = total - len(self.rbuffer)
if more_to_go <= 0: # enough
res, self.rbuffer = self.rbuffer[:n], self.rbuffer[n:]
future.set_result(bytes(res))
else:
self.loop.call_soon(self.read_ready, future, more_to_go, total)
else: # < 0
self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, total)
@asyncio.coroutine
def read(self, n=-1):
future = asyncio.Future(loop=self.loop)
if n == 0:
future.set_result(b'')
return future
elif n < 0:
self.loop.call_soon(self.read_ready, future, self.DEFAULT_BLOCK_SIZE, n)
else:
self.loop.call_soon(self.read_ready, future, n, n)
return future
def write(self, data):
# XXX: big data?
return self.fileobj.write(data)
def close(self):
self.fileobj |
上面這個類通過不斷使用asyncio事件循環的call_soon方法, 重復執行讀取小塊文件內容的代碼(read_ready方法),使得 循環內的其他代碼有更多執行機會,典型的以吞吐量換響應速 度。可以在coroutine中這樣使用此類:
@asyncio.coroutine
def some_func(...):
...
afile = AsyncFileWrapper(filename='some_file.txt')
content = yield from afile.read()
... |
Errata
AsyncFileWrapper
多謝Robber Phex評論指正,AsyncFileWrapper wrap起來的文 件對象其實還是工作在同步狀態下的,需要指定O_NONBLOCK.
另外這個類實際工作時會出現調用close方法之后read_ready方 法又被事件循環回調的情況,所以close方法中還要做額外的清 理工作。
以上修改都在 這個gist里 .
</div>