初探 Python 3 的異步 IO 編程
上周終于把知乎日報的新版本做完了,于是趁著這幾天的休息,有精力折騰一些感興趣的玩意了。
雖然工作時并不會接觸到 Python 3,但還是對它抱有不少好奇心,于是把 Python 版本更新到了 3.4,開始了折騰之旅。
在各種更新中,我最感興趣的當屬 asyncio 模塊了,所以就從異步 IO 開始探索吧。
探索之前,先簡單介紹下各種 IO 模型:
最容易做的是阻塞 IO,即讀寫數據時,需要等待操作完成,才能繼續執行。進階的做法就是用多線程來處理需要 IO 的部分,缺點是開銷會有些大。
接著是非阻塞 IO,即讀寫數據時,如果暫時不可讀寫,則立刻返回,而不等待。因為不知道什么時候是可讀寫的,所以輪詢時可能會浪費 CPU 時間。
然后是 IO 復用,即在讀寫數據前,先檢查哪些描述符是可讀寫的,再去讀寫。select 和 poll 就是這樣做的,它們會遍歷所有被監視的描述符,查看是否滿足,這個檢查的過程是阻塞的。而 epoll、kqueue 和 /dev/poll 則做了些改進,事先注冊需要檢查哪些描述符的哪些事件,當狀態發生變化時,內核會調用對應的回調函數,將這些描述符保存下來;下次獲取可用的描述符時,直接返回這些發生變化的描述符即可。
再之后是信號驅動,即描述符就緒時,內核發送 SIGIO 信號,再由信號處理程序去處理這些信號即可。不過信號處理的時機是從內核態返回用戶態時,感覺也得把這些事件收集起來才好處理,有點像模擬 IO 復用了。
最后是異步 IO,即讀寫數據時,只注冊事件,內核完成讀寫后(讀取的數據會復制到用戶態),再調用事件處理函數。這整個過程都不會阻塞調用線程,不過實現它的操作系統比較少,Windows 上有比較成熟的 IOCP,Linux 上的 AIO 則有不少缺點。
雖然真正的異步 IO 需要中間任何步驟都沒有阻塞,這對于某些只是偶爾需要處理 IO 請求的情況確實有用(比如文本編輯器偶爾保存一下文件);但對于服務器端編程的大多數情況而言,它的主線程就是用來處理 IO 請求的,如果在空閑時不阻塞在 IO 等待上,也沒有別的事情能做,所以本文就不糾結這個異步是否名副其實了。
在 Python 2 的時代,高性能的網絡編程主要是使用 Twisted、Tornado 和 gevent 這三個庫。
我對 Twisted 不熟,只知道它的缺點是比較重,性能相對而言并不算好。
Tornado 平時用得比較多,缺點是寫異步調用時特別麻煩。
gevent 我只能算接觸過,缺點是不太干凈。
由于它們都各自有一個 IO loop,不好混用,而 Tornado 的 web 框架相對而言比較完善,因此成了我的首選。
而從 Python 3.4 開始,標準庫里又新增了 asyncio 這個模塊。
從原理上來說,它和 Tornado 其實差不多,都是注冊 IO 事件,然后在 IO loop 中等待事件發生,然后調用相應的處理函數。
不同之處在于 Python 3 增加了一些新的特性,而 Tornado 需要兼容 Python 2,所以寫起來會比較麻煩。
舉例來說,Python 3.3 可以在 generator 中 return 返回值(相當于 raise StopIteration),而 Tornado 中需要 raise 一個 Return 對象。此外,Python 3.3 還增加了 yield from 語法,減輕了在 generator 中處理另一個 generator 的工作量(省去了循環和 try ... except ...)。
不過,雖然 asyncio 有那么多得天獨厚的優勢,卻不一定比 Tornado 的性能更好,所以我寫個簡單的例子測試一下。
比較方法就是寫個最簡單的 HTTP 服務器,不做任何檢查,讀取到任何內容都輸出一個 hello world,并斷開連接。
測試的客戶端就懶得寫了,直接用 ab 即可:
ab -n 10000 -c 10 "http://0.0.0.0:8000/"
Tornado 版是這樣:
from tornado.gen import coroutine from tornado.ioloop import IOLoop from tornado.tcpserver import TCPServerclass Server(TCPServer): @coroutine def handle_stream(self, stream, address): try: yield stream.read_bytes(1024, partial=True) yield stream.write(b'HTTP 1.0 200 OK\r\n\r\nhello world') finally: stream.close()
server = Server() server.bind(8000) server.start(1) IOLoop.current().start()</pre>在我的電腦上大概 4000 QPS。
asyncio 版是這樣:import asyncioclass Server(asyncio.Protocol): def connection_made(self, transport): self.transport = transport
def data_received(self, data): try: self.transport.write(b'HTTP/1.1 200 OK\r\n\r\nhello world') finally: self.transport.close()
loop = asyncio.get_event_loop() server = loop.create_server(Server, '', 8000) loop.run_until_complete(server) loop.run_forever()</pre>在我的電腦上大概 3000 QPS,比 Tornado 版慢了一些。此外,asyncio 的 transport 在 write 時不用 yield from,這點可能有些不一致。
asyncio 還有個高級版的 API:import asyncio@asyncio.coroutine def handle(reader, writer): yield from reader.read(1024) writer.write(b'HTTP/1.1 200 OK\r\n\r\nhello world') yield from writer.drain() writer.close()
loop = asyncio.get_event_loop() task = asyncio.start_server(handle, '', 8000, loop=loop) server = loop.run_until_complete(task) loop.run_forever()</pre>在我的電腦上大概 2200 QPS。這下讀寫都要 yield from 了,一致性上來說會好些。
以框架的性能而言,其實都夠用,開銷都不超過 1 毫秒,而 web 請求一般都需要 10 毫秒的以上的處理時間。
于是順便再測一下和 MySQL 的搭配,即在每個請求內調用一下 SELECT 1,然后輸出返回值。
因為自己懶得寫客戶端了,于是就用現成的 tornado_mysql 和 aiomysql 來測試了。原理應該都差不多,發送寫請求后就返回,等收到可讀事件時再獲取內容。
Tornado 版是這樣:from tornado.gen import coroutine from tornado.ioloop import IOLoop from tornado.tcpserver import TCPServer from tornado_mysql import poolsclass Server(TCPServer): @coroutine def handle_stream(self, stream, address): try: yield stream.read_bytes(1024, partial=True) cursor = yield POOL.execute(b'SELECT 1') data = cursor.fetchone() yield stream.write('HTTP/1.1 200 OK\r\n\r\n{0[0]}'.format(data).encode()) # Python 3.5 的 bytes 才能用 % 格式化 finally: stream.close()
POOL = pools.Pool( dict(host='127.0.0.1', port=3306, user='root', passwd='123', db='mysql'), max_idle_connections=10, max_open_connections=10)
server = Server() server.bind(8000) server.start(1) IOLoop.current().start()</pre>在我的電腦上大概 680 QPS。
asyncio 版是這樣:import asyncioimport aiomysql
class Server(asyncio.Protocol): def connection_made(self, transport): self.transport = transport
class Server(asyncio.Protocol): def connection_made(self, transport): self.transport = transport
def data_received(self, data): @asyncio.coroutine def handle(): with (yield from pool) as conn: cursor = yield from conn.cursor() yield from cursor.execute(b'SELECT 1') result = yield from cursor.fetchone() try: self.transport.write('HTTP/1.1 200 OK\r\n\r\n{0[0]}'.format(result).encode()) finally: self.transport.close() loop.create_task(handle()) # 或者 asyncio.async(handle())
@asyncio.coroutine def get_pool(): return(yield from aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123', loop=loop))
loop = asyncio.get_event_loop() pool = loop.run_until_complete(get_pool())
server = loop.create_server(Server, '', 8000) loop.run_until_complete(server) loop.run_forever()</pre>在我的電腦上大概 1250 QPS,比 Tornado 版快了不少。不過寫起來比較蛋疼,因為 data_received 方法里不能直接用 yield from。
用 cProfile 看了下,Tornado 版在 tornado.gen 和 functools 模塊里花了不少時間,可能是異步調用過多了吧。
但如果不做異步庫的開發者,而只就使用者的體驗而言,Tornado 會顯得更加靈活和易用。不過 asyncio 的高級 API 應該也能提供類似的體驗。
順便再用底層 socket 模塊寫個服務器試試。
先用 poll 看看,錯誤處理什么的就先不做了:from functools import partial import select import socketclass Server: def init(self): self._sock = socket.socket() self._poll = select.poll() self._handlers = {} self._fd_events = {}
def start(self): sock = self._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(('', 8000)) sock.listen(100) handlers = self._handlers poll = self._poll self.add_handler(sock.fileno(), self._accept, select.POLLIN) while True: poll_events = poll.poll(1) for fd, event in poll_events: handler = handlers.get(fd) if handler: handler() def _accept(self): for i in range(100): try: conn, address = self._sock.accept() except OSError: break else: conn.setblocking(0) fd = conn.fileno() self.add_handler(fd, partial(self._read, conn), select.POLLIN) def _read(self, conn): fd = conn.fileno() self.remove_handler(fd) try: conn.recv(1024) except: conn.close() raise else: self.add_handler(fd, partial(self._write, conn), select.POLLOUT) def _write(self, conn): fd = conn.fileno() self.remove_handler(fd) try: conn.send(b'HTTP 1.0 200 OK\r\n\r\nhello world') finally: conn.close() def add_handler(self, fd, handler, event): self._handlers[fd] = handler self.register(fd, event) def remove_handler(self, fd): self._handlers.pop(fd, None) self.unregister(fd) def register(self, fd, event): if fd in self._fd_events: raise IOError("fd %s already registered" % fd) self._poll.register(fd, event) self._fd_events[fd] = event def unregister(self, fd): event = self._fd_events.pop(fd, None) if event is not None: self._poll.unregister(fd)
Server().start()</pre>在我的電腦上大概 7700 QPS,優勢巨大。
再用 kqueue 試試(我用的是 OS X):from functools import partial import select import socketclass Server: def init(self): self._sock = socket.socket() self._kqueue = select.kqueue() self._handlers = {} self._fd_events = {}
def start(self): sock = self._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(('', 8000)) sock.listen(100) self.add_handler(sock.fileno(), self._accept, select.KQ_FILTER_READ) handlers = self._handlers while True: kevents = self._kqueue.control(None, 1000, 1) for kevent in kevents: fd = kevent.ident handler = handlers.get(fd) if handler: handler() def _accept(self): for i in range(100): try: conn, address = self._sock.accept() except OSError: break else: conn.setblocking(0) fd = conn.fileno() self.add_handler(fd, partial(self._read, conn), select.KQ_FILTER_READ) def _read(self, conn): fd = conn.fileno() self.remove_handler(fd) try: conn.recv(1024) except: conn.close() raise else: self.add_handler(fd, partial(self._write, conn), select.KQ_FILTER_WRITE) def _write(self, conn): fd = conn.fileno() self.remove_handler(fd) try: conn.send(b'HTTP 1.0 200 OK\r\n\r\nhello world') finally: conn.close() def add_handler(self, fd, handler, event): self._handlers[fd] = handler self.register(fd, event) def remove_handler(self, fd): self._handlers.pop(fd, None) self.unregister(fd) def register(self, fd, event): if fd in self._fd_events: raise IOError("fd %s already registered" % fd) self._control(fd, event, select.KQ_EV_ADD) self._fd_events[fd] = event def unregister(self, fd): event = self._fd_events.pop(fd, None) if event is not None: self._control(fd, event, select.KQ_EV_DELETE) def _control(self, fd, event, flags): change_list = (select.kevent(fd, event, flags),) self._kqueue.control(change_list, 0)
Server().start()</pre>在我的電腦上大概 7200 QPS,比 poll 版稍慢。不過因為只有 10 個并發連接,而且沒有慢速網絡的影響,所以 poll 的性能好并不奇怪。
再試試 Python 3.4 新增的 selectors 模塊,它的 DefaultSelector 會自動選擇所在平臺最高效的實現,asyncio 就用到了這個模塊。import selectors import socketclass Server: def init(self): self._sock = socket.socket() self._selector = selectors.DefaultSelector()
def start(self): sock = self._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(('', 8000)) sock.listen(100) selector = self._selector self.add_handler(sock.fileno(), self._accept, selectors.EVENT_READ) while True: events = selector.select(1) for key, event in events: handler, data = key.data if data: handler(**data) else: handler() def _accept(self): for i in range(100): try: conn, address = self._sock.accept() except OSError: break else: conn.setblocking(0) fd = conn.fileno() self.add_handler(fd, self._read, selectors.EVENT_READ, {'conn': conn}) def _read(self, conn): fd = conn.fileno() self.remove_handler(fd) try: conn.recv(1024) except: conn.close() raise else: self.add_handler(fd, self._write, selectors.EVENT_WRITE, {'conn': conn}) def _write(self, conn): fd = conn.fileno() self.remove_handler(fd) try: conn.send(b'HTTP 1.0 200 OK\r\n\r\nhello world') finally: conn.close() def add_handler(self, fd, handler, event, data=None): self._selector.register(fd, event, (handler, data)) def remove_handler(self, fd): self._selector.unregister(fd)
Server().start()</pre>在我的電腦上大概 6100 QPS,成績也還不錯。
從這些測試來看,如果想自己實現一個舍棄了一些功能和兼容性的 Tornado,應該能比它稍快一點,不過似乎沒多大必要。
所以暫時不糾結性能了,還是從使用的便利性上來考慮。Tornado 可以用 yield 取代 callback,我們也來實現這個 feature。
實現前先得了解下 yield。
當一個函數內部出現了 yield 語句時,它就不再是一個單純的函數了,而是一個生成器函數,調用它并不會執行它的代碼,而是返回一個生成器。
調用這個生成器的 send 方法時,才會執行內部的代碼。當執行到 yield 時,這個 send 方法就返回了,調用者可以得到其返回值。
send 方法在第一次調用時,參數必須為 None。Python 2 中可以用它的 next 方法,Python 3 中改成了 next 方法,還可以用內置的 next 函數來調用。
send 方法可以被多次調用,參數會作為 yield 的返回值,回到生成器內上一次執行的地方,并繼續執行下去。
當生成器的代碼執行完時,會拋出一個 StopIteration 的異常。Python 3.3 開始可以在生成器里使用 return,返回值可以從 StopIteration 異常的 value 屬性獲取。
for ... in ... 循環會自動捕獲 StopIteration 異常,并作為循環停止的條件。
由此可見,yield 可以用于跳轉。而我們要做的,則是在遇到 IO 請求時,用 yield 返回 IO loop;當事件發生時,找到對應的生成器,用 send 方法繼續執行即可。
為了簡單起見,我就在 poll 版的基礎上進行改造了:from collections import deque import select import socket from types import GeneratorTypeclass Stream: def init(self, sock, loop): sock.setblocking(0) self._sock = sock self._loop = loop
def close(self): self._sock.close() def read(self, size=1024): sock = self._sock fd = sock.fileno() try: data = sock.recv(size) except OSError as e: if e.errno == socket.EAGAIN or socket.EWOULDBLOCK: self._loop.add_handler(fd, self.read(size), select.POLLIN) yield else: raise else: return data finally: self._loop.remove_handler(fd) def write(self, data): sock = self._sock fd = sock.fileno() try: try: sent_bytes = sock.send(data) except OSError as e: if e.errno not in (socket.EAGAIN, socket.EWOULDBLOCK): raise else: if sent_bytes == len(data): return data = data[sent_bytes:] self._loop.add_handler(fd, self.write(data), select.POLLOUT) yield while data: try: sent_bytes = sock.send(data) except OSError as e: if e.errno not in (socket.EAGAIN, socket.EWOULDBLOCK): raise else: if sent_bytes == len(data): return data = data[sent_bytes:] yield finally: self._loop.remove_handler(fd)
class IOLoop: def init(self): self._poll = select.poll() self._handlers = {} self._fd_events = {}
def start(self): handlers = self._handlers poll = self._poll while True: poll_events = poll.poll(1) for fd, event in poll_events: handler = handlers.get(fd) if handler: if callable(handler): handler() else: stack = handler while True: generator, value = stack[-1] try: value = generator.send(value) if isinstance(value, GeneratorType): stack.append([value, None]) else: break except StopIteration as e: stack.pop() if stack: stack[-1][-1] = e.value else: break def add_handler(self, fd, handler, event): if isinstance(handler, GeneratorType): self._handlers[fd] = deque([[handler, None]]) else: self._handlers[fd] = handler self.register(fd, event) def remove_handler(self, fd): self._handlers.pop(fd, None) self.unregister(fd) def update_handler(self, fd, event): self.modify(fd, event) def register(self, fd, event): if fd in self._fd_events: raise IOError("fd %s already registered" % fd) self._poll.register(fd, event) self._fd_events[fd] = event def unregister(self, fd): event = self._fd_events.pop(fd, None) if event is not None: self._poll.unregister(fd) def modify(self, fd, event): self._poll.modify(fd, event) self._fd_events[fd] = event
class Server: def init(self): self._sock = socket.socket() self._loop = IOLoop() self._stream = Stream(self._sock, self._loop)
def start(self): sock = self._sock sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setblocking(0) sock.bind(('', 8000)) sock.listen(100) self._loop.add_handler(sock.fileno(), self._accept, select.POLLIN) self._loop.start() def _accept(self): for i in range(100): try: conn, address = self._sock.accept() except OSError: break else: stream = Stream(conn, self._loop) fd = conn.fileno() self._loop.add_handler(fd, self._handle(stream), select.POLLIN) def _handle(self, stream): yield stream.read() yield stream.write(b'HTTP 1.0 200 OK\r\n\r\nhello world')
Server().start()</pre>在我的電腦上大概 5300 QPS。
雖然成績比較尷尬,但畢竟用起來比前一個版本好多了。至于慢的原因,我估計是自己維護了一個堆棧的原因(也可能是有什么 bug,畢竟寫這個感覺太跳躍了,能運行起來就謝天謝地了)。
實現時做了兩點假設:
- handler 為 generator 時,視為異步方法。
- 在異步方法中 yield None 時,視為等待 IO;yield / yield from 異步方法時,則是等待方法返回。
</ol> 實現細節也沒什么好說的了,只是覺得在實現 Stream 的 read / write 方法時,調用 IOLoop.add_handler 方法不太優雅。其實可以直接 yield 一個 fd 和 event,在 IOLoop.start 方法中再去注冊。不過這個重構其實蠻小的,我就不再貼一次代碼了,感興趣的可以自己試試。
于是這次初探就到此為止了,有空我也許會繼續完善它。至少這次探索,讓我覺得 Python 3 還是蠻有意思的。