借助 coroutine 用同步的語法寫異步
借助coroutine用同步的語法寫異步
首先我們構造一個耗時足夠久的服務器:
import tornado.gen
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
yield tornado.gen.sleep(1)
self.write("Hello, world\n")
if name == "main":
application = tornado.web.Application([
(r"/", MainHandler),
])
application.listen(8888)
tornado.ioloop.IOLoop.current().start()</code></pre>
每次請求都耗時一秒鐘:
root@arch tests: nohup python test.py > /dev/null &
[1] 15597
nohup: ignoring input and redirecting stderr to stdout
root@arch tests:
root@arch tests: ls
sama test.py
root@arch tests:
root@arch tests: time curl localhost:8888
Hello, world
real 0m1.018s
user 0m0.000s
sys 0m0.007s
root@arch tests: time curl localhost:8888
Hello, world
real 0m1.016s
user 0m0.003s
sys 0m0.003s</code></pre>
阻塞型請求
我們先來看代碼:
import socket
import time
PORT = 8888
CHUNK_SIZE = 4096
def request():
sock = socket.socket()
sock.connect(("", PORT))
sock.send(b"GET / HTTP/1.1\r\n\r\n")
data = sock.recv(CHUNK_SIZE)
print(data.decode())
start = time.time()
request()
request()
end = time.time()
print("use time: %.2f second(s)" % (end - start))</code></pre>
這樣子的話,請求一次就需要花費一秒,請求是一個接著一個來的,在這中間的時間 進程被投入睡眠。
I/O多路復用
這個時候我們的老前輩們就有新辦法了,好,我們翻開《UNIX環境高級編程》,里面有 專門講select的,看完這個之后,我們來看看 Python 3 提供的 selectors 模塊
我們把上面的代碼改改:
import selectors
import socket
import time
PORT = 8888
CHUNK_SIZE = 4096
COUNT = 0
selector = selectors.DefaultSelector()
def request(selector):
global COUNT
sock = socket.socket()
sock.connect(("", PORT))
selector.register(sock.fileno(), selectors.EVENT_WRITE, data=lambda: writable(selector, sock))
COUNT += 1
def writable(selector, sock):
selector.unregister(sock.fileno())
sock.send(b"GET / HTTP/1.1\r\n\r\n")
selector.register(sock.fileno(), selectors.EVENT_READ, data=lambda: readable(selector, sock))
def readable(selector, sock):
global COUNT
selector.unregister(sock.fileno())
COUNT -= 1
data = sock.recv(CHUNK_SIZE)
print(data.decode())
start = time.time()
request(selector)
request(selector)
while COUNT:
for key, _ in selector.select():
callback = key.data
callback()
end = time.time()
print("use time: %.1f second(s)" % (end - start))</code></pre>
root@arch tests: python client.py
HTTP/1.1 200 OK
Content-Type: text/html; charset=UTF-8
Etag: "7b4758d4baa20873585b9597c7cb9ace2d690ab8"
Server: TornadoServer/4.4.2
Content-Length: 13
Date: Sun, 27 Nov 2016 14:02:38 GMT
Hello, world
HTTP/1.1 200 OK
Content-Type: text/html; charset=UTF-8
Etag: "7b4758d4baa20873585b9597c7cb9ace2d690ab8"
Server: TornadoServer/4.4.2
Content-Length: 13
Date: Sun, 27 Nov 2016 14:02:38 GMT
Hello, world
use time: 1.0 second(s)</code></pre>
再運行一下發現兩次請求也只花一秒鐘時間。這就是I/O多路復用模型的作用~ 但是呢,大把大把的callback把函數拆的四分五散,很不利于閱讀。所以接下來我們 就要介紹主角出場: coroutine
coroutine
In [6]: def use_yield():
...: print("enter the func")
...: value = yield "hello"
...: print("got: ", value)
...: return value
...:
In [7]: gen = use_yield()
In [8]: gen.send(None)
enter the func
Out[8]: 'hello'
In [9]: gen.send("world")
got: world
StopIteration Traceback (most recent call last)
<ipython-input-9-ffdc45971c0a> in <module>()
----> 1 gen.send("world")
StopIteration: world
In [10]: type(gen)
Out[10]: generator</code></pre>
說好的coroutine呢?怎么最后輸出的是generator?別著急,且聽我慢慢說來。
首先我們先下個定義,包含了yield關鍵字的函數就叫generator。來我們先 默念三遍,包含了yield關鍵字的函數就叫generator; 包含了yield關鍵字的函數就叫generator; 包含了yield關鍵字的函數就叫generator。
什么叫generator呢?就是這個函數可以執行到中間某句話的時候,把控制權轉讓給別人。 并且在未來,別人可以讓這個函數從那句話處繼續執行。我們通過next讓generator執行 到下一個yield處,如果之后沒有了yield就會執行到函數結尾,然后拋一個 StopIteration 異常。而且我們還可以通過 .send 給generator發送數據,恢復它的執行。
個人的理解就是,在python的世界里,coroutine是建立在generator的語法基礎上的產物。 并沒有具體的形式,coroutine就是用戶來控制程序切換。具體在python里就是用戶通過 yield把控制權丟出去,通過 .send 或者 next 來切回那個函數里繼續執行。
注:接下來所有說用 next 的地方,實際代碼上我都是用的 .send
coroutine based I/O
我想在等待I/O的時候,把cpu控制權丟出去,讓別人繼續執行,等到I/O準備完成的時候, 再來執行我。這句話有點熟悉,就跟我們站在第一人稱描述I/O多路復用的時候一樣: 我想在等待I/O的時候把我掛起,讓別人執行,我給你一個回調函數,等到I/O準備完成的 時候,你去執行這個回調函數。
那如果我們想通過yield來抹平回調函數把原本一個函數切分成兩個函數的縫隙呢? 函數執行的一個缺點就是執行完之后,函數中的變量狀態就丟失了。
> 注:我們簡單說一下Python的VM,Python是有自己的指令的,就跟x86的cpu有 自己的指令一樣。我們來簡單看一下:
In [17]: def foo():
...: bar()
...:
In [18]: def bar():
...: pass
...:
In [19]: import dis
In [20]: dis.dis(foo)
2 0 LOAD_GLOBAL 0 (bar)
3 CALL_FUNCTION 0 (0 positional, 0 keyword pair)
6 POP_TOP
7 LOAD_CONST 0 (None)
10 RETURN_VALUE
In [21]: dis.dis(bar)
2 0 LOAD_CONST 0 (None)
3 RETURN_VALUE
首先執行foo函數的時候,會由其它函數把環境準備好,把回退指針準備好,然后 調用。
LOAD_GLOBAL
首先從global()里加載bar函數
CALL_FUNCTION
會調用該函數
POP_TOP
會把該函數的棧清掉
LOAD_CONST
把None加載到棧頂,因為這是foo函數的默認返回值
RETURN_VALUE
把None返回</code></pre>
其實我們可以直接把一系列的函數存到 selector.register 的data里,但是我們 把它抽出來,就跟ES6里的 Promise 一樣,我們管它叫 Future 。就是一個 普通的類,用來保存回調函數和執行結果的。
class Future:
def init(self):
self._reuslt = None
self._callbacks = []
def set_result(self, result):
self._result = result
for callback in self._callbacks:
callback()
def add_done_callback(self, callback):
self._callbacks.append(callback)</code></pre>
所以我們把 register 改成:
selector.register(sock.fileno(), selectors.EVENT_READ, data=fut)
然后在下面的 select 處改成:
for key, _ in selector.select():
fut = key.data
fut.set_result(None)
因為在這里,key.data 已經不是回調函數,而是我們的Future了。
但是我們希望的結果是能夠切回我們的函數繼續執行,這時候就靠 next 了,那我們 要想個辦法,讓future執行完之后調用 next(coro) 。首先我們要找個地方保存住 對coro的引用,所以和Future一樣,我們用一個類或者函數來保存都行。為了以后更方便 理解asyncio和tornado,我們用一個類,名字叫 Task :
class Task:
def init(self, coro):
self.coro = coro
def step(self):
try:
fut = self.coro.send(None)
except StopIteration:
return
fut.add_done_callback(self.step)</code></pre>
這樣我們調用的時候就是 task = Task(request()) 然后 task.step() 了, 首先 task = Task(request()) 會執行 Task.__init__ 會把request()這個 generator保存下來,為啥參數里叫做coro呢?因為我們把它用作coroutine,好以后 我們統稱coroutine吧。
接下來通過 task.step() 啟動coroutine,然后增加一個回調函數,一直執行 到 selctor.register ,然后yield。接著執行第二個 Task(request(selector)).step() 同樣yield。接著執行 while COUNT 循環,然后執行 selctor.select 并且阻塞 于此,當socket可讀時,就會執行 fut.set_result(None) 然后就會執行里面的 callback函數,其中有一個callback就是執行上面的 step ,借此執行了 self.coro.send(None) 從而恢復了coroutine的執行。
如果使用函數的形式,可以通過閉包達到這一點。
def task(coro):
try:
fut = coro.send(None)
except StopIteration:
return
fut.add_done_callback(lambda: task(coro))</code></pre>
結合上面所說,代碼應該是這樣的:
import selectors
import socket
import time
PORT = 8888
CHUNK_SIZE = 4096
COUNT = 0
selector = selectors.DefaultSelector()
class Future:
def init(self):
self._result = None
self._callbacks = []
def set_result(self, result):
self._result = result
for callback in self._callbacks:
callback()
def add_done_callback(self, callback):
self._callbacks.append(callback)
class Task():
def init(self, coro):
self.coro = coro
def step(self):
try:
fut = self.coro.send(None)
except StopIteration:
return
fut.add_done_callback(self.step)
def request(selector):
global COUNT
fut = Future()
sock = socket.socket()
sock.connect(("", PORT))
selector.register(sock.fileno(), selectors.EVENT_WRITE, data=fut)
COUNT += 1
yield fut
selector.unregister(sock.fileno())
sock.send(b"GET / HTTP/1.1\r\n\r\n")
fut = Future() # 原來的fut已經用完了,我們要來個新的
selector.register(sock.fileno(), selectors.EVENT_READ, data=fut)
yield fut
selector.unregister(sock.fileno())
COUNT -= 1
data = sock.recv(CHUNK_SIZE)
print(data.decode())
start = time.time()
Task(request(selector)).step()
Task(request(selector)).step()
while COUNT:
for key, _ in selector.select():
fut = key.data
fut.set_result(None)
end = time.time()
print("use time: %.1f second(s)" % (end - start))</code></pre>
另外, sock.connect 是阻塞的,這個時候我們需要把socket設置 成非阻塞的。 socket.setblocking(False) 可以把它設置成非阻塞的。
import selectors
import socket
import time
PORT = 8888
CHUNK_SIZE = 4096
COUNT = 0
selector = selectors.DefaultSelector()
class Future:
def init(self):
self._result = None
self._callbacks = []
def set_result(self, result):
self._result = result
for callback in self._callbacks:
callback()
def add_done_callback(self, callback):
self._callbacks.append(callback)
class Task():
def init(self, coro):
self.coro = coro
def step(self):
try:
fut = self.coro.send(None)
except StopIteration:
return
fut.add_done_callback(self.step)
def request(selector):
global COUNT
COUNT += 1
fut = Future()
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(("", PORT))
except BlockingIOError:
pass
selector.register(sock.fileno(), selectors.EVENT_WRITE, data=fut)
yield fut
selector.unregister(sock.fileno())
sock.send(b"GET / HTTP/1.1\r\n\r\n")
fut = Future() # 原來的fut已經用完了,我們要來個新的
selector.register(sock.fileno(), selectors.EVENT_READ, data=fut)
yield fut
selector.unregister(sock.fileno())
data = sock.recv(CHUNK_SIZE)
print(data.decode())
COUNT -= 1
start = time.time()
Task(request(selector)).step()
Task(request(selector)).step()
while COUNT:
for key, _ in selector.select():
fut = key.data
fut.set_result(None)
end = time.time()
print("use time: %.1f second(s)" % (end - start))</code></pre>
這份代碼對比起一開始的阻塞型代碼,結構上就很類似了,不會因為回調而把一個 函數拆的四分五裂。
來自:https://github.com/jiajunhuang/blog/blob/master/articles/2016_11_27-python_coroutine.md