深入理解python3.4中Asyncio庫與Node.js的異步IO機制

ECGBella 7年前發布 | 13K 次閱讀 Python Node.js IO Python開發

譯者前言

  • 如何用yield以及多路復用機制實現一個基于協程的異步事件框架?
  • 現有的組件中yield from是如何工作的,值又是如何被傳入yield from表達式的?
  • 在這個yield from之上,是如何在一個線程內實現一個調度機制去調度協程的?
  • 協程中調用協程的調用棧是如何管理的?
  • gevent和tornado是基于greenlet協程庫實現的異步事件框架,greenlet和asyncio在協程實現的原理又有什么區別?

去年稍微深入地了解了下nodejs,啃完了 樸靈《深入淺出Node.js》 ,自己也稍微看了看nodejs的源碼,對于它的異步事件機制還是有一個大致的輪廓的。雖然說讓自己寫一個類似的機制去實現異步事件比較麻煩,但也并不是完全沒有思路。

而對于python中并發我還僅僅停留在使用框架的水平,對于里面是怎么實現的一點想法也沒有,出于這部分實現原理的好奇,嘗試讀了一個晚上asyncio庫的源碼,感覺還是一頭霧水。像這樣一個較為成熟的庫內容太多了,有好多冗余的模塊擋在核心細節之前,確實會對學習有比較大的阻礙。

我也搜了很多國內的關于asyncio以及python中coroutine的文章,但都感覺還沒到那個意思,不解渴~在網上找到了這篇文章并閱讀之后,我頓時有種醍醐灌頂的感覺,因此決定把這篇長文翻譯出來,獻給國內同樣想了解這部分的朋友們。這篇文章能很好的解答我最前的4個問題,對于第5個問題,還有待去研究greenlet的實現原理。

前言

我花了一個夏天的時間在Node.js的web框架上,那是我第一次全職用Node.js工作。在使用了幾周后,有一件事變得很清晰,那就是我們組的工程師包括我都對Node.js中異步事件機制缺乏了解,也不清楚它底層是怎么實現的。我深信,對一個框架使用非常熟練高效,一定是基于對它的實現原理了解非常深刻之上的。所以我決定去深挖它。這份好奇和執著最后不僅停留在Node.js上,同時也延伸到了對其它語言中異步事件機制的實現,尤其是python。我也是拿python來開刀,去學習和實踐的。于是我接觸到了python 3.4的異步IO庫 asyncio ,它同時也和我對協程(coroutine)的興趣不謀而合, 可以參考我的那篇關于生成器和協程的博客 (譯者注:因為asyncio的異步IO是用協程實現的)。這篇博客是為了回答我在研究那篇博客時產生的問題,同時也希望能解答朋友們的一些疑惑。

這篇博客中所有的代碼都是基于Python 3.4的。這是因為Python 3.4同時引入了 selectorsasyncio 模塊。對于Python以前的版本, Twisted , geventtornado 都提供了類似的功能。

對于本文中剛開始的一些示例代碼,出于簡單易懂的原因,我并沒有引入錯誤處理和異常的機制。在實際編碼中,適當的異常處理是一個非常重要的編碼習慣。在本文的最后,我將用幾個例子來展示Python 3.4中的 asyncio 庫是如何處理異常的。

開始:重溫Hello World

我們來寫一個程序解決一個簡單的問題。本文后面篇幅的多個程序,都是在這題的基礎之上稍作改動,來闡述協程的思想。

寫一個程序每隔3秒打印“Hello World”,同時等待用戶命令行的輸入。用戶每輸入一個自然數n,就計算并打印斐波那契函數的值F(n),之后繼續等待下一個輸入

有這樣一個情況:在用戶輸入到一半的時候有可能就打印了“Hello World!”,不過這個case并不重要,不考慮它。

對于熟悉Node.js和JavaScript的同學可能很快能寫出類似下面的程序:

log_execution_time = require('./utils').log_execution_time;
varfib = functionfib(n) {
  if (n < 2) return n;
  return fib(n - 1) + fib(n - 2);
};
vartimed_fib = log_execution_time(fib);
varsayHello = functionsayHello() {
  console.log(Math.floor((new Date()).getTime() / 1000) + " - Hello world!");
};
varhandleInput = functionhandleInput(data) {
  n = parseInt(data.toString());
  console.log('fib(' + n + ') = ' + timed_fib(n));
};
process.stdin.on('data', handleInput);
setInterval(sayHello, 3000);

跟你所看到的一樣,這題使用Node.js很容易就可以做出來。我們所要做的只是設置一個周期性定時器去輸出“Hello World!”,并且在 process.stdindata 事件上注冊一個回調函數。非常容易,它就是這么工作了,但是原理如何呢?讓我們先來看看Python中是如何做這樣的事情的,再來回答這個問題。

在這里也使用了一個 log_execution_time 裝飾器來統計斐波那契函數的計算時間。

程序中采用的 斐波那契算法 是故意使用最慢的一種的(指數復雜度)。這是因為這篇文章的主題不是關于斐波那契的(可以參考我的 這篇文章 ,這是一個關于斐波那契對數復雜度的算法),同時因為比較慢,我可以更容易地展示一些概念。下面是Python的做法,它將使用數倍的時間。

from log_execution_timeimport log_execution_time
def fib(n):
    return fib(n - 1) + fib(n - 2) if n > 1 else n
timed_fib = log_execution_time(fib)

回到最初的問題,我們如何開始去寫這樣一個程序呢?Python內部并沒有類似于 setInterval 或者 setTimeOut 這樣的函數。

所以第一個可能的做法就是采用系統層的并發——多線程的方式:

from threading import Thread
from time import sleep
from time import time
from fibimport timed_fib
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        sleep(3)
def read_and_process_input():
    while True:
        n = int(input())
        print('fib({}) = {}'.format(n, timed_fib(n)))
def main():
    # Second thread will print the hello message. Starting as a daemon means
    # the thread will not prevent the process from exiting.
    t = Thread(target=print_hello)
    t.daemon = True
    t.start()
    # Main thread will read and process input
    read_and_process_input()
if __name__ == '__main__':
    main()

同樣也不麻煩。但是它和Node.js版本的做法是否在效率上也是差不多的呢?來做個試驗。這個斐波那契計算地比較慢,我們嘗試一個較為大的數字就可以看到比較客觀的效果:Python中用37,Node.js中用45(JavaScript在數字計算上本身就比Python快一些)。

python3.4 hello_threads.py
1412360472 - Helloworld!
37
1412360475 - Helloworld!
1412360478 - Helloworld!
1412360481 - Helloworld!
Executingfibtook 8.96 seconds.
fib(37) = 24157817
1412360484 - Helloworld!

它花了將近9秒來計算,在計算的同時“Hello World!”的輸出并沒有被掛起。下面嘗試下Node.js:

nodehello.js
1412360534 - Helloworld!
1412360537 - Helloworld!
45
Calculationtook 12.793 seconds
fib(45) = 1134903170
1412360551 - Helloworld!
1412360554 - Helloworld!
1412360557 - Helloworld!

不過Node.js在計算斐波那契的時候,“Hello World!”的輸出卻被掛起了。我們來研究下這是為什么。

事件循環和線程

對于線程和事件循環我們需要有一個簡單的認識,來理解上面兩種解答的區別。先從線程說起,可以把線程理解成指令的序列以及CPU執行的上下文(CPU上下文就是寄存器的值,也就是下一條指令的寄存器)。

一個同步的程序總是在一個線程中運行的,這也是為什么在等待,比如說等待IO或者定時器的時候,整個程序會被阻塞。最簡單的掛起操作是 sleep ,它會把當前運行的線程掛起一段給定的時間。一個進程可以有多個線程,同一個進程中的線程共享了進程的一些資源,比如說內存、地址空間、文件描述符等。

線程是由操作系統的調度器來調度的,調度器統一負責管理調度進程中的線程(當然也包括不同進程中的線程,不過對于這部分我將不作過多描述,因為它超過了本文的范疇。),它來決定什么時候該把當前的線程掛起,并把CPU的控制權交給另一個線程來處理。這稱為上下文切換,包括對于當前線程上下文的保存、對目標線程上下文的加載。上下文切換會對性能造成一定的影響,因為它本身也需要CPU周期來執行。

操作系統切換線程有很多種原因:

1.另一個優先級更高的線程需要馬上被執行(比如處理硬件中斷的代碼)

2.線程自己想要被掛起一段時間(比如 sleep )

3.線程已經用完了自己時間片,這個時候線程就不得不再次進入隊列,供調度器調度

回到我們之前的代碼,Python的解答是多線程的。這也解釋了兩個任務可以并行的原因,也就是在計算斐波那契這樣的CPU密集型任務的時候,沒有把其它的線程阻塞住。

再來看Node.js的解答,從計算斐波那契把定時線程阻塞住可以看出它是單線程的,這也是Node.js實現的方式。從操作系統的角度,你的Node.js程序是在單線程上運行的(事實上,根據操作系統的不同, libuv 庫在處理一些IO事件的時候可能會使用線程池的方式,但這并不影響你的JavaScript代碼是跑在單線程上的事實)。

基于一些原因,你可能會考慮避免多線程的方式:

1.線程在計算和資源消耗的角度是較為昂貴的

2.線程并發所帶來的問題,比如因為共享的內存空間而帶來的死鎖和競態條件。這些又會導致更加復雜的代碼,在編寫代碼的時候需要時不時地注意一些線程安全的問題

當然以上這些都是相對的,線程也是有線程的好處的。但討論那些又與本文的主題偏離了,所以就此打住。

來嘗試一下不使用多線程的方式處理最初的問題。為了做到這個,我們需要模仿一下Node.js是怎么做的:事件循環。我們需要一種方式去poll(譯者注:沒想到對這個詞的比較合適的翻譯,輪訓?不合適。) stdin 看看它是否已經準備好輸入了。基于不同的操作系統,有很多不同的系統調用,比如 poll , select , kqueue 等。在Python 3.4中, select 模塊在以上這些系統調用之上提供了一層封裝,所以你可以在不同的操作系統上很放心地使用而不用擔心跨平臺的問題。

有了這樣一個polling的機制,事件循環的實現就很簡單了:每個循環去看看 stdin 是否準備好,如果已經準備好了就嘗試去讀取。之后去判斷上次輸出“Hello world!”是否3秒種已過,如果是那就再輸出一遍。

下面是代碼:

import selectors
import sys
from time import time
from fibimport timed_fib
def process_input(stream):
    text = stream.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))
def print_hello():
    print("{} - Hello world!".format(int(time())))
def main():
    selector = selectors.DefaultSelector()
    # Register the selector to poll for "read" readiness on stdin
    selector.register(sys.stdin, selectors.EVENT_READ)
    last_hello = 0  # Setting to 0 means the timer will start right away
    while True:
        # Wait at most 100 milliseconds for input to be available
        for event, maskin selector.select(0.1):
            process_input(event.fileobj)
        if time() - last_hello > 3:
            last_hello = time()
            print_hello()
if __name__ == '__main__':
    main()

然后輸出:

$ python3.4 hello_eventloop.py
1412376429 - Helloworld!
1412376432 - Helloworld!
1412376435 - Helloworld!
37
Executingfibtook 9.7 seconds.
fib(37) = 24157817
1412376447 - Helloworld!
1412376450 - Helloworld!

跟預計的一樣,因為使用了單線程,該程序和Node.js的程序一樣,計算斐波那契的時候阻塞了“Hello World!”輸出。

Nice!但是這個解答還是有點hard code的感覺。下一部分,我們將使用兩種方式對這個event loop的代碼作一些優化,讓它功能更加強大、更容易編碼,分別是 回調協程

事件循環——回調

對于上面的事件循環的寫法一個比較好的抽象是加入事件的handler。這個用回調的方式很容易實現。對于每一種事件的類型(這個例子中只有兩種,分別是stdin的事件和定時器事件),允許用戶添加任意數量的事件處理函數。代碼不難,就直接貼出來了。這里有一點比較巧妙的地方是使用了 bisect.insort 來幫助處理時間的事件。算法描述如下:維護一個按時間排序的事件列表,最近需要運行的定時器在最前面。這樣的話每次只需要從頭檢查是否有超時的事件并執行它們。 bisect.insort 使得維護這個列表更加容易,它會幫你在合適的位置插入新的定時器事件回調函數。誠然,有多種其它的方式實現這樣的列表,只是我采用了這種而已。

from bisect import insort
from fibimport timed_fib
from time import time
import selectors
import sys
class EventLoop(object):
    """
    Implements a callback based single-threaded event loop as a simple
    demonstration.
    """
    def __init__(self, *tasks):
        self._running = False
        self._stdin_handlers = []
        self._timers = []
        self._selector = selectors.DefaultSelector()
        self._selector.register(sys.stdin, selectors.EVENT_READ)
    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, maskin self._selector.select(0):
                line = key.fileobj.readline().strip()
                for callbackin self._stdin_handlers:
                    callback(line)
            # Handle timer events
            while self._timersand self._timers[0][0] < time():
                handler = self._timers[0][1]
                del self._timers[0]
                handler()
    def add_stdin_handler(self, callback):
        self._stdin_handlers.append(callback)
    def add_timer(self, wait_time, callback):
        insort(self._timers, (time() + wait_time, callback))
    def stop(self):
        self._running = False
def main():
    loop = EventLoop()
    def on_stdin_input(line):
        if line == 'exit':
            loop.stop()
            return
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))
    def print_hello():
        print("{} - Hello world!".format(int(time())))
        loop.add_timer(3, print_hello)
    def f(x):
        def g():
            print(x)
        return g
    loop.add_stdin_handler(on_stdin_input)
    loop.add_timer(0, print_hello)
    loop.run_forever()
if __name__ == '__main__':
    main()

代碼很簡單,實際上Node.js底層也是采用這種方式實現的。然而在更復雜的應用中,以這種方式來編寫異步代碼,尤其是又加入了異常處理機制,很快代碼就會變成所謂的回調地獄( callback hell )。引用 Guido van Rossum 關于回調方式的一段話:

要以回調的方式編寫可讀的代碼,你需要異于常人的編碼習慣。如果你不相信,去看看JavaScript的代碼就知道了——Guido van Rossum

寫異步回調代碼還有其它的方式,比如 promisecoroutine(協程) 。我最喜歡的方式(協程非常酷,我的博客中 這篇文章 就是關于它的)就是采用協程的方式。下一部分我們將展示使用協程封裝任務來實現事件循環的。

事件循環——協程

協程 也是一個函數,它在返回的同時,還可以保存返回前的運行上下文(本地變量,以及下一條指令),需要的時候可以重新加載上下文從上次離開的下一條命令繼續執行。這種方式的 return 一般叫做 yielding 。在 這篇文章 中我介紹了更多關于協程以及在Python中的如何使用的內容。在我們的例子中使用之前,我將對協程做一個更簡單的介紹:

Python中 yield 是一個關鍵詞,它可以用來創建協程。

1.當調用 yield value 的時候,這個 value 就被返回出去了,CPU控制權就交給了協程的調用方。調用 yield 之后,如果想要重新返回協程,需要調用Python中內置的 next 方法。

2.當調用 y = yield x 的時候,x被返回給調用方。要繼續返回協程上下文,調用方需要再執行協程的 send 方法。在這個列子中,給send方法的參數會被傳入協程作為這個表達式的值(本例中,這個值會被y接收到)。

這意味著我們可以用協程來寫異步代碼,當程序等待異步操作的時候,只需要使用yield把控制權交出去就行了,當異步操作完成了再進入協程繼續執行。這種方式的代碼看起來像同步的方式編寫的,非常流暢。下面是一個采用yield計算斐波那契的簡單例子:

def read_input():
    while True:
        line = yield sys.stdin
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))

僅僅這樣還不夠,我們需要一個能處理協程的事件循環。在下面的代碼中,我們維護了一個列表,列表里面保存了,事件循環要運行的 task 。當輸入事件或者定時器事件發生(或者是其它事件),有一些協程需要繼續執行(有可能也要往協程中傳入一些值)。每一個 task 里面都有一個 stack 變量保存了協程的調用棧,棧里面的每一個協程都依賴著后一個協程的完成。這個基于 PEP 342 中 “Trampoline”的例子實現的。代碼中我也使用了 functools.partial ,對應于JavaScript中的 Function.prototype.bind ,即把參數綁定( curry )在函數上,調用的時候不需要再傳參了。

下面是代碼:

from bisect import insort
from collections import deque
from fibimport timed_fib
from functools import partial
from time import time
import selectors
import sys
import types
class sleep_for_seconds(object):
    """
    Yield an object of this type from a coroutine to have it "sleep" for the
    given number of seconds.
    """
    def __init__(self, wait_time):
        self._wait_time = wait_time
class EventLoop(object):
    """
    Implements a simplified coroutine-based event loop as a demonstration.
    Very similar to the "Trampoline" example in PEP 342, with exception
    handling taken out for simplicity, and selectors added to handle file IO
    """
    def __init__(self, *tasks):
        self._running = False
        self._selector = selectors.DefaultSelector()
        # Queue of functions scheduled to run
        self._tasks = deque(tasks)
        # (coroutine, stack) pair of tasks waiting for input from stdin
        self._tasks_waiting_on_stdin = []
        # List of (time_to_run, task) pairs, in sorted order
        self._timers = []
        # Register for polling stdin for input to read
        self._selector.register(sys.stdin, selectors.EVENT_READ)
    def resume_task(self, coroutine, value=None, stack=()):
        result = coroutine.send(value)
        if isinstance(result, types.GeneratorType):
            self.schedule(result, None, (coroutine, stack))
        elif isinstance(result, sleep_for_seconds):
            self.schedule(coroutine, None, stack, time() + result._wait_time)
        elif resultis sys.stdin:
            self._tasks_waiting_on_stdin.append((coroutine, stack))
        elif stack:
            self.schedule(stack[0], result, stack[1])
    def schedule(self, coroutine, value=None, stack=(), when=None):
        """
        Schedule a coroutine task to be run, with value to be sent to it, and
        stack containing the coroutines that are waiting for the value yielded
        by this coroutine.
        """
        # Bind the parameters to a function to be scheduled as a function with
        # no parameters.
        task = partial(self.resume_task, coroutine, value, stack)
        if when:
            insort(self._timers, (when, task))
        else:
            self._tasks.append(task)
    def stop(self):
        self._running = False
    def do_on_next_tick(self, func, *args, **kwargs):
        self._tasks.appendleft(partial(func, *args, **kwargs))
    def run_forever(self):
        self._running = True
        while self._running:
            # First check for available IO input
            for key, maskin self._selector.select(0):
                line = key.fileobj.readline().strip()
                for task, stackin self._tasks_waiting_on_stdin:
                    self.schedule(task, line, stack)
                self._tasks_waiting_on_stdin.clear()
            # Next, run the next task
            if self._tasks:
                task = self._tasks.popleft()
                task()
            # Finally run time scheduled tasks
            while self._timersand self._timers[0][0] < time():
                task = self._timers[0][1]
                del self._timers[0]
                task()
        self._running = False
def print_every(message, interval):
    """
    Coroutine task to repeatedly print the message at the given interval
    (in seconds)
    """
    while True:
        print("{} - {}".format(int(time()), message))
        yield sleep_for_seconds(interval)
def read_input(loop):
    """
    Coroutine task to repeatedly read new lines of input from stdin, treat
    the input as a number n, and calculate and display fib(n).
    """
    while True:
        line = yield sys.stdin
        if line == 'exit':
            loop.do_on_next_tick(loop.stop)
            continue
        n = int(line)
        print("fib({}) = {}".format(n, timed_fib(n)))
def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()
if __name__ == '__main__':
    main()

代碼中我們也實現了一個 do_on_next_tick 的函數,可以在下次事件循環的時候注冊想要執行的函數,這個跟Node.js中的process.nextTick多少有點像。我使用它來實現了一個簡單的 exit 特性(即便我可以直接調用 loop.stop() )。

我們也可以使用協程來重構斐波那契算法代替原有的遞歸方式。這么做的好處在于,協程間可以并發運行,包括輸出“Hello World!”的協程。

斐波那契算法重構如下:

from event_loop_coroutineimport EventLoop
from event_loop_coroutineimport print_every
import sys
def fib(n):
    if n <= 1:
        yield n
    else:
        a = yield fib(n - 1)
        b = yield fib(n - 2)
        yield a + b
def read_input(loop):
    while True:
        line = yield sys.stdin
        n = int(line)
        fib_n = yield fib(n)
        print("fib({}) = {}".format(n, fib_n))
def main():
    loop = EventLoop()
    hello_task = print_every('Hello world!', 3)
    fib_task = read_input(loop)
    loop.schedule(hello_task)
    loop.schedule(fib_task)
    loop.run_forever()
if __name__ == '__main__':
    main()

程序的輸出:

$ python3.4 fib_coroutine.py
1412727829 - Helloworld!
1412727832 - Helloworld!
28
1412727835 - Helloworld!
1412727838 - Helloworld!
fib(28) = 317811
1412727841 - Helloworld!
1412727844 - Helloworld!

不重復造車輪

前面兩個部分,我們分別使用了回調函數和協程實現了事件循環來寫異步的邏輯,對于實踐學習來說確實是一種不錯的方式,但是Python中已經有了非常成熟的庫提供事件循環。Python3.4中的 asyncio 模塊提供了事件循環和協程來處理IO操作、網絡操作等。在看更多有趣的例子前,針對上面的代碼我們用 asyncio 模塊來重構一下:

import asyncio
import sys
from time import time
from fibimport timed_fib
def process_input():
    text = sys.stdin.readline()
    n = int(text.strip())
    print('fib({}) = {}'.format(n, timed_fib(n)))
@asyncio.coroutine
def print_hello():
    while True:
        print("{} - Hello world!".format(int(time())))
        yield from asyncio.sleep(3)
def main():
    loop = asyncio.get_event_loop()
    loop.add_reader(sys.stdin, process_input)
    loop.run_until_complete(print_hello())
if __name__ == '__main__':
    main()

上面的代碼中 @asyncio.coroutine 作為裝飾器來裝飾協程, yield from 用來從其它協程中接收參數。

異常處理

Python中的協程允許異常在協程調用棧中傳遞,在協程掛起的地方捕獲到異常狀態。我們來看一個簡單的例子:

def coroutine():
    print("Starting")
    try:
        yield "Let's pause until continued."
        print("Continuing")
    except Exception as e:
        yield "Got an exception: " + str(e)
def main():
    c = coroutine()
    next(c)  # Execute until the first yield
    # Now throw an exception at the point where the coroutine has paused
    value = c.throw(Exception("Have an exceptional day!"))
    print(value)
if __name__ == '__main__':
    main()

輸出如下:

Starting
Gotanexception: Haveanexceptionalday!

這個特性使得用異常處理問題有一個統一的處理方式,不管是在同步還是異步的代碼中,因為事件循環可以合理地捕獲以及傳遞異常。我們來看一個事件循環和多層調用協程的例子:

import asyncio
@asyncio.coroutine
def A():
    raise Exception("Something went wrong in A!")
@asyncio.coroutine
def B():
    a = yield from A()
    yield a + 1
@asyncio.coroutine
def C():
    try:
        b = yield from B()
        print(b)
    except Exception as e:
        print("C got exception:", e)
def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(C())
if __name__ == '__main__':
    main()

輸出:

C gotexception: Somethingwentwrongin A!

在上面的例子中,協程C依賴B的結果,B又依賴A的結果,A最后拋出了一個異常。最后這個異常一直被傳遞到了C,然后被捕獲輸出。這個特性與同步的代碼的方式基本一致,并不用手動在B中捕獲、再拋出!

當然,這個例子非常理論化,沒有任何創意。讓我們來看一個更像生產環境中的例子:我們使用 ipify 寫一個程序異步地獲取本機的ip地址。因為 asyncio 庫并沒有HTTP客戶端,我們不得不在TCP層手動寫一個HTTP請求,并且解析返回信息。這并不難,因為API的內容都以及胸有成竹了(僅僅作為例子,不是產品代碼),說干就干。實際應用中,使用 aiohttp 模塊是一個更好的選擇。下面是實現代碼:

import asyncio
import json
host = 'api.ipify.org'
request_headers = {'User-Agent': 'python/3.4',
                  'Host': host,
                  'Accept': 'application/json',
                  'Accept-Charset': 'UTF-8'}
@asyncio.coroutine
def write_headers(writer):
    for key, valuein request_headers.items():
        writer.write((key + ': ' + value + '\r\n').encode())
    writer.write(b'\r\n')
    yield from writer.drain()
@asyncio.coroutine
def read_headers(reader):
    response_headers = {}
    while True:
        line_bytes = yield from reader.readline()
        line = line_bytes.decode().strip()
        if not line:
            break
        key, value = line.split(':', 1)
        response_headers[key.strip()] = value.strip()
    return response_headers
@asyncio.coroutine
def get_my_ip_address(verbose):
    reader, writer = yield from asyncio.open_connection(host, 80)
    writer.write(b'GET /?format=json HTTP/1.1\r\n')
    yield from write_headers(writer)
    status_line = yield from reader.readline()
    status_line = status_line.decode().strip()
    http_version, status_code, status = status_line.split(' ')
    if verbose:
        print('Got status {} {}'.format(status_code, status))
    response_headers = yield from read_headers(reader)
    if verbose:
        print('Response headers:')
        for key, valuein response_headers.items():
            print(key + ': ' + value)
    # Assume the content length is sent by the server, which is the case
    # with ipify
    content_length = int(response_headers['Content-Length'])
    response_body_bytes = yield from reader.read(content_length)
    response_body = response_body_bytes.decode()
    response_object = json.loads(response_body)
    writer.close()
    return response_object['ip']
@asyncio.coroutine
def print_my_ip_address(verbose):
    try:
        ip_address = yield from get_my_ip_address(verbose)
        print("My IP address is:")
        print(ip_address)
    except Exception as e:
        print("Error: ", e)
def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(print_my_ip_address(verbose=True))
    finally:
        loop.close()
if __name__ == '__main__':
    main()

是不是跟同步代碼看著很像?:沒有回調函數,沒有復雜的錯誤處理邏輯,非常簡單、可讀性非常高的代碼。

下面是程序的輸出,沒有任何錯誤:

$ python3.4 ipify.py
Gotstatus 200 OK
Responseheaders:
Content-Length: 21
Server: Cowboy
Connection: keep-alive
Via: 1.1 vegur
Content-Type: application/json
Date: Fri, 10 Oct 2014 03:46:31 GMT
MyIPaddressis:

<my IP address here, hidden for privacy!>

</code></pre>

使用協程來處理異步邏輯的主要優勢在我看來就是:錯誤處理與同步代碼幾乎一致。比如在上面的代碼中,協程調用鏈中任意一環出錯,并不會導致什么問題,錯誤與同步代碼一樣被捕獲,然后處理。

依賴多個互不相關協程的返回結果

在上面的例子中,我們寫的程序是順序執行的,雖然使用了協程,但互不相關的協程并沒有完美地并發。也就是說,協程中的每一行代碼都依賴于前一行代碼的執行完畢。有時候我們需要一些互不相關的協程并發執行、等待它們的完成結果,并不在意它們的執行順序。比如,使用網絡爬蟲的時候,我們會給頁面上的所有外鏈發送請求,并把返回結果放入處理隊列中。

協程可以讓我們用同步的方式編寫異步的代碼,但是對于處理互不相關的任務不論是完成后馬上處理抑或是最后統一處理,回調的方式看上去是最好的選擇。但是,Python 3.4的 asyncio 模塊同時也提供了以上兩種情形的支持。分別是函數 asyncio.as_completedasyncio.gather

我們來一個例子,例子中需要同時加載3個URL。采用兩種方式:

1.使用 asyncio.as_completed 一旦請求完成就處理

2.使用 asyncio.gather 等待所有都完成一起處理

與其加載真的URL地址,我們采用一個更簡單的方式,讓協程掛起隨機長度的時間。

下面是代碼:

import asyncio
import random
@asyncio.coroutine
def get_url(url):
    wait_time = random.randint(1, 4)
    yield from asyncio.sleep(wait_time)
    print('Done: URL {} took {}s to get!'.format(url, wait_time))
    return url, wait_time
@asyncio.coroutine
def process_as_results_come_in():
    coroutines = [get_url(url) for urlin ['URL1', 'URL2', 'URL3']]
    for coroutinein asyncio.as_completed(coroutines):
        url, wait_time = yield from coroutine
        print('Coroutine for {} is done'.format(url))
@asyncio.coroutine
def process_once_everything_ready():
    coroutines = [get_url(url) for urlin ['URL1', 'URL2', 'URL3']]
    results = yield from asyncio.gather(*coroutines)
    print(results)
def main():
    loop = asyncio.get_event_loop()
    print("First, process results as they come in:")
    loop.run_until_complete(process_as_results_come_in())
    print("\nNow, process results once they are all ready:")
    loop.run_until_complete(process_once_everything_ready())
if __name__ == '__main__':
    main()

輸出如下:

$ python3.4 gather.py
First, processresultsas theycomein:
Done: URLURL2took 2s toget!
Coroutinefor URL2is done
Done: URLURL3took 3s toget!
Coroutinefor URL3is done
Done: URLURL1took 4s toget!
Coroutinefor URL1is done
Now, processresultsoncetheyareall ready:
Done: URLURL1took 1s toget!
Done: URLURL2took 3s toget!
Done: URLURL3took 4s toget!
[('URL1', 1), ('URL2', 3), ('URL3', 4)]

更加深入

有很多內容本篇文章并沒有涉及到,比如 Futures libuv 這個視頻(需要KX上網) 是介紹Python中的異步IO的。本篇文章中也有可能有很多我遺漏的內容,歡迎隨時在評論中給我補充。

 

來自:http://python.jobbole.com/87431/

 

 本文由用戶 ECGBella 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!