淺析tornado協程運行原理

zhang520 8年前發布 | 19K 次閱讀 Tornado Python開發

來自: http://xidui.github.io/2016/01/26/淺析tornado協程運行原理/

前言

去年有一段時間一直在研究各種python協程框架,包括gevent, asyncio, tornado。閱讀tornado的源碼還是兩個多月前的事了,一直想寫一篇文章出來整理整理,但不知道從何處開始下筆。如果貼上一段段源碼,然后通過語言來描述各種流程,這種類型的文章網上也有不少,況且這樣子的講解對于讀者來說可能會比較乏味。

我希望我對于源碼分析的博文能夠通過貼上更容易理解的圖(當然也會有一些代碼來輔助講解),這樣的分享對讀者來說會更加容易讀懂,也更有價值。對自己要求高了,反而更難下筆,在試圖畫圖的過程中,發現其實有好多細節自己也沒有琢磨透,導致在如何組織這幅流程圖的問題上斟酌了好久,不過好在最后終于捯飭出了一張自己覺得還算及格的流程圖,作完圖的時候我感覺比起之前剛閱讀完代碼時候的理解又上了一個層次。

流程圖

tornado執行協程的方式有很多,但協程內部的運行原理沒什么區別,這篇文章以 IOLoop 中的 run_sync 函數作為入口進行介紹。在開始進行分析之前,先把流程圖貼上,其中的細節后面會通過代碼輔助的方式一一講解。

在理解tornado運行原理的過程中,我是通過寫一個demo,然后在源碼中到處打斷點,然后調試的方式,一遍遍走,到最后慢慢地理解。順便也把我的demo代碼貼上吧(看過我之前的一篇譯文的讀者可能會發現,這個demo是從那兒仿照過來的)。

import random
import time
from tornado import gen
from tornado.ioloop import IOLoop


@gen.coroutine
def get_url(url):
    wait_time = random.randint(1, 4)
    yield gen.sleep(wait_time)
    print('URL {} took {}s to get!'.format(url, wait_time))
    raise gen.Return((url, wait_time))


@gen.coroutine
def outer_coroutine():
    before = time.time()
    coroutines = [get_url(url) for url in ['URL1', 'URL2', 'URL3']]
    result = yield coroutines
    after = time.time()
    print(result)
    print('total time: {} seconds'.format(after - before))

if __name__ == '__main__':
    IOLoop.current().run_sync(outer_coroutine)

</div>

有興趣的讀者可以自己去執行一下玩玩,輸出類似于這樣:

URL URL1 took 1s to get!
URL URL2 took 2s to get!
URL URL3 took 2s to get!
[('URL1', 1), ('URL2', 2), ('URL3', 2)]
total time: 2.00353884697 seconds

</div>

Coroutine

起初我以為調用協程后,返回的是一個生成器對象,畢竟 gen.coroutine 裝飾在一個函數或者生成器上。看了源碼發現,其實每次調用一個協程,它在獲取了生成器對象之后,同時又對它執行了 next 操作來獲取生成器內部yield出來的值,這個可以是一個值,當然也可以是一個由內部協程嵌套調用返回的future對象。

# gen.py
def _make_coroutine_wrapper(func, replace_callback):
 @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()
        # 省略n行
        try:
            result = func(*args, **kwargs)
        # 省略n個except
        else:
            if isinstance(result, types.GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)    
                    # 如果func內部有yield關鍵字,result是一個生成器
                    # 如果func內部又調用了其它協程,yielded將會是由嵌套協程返回的future對象
                    # 省略n行
                # 省略n個except
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper

</div>

Future

我覺得 Future 在tornado中是一個很奇妙的對象,它是一個穿梭于協程和調度器之間的信使。提供了回調函數注冊(當異步事件完成后,調用注冊的回調)、中間結果保存、嵌套協程喚醒父協程(通過Runner實現)等功能。Coroutine和Future是一一對應的,可以從上節gen.coroutine裝飾器的實現中看到。每調用一個協程,表達式所返回的就是一個Future對象,它所表達的意義為: 這個協程的內部各種異步邏輯執行完畢后,會把結果保存在這個Future中,同時調用這個Future中指定的回調函數 ,而future中的回調函數是什么時候被注冊的呢?那就是當前——你通過調用協程,返回了這個future對象的時候:

我們看看demo代碼中run_sync的實現:

# ioloop.py IOLoop
def run_sync(self, func, timeout=None):
    future_cell = [None]

    def run():
        try:
            result = func()
        except Exception:
            future_cell[0] = TracebackFuture()
            future_cell[0].set_exc_info(sys.exc_info())
        else:
            if is_future(result):
                future_cell[0] = result
            else:
                future_cell[0] = TracebackFuture()
                future_cell[0].set_result(result)
        self.add_future(future_cell[0], lambda future: self.stop())
    self.add_callback(run)
    if timeout is not None:
        timeout_handle = self.add_timeout(self.time() + timeout, self.stop)
    self.start()
    if timeout is not None:
        self.remove_timeout(timeout_handle)
    if not future_cell[0].done():
        raise TimeoutError('Operation timed out after %s seconds' % timeout)
    return future_cell[0].result()

</div>

代碼中先給 IOLoop 注冊一個回調函數,等下個事件循環再執行內部定義的run函數。在run中通過 result = func() 執行協程 outer_coroutine ,result則是該協程對應的future對象。如果這個時候不對future作任何操作,最后這個future完成后也不會執行任何回調。所以在源碼中通過 add_future 給這個future添加回調函數,也就是 self.stop() ,表明這個協程執行完畢后觸發的操作是退出事件循環。

其實IOLoop::add_future這個函數的命名會有些奇怪,剛讀代碼還不知道它是干嘛的(給IOLoop添加future是什么鬼?如果說是add_callback那還容易理解),看了add_future的實現就明白了:

# ioloop.py IOLoop
def add_future(self, future, callback):
    """Schedules a callback on the ``IOLoop`` when the given
 `.Future` is finished.

 The callback is invoked with one argument, the
 `.Future`.
 """
    assert is_future(future)
    callback = stack_context.wrap(callback)
    future.add_done_callback(
        lambda future: self.add_callback(callback, future))

</div>

它并不會給IOLoop添加future(也沒有什么意義),它只是給這個future添加回調函數而已,而這個回調函數是當這個future完成以后給IOLoop添加一個回調函數(有點繞,哈哈~給IOLoop添加的回調函數在這里就是stop)。 因此當一個future完成以后,到最后future的回調函數真正被執行將會隔著一個IOLoop的事件循環,而不是馬上會被執行 。

Runner

如果說tornado是一輛車,那么Runner對象就是它的發動機,由它來調度各種協程來完成異步事件的操作。Coroutine和Runner也是一一對應的,每個Coroutine都是由一個Runner實例去執行的。協程包裝著生成器(當然也有可能是函數,本文考慮比較復雜的協程嵌套調用的情況),在生成器內部,也有可能會調用其它的協程,從而把內部協程的future對象yield出來,這個runner就會通過調用返回的方式( future = next(gen) )接到內部出來的future,并把它納入執行的loop中,先是 handle_yielded ,再是 run (中間會隔著一個或者多個IOLoop的事件循環,因此圖中是用虛線表示的)。

調度器中有兩個比較重要的函數: handle_yielded 和 run ,先來看 handle_yielded :

# gen.py Runner
def handle_yield(self, yielded):
    # Lists containing YieldPoints require stack contexts;
    # other lists are handled via multi_future in convert_yielded.
    if (isinstance(yielded, list) and
            any(isinstance(f, YieldPoint) for f in yielded)):
        yielded = Multi(yielded)
    elif (isinstance(yielded, dict) and
          any(isinstance(f, YieldPoint) for f in yielded.values())):
        yielded = Multi(yielded)

    if isinstance(yielded, YieldPoint):
        # 省略n行
    else:
        try:
            self.future = convert_yielded(yielded)
        except BadYieldError:
            self.future = TracebackFuture()
            self.future.set_exc_info(sys.exc_info())

    if not self.future.done() or self.future is moment:
        self.io_loop.add_future(
            self.future, lambda f: self.run())
        return False
    return True

</div>

在runner中, handle_yielded 用于處理generator返回的內部協程future對象。因為協程處理的大部分是異步的事件,所以內部協程yield出來的future對象狀態多半還是處于未完成。這個時候收到該future的Runner所能做的也僅僅只是注冊一個回調函數而已(上面源碼的最后幾行)。

再來看看 run :

# gen.py Runner
def run(self):
    """Starts or resumes the generator, running until it reaches a
 yield point that is not ready.
 """
    if self.running or self.finished:
        return
    try:
        self.running = True
        while True:
            future = self.future
            if not future.done():
                return
            self.future = None
            try:
                orig_stack_contexts = stack_context._state.contexts
                exc_info = None

                try:
                    value = future.result()
                except Exception:
                    self.had_exception = True
                    exc_info = sys.exc_info()

                if exc_info is not None:
                    yielded = self.gen.throw(*exc_info)
                    exc_info = None
                else:
                    yielded = self.gen.send(value)

                if stack_context._state.contexts is not orig_stack_contexts:
                    self.gen.throw(
                        stack_context.StackContextInconsistentError(
                            'stack_context inconsistency (probably caused '
                            'by yield within a "with StackContext" block)'))
            except (StopIteration, Return) as e:
                self.finished = True
                self.future = _null_future
                if self.pending_callbacks and not self.had_exception:
                    # If we ran cleanly without waiting on all callbacks
                    # raise an error (really more of a warning). If we
                    # had an exception then some callbacks may have been
                    # orphaned, so skip the check in that case.
                    raise LeakedCallbackError(
                        "finished without waiting for callbacks %r" %
                        self.pending_callbacks)
                self.result_future.set_result(getattr(e, 'value', None))
                self.result_future = None
                self._deactivate_stack_context()
                return
            except Exception:
                self.finished = True
                self.future = _null_future
                self.result_future.set_exc_info(sys.exc_info())
                self.result_future = None
                self._deactivate_stack_context()
                return
            if not self.handle_yield(yielded):
                return
    finally:
        self.running = False

</div>

run函數中的注釋很好得詮釋了它的作用,它就是不斷地給傳入Runner的generator執行next或者send操作(next或send都會讓生成器繼續運行,區別就是send會傳一個參數進去),直到generator返回的future對象狀態還未完成,需要等待異步響應,這個時候它會調用handle_yielded。

異步響應來了以后,就會調用這個run,為什么呢?因為在 handle_yielded 中給這個future注冊了回調函數,回調函數就是 run 函數。然后在run函數中執行send(value),讓這個生成器繼續運行,如此往復循環,直到generator退出。

generator退出就代表著這個Runner引擎所跑的Coroutine完成了,然后再給這個Coroutine所對應的Future對象執行set_result操作,表示這個協程的Future已完成了,可以執行它的回調函數了。

這個回調函數對于outer_coroutine的future來說就是執行IOLoop的stop操作。對于inner_coroutine的future來說就是outer_coroutine對應的Runner的run操作。這句話很繞,但是要是真讀懂了,相信對于它的運行原理也就了解的差不多了。

IOLoop

IOLoop是一個很常見的模塊,就是多路復用IO機制,好多項目中都有這一塊的封裝,原理都差不多。也可以參考 shadowsocks 中的loop模塊,它也是用python實現的基于多種不同操作系統io多路復用的封裝。tornado的ioloop也是類似的,記錄了一個個文件描述符和handler的pair,每當有io事件發生,就會調用該文件描述符對應的handler。如果這個handler是對future執行set_result操作,那連鎖地就會執行Runner中的run,從而進入Runner的運行循環中,直到需要等待下一個異步事件,然后再向ioloop注冊事件。。。如此循環往復。

總結

我講的可能詞不達意,畢竟我自己也是看了好多遍源碼,才一步步理解清晰的。讀者也不妨運行我的例子,逐步調試看看,說不定會有意想不到的收獲。如果我哪些地方講的欠妥當,也歡迎大家來指正。

</div>

 本文由用戶 zhang520 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!