零基礎編寫Python Redis Client(一)
什么是AIO
AIO是Asynchronous Input/Output的簡寫,也就是異步IO。不過在談什么是AIO之前,我們可能要先介紹一下BIO。那么什么是BIO呢?簡單的說,BIO是Blocking Input/Output,也就是阻塞IO,他實現的通常是在線程池中找出一個線程處理IO,在IO過程中,其他線程都需要等待IO完成后才可以從中選取一個線程占用IO。這樣最大的問題是,當線程數量較多,并且需要大量的IO操作時,就會造成一個大量的阻塞,因為實際上每次只有一個線程在處理IO。
那么如何解決這個時候的問題呢?這時候就提出了AIO的概念。通常在IO處理過程中也會伴有一些其他的處理操作,假如把所有的操作都浪費在了等待IO釋放上,線程池中的線程利用率也太低了,因此我們需要一種方式,在申請IO處理之后,就去繼續做其他的事情,等IO操作完成了,然后通知我們已經OK,我們可以繼續處理了。這也就是我們常說的AIO的原型。
AIO的情況也說明了它適用的場景:長連接場景,或者重度的IO操作等等的情況。
如果找軟件來做案例,我們可以找一個可能大家熟知的:NGINX。正如我們所知,NGINX采用了 異步、事件驅動的方法來處理連接 。這種處理方式無需(像使用傳統架構的服務器一樣)為每個請求創建額外的專用進程或者線程,而是在一個工作進程中處理多個連接和請求。為此,NGINX工作在非阻塞的socket模式下,并使用了epoll 和 kqueue這樣有效的方法。
這部分的內容,在 NGINX引入線程池 性能提升9倍 中進行了詳細的介紹,包含了NGINX的異步應用經驗,同時介紹了NGINX中引入了阻塞的線程池用于解決某些特定場景問題下的效率。
如何實現Python的異步IO
這篇文章會以最新的Python 3.5為基礎來介紹實現一個異步的Python Redis Client。不過在此之前,我們先來看一下,怎么實現Python的aio。
Python的aio官方封裝了一個比較合適的基礎庫 asyncio 。
從一個例子開始簡單認識一下如何實現一個異步的aio client。這里以官方文檔中的例子為例:
import asyncio
async def tcp_echo_client(message, loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = await reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()</pre>
這里面用到的Python 3.5中引入的 async/await 關鍵字,還有 asyncio 庫。這里面 asyncio.open_connection 會返回一個coroutine,這個可以使用await進行一個aio的調用,即,在收到返回信號之前,程序可以繼續去處理其他的任務。這里面真正核心的就是 EventLoop ,它負責監視發送這些信號,并且返回數據,它可以通過 asyncio.get_event_loop 獲取到。然后他會真正返回的數據是一個讀取 StreamReader 和寫入 StreamWriter 的對象。
接下來,就可以通過這個 reader 和 writer 進行數據的讀取和寫入。 writer 是可以直接寫入的,如果是 reader 的話,就需要aio的方式等待受到數據后返回。這樣看起來更接近于普通的socket編程。不過關閉連接時,僅僅需要關閉 writer 就足夠了。
從socket通訊到redis通訊
本質上來說,所有的網絡請求都可以看成是SocketIO的請求,因此,我們可以把Redis的請求當做是一個socket的通訊來進行,這樣就很方便了。
不過先等一等,那么通訊的數據格式怎么辦?沒關系,這里我們使用 hiredis-py 來解決協議解析的問題。不過,從庫設計的角度來說,我們需要封裝一個RedisConnection的類出來解決Redis的通訊協議。它可能傳入的參數包含,一個 StreamReader 、一個 StreamWriter ,一個 EventLoop ,哦,別忘記還有編碼 encoding 。其他的我們就用一個 * 來表示好了。
class RedisConnection(object):
'''Redis Connection'''
def init(self, reader, writer, *, encoding=None, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._reader = reader
self._writer = writer
self._encoding = encoding
self._loop = loop
self._db = 0
def __repr__(self):
return '<RedisConnection [db:{}]>'.format(self._db)</pre> <p>記得加上 __repr__ 用來描述這個對象,這個可是一個好習慣。接下來就需要完善這個類了,比如,我們需要添加一個關閉連接的方法,這需要至少一個參數用于標記連接是否關閉,一個用于執行關閉操作,比如我們需要這樣子的: </p>
def close(self):
"""Close connection."""
self._do_close(None)
def _do_close(self, exc):
if self._closed:
return
self._closed = True
self._closing = False
# 關閉寫入
self._writer.transport.close()
# 取消讀取任務
self._reader_task.cancel()
self._reader_task = None
self._writer = None
self._reader = None
@property
def closed(self):
"""True if connection is closed."""
closed = self._closing or self._closed
if not closed and self._reader and self._reader.at_eof():
self._closing = closed = True
self._loop.call_soon(self._do_close, None)
return closed</pre> <p>連接這類的方法已經處理完了,接下來就應該是執行Redis命令了,我們可以叫它 execute 。那他需要幾個東西,一個是執行的指令 command ,一個是指令參數 *args ,還有一些其他的,比如編碼 encoding 。這里為了節省時間,只是考慮一些Set和Get的基本操作。哦,不過等等,那么Redis的數據結構是什么樣子的呢?我們還需要先把它編譯成Redis-server可以識別的形式,那么需要一個 encode_command 方法。 </p>
_converters = {
bytes: lambda val: val,
bytearray: lambda val: val,
str: lambda val: val.encode('utf-8'),
int: lambda val: str(val).encode('utf-8'),
float: lambda val: str(val).encode('utf-8'),
}
def encode_command(*args):
"""Encodes arguments into redis bulk-strings array.
Raises TypeError if any of args not of bytes, str, int or float type.
"""
buf = bytearray()
def add(data):
return buf.extend(data + b'\r\n')
add(b'*' + _bytes_len(args))
for arg in args:
if type(arg) in _converters:
barg = _converters[type(arg)](arg)
add(b'$' + _bytes_len(barg))
add(barg)
else:
raise TypeError("Argument {!r} expected to be of bytes,"
" str, int or float type".format(arg))
return buf</pre> <p>這樣可以轉化為可以識別的形式了,接下來還有一個問題,那么怎么讓程序可以等待信號的生效呢?這里介紹一下 asyncio.Future 。這個 asyncio.Future 類是用于封裝回調函數的類,包含了一些更加方便使用的方法。通過這個類,可以實現aio的通知機制,也就是回調。這個類實例可以通過 await 返回我們需要的結果。不過這樣就還需要在項目中添加一些更多的變量,比如所有等待返回的 self._waiters 。 </p>
def execute(self, command, *args, encoding=None):
"""Executes redis command and returns Future waiting for the answer.
Raises:
* TypeError if any of args can not be encoded as bytes.
* ReplyError on redis '-ERR' resonses.
* ProtocolError when response can not be decoded meaning connection
is broken.
"""
assert self._reader and not self._reader.at_eof(), (
"Connection closed or corrupted")
if command is None:
raise TypeError("command must not be None")
if None in set(args):
raise TypeError("args must not contain None")
# 這樣小寫也沒有問題了
command = command.upper().strip()
if encoding is None:
encoding = self._encoding
fut = asyncio.Future(loop=self._loop)
self._writer.write(encode_command(command, *args))
self._waiters.append((fut, encoding, cb))
return fut</pre> <p>現在所有的命令都已經發送到了redis-server,接下來就需要讀取對應的結果了。</p>
async def _read_data(self):
"""Response reader task."""
while not self._reader.at_eof():
try:
data = await self._reader.read(65536)
except asyncio.CancelledError:
break
except Exception as exc:
# XXX: for QUIT command connection error can be received
# before response
logger.error("Exception on data read %r", exc, exc_info=True)
break
self._parser.feed(data)
while True:
try:
obj = self._parser.gets()
except ProtocolError as exc:
# ProtocolError is fatal
# so connection must be closed
self._closing = True
self._loop.call_soon(self._do_close, exc)
if self._in_transaction:
self._transaction_error = exc
return
else:
if obj is False:
break
else:
self._process_data(obj)
self._closing = True
self._loop.call_soon(self._do_close, None)
def _process_data(self, obj):
"""Processes command results."""
waiter, encoding, cb = self._waiters.popleft()
if waiter.done():
logger.debug("Waiter future is already done %r", waiter)
assert waiter.cancelled(), (
"waiting future is in wrong state", waiter, obj)
return
if isinstance(obj, RedisError):
waiter.set_exception(obj)
if self._in_transaction:
self._transaction_error = obj
else:
if encoding is not None:
try:
obj = decode(obj, encoding)
except Exception as exc:
waiter.set_exception(exc)
return
waiter.set_result(obj)
if cb is not None:
cb(obj)</pre> <p>有了這些之后,我們就可以簡單創建一個連接了:</p>
async def create_connection(address, *, db=None, password=None,
encoding=None, loop=None):
"""Creates redis connection.
Opens connection to Redis server specified by address argument.
Address argument is similar to socket address argument, ie:
* when address is a tuple it represents (host, port) pair;
* when address is a str it represents unix domain socket path.
(no other address formats supported)
Encoding argument can be used to decode byte-replies to strings.
By default no decoding is done.
Return value is RedisConnection instance.
This function is a coroutine.
"""
assert isinstance(address, (tuple, list, str)), "tuple or str expected"
if isinstance(address, (list, tuple)):
host, port = address
reader, writer = await asyncio.open_connection(
host, port, loop=loop)
else:
reader, writer = await asyncio.open_unix_connection(
address, loop=loop)
conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)
try:
if password is not None:
yield from conn.auth(password)
if db is not None:
yield from conn.select(db)
except Exception:
conn.close()
return conn</pre> <p>這樣,連接部分的代碼基本上已經處理完成了,接下來要做的就是實現基于這個連接的命令執行了,下面的內容會下一個文章中繼續介紹,敬請期待。</p>
</div>
來自: http://ipfans.github.io/2015/10/write-aio-python-redis-client-as-dummy-1/
</code></code></code></code></code></code></code>