gevent:輕松異步 I/O

yuja3163 8年前發布 | 24K 次閱讀 gevent C/C++開發

來自: http://python.jobbole.com/84301/

介紹

gevent 是一個使用完全同步編程模型的可擴展的異步I/O框架。

讓我們先來看一些示例,這里有一個 echo 服務器:

from gevent.server import StreamServer

def connection_handler(socket, address):
    for l in socket.makefile('r'):
        socket.sendall(l)

if __name__ == '__main__':
    server = StreamServer(('0.0.0.0', 8000), connection_handler)
    server.serve_forever()
from gevent.server import StreamServer
 
def connection_handler(socket, address):
    for l in socket.makefile('r'):
        socket.sendall(l)
 
if __name__ == '__main__':
    server = StreamServer(('0.0.0.0', 8000), connection_handler)
    server.serve_forever()

在這個例子中,我們并行發出100個web請求:

from gevent import monkey
monkey.patch_all()

import urllib2
from gevent.pool import Pool

def download(url):
    return urllib2.urlopen(url).read()

if __name__ == '__main__':
    urls = ['http://httpbin.org/get'] * 100
    pool = Pool(20)
    print pool.map(download, urls)
from gevent import monkey
monkey.patch_all()
 
import urllib2
from gevent.pool import Pool
 
def download(url):
    return urllib2.urlopen(url).read()
 
if __name__ == '__main__':
    urls = ['http://httpbin.org/get'] * 100
    pool = Pool(20)
    print pool.map(download, urls)

有些奇怪monkey.patch_all()的調用?不用擔心,這可不像你每天打的猴子補丁(譯注:monkey patching,即動態修改執行代碼)。這僅僅是Python發行版恰好要打的一組猴子補丁。

最后一個例子是一個聊天服務器:

import gevent from gevent.queue import Queue from gevent.server import StreamServer

users = {} # mapping of username -> Queue

def broadcast(msg): msg += '\n' for v in users.values(): v.put(msg)

def reader(username, f): for l in f: msg = '%s> %s' % (username, l.strip()) broadcast(msg)

def writer(q, sock): while True: msg = q.get() sock.sendall(msg)

def read_name(f, sock): while True: sock.sendall('Please enter your name: ') name = f.readline().strip() if name: if name in users: sock.sendall('That username is already taken.\n') else: return name

def handle(sock, client_addr): f = sock.makefile()

name = read_name(f, sock)

broadcast('## %s joined from %s.' % (name, client_addr[0]))

q = Queue()
users[name] = q

try:
    r = gevent.spawn(reader, name, f)
    w = gevent.spawn(writer, q, sock)
    gevent.joinall([r, w])
finally:
    del(users[name])
    broadcast('## %s left the chat.' % name)

if name == ' main ': import sys try: myip = sys.argv[1] except IndexError: myip = '0.0.0.0'

print 'To join, telnet %s 8001' % myip
s = StreamServer((myip, 8001), handle)
s.serve_forever()

importgevent
fromgevent.queueimportQueue
fromgevent.serverimportStreamServer
 
users = {}  # mapping of username -> Queue
 
defbroadcast(msg):
    msg += '\n'
    for v in users.values():
        v.put(msg)
 
defreader(username, f):
    for l in f:
        msg = '%s> %s' % (username, l.strip())
        broadcast(msg)
 
defwriter(q, sock):
    while True:
        msg = q.get()
        sock.sendall(msg)
 
defread_name(f, sock):
    while True:
        sock.sendall('Please enter your name: ')
        name = f.readline().strip()
        if name:
            if namein users:
                sock.sendall('That username is already taken.\n')
            else:
                return name
 
defhandle(sock, client_addr):
    f = sock.makefile()
 
    name = read_name(f, sock)
 
    broadcast('## %s joined from %s.' % (name, client_addr[0]))
 
    q = Queue()
    users[name] = q
 
    try:
        r = gevent.spawn(reader, name, f)
        w = gevent.spawn(writer, q, sock)
        gevent.joinall([r, w])
    finally:
        del(users[name])
        broadcast('## %s left the chat.' % name)
 
if __name__ == '__main__':
    importsys
    try:
        myip = sys.argv[1]
    exceptIndexError:
        myip = '0.0.0.0'
 
    print 'To join, telnet %s 8001' % myip
    s = StreamServer((myip, 8001), handle)
    s.serve_forever()
</div>

夠簡單吧?讓我們看看用gevent為什么要這樣寫。

同步 I/O

同步I/O是指每個I/O操作被允許阻塞,直到它完成。

為了在同一時間擴展用戶規模,我們需要的線程和進程。每個線程或進程被允許單獨阻塞等待I/O操作。因為我們有完整的并發性,這阻塞不影響其他操作;從每個線程/進程的角度看,世界將停止直到操作完成。當結果準備就緒時,操作系統會恢復線程/進程。

線程的缺點:糟糕的性能。請參閱 Dave Beazley 的 GIL 筆記。還有高內存使用。線程在Linux中分配堆棧內存(請參閱ulimit -s)。這是對 Python 是沒有用的 —— 相對較少線程就會讓你耗盡內存。

進程的缺點:沒有共享的內存空間。還有高內存使用,因為堆棧分配和寫入時復制(譯注:copy-on-write)。線程在Linux中就像一種特殊的進程;內核結構都或多或少是相同的。

異步I/O

所有的異步I/O都依賴于同一種模式.它不在于代碼如何運行,而在于在何處完成等待.多路I/O操作需要統一做等待處理,于是,等待只在代碼中的一個地方出現.當事件觸發的時候,異步系統需要恢復等待這個事件的代碼塊.

接下來的問題不在于在一個地方做等待,而在于如何恢復等待接收事件的代碼塊.

這里有一些方法,關于如何組織一個單線程程序,所有的等待只在代碼中的一個地方完成.在下面的事件循環代碼中,關于resume()和waiter有一些不同的實現方法:

read_waiters = {}
write_waiters = {}
timeout_waiters = []

def wait_for_read(fd, waiter):
    read_waiters[fd] = waiter

wait_for_write = write_waiters.___setitem__

def event_loop():
    while True:
        readfds = read_waiters.keys()
        writefds = write_waiters.keys()

        read, write, error = select.select(
            readfds,  # waiting for read
            writefds,  # waiting for write
            readfds + writefds,  # waiting for errors
        )

        for fd in read:
            resume(read_waiters.pop(fd))

        for fd in write:
            resume(write_waiters.pop(fd))

        # something about errors
read_waiters = {}
write_waiters = {}
timeout_waiters = []
 
def wait_for_read(fd, waiter):
    read_waiters[fd] = waiter
 
wait_for_write = write_waiters.___setitem__
 
def event_loop():
    while True:
        readfds = read_waiters.keys()
        writefds = write_waiters.keys()
 
        read, write, error = select.select(
            readfds,  # waiting for read
            writefds,  # waiting for write
            readfds + writefds,  # waiting for errors
        )
 
        for fd in read:
            resume(read_waiters.pop(fd))
 
        for fd in write:
            resume(write_waiters.pop(fd))
 
        # something about errors

我們可能希望在上面的代碼中增加timeouts,在這種情況下,我們可能會寫一些類似下面的代碼:

timeout_waiters = []

def wait_for_timeout(delay, waiter):
    when = time.time() + delay
    heapq.heappush(timeout_waiters, (when, waiter))

def event_loop():
    while True:
        now = time.time()
        read, write, error = select.select(
            rfds, wfds, efds,
            timeout_waiters[0][0] - time.time()
        )
        while timeout_waiters:
            if timeout_waiters[0][0] <= now:
                 _, waiter = heapq.heappop(timeout_waiters)
                 resume(waiter)
timeout_waiters = []
 
defwait_for_timeout(delay, waiter):
    when = time.time() + delay
    heapq.heappush(timeout_waiters, (when, waiter))
 
defevent_loop():
    while True:
        now = time.time()
        read, write, error = select.select(
            rfds, wfds, efds,
            timeout_waiters[0][0] - time.time()
        )
        while timeout_waiters:
            if timeout_waiters[0][0] <= now:
                _, waiter = heapq.heappop(timeout_waiters)
                resume(waiter)

所有的異步I/O框架都是建立在同樣的模型之上.只是采用了不同的方式構建代碼.這樣,當發出I/O操作請求的時候,可以暫停;當這個操作完成的時候,又可以恢復.

回調

例子:

  • Javascript/Node
  • Tornado IOStream
  • Twisted Deferred
  • asyncio, under the hood

有一種方式是,當有數據可讀的時候,只是調用一個可調用的函數. 通常,我們希望在更高層面去做處理,而不僅僅只是處理一塊一塊的數據.于是,我們用一個回調函數讀取和分析二進制數據塊,當分析數據內容的過程完成后,調用應用程序的回調函數(例如HTTP請求或是應答).

用戶代碼看起來像下面這樣:

def start_beer_request():
    http.get('/api/beer', handle_response)

def handle_response(resp):
    beer = load_beer(resp.json)
    do_something(beer)
def start_beer_request():
    http.get('/api/beer', handle_response)
 
def handle_response(resp):
    beer = load_beer(resp.json)
    do_something(beer)

我們如何才能把響應和特定的請求關聯起來呢?一種方式是使用閉包:

def get_fruit(beer_id, callback):
    def handle_response(resp):
        beer = load_beer(resp.json)
        callback(beer)

    http.get('/api/beer/%d' % beer_id, handle_response)
def get_fruit(beer_id, callback):
    def handle_response(resp):
        beer = load_beer(resp.json)
        callback(beer)
 
    http.get('/api/beer/%d' % beer_id, handle_response)

這兩種方式都比較丑陋,特別是當我們需要關聯更多的I/O調用的時候(有人喜歡這種更深層的嵌套嗎?).這將會無法逃脫嵌套以及碎片化的編程.就像人們常說的,”回調就是新的GOTO語句”.

回調堆棧看起來像這樣:

/presentations/gevent-talk/_static/callbacks.svg

注意,返回值別無用處,視覺上,唯一的傳遞結果的方式是編寫額外的回調函數.

基于方法的回調

例子:

  • Twisted Protocols
  • Tornado RequestHandler
  • asyncio Transports/Protocols

回調簡直是一團糟!但是我們可以將回調組織成接口,而這些接口的函數則自動注冊為回調,這樣,使用者只要繼承接口去實現即可.例如,asyncio的示例代碼:

import asyncio

class EchoClient(asyncio.Protocol):
    message = 'This is the message. It will be echoed.'

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))

    def data_received(self, data):
        print('data received: {}'.format(data.decode()))

    def connection_lost(self, exc):
        print('server closed the connection')
        asyncio.get_event_loop().stop()
import asyncio
 
class EchoClient(asyncio.Protocol):
    message = 'This is the message. It will be echoed.'
 
    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('data sent: {}'.format(self.message))
 
    def data_received(self, data):
        print('data received: {}'.format(data.decode()))
 
    def connection_lost(self, exc):
        print('server closed the connection')
        asyncio.get_event_loop().stop()

這種方式并沒有解決消除回調的問題,而只是提供了一種更為簡潔的框架,去避免回調散布在代碼的各個地方.它設置了對回調怎么調用以及在哪里定義的限制.假設,你希望把兩種協議連接起來,例如,你在處理一個來自用戶的請求過程中,需要將一個HTTP請求發送至后端的REST服務.這時,你就會碰到多個回調在邏輯上碎片化的問題.同時,你也無法使用異步函數的返回值.

回調中的錯誤處理

當使用一個基于回調的編程模型的時候,你不得不注冊額外的錯誤處理回調函數.

可惜的是,不是所有的框架都強化這一點.一個原因是,如果強化的話,將會使每個單獨的程序晦澀難懂.于是,它是一個可選項.結果是,程序員并不是總是會(甚至經常不)這樣做

這違反了 PEP20 :

錯誤應該顯示的傳遞,除非被顯示的忽略了.

沒有合理的處理錯誤的一個較大的風險是,程序內部狀態不再是同步的狀態,可能會因為等待永遠不會觸發的事件而死鎖,或者是無限的保留一個已經斷開的連接的資源.

基于生成器的協程

例子:

  • tornado.gen
  • asyncio/Tulip

生成器已經被用作實現類似協程的功能.它允許我們在某個事件循環系統中,在I/O操作完成后,恢復回調之后的代碼塊.

import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
import asyncio
 
@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    yield from asyncio.sleep(1.0)
    return x + y
 
@asyncio.coroutine
def print_sum(x, y):
    result = yield from compute(x, y)
    print("%s + %s = %s" % (x, y, result))
 
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

(在這個例子中,需要指出的是,通過這種辦法,我們希望產生其他的同步操作,以便獲取一些規模效應的好處).

在PEP255中介紹生成器的時候,它們被描述為可提供類似協程的功能.PEP342和PEP380拓展了這種能力,在給生成器發送異常的同時,也可以讓子生成器產生迭代.

術語”協程”意味著多于一個常規程序的系統.實際上,調用棧每次調用活躍一次.因為每個調用棧是被保留的,一個常規程序可以暫停它的狀態,然后轉換到一個不同的協程.在一些編程語言中,有一個yield關鍵字,某些方式上表現出這樣的效果(和Python里面的yield關鍵字差別很大).

為什么你希望這么做呢?它提供了一種原始形態的多任務–合作的多任務.不像線程,它們不是搶占式的.這意味著它們不會被中斷(搶占),直到顯示的調用了yield.

生成器是這種行為的子集,正好契合’yield’術語.維基百科成它們是半協程.然而,生成器和協程有兩個重要的不同點:

1.生成器只能迭代到調用幀

2.在迭代到調用幀的時候,棧里面的每一幀都需要協作.頂部幀可能迭代,棧里面的其他調用也來自yield.

這個調用棧看起來像下面這樣:

/presentations/gevent-talk/_static/generators.svg

需要指出的是,這種使用yield的方式意味著,你不能使用yield編寫異步生成器,而必須返回列表.

Greenlets/green threads

一個greenlet是一個完整的協程.

例子:

  • gevent
  • greenlet
  • Stackless Python

讓我們用gevent重寫異步I/O的例子:

import gevent

def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    gevent.sleep(1.0)
    return x + y

def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)

print_sum(1, 2)
import gevent
 
def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    gevent.sleep(1.0)
    return x + y
 
def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)
 
print_sum(1, 2)

(同樣的,通過這種方法,我們將啟動其他的greenlet以獲得規模效應的優勢).

這個例子更加簡單了!我們可以簡單的忽略所有的生成器不一致的地方,因為gevent.sleep()可以觸發事件循環(在gevent稱作hub),而不需要引入調用幀.同樣的,因為我們有頂層的協程,hub可以在需要時實例化,它不需要被創建為棧的顯式的父類.

協程提供了從當前棧跳到事件循環,以及從事件循環返回當前棧的神奇功能.

/presentations/gevent-talk/_static/coroutines.svg

上面C語言級別的代碼,允許我們編寫看起來是同步的,但實際上卻所有擁有異步I/O特性的代碼.

Gevent: greenlets 和 monkey-patching

我們的代碼里要假定沒用sleep? 我們可以用gevent的猴子補丁來確保不需要更改代碼:

# These two lines have to be the first thing in your program, before
# anything else is imported
from gevent.monkey import patch_all
patch_all()

import time

def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    time.sleep(1.0)
    return x + y

def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)

print_sum(1, 2)
# These two lines have to be the first thing in your program, before
# anything else is imported
from gevent.monkey import patch_all
patch_all()
 
import time
 
def compute(x, y):
    print "Compute %s + %s ..." % (x, y)
    time.sleep(1.0)
    return x + y
 
def print_sum(x, y):
    result = compute(x, y)
    print "%s + %s = %s" % (x, y, result)
 
print_sum(1, 2)

標準庫中的大部分阻塞調用都會被打補丁,這樣可以用hub調度而不必堵塞。同樣的,線程系統會用微線程代替線程。

monkey-patching很糟糕嗎?

在這種情況下,最好把gevent當做python的一部分,它使用了green thread,然后包含標準庫代碼的不同實現.

部分原因是,它必須在包含程序入口的模塊的最開頭.它是在代碼運行前,我們使用gevent版本stdlib的一個聲明.

monkey patching是優雅的,它允許純Python應用程序和庫在不做修改的情況下變成異步的.它是gevent的一個可選組件,不需要它也可以編寫異步程序.

  • 它可以和已存在的同步純Python代碼一起運行
  • 一般也可以和異步代碼一起運行,只模擬過select(),但是大多數異步框架都有一個基于select()的實現,像epoll()等函數也是可以實現的.

gevents線程原語的例子

因為gevent是基于輕量級的”線程”,所以gevent庫包含大量的并發工具來生成greenlet,實現臨界區(鎖和互斥變量)以及在greenlet之間傳遞消息.

因為它是一個網路庫,所以也包含了一些高級別的網路服務模塊,如TCP服務器和WSGI服務器.

生成和殺死greenlets

  • gevent.spawn(function, *args, **kwargs)  - 生成greenlet
  • gevent.kill(greenlet, exception=GreenletExit) – 殺死greenlet是通過在greenlet里面引發異常.

這里也有一些高級別的原語,像gevent.pool,它基于greenlet,與 multiprocessing.pool對等.

同步原語

  • gevent.lock.Sempahore
  • gevent.lock.RLock
  • gevent.event.Event

消息傳遞

  • gevent.queue.Queue
  • gevent.event.AsyncResult – 等待單一結果而阻塞,也允許引發異常.

超時

這里有一個有用的包裝器,可以用來殺死一個段時間內都沒有成功運行的greenlet.它可以用來在其他一系列操作中引入超時.

from gevent import Timeout

with Timeout(5):
    data = sock.recv()
from gevent import Timeout
 
with Timeout(5):
    data = sock.recv()

更高級別的服務器工具

  • gevent.server.StreamServer – TCP服務器,同時也支持SSL.
  • gevent.server.DatagramServer – UDP服務器.
  • gevent.pywsgi.WSGIServer – WSGI服務器,支持流,保持存活和SSL等.

Gevent I/O模式

使用gevent (避免使用select())時,推薦為各個方面的交互生成一個greenlet–包括讀和寫.每個greenlet的代碼只是一個簡單的循環,在需要時可以阻塞.查看早前的 chat server 的代碼作為示例.

為什么我們想要一個同步的編程模型?

首先,它讓代碼更易懂易讀.它和單線程是一樣的編程方式.使用yield后代碼不會分散.

更重要的是,上面描述的這些方法,只有gevent不需要改變代碼的調用約定.這個重要性不應該被低估,它意味著業務邏輯代碼可以通暢的進入到阻塞代碼.

一個簡單的例子,假設我們想開發一個流式API,之前存在的業務邏輯代碼(process_orders)需要一個迭代器.通過gevent,和遠程服務器交互過程中,我們可以異步的流化迭代器,而不需要修改代碼.

from gevent.socket import create_connection

def process_orders(lines):
    """Do something important with an iterable of lines."""
    for l in lines:
        ...

# Create an asynchronous file-like object with gevent
socket = create_connection((host, port))
f = socket.makefile(mode='r')
process_orders(f)
from gevent.socket import create_connection
 
def process_orders(lines):
    """Do something important with an iterable of lines."""
    for l in lines:
        ...
 
# Create an asynchronous file-like object with gevent
socket = create_connection((host, port))
f = socket.makefile(mode='r')
process_orders(f)

另一個優勢是,在敏感的地方總可以引發異常.

和真實的線程不同,greenlet不會在任意時間暫停,于是可以使用更少的鎖和互斥量.只是在原子操作過程中存在阻塞的風險的時候,需要一個互斥量.

和真實線程的另一個不同是,一個greenlet可以殺死另外一個greenlet–當greenlet下一次恢復的時候觸發一個異常.

缺點

壞消息:Python 3分支版本的gevent還沒有開發完成.我暫未查明是否它根本不可用.

異步I/O的缺陷

和其他異步I/O框架一樣,gevent也有一些缺陷:

  • 阻塞(真正的阻塞,在內核級別)在程序中的某個地方停止了所有的東西.這很像C代碼中monkey patch沒有生效.你需要采用更細致的方法讓C代碼庫”green”.
  • 保持CPU處于繁忙狀態.greenlet不是搶占式的,這可能導致其他greenlet不會被調度.
  • 在greenlet之間存在死鎖的可能.

總的來說,相對于其他異步I/O框架,gevent的缺陷更少(當然,你可能不能死鎖回調,這僅僅是因為,事件循環沒有提供同步原始,所以你無法實現臨界區).

一個gevent回避的缺陷是,你幾乎不會碰到一個和異步無關的Python庫–它將阻塞你的應用程序,因為純Python庫使用的是monkey patch的stdlib.

n到m的并發

對于事件而言,一種更具規模效應的方法是,在m個物理線程中運行n個greenlet.在Python中,我們需要進程來做到這點.在多核系統中,這增加了性能,也增加了靈活性.

這是Rust和Java在其他地方使用的模型.

使用gevent的經驗

我評估過gevent以及在2011提到的其他系統.Gevent無疑是勝者.雖然在性能上面沒太多選擇,但是gevent簡化編程模型卻是一個主要的賣點.不是所有的開發者,在使用生成器,閉包和回調函數的時候都處于相同的水平.但是gevent沒有這個要求,你可以使用任何技術讓代碼更易讀.

在已有的代碼或與I/O無關的業務邏輯中使用gevent也是很有價值的:你可能希望,在高性能的網絡應用程序和線下的批處理進程中重用某個業務邏輯庫.

在接下來的18個月里面,我一直在寫各種各樣的網絡應用程序.一個產品是名為 nucleon 的web服務框架,它的目標是連接RESTful JSON,PostgreSQL和AMQP,所有的都通過”綠色”驅動代碼保持高伸縮性.

盡管我們需要修改代碼,以非阻塞的方式使用PostgreSQL,但是gevent的mokey patching意味著純Python驅動在其他數據存儲過程中已經正常運行了–所以Redis,ElasticSearch和CouchDB都可以透明的使用.

AMQP庫最初是Puka (不是Pika)的一個復制, AMQP庫沒有強化異步的特性(像Pika一樣).我最終完全重寫了它,并把它單獨作為一個名為 nucleon.amqp 的項目. nucleon.amqp允許使用完整的同步編程模型和AMQP服務器交互–AMQP的遠程隊列可以通過Queue API暴露在本地.

在一些并發編程的項目中,也有一些讓人抓狂的時候.但是作為一個團隊,我們適應了gevent,并且開發了一種圖表語言.用它來向各個成員解釋,理解各個greenlet之間的數據流向,相互之間如何阻塞以及如何發送信號.

這個項目成功了.我們保持代碼簡潔和可維護,同時也保證服務高效和可伸縮(負載測試中保留的)

</div>

 本文由用戶 yuja3163 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!