零基礎編寫Python Redis Client(一)

jopen 8年前發布 | 15K 次閱讀 Redis Python Python開發

什么是AIO

AIO是Asynchronous Input/Output的簡寫,也就是異步IO。不過在談什么是AIO之前,我們可能要先介紹一下BIO。那么什么是BIO呢?簡單的說,BIO是Blocking Input/Output,也就是阻塞IO,他實現的通常是在線程池中找出一個線程處理IO,在IO過程中,其他線程都需要等待IO完成后才可以從中選取一個線程占用IO。這樣最大的問題是,當線程數量較多,并且需要大量的IO操作時,就會造成一個大量的阻塞,因為實際上每次只有一個線程在處理IO。

那么如何解決這個時候的問題呢?這時候就提出了AIO的概念。通常在IO處理過程中也會伴有一些其他的處理操作,假如把所有的操作都浪費在了等待IO釋放上,線程池中的線程利用率也太低了,因此我們需要一種方式,在申請IO處理之后,就去繼續做其他的事情,等IO操作完成了,然后通知我們已經OK,我們可以繼續處理了。這也就是我們常說的AIO的原型。

AIO的情況也說明了它適用的場景:長連接場景,或者重度的IO操作等等的情況。

如果找軟件來做案例,我們可以找一個可能大家熟知的:NGINX。正如我們所知,NGINX采用了 異步、事件驅動的方法來處理連接 。這種處理方式無需(像使用傳統架構的服務器一樣)為每個請求創建額外的專用進程或者線程,而是在一個工作進程中處理多個連接和請求。為此,NGINX工作在非阻塞的socket模式下,并使用了epoll 和 kqueue這樣有效的方法。

這部分的內容,在 NGINX引入線程池 性能提升9倍 中進行了詳細的介紹,包含了NGINX的異步應用經驗,同時介紹了NGINX中引入了阻塞的線程池用于解決某些特定場景問題下的效率。

如何實現Python的異步IO

這篇文章會以最新的Python 3.5為基礎來介紹實現一個異步的Python Redis Client。不過在此之前,我們先來看一下,怎么實現Python的aio。

Python的aio官方封裝了一個比較合適的基礎庫 asyncio 。

從一個例子開始簡單認識一下如何實現一個異步的aio client。這里以官方文檔中的例子為例:

import asyncio

async def tcp_echo_client(message, loop): reader, writer = await asyncio.open_connection('127.0.0.1', 8888, loop=loop)

print('Send: %r' % message)
writer.write(message.encode())

data = await reader.read(100)
print('Received: %r' % data.decode())

print('Close the socket')
writer.close()

message = 'Hello World!' loop = asyncio.get_event_loop() loop.run_until_complete(tcp_echo_client(message, loop)) loop.close()</pre>

這里面用到的Python 3.5中引入的 async/await 關鍵字,還有 asyncio 庫。這里面 asyncio.open_connection 會返回一個coroutine,這個可以使用await進行一個aio的調用,即,在收到返回信號之前,程序可以繼續去處理其他的任務。這里面真正核心的就是 EventLoop ,它負責監視發送這些信號,并且返回數據,它可以通過 asyncio.get_event_loop 獲取到。然后他會真正返回的數據是一個讀取 StreamReader 和寫入 StreamWriter 的對象。

接下來,就可以通過這個 reader 和 writer 進行數據的讀取和寫入。 writer 是可以直接寫入的,如果是 reader 的話,就需要aio的方式等待受到數據后返回。這樣看起來更接近于普通的socket編程。不過關閉連接時,僅僅需要關閉 writer 就足夠了。

從socket通訊到redis通訊

本質上來說,所有的網絡請求都可以看成是SocketIO的請求,因此,我們可以把Redis的請求當做是一個socket的通訊來進行,這樣就很方便了。

不過先等一等,那么通訊的數據格式怎么辦?沒關系,這里我們使用 hiredis-py 來解決協議解析的問題。不過,從庫設計的角度來說,我們需要封裝一個RedisConnection的類出來解決Redis的通訊協議。它可能傳入的參數包含,一個 StreamReader 、一個 StreamWriter ,一個 EventLoop ,哦,別忘記還有編碼 encoding 。其他的我們就用一個 * 來表示好了。

class RedisConnection(object):
    '''Redis Connection'''
    def init(self, reader, writer, *, encoding=None, loop=None):
        if loop is None:
            loop = asyncio.get_event_loop()
        self._reader = reader
        self._writer = writer
        self._encoding = encoding
        self._loop = loop
        self._db = 0

def __repr__(self):
    return '<RedisConnection [db:{}]>'.format(self._db)</pre> <p>記得加上 __repr__ 用來描述這個對象,這個可是一個好習慣。接下來就需要完善這個類了,比如,我們需要添加一個關閉連接的方法,這需要至少一個參數用于標記連接是否關閉,一個用于執行關閉操作,比如我們需要這樣子的: </p>

    def close(self):
        """Close connection."""
        self._do_close(None)

def _do_close(self, exc):
    if self._closed:
        return
    self._closed = True
    self._closing = False
    # 關閉寫入
    self._writer.transport.close()
    # 取消讀取任務
    self._reader_task.cancel()
    self._reader_task = None
    self._writer = None
    self._reader = None

@property
def closed(self):
    """True if connection is closed."""
    closed = self._closing or self._closed
    if not closed and self._reader and self._reader.at_eof():
        self._closing = closed = True
        self._loop.call_soon(self._do_close, None)
    return closed</pre> <p>連接這類的方法已經處理完了,接下來就應該是執行Redis命令了,我們可以叫它 execute 。那他需要幾個東西,一個是執行的指令 command ,一個是指令參數 *args ,還有一些其他的,比如編碼 encoding 。這里為了節省時間,只是考慮一些Set和Get的基本操作。哦,不過等等,那么Redis的數據結構是什么樣子的呢?我們還需要先把它編譯成Redis-server可以識別的形式,那么需要一個 encode_command 方法。 </p>

_converters = {
    bytes: lambda val: val,
    bytearray: lambda val: val,
    str: lambda val: val.encode('utf-8'),
    int: lambda val: str(val).encode('utf-8'),
    float: lambda val: str(val).encode('utf-8'),
}

def encode_command(*args): """Encodes arguments into redis bulk-strings array. Raises TypeError if any of args not of bytes, str, int or float type. """ buf = bytearray()

def add(data):
    return buf.extend(data + b'\r\n')

add(b'*' + _bytes_len(args))
for arg in args:
    if type(arg) in _converters:
        barg = _converters[type(arg)](arg)
        add(b'$' + _bytes_len(barg))
        add(barg)
    else:
        raise TypeError("Argument {!r} expected to be of bytes,"
                        " str, int or float type".format(arg))
return buf</pre> <p>這樣可以轉化為可以識別的形式了,接下來還有一個問題,那么怎么讓程序可以等待信號的生效呢?這里介紹一下 asyncio.Future 。這個 asyncio.Future 類是用于封裝回調函數的類,包含了一些更加方便使用的方法。通過這個類,可以實現aio的通知機制,也就是回調。這個類實例可以通過 await 返回我們需要的結果。不過這樣就還需要在項目中添加一些更多的變量,比如所有等待返回的 self._waiters 。 </p>

    def execute(self, command, *args, encoding=None):
        """Executes redis command and returns Future waiting for the answer.
        Raises:

    * TypeError if any of args can not be encoded as bytes.
    * ReplyError on redis '-ERR' resonses.
    * ProtocolError when response can not be decoded meaning connection
      is broken.
    """
    assert self._reader and not self._reader.at_eof(), (
        "Connection closed or corrupted")
    if command is None:
        raise TypeError("command must not be None")
    if None in set(args):
        raise TypeError("args must not contain None")
    # 這樣小寫也沒有問題了
    command = command.upper().strip()
    if encoding is None:
        encoding = self._encoding
    fut = asyncio.Future(loop=self._loop)
    self._writer.write(encode_command(command, *args))
    self._waiters.append((fut, encoding, cb))
    return fut</pre> <p>現在所有的命令都已經發送到了redis-server,接下來就需要讀取對應的結果了。</p>

    async def _read_data(self):
        """Response reader task."""
        while not self._reader.at_eof():
            try:
                data = await self._reader.read(65536)
            except asyncio.CancelledError:
                break
            except Exception as exc:

            # XXX: for QUIT command connection error can be received
            #       before response
            logger.error("Exception on data read %r", exc, exc_info=True)
            break
        self._parser.feed(data)
        while True:
            try:
                obj = self._parser.gets()
            except ProtocolError as exc:
                # ProtocolError is fatal
                # so connection must be closed
                self._closing = True
                self._loop.call_soon(self._do_close, exc)
                if self._in_transaction:
                    self._transaction_error = exc
                return
            else:
                if obj is False:
                    break
                else:
                    self._process_data(obj)
    self._closing = True
    self._loop.call_soon(self._do_close, None)

def _process_data(self, obj):
    """Processes command results."""
    waiter, encoding, cb = self._waiters.popleft()
    if waiter.done():
        logger.debug("Waiter future is already done %r", waiter)
        assert waiter.cancelled(), (
            "waiting future is in wrong state", waiter, obj)
        return
    if isinstance(obj, RedisError):
        waiter.set_exception(obj)
        if self._in_transaction:
            self._transaction_error = obj
    else:
        if encoding is not None:
            try:
                obj = decode(obj, encoding)
            except Exception as exc:
                waiter.set_exception(exc)
                return
        waiter.set_result(obj)
        if cb is not None:
            cb(obj)</pre> <p>有了這些之后,我們就可以簡單創建一個連接了:</p>

async def create_connection(address, *, db=None, password=None,
                      encoding=None, loop=None):
    """Creates redis connection.
    Opens connection to Redis server specified by address argument.
    Address argument is similar to socket address argument, ie:

* when address is a tuple it represents (host, port) pair;
* when address is a str it represents unix domain socket path.
(no other address formats supported)
Encoding argument can be used to decode byte-replies to strings.
By default no decoding is done.
Return value is RedisConnection instance.
This function is a coroutine.
"""
assert isinstance(address, (tuple, list, str)), "tuple or str expected"

if isinstance(address, (list, tuple)):
    host, port = address
    reader, writer = await asyncio.open_connection(
        host, port, loop=loop)
else:
    reader, writer = await asyncio.open_unix_connection(
        address, loop=loop)
conn = RedisConnection(reader, writer, encoding=encoding, loop=loop)

try:
    if password is not None:
        yield from conn.auth(password)
    if db is not None:
        yield from conn.select(db)
except Exception:
    conn.close()
return conn</pre> <p>這樣,連接部分的代碼基本上已經處理完成了,接下來要做的就是實現基于這個連接的命令執行了,下面的內容會下一個文章中繼續介紹,敬請期待。</p>

</div>

來自: http://ipfans.github.io/2015/10/write-aio-python-redis-client-as-dummy-1/

</code></code></code></code></code></code></code>

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!