從asyncio簡單實現看異步是如何工作的
來自: http://ipfans.github.io/2016/02/simple-implement-asyncio-to-understand-how-async-works/
從asyncio簡單實現看異步是如何工作的
by ipfans
注:請使用 Python 3.5+ 版本運行以下代碼。
先從例子看起
首先我們來看一個socket通訊的例子,這個例子我們可以在官方 socket 模塊的文檔中找到部分原型代碼:
# echo.py
from socket import * # 是的,這是一個不好的寫法
def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print("connect from ", addr)
echo_handler(client)
def echo_handler(client):
while True:
data = client.recv(10000)
if not data:
break
client.send(str.encode("Got: ") + data)
print("connection closed.")
if name == 'main':
echo_server(('', 25000))</code></pre>
但是同步模式會有一個問題,當進行通訊是阻塞的,當一個連接占用時就會阻礙其他連接的繼續,這個時候應該怎么更快的運行呢?
回顧歷史
在asyncio出現之前,我們都是怎么提高效率的呢?首先想到的方法就是多線程處理:
# echo_thread.py
from socket import *
import _thread
def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
while True:
client, addr = sock.accept()
print("connect from ", addr)
_thread.start_new_thread(echo_handler, (client, ))
def echo_handler(client):
while True:
data = client.recv(10000)
if not data:
break
client.send(str.encode("Got: ") + data)
print("connection closed.")
if name == 'main':
echo_server(('', 25000))</code></pre>
當然了,我們都知道多線程之下總是會有一些問題的。那么還有更好的方案嗎?如果你了解過 C10k問題 ,你一定聽過 epoll 、 kqueue 之類的大名。那么,能在Python中使用這些功能嗎?答案是肯定的。那就是 select 。
# echo_select.py
from socket import *
import select
def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SOREUSEADDR, 1)
sock.bind(address)
sock.listen(5)
input = [sock, ]
while True:
r, , _ = select.select(input, [], [])
for s in r:
if s == sock:
client, addr = sock.accept()
print("connect from ", addr)
echo_handler(client)
def echo_handler(client):
while True:
data = client.recv(10000)
if not data:
break
client.send(str.encode("Got: ") + data)
print("connection closed.")
if name == 'main':
echo_server(('', 25000))</code></pre>
相比 _thread 來說, select 更加底層,提供了最基礎的等待IO完成功能。但是缺點是這個功能太單一了,這也就是為什么后面語言提供了 asyncio 。最早應該是 python-dev 中 提出了 要在標準庫中添加基于 select 的異步IO功能。之后Python在3.4版本之中就加入了 selectors 與 asyncio 庫用于異步IO。
其他的方法還有 gevent 、 Twisted 、 Tornado 等等的方案,這里就不多贅述了。(在3.4的時候我一直覺得 yield form 太丑陋了,相對我寧愿繼續用 Tornado 的 yield 方式。當然這個更加主觀的原因吧,不過現在 async/await 方式明顯讓我又讓我愛上了。)
從同步到asyncio
那么如何在asyncio框架下如何實現異步socket通訊的例子呢?事實上官方文檔中提供了兩個比較高層封裝過的asyncio庫例子 TCP echo server protocol 和 TCP echo server using streams 。這兩個例子采用的是 asyncio 的 socket 通訊高級別封裝,似乎與我們同步代碼相差有點遠。這里我們實際例子中使用了更加底層的 Low-level socket operations 。這個更接近于我們在同步狀態下使用 socket 的代碼。
# aecho.py
from socket import *
import asyncio
loop = asyncio.get_event_loop()
async def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
sock.setblocking(False) # 設置非阻塞
while True:
client, addr = await loop.sock_accept(sock)
print("connect from ", addr)
loop.create_task(echo_handler(client))
async def echo_handler(client):
with client:
while True:
data = await loop.sock_recv(client, 10000)
if not data:
break
await loop.sock_sendall(client, str.encode("Got: ") + data)
print("connection closed")
loop.create_task(echo_server(('', 25000)))
loop.run_forever()</code></pre>
其中遇到的 create_task 會相對同步狀態下無法對應,這個方法用于安排一個異步任務的執行,將一個異步方法封裝為 future 對象。其他的 Event Loop 中的功能基本與傳統的程序相同。
從asyncio到自己的實現
那么在 asyncio.event_loop 中到底發生了什么呢?我們可以嘗試用自己的程序實現一下。
如果你閱讀過 PEP-0492 ,你就知道,實際上Python的協程是通過生成器實現的。
# async_yield.py
from types import coroutine
@coroutine
def read_wait(sock):
yield "read_wait", sock # 為什么有個read_wait?等下介紹</code></pre>
下面來模擬實際調用:
python -i async_yield.py
>>> f = read_wait("somesocket")
>>> f
<generator object read_wait at 0x10200d5c8>
>>> f.send(None)
('read_wait', 'somesocket')
>>> f.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
如果不了解 send() 與 StopIteration 作用的話,請參考 PEP-0492 中相關的描述。
接下來繼續完善 write 方法,并且實現我們自己的 Loop 。
# async_yield.py
from types import coroutine
from collections import deque
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
@coroutine
def read_wait(sock):
yield "read_wait", sock
@coroutine
def write_wait(sock):
yield "write_wait", sock
class Loop(object):
def init(self):
self.ready = deque()
self.selector = DefaultSelector()
async def sock_recv(self, sock, maxbytes):
await read_wait(sock)
return sock.recv(maxbytes)
async def sock_accept(self, sock):
await read_wait(sock)
return sock.accept()
async def sock_sendall(self, sock, data):
while data:
await write_wait(sock)
n = sock.send(data)
data = data[n:]
def create_task(self, coro):
self.ready.append(coro)
def run_forever(self):
while True:
while not self.ready:
events = self.selector.select()
for key, _ in events:
self.ready.append(key.data)
self.selector.unregister(key.fileobj)
while self.ready:
self.current_task = self.ready.popleft()
try:
op, *args = self.current_task.send(None)
getattr(self, op)(*args)
except StopIteration:
pass
def read_wait(self, sock):
self.selector.register(sock, EVENT_READ, self.current_task)
def write_wait(self, sock):
self.selector.register(sock, EVENT_WRITE, self.current_task)</code></pre><code data-lang="python">
對于之前一節中的 aecho.py 文件,我們只需要修改一下導入模塊與loop的獲取方法即可:
# pecho.py
from socket import *
import async_yield
loop = async_yield.Loop()
async def echo_server(address):
sock = socket(AF_INET, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.bind(address)
sock.listen(5)
sock.setblocking(False) # 設置非阻塞模式
while True:
client, addr = await loop.sock_accept(sock)
print("connect from ", addr)
loop.create_task(echo_handler(client))
async def echo_handler(client):
with client:
while True:
data = await loop.sock_recv(client, 10000)
if not data:
break
await loop.sock_sendall(client, str.encode("Got: ") + data)
print("connection closed")
loop.create_task(echo_server(('', 25000)))
loop.run_forever()</code></pre>
async_yield發生了什么?
首先,我們定義了兩個協程函數 read_wait 和 write_wait ,分別用于相應處理讀取操作與寫入操作。其中返回了一個tuple類型數據,用于在 op, *args = self.current_task.send(None) 中填充方法名和參數,之后在 getattr(self, op)(*args) 中進行分別調用。
下面 Loop 類實現了在pecho中用到的所有異步函數。初始化時的 self.ready 用于存儲協程的調用序列。該序列通過 create_task 添加協程到隊列中。
在 run_forever 中,如果目前隊列為空,則通過 self.selector.select() 提取一個事件放入隊列處理,若隊列存在通過 self.current_task.send(None) 通知事件發送,從而調用對應的事件功能。你也可以在 op, *args = self.current_task.send(None) 后添加 print(op) 獲取實時的調用情況。
結語
事實上這篇文章的思路是基于 @dabeaz 在 Python Brasil 上的 keynote 整理而來。dabeaz還有另外一個非常不錯的基于select的異步庫,名字叫做 curio ,是一個了解實現異步庫的很好教程。
最后講個段子,之前有人開玩笑,蟒爹開發一個功能,之后大家都不會正確使用,直到dabeaz站出來告訴大家如何正確使用新功能。在寫這篇文章的時候雖然很想找出來出處,但是似乎找不到了...
</code></code></code></code></code></code></code></code></div>