借助 coroutine 用同步的語法寫異步

nszhou 8年前發布 | 12K 次閱讀 軟件開發 Python開發

借助coroutine用同步的語法寫異步

首先我們構造一個耗時足夠久的服務器:

import tornado.gen
import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler): @tornado.gen.coroutine def get(self): yield tornado.gen.sleep(1) self.write("Hello, world\n")

if name == "main": application = tornado.web.Application([ (r"/", MainHandler), ]) application.listen(8888) tornado.ioloop.IOLoop.current().start()</code></pre>

每次請求都耗時一秒鐘:

root@arch tests: nohup python test.py > /dev/null &
[1] 15597
nohup: ignoring input and redirecting stderr to stdout
root@arch tests:
root@arch tests: ls
sama  test.py
root@arch tests:
root@arch tests: time curl localhost:8888
Hello, world

real 0m1.018s user 0m0.000s sys 0m0.007s root@arch tests: time curl localhost:8888 Hello, world

real 0m1.016s user 0m0.003s sys 0m0.003s</code></pre>

阻塞型請求

我們先來看代碼:

import socket
import time

PORT = 8888 CHUNK_SIZE = 4096

def request(): sock = socket.socket() sock.connect(("", PORT)) sock.send(b"GET / HTTP/1.1\r\n\r\n") data = sock.recv(CHUNK_SIZE) print(data.decode())

start = time.time() request() request() end = time.time() print("use time: %.2f second(s)" % (end - start))</code></pre>

這樣子的話,請求一次就需要花費一秒,請求是一個接著一個來的,在這中間的時間 進程被投入睡眠。

I/O多路復用

這個時候我們的老前輩們就有新辦法了,好,我們翻開《UNIX環境高級編程》,里面有 專門講select的,看完這個之后,我們來看看 Python 3 提供的 selectors 模塊

我們把上面的代碼改改:

import selectors
import socket
import time

PORT = 8888 CHUNK_SIZE = 4096 COUNT = 0

selector = selectors.DefaultSelector()

def request(selector): global COUNT sock = socket.socket() sock.connect(("", PORT)) selector.register(sock.fileno(), selectors.EVENT_WRITE, data=lambda: writable(selector, sock)) COUNT += 1

def writable(selector, sock): selector.unregister(sock.fileno()) sock.send(b"GET / HTTP/1.1\r\n\r\n") selector.register(sock.fileno(), selectors.EVENT_READ, data=lambda: readable(selector, sock))

def readable(selector, sock): global COUNT selector.unregister(sock.fileno()) COUNT -= 1 data = sock.recv(CHUNK_SIZE) print(data.decode())

start = time.time() request(selector) request(selector)

while COUNT: for key, _ in selector.select(): callback = key.data callback()

end = time.time() print("use time: %.1f second(s)" % (end - start))</code></pre>

root@arch tests: python client.py
HTTP/1.1 200 OK
Content-Type: text/html; charset=UTF-8
Etag: "7b4758d4baa20873585b9597c7cb9ace2d690ab8"
Server: TornadoServer/4.4.2
Content-Length: 13
Date: Sun, 27 Nov 2016 14:02:38 GMT

Hello, world

HTTP/1.1 200 OK Content-Type: text/html; charset=UTF-8 Etag: "7b4758d4baa20873585b9597c7cb9ace2d690ab8" Server: TornadoServer/4.4.2 Content-Length: 13 Date: Sun, 27 Nov 2016 14:02:38 GMT

Hello, world

use time: 1.0 second(s)</code></pre>

再運行一下發現兩次請求也只花一秒鐘時間。這就是I/O多路復用模型的作用~ 但是呢,大把大把的callback把函數拆的四分五散,很不利于閱讀。所以接下來我們 就要介紹主角出場: coroutine

coroutine

In [6]: def use_yield():
...:     print("enter the func")
...:     value = yield "hello"
...:     print("got: ", value)
...:     return value
...:

In [7]: gen = use_yield()

In [8]: gen.send(None) enter the func Out[8]: 'hello'

In [9]: gen.send("world")

got: world

StopIteration Traceback (most recent call last) <ipython-input-9-ffdc45971c0a> in <module>() ----> 1 gen.send("world")

StopIteration: world

In [10]: type(gen) Out[10]: generator</code></pre>

說好的coroutine呢?怎么最后輸出的是generator?別著急,且聽我慢慢說來。

首先我們先下個定義,包含了yield關鍵字的函數就叫generator。來我們先 默念三遍,包含了yield關鍵字的函數就叫generator; 包含了yield關鍵字的函數就叫generator; 包含了yield關鍵字的函數就叫generator。

什么叫generator呢?就是這個函數可以執行到中間某句話的時候,把控制權轉讓給別人。 并且在未來,別人可以讓這個函數從那句話處繼續執行。我們通過next讓generator執行 到下一個yield處,如果之后沒有了yield就會執行到函數結尾,然后拋一個 StopIteration 異常。而且我們還可以通過 .send 給generator發送數據,恢復它的執行。

個人的理解就是,在python的世界里,coroutine是建立在generator的語法基礎上的產物。 并沒有具體的形式,coroutine就是用戶來控制程序切換。具體在python里就是用戶通過 yield把控制權丟出去,通過 .send 或者 next 來切回那個函數里繼續執行。

注:接下來所有說用 next 的地方,實際代碼上我都是用的 .send

coroutine based I/O

我想在等待I/O的時候,把cpu控制權丟出去,讓別人繼續執行,等到I/O準備完成的時候, 再來執行我。這句話有點熟悉,就跟我們站在第一人稱描述I/O多路復用的時候一樣: 我想在等待I/O的時候把我掛起,讓別人執行,我給你一個回調函數,等到I/O準備完成的 時候,你去執行這個回調函數。

那如果我們想通過yield來抹平回調函數把原本一個函數切分成兩個函數的縫隙呢? 函數執行的一個缺點就是執行完之后,函數中的變量狀態就丟失了。

> 注:我們簡單說一下Python的VM,Python是有自己的指令的,就跟x86的cpu有 自己的指令一樣。我們來簡單看一下:

In [17]: def foo(): ...: bar() ...:

In [18]: def bar(): ...: pass ...:

In [19]: import dis

In [20]: dis.dis(foo) 2 0 LOAD_GLOBAL 0 (bar) 3 CALL_FUNCTION 0 (0 positional, 0 keyword pair) 6 POP_TOP 7 LOAD_CONST 0 (None) 10 RETURN_VALUE

In [21]: dis.dis(bar) 2 0 LOAD_CONST 0 (None) 3 RETURN_VALUE

首先執行foo函數的時候,會由其它函數把環境準備好,把回退指針準備好,然后 調用。

  • LOAD_GLOBAL 首先從global()里加載bar函數
  • CALL_FUNCTION 會調用該函數
  • POP_TOP 會把該函數的棧清掉
  • LOAD_CONST 把None加載到棧頂,因為這是foo函數的默認返回值
  • RETURN_VALUE 把None返回</code></pre>

    其實我們可以直接把一系列的函數存到 selector.register 的data里,但是我們 把它抽出來,就跟ES6里的 Promise 一樣,我們管它叫 Future 。就是一個 普通的類,用來保存回調函數和執行結果的。

    class Future:
      def init(self):

      self._reuslt = None
      self._callbacks = []
    
    

    def set_result(self, result):

      self._result = result
      for callback in self._callbacks:
          callback()
    
    

    def add_done_callback(self, callback):

      self._callbacks.append(callback)</code></pre> 
    

    所以我們把 register 改成:

    selector.register(sock.fileno(), selectors.EVENT_READ, data=fut)

    然后在下面的 select 處改成:

    for key, _ in selector.select():
      fut = key.data
      fut.set_result(None)

    因為在這里,key.data 已經不是回調函數,而是我們的Future了。

    但是我們希望的結果是能夠切回我們的函數繼續執行,這時候就靠 next 了,那我們 要想個辦法,讓future執行完之后調用 next(coro) 。首先我們要找個地方保存住 對coro的引用,所以和Future一樣,我們用一個類或者函數來保存都行。為了以后更方便 理解asyncio和tornado,我們用一個類,名字叫 Task :

    class Task:
      def init(self, coro):

      self.coro = coro
    
    

    def step(self):

      try:
          fut = self.coro.send(None)
      except StopIteration:
          return
      fut.add_done_callback(self.step)</code></pre> 
    

    這樣我們調用的時候就是 task = Task(request()) 然后 task.step() 了, 首先 task = Task(request()) 會執行 Task.__init__ 會把request()這個 generator保存下來,為啥參數里叫做coro呢?因為我們把它用作coroutine,好以后 我們統稱coroutine吧。

    接下來通過 task.step() 啟動coroutine,然后增加一個回調函數,一直執行 到 selctor.register ,然后yield。接著執行第二個 Task(request(selector)).step() 同樣yield。接著執行 while COUNT 循環,然后執行 selctor.select 并且阻塞 于此,當socket可讀時,就會執行 fut.set_result(None) 然后就會執行里面的 callback函數,其中有一個callback就是執行上面的 step ,借此執行了 self.coro.send(None) 從而恢復了coroutine的執行。

    如果使用函數的形式,可以通過閉包達到這一點。

    def task(coro):
      try:

      fut = coro.send(None)
    

    except StopIteration:

      return
    

    fut.add_done_callback(lambda: task(coro))</code></pre>

    結合上面所說,代碼應該是這樣的:

    import selectors
    import socket
    import time

PORT = 8888 CHUNK_SIZE = 4096 COUNT = 0

selector = selectors.DefaultSelector()

class Future: def init(self): self._result = None self._callbacks = []

def set_result(self, result):
    self._result = result
    for callback in self._callbacks:
        callback()

def add_done_callback(self, callback):
    self._callbacks.append(callback)


class Task(): def init(self, coro): self.coro = coro

def step(self):
    try:
        fut = self.coro.send(None)
    except StopIteration:
        return
    fut.add_done_callback(self.step)


def request(selector): global COUNT fut = Future()

sock = socket.socket()
sock.connect(("", PORT))
selector.register(sock.fileno(), selectors.EVENT_WRITE, data=fut)
COUNT += 1

yield fut

selector.unregister(sock.fileno())
sock.send(b"GET / HTTP/1.1\r\n\r\n")

fut = Future()  # 原來的fut已經用完了,我們要來個新的
selector.register(sock.fileno(), selectors.EVENT_READ, data=fut)

yield fut

selector.unregister(sock.fileno())
COUNT -= 1
data = sock.recv(CHUNK_SIZE)
print(data.decode())


start = time.time() Task(request(selector)).step() Task(request(selector)).step()

while COUNT: for key, _ in selector.select(): fut = key.data fut.set_result(None)

end = time.time() print("use time: %.1f second(s)" % (end - start))</code></pre>

另外, sock.connect 是阻塞的,這個時候我們需要把socket設置 成非阻塞的。 socket.setblocking(False) 可以把它設置成非阻塞的。

import selectors
import socket
import time

PORT = 8888 CHUNK_SIZE = 4096 COUNT = 0

selector = selectors.DefaultSelector()

class Future: def init(self): self._result = None self._callbacks = []

def set_result(self, result):
    self._result = result
    for callback in self._callbacks:
        callback()

def add_done_callback(self, callback):
    self._callbacks.append(callback)


class Task(): def init(self, coro): self.coro = coro

def step(self):
    try:
        fut = self.coro.send(None)
    except StopIteration:
        return
    fut.add_done_callback(self.step)


def request(selector): global COUNT COUNT += 1

fut = Future()

sock = socket.socket()
sock.setblocking(False)

try:
    sock.connect(("", PORT))
except BlockingIOError:
    pass

selector.register(sock.fileno(), selectors.EVENT_WRITE, data=fut)
yield fut
selector.unregister(sock.fileno())

sock.send(b"GET / HTTP/1.1\r\n\r\n")

fut = Future()  # 原來的fut已經用完了,我們要來個新的

selector.register(sock.fileno(), selectors.EVENT_READ, data=fut)
yield fut
selector.unregister(sock.fileno())

data = sock.recv(CHUNK_SIZE)
print(data.decode())
COUNT -= 1


start = time.time() Task(request(selector)).step() Task(request(selector)).step()

while COUNT: for key, _ in selector.select(): fut = key.data fut.set_result(None)

end = time.time() print("use time: %.1f second(s)" % (end - start))</code></pre>

這份代碼對比起一開始的阻塞型代碼,結構上就很類似了,不會因為回調而把一個 函數拆的四分五裂。

 

來自:https://github.com/jiajunhuang/blog/blob/master/articles/2016_11_27-python_coroutine.md

 

 本文由用戶 nszhou 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!