從零實現一個Redis客戶端(二)
從Call到命令端
在第一個文章中,我們介紹了實現一個Call的客戶端基本模型,但只是Call怎么能滿足需求呢?比如在redis-py中,一個完整的客戶端應該是這樣的:
client = redis.StrictRedis()
client.setex("key", 10, "value")
接下來作為一個程序的客戶端,需要去做的就是封裝出一個Redis Client。比如setex方法:
def setex(self, key, seconds, value):
"""Set the value and expiration of a key.
:raises TypeError: if seconds is neither int
"""
if not isinstance(seconds, int):
raise TypeError("milliseconds argument must be int")
fut = self._conn.execute(b'SETEX', key, seconds, value)
return wait_ok(fut)</pre> <p>剩下的就是一個個方法逐個完善。</p>
什么是連接池
我們會看到,無論那個數據庫客戶端,總是會有連接池機制。那么連接池是什么呢?我們為什么需要連接池呢?
首先,我們都知道,對連接而言,創建是必要重型的操作。比如說,TCP連接,接下來之后是登錄認證等等過程,最后才會執行命令。這也就是我們通常計算庫性能時,很多時候會把建立連接的時候去掉。但是這就出現了一個問題,當一個連接被占用時,其他的操作仍舊是不能夠完成操作了,只能等待前一個操作完成。但是假如我們一次性創建一堆連接呢?從一堆連接中找到空閑的連接,使用完成后釋放成空閑的狀態,這就是線程池的本質。因為減少了每次創建連接的過程,所以對性能提升也非常有幫助。
從單連接到連接池
首先,還是創建一個RedisPool類,用于管理Redis的連接池。
class RedisPool:
"""Redis connections pool.
"""
def __init__(self, address, db=0, password=None, encoding=None,
*, minsize, maxsize, commands_factory, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
self._address = address
self._db = db
self._password = password
self._encoding = encoding
self._minsize = minsize
self._factory = commands_factory
self._loop = loop # 連接池數組
self._pool = collections.deque(maxlen=maxsize)
self._used = set()
self._acquiring = 0
self._cond = asyncio.Condition(loop=loop)
def _create_new_connection(self):
return create_redis(self._address,
db=self._db,
password=self._password,
encoding=self._encoding,
commands_factory=self._factory,
loop=self._loop)</pre> <p>接下來,就需要創建大量的連接了:</p>
async def create_pool(self, *, override_min):
# todo: drop closed connections first
# 判斷是否達到了連接池數量限制
while not self._pool and self.size < self.maxsize:
self._acquiring += 1
try:
conn = await self._create_new_connection()
self._pool.append(conn)
finally:
self._acquiring -= 1
# connection may be closed at yeild point
self._drop_closed()</pre> <p>那么怎么從這些連接中抽取連接并且進行連接呢:</p>
@asyncio.coroutine
def acquire(self):
"""Acquires a connection from free pool.
Creates new connection if needed.
"""
with await self._cond:
while True:
await self._fill_free(override_min=True)
if self.freesize:
conn = self._pool.popleft()
assert not conn.closed, conn
assert conn not in self._used, (conn, self._used)
self._used.add(conn)
return conn
else:
await self._cond.wait()
接下來就是使用完成后進行釋放即可:
def release(self, conn):
"""Returns used connection back into pool.
When returned connection has db index that differs from one in pool
the connection will be closed and dropped.
When queue of free connections is full the connection will be dropped.
"""
assert conn in self._used, "Invalid connection, maybe from other pool"
self._used.remove(conn)
if not conn.closed:
if conn.in_transaction:
logger.warning("Connection %r in transaction, closing it.",
conn)
conn.close()
elif conn.db == self.db:
if self.maxsize and self.freesize < self.maxsize:
self._pool.append(conn)
else:
# consider this connection as old and close it.
conn.close()
else:
conn.close()
# FIXME: check event loop is not closed
asyncio.async(self._wakeup(), loop=self._loop)</pre> <p>至此,你已經可以實現一個基本的Redis客戶端了,還在猶豫什么?快自己動手吧!</p>
注: 文中Redis庫參考了aio-lib/aioredis庫。
</div>
來自: http://ipfans.github.io/2015/10/write-aio-python-redis-client-as-dummy-2/
</code></code></code></code></code></code>
本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!