gevent:輕松異步 I/O

jopen 10年前發布 | 133K 次閱讀 gevent C/C++開發

介紹

gevent是一個使用完全同步編程模型的可擴展的異步I/O框架。

讓我們先來看一些示例,這里有一個 echo 服務器:

from gevent.server import StreamServer

def connection_handler(socket, address):     for l in socket.makefile('r'):         socket.sendall(l)

if name == 'main':     server = StreamServer(('0.0.0.0', 8000), connection_handler)     server.serve_forever()</pre>

在這個例子中,我們并行發出100個web請求:

from gevent import monkey
monkey.patch_all()

import urllib2 from gevent.pool import Pool

def download(url):     return urllib2.urlopen(url).read()

if name == 'main':     urls = ['     pool = Pool(20)     print pool.map(download, urls)</pre>

有些奇怪monkey.patch_all()的調用?不用擔心,這可不像你每天打的猴子補丁(譯注:monkey patching,即動態修改執行代碼)。這僅僅是Python發行版恰好要打的一組猴子補丁。

最后一個例子是一個聊天服務器:

import gevent
from gevent.queue import Queue
from gevent.server import StreamServer

users = {}  # mapping of username -> Queue

def broadcast(msg):     msg += '\n'     for v in users.values():         v.put(msg)

def reader(username, f):     for l in f:         msg = '%s> %s' % (username, l.strip())         broadcast(msg)

def writer(q, sock):     while True:         msg = q.get()         sock.sendall(msg)

def read_name(f, sock):     while True:         sock.sendall('Please enter your name: ')         name = f.readline().strip()         if name:             if name in users:                 sock.sendall('That username is already taken.\n')             else:                 return name

def handle(sock, client_addr):     f = sock.makefile()

    name = read_name(f, sock)

    broadcast('## %s joined from %s.' % (name, client_addr[0]))

    q = Queue()     users[name] = q

    try:         r = gevent.spawn(reader, name, f)         w = gevent.spawn(writer, q, sock)         gevent.joinall([r, w])     finally:         del(users[name])         broadcast('## %s left the chat.' % name)

if name == 'main':     import sys     try:         myip = sys.argv[1]     except IndexError:         myip = '0.0.0.0'

    print 'To join, telnet %s 8001' % myip     s = StreamServer((myip, 8001), handle)     s.serve_forever()</pre>

夠簡單吧?讓我們看看用gevent為什么要這樣寫。

同步 I/O

同步I/O是指每個I/O操作被允許阻塞,直到它完成。 

為了在同一時間擴展用戶規模,我們需要的線程和進程。每個線程或進程被允許單獨阻塞等待I/O操作。因為我們有完整的并發性,這阻塞不影響其他操作;從每個線程/進程的角度看,世界將停止直到操作完成。當結果準備就緒時,操作系統會恢復線程/進程。 





線程的缺點:糟糕的性能。請參閱 Dave Beazley 的 GIL 筆記。還有高內存使用。線程在Linux中分配堆棧內存(請參閱ulimit -s)。這是對 Python 是沒有用的 —— 相對較少線程就會讓你耗盡內存。

進程的缺點:沒有共享的內存空間。還有高內存使用,因為堆棧分配和寫入時復制(譯注:copy-on-write)。線程在Linux中就像一種特殊的進程;內核結構都或多或少是相同的。

異步I/O

所有的異步I/O都依賴于同一種模式.它不在于代碼如何運行,而在于在何處完成等待.多路I/O操作需要統一做等待處理,于是,等待只在代碼中的一個地方出現.當事件觸發的時候,異步系統需要恢復等待這個事件的代碼塊.

接下來的問題不在于在一個地方做等待,而在于如何恢復等待接收事件的代碼塊.

這里有一些方法,關于如何組織一個單線程程序,所有的等待只在代碼中的一個地方完成.在下面的事件循環代碼中,關于resume()和waiter有一些不同的實現方法:

read_waiters = {}
write_waiters = {}
timeout_waiters = []

def wait_for_read(fd, waiter):     read_waiters[fd] = waiter

wait_for_write = write_waiters._setitem

def event_loop():     while True:         readfds = read_waiters.keys()         writefds = write_waiters.keys()

        read, write, error = select.select(             readfds,  # waiting for read             writefds,  # waiting for write             readfds + writefds,  # waiting for errors         )

        for fd in read:             resume(read_waiters.pop(fd))

        for fd in write:             resume(write_waiters.pop(fd))

        # something about errors</pre>

我們可能希望在上面的代碼中增加timeouts,在這種情況下,我們可能會寫一些類似下面的代碼:

timeout_waiters = []

def wait_for_timeout(delay, waiter):     when = time.time() + delay     heapq.heappush(timeout_waiters, (when, waiter))

def event_loop():     while True:         now = time.time()         read, write, error = select.select(             rfds, wfds, efds,             timeout_waiters[0][0] - time.time()         )         while timeout_waiters:             if timeoutwaiters[0][0] <= now:                  , waiter = heapq.heappop(timeout_waiters)                  resume(waiter)</pre>

所有的異步I/O框架都是建立在同樣的模型之上.只是采用了不同的方式構建代碼.這樣,當發出I/O操作請求的時候,可以暫停;當這個操作完成的時候,又可以恢復.

回調

例子:

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!