從asyncio簡單實現看異步是如何工作的

Jefferey245 8年前發布 | 24K 次閱讀 Socket Python Python開發

來自: http://ipfans.github.io/2016/02/simple-implement-asyncio-to-understand-how-async-works/

從asyncio簡單實現看異步是如何工作的

by ipfans

注:請使用 Python 3.5+ 版本運行以下代碼。

先從例子看起

首先我們來看一個socket通訊的例子,這個例子我們可以在官方 socket 模塊的文檔中找到部分原型代碼:

# echo.py
from socket import *  # 是的,這是一個不好的寫法

def echo_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print("connect from ", addr) echo_handler(client)

def echo_handler(client): while True: data = client.recv(10000) if not data: break client.send(str.encode("Got: ") + data) print("connection closed.")

if name == 'main': echo_server(('', 25000))</code></pre>

但是同步模式會有一個問題,當進行通訊是阻塞的,當一個連接占用時就會阻礙其他連接的繼續,這個時候應該怎么更快的運行呢?

回顧歷史

在asyncio出現之前,我們都是怎么提高效率的呢?首先想到的方法就是多線程處理:

# echo_thread.py
from socket import *
import _thread

def echo_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) while True: client, addr = sock.accept() print("connect from ", addr) _thread.start_new_thread(echo_handler, (client, ))

def echo_handler(client): while True: data = client.recv(10000) if not data: break client.send(str.encode("Got: ") + data) print("connection closed.")

if name == 'main': echo_server(('', 25000))</code></pre>

當然了,我們都知道多線程之下總是會有一些問題的。那么還有更好的方案嗎?如果你了解過 C10k問題 ,你一定聽過 epoll 、 kqueue 之類的大名。那么,能在Python中使用這些功能嗎?答案是肯定的。那就是 select

# echo_select.py
from socket import *
import select

def echo_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SOREUSEADDR, 1) sock.bind(address) sock.listen(5) input = [sock, ] while True: r, , _ = select.select(input, [], []) for s in r: if s == sock: client, addr = sock.accept() print("connect from ", addr) echo_handler(client)

def echo_handler(client): while True: data = client.recv(10000) if not data: break client.send(str.encode("Got: ") + data) print("connection closed.")

if name == 'main': echo_server(('', 25000))</code></pre>

相比 _thread 來說, select 更加底層,提供了最基礎的等待IO完成功能。但是缺點是這個功能太單一了,這也就是為什么后面語言提供了 asyncio 。最早應該是 python-dev 中 提出了 要在標準庫中添加基于 select 的異步IO功能。之后Python在3.4版本之中就加入了 selectors asyncio 庫用于異步IO。

其他的方法還有 gevent 、 Twisted 、 Tornado 等等的方案,這里就不多贅述了。(在3.4的時候我一直覺得 yield form 太丑陋了,相對我寧愿繼續用 Tornado 的 yield 方式。當然這個更加主觀的原因吧,不過現在 async/await 方式明顯讓我又讓我愛上了。)

從同步到asyncio

那么如何在asyncio框架下如何實現異步socket通訊的例子呢?事實上官方文檔中提供了兩個比較高層封裝過的asyncio庫例子 TCP echo server protocolTCP echo server using streams 。這兩個例子采用的是 asyncio 的 socket 通訊高級別封裝,似乎與我們同步代碼相差有點遠。這里我們實際例子中使用了更加底層的 Low-level socket operations 。這個更接近于我們在同步狀態下使用 socket 的代碼。

# aecho.py
from socket import *
import asyncio

loop = asyncio.get_event_loop()

async def echo_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) sock.setblocking(False) # 設置非阻塞 while True: client, addr = await loop.sock_accept(sock) print("connect from ", addr) loop.create_task(echo_handler(client))

async def echo_handler(client): with client: while True: data = await loop.sock_recv(client, 10000) if not data: break await loop.sock_sendall(client, str.encode("Got: ") + data) print("connection closed")

loop.create_task(echo_server(('', 25000))) loop.run_forever()</code></pre>

其中遇到的 create_task 會相對同步狀態下無法對應,這個方法用于安排一個異步任務的執行,將一個異步方法封裝為 future 對象。其他的 Event Loop 中的功能基本與傳統的程序相同。

從asyncio到自己的實現

那么在 asyncio.event_loop 中到底發生了什么呢?我們可以嘗試用自己的程序實現一下。

如果你閱讀過 PEP-0492 ,你就知道,實際上Python的協程是通過生成器實現的。

# async_yield.py
from types import coroutine

@coroutine def read_wait(sock): yield "read_wait", sock # 為什么有個read_wait?等下介紹</code></pre>

下面來模擬實際調用:

python -i async_yield.py
>>> f = read_wait("somesocket")
>>> f
<generator object read_wait at 0x10200d5c8>
>>> f.send(None)
('read_wait', 'somesocket')
>>> f.send(None)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如果不了解 send() 與 StopIteration 作用的話,請參考 PEP-0492 中相關的描述。

接下來繼續完善 write 方法,并且實現我們自己的 Loop 。

# async_yield.py
from types import coroutine
from collections import deque
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

@coroutine def read_wait(sock): yield "read_wait", sock

@coroutine def write_wait(sock): yield "write_wait", sock

class Loop(object): def init(self): self.ready = deque() self.selector = DefaultSelector()

async def sock_recv(self, sock, maxbytes):
    await read_wait(sock)
    return sock.recv(maxbytes)

async def sock_accept(self, sock):
    await read_wait(sock)
    return sock.accept()

async def sock_sendall(self, sock, data):
    while data:
        await write_wait(sock)
        n = sock.send(data)
        data = data[n:]

def create_task(self, coro):
    self.ready.append(coro)

def run_forever(self):
    while True:
        while not self.ready:
            events = self.selector.select()
            for key, _ in events:
                self.ready.append(key.data)
                self.selector.unregister(key.fileobj)
        while self.ready:
            self.current_task = self.ready.popleft()
            try:
                op, *args = self.current_task.send(None)
                getattr(self, op)(*args)
            except StopIteration:
                pass

def read_wait(self, sock):
    self.selector.register(sock, EVENT_READ, self.current_task)

def write_wait(self, sock):
    self.selector.register(sock, EVENT_WRITE, self.current_task)</code></pre><code data-lang="python"> 

對于之前一節中的 aecho.py 文件,我們只需要修改一下導入模塊與loop的獲取方法即可:

# pecho.py
from socket import *
import async_yield

loop = async_yield.Loop()

async def echo_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.bind(address) sock.listen(5) sock.setblocking(False) # 設置非阻塞模式 while True: client, addr = await loop.sock_accept(sock) print("connect from ", addr) loop.create_task(echo_handler(client))

async def echo_handler(client): with client: while True: data = await loop.sock_recv(client, 10000) if not data: break await loop.sock_sendall(client, str.encode("Got: ") + data) print("connection closed")

loop.create_task(echo_server(('', 25000))) loop.run_forever()</code></pre>

async_yield發生了什么?

首先,我們定義了兩個協程函數 read_wait 和 write_wait ,分別用于相應處理讀取操作與寫入操作。其中返回了一個tuple類型數據,用于在 op, *args = self.current_task.send(None) 中填充方法名和參數,之后在 getattr(self, op)(*args) 中進行分別調用。

下面 Loop 類實現了在pecho中用到的所有異步函數。初始化時的 self.ready 用于存儲協程的調用序列。該序列通過 create_task 添加協程到隊列中。

在 run_forever 中,如果目前隊列為空,則通過 self.selector.select() 提取一個事件放入隊列處理,若隊列存在通過 self.current_task.send(None) 通知事件發送,從而調用對應的事件功能。你也可以在 op, *args = self.current_task.send(None) 后添加 print(op) 獲取實時的調用情況。

結語

事實上這篇文章的思路是基于 @dabeaz 在 Python Brasil 上的 keynote 整理而來。dabeaz還有另外一個非常不錯的基于select的異步庫,名字叫做 curio ,是一個了解實現異步庫的很好教程。

最后講個段子,之前有人開玩笑,蟒爹開發一個功能,之后大家都不會正確使用,直到dabeaz站出來告訴大家如何正確使用新功能。在寫這篇文章的時候雖然很想找出來出處,但是似乎找不到了...

</code></code></code></code></code></code></code></code></div>

 本文由用戶 Jefferey245 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!