優秀開源項目kombu源碼分析之registry和entrypoint
我曾經在一些公眾場合說過心中的優秀Python開發者。Flask和Requests的作者就不說了,21世紀最缺的就是idea,他們不僅有而且還都用非常優美的方式做出來了。另外我還提到了Celery作者Ask Solem,并不是因為Celery很有名它的主要作者就優秀了,我對ask的欣賞,完全是看Celery及其相關依賴的源代碼的時候產生的。
有多年后臺開發的工程師想必清楚,Celery本身涉及到的技術點其實在業界應用是很廣泛的。Celery能這么流行,我們先排除沒有進行技術深入下的盲從,和它誕生的非常早以外,我認為這和項目的內部設計的非常好也是有關的。
接下來的幾篇文章我將分析Celery使用的Kombu庫中的一些設計實現讓大家對這個優秀項目更了解,并從中學習可擴展開發的實踐。
Kombu是什么?
當一個項目變得越來越復雜,就要考慮只保留核心,并把其他部分分拆到不同的項目中以便減少未來的維護和開發的成本。Flask、IPython都是這樣做的。
Kombu是一個把消息傳遞封裝成統一接口的庫。 Celery一開始先支持的RabbitMQ,也就是使用AMQ協議。由于要支持越來越多的消息代理,但是這些消息代理是不支持AMQ協議的,需要一個東西把所有的消息代理的處理方式統一起來,甚至可以理解為把它們「偽裝成支持AMQ協議」。Kombu的最初的實現叫做carrot, 后來經過重構才成了Kombu。
registry
registry也就是「注冊」,有按需加入的意思,在Python標準庫和一些優秀開源項目中都有應用。我們先看個django的 場景 ,為了減少篇幅我沒有列出CheckRegistry類中其他方法:
### source code start
from itertools import chain
classCheckRegistry:
def__init__(self):
self.registered_checks = []
self.deployment_checks = []
defregister(self, check=None, *tags, **kwargs):
kwargs.setdefault('deploy', False)
definner(check):
check.tags = tags
if kwargs['deploy']:
if check not in self.deployment_checks:
self.deployment_checks.append(check)
elif check not in self.registered_checks:
self.registered_checks.append(check)
return check
if callable(check):
return inner(check)
else:
if check:
tags += (check, )
return inner
deftag_exists(self, tag, include_deployment_checks=False):
return tag in self.tags_available(include_deployment_checks)
deftags_available(self, deployment_checks=False):
return set(chain(*[check.tags for check in self.get_checks(deployment_checks) if hasattr(check, 'tags')]))
defget_checks(self, include_deployment_checks=False):
checks = list(self.registered_checks)
if include_deployment_checks:
checks.extend(self.deployment_checks)
return checks
registry = CheckRegistry()
register = registry.register
tag_exists = registry.tag_exists
### source code end
@register('mytag', 'another_tag')
defmy_check(apps, **kwargs):
pass
print tag_exists('another_tag')
print tag_exists('not_exists_tag')
可以看到每次用registry.register都能動態的添加新的tag,最后還用 register = registry.register 這樣的方式列了個別名。執行結果如下:
? python django_example.py
True
False
kombu庫包含對消息的序列化和反序列化工作的實現,可以同時支持多種序列化方案,如pickle、json、yaml和msgpack。假如你從前沒有寫過這樣可擴展的項目,可能想的是每種的方案的loads和dumps都封裝一遍,然后用一個大的if/elif/else來控制最后的序列化如何執行。
那么在kombu里面是怎么用的呢?我簡化下它的 實現 :
import codecs
from collections import namedtuple
codec = namedtuple('codec', ('content_type', 'content_encoding', 'encoder'))
classSerializerNotInstalled(Exception):
pass
classSerializerRegistry(object):
def__init__(self):
self._encoders = {}
self._decoders = {}
self._default_encode = None
self._default_content_type = None
self._default_content_encoding = None
defregister(self, name, encoder, decoder, content_type,
content_encoding='utf-8'):
if encoder:
self._encoders[name] = codec(
content_type, content_encoding, encoder,
)
if decoder:
self._decoders[content_type] = decoder
def_set_default_serializer(self, name):
try:
(self._default_content_type, self._default_content_encoding,
self._default_encode) = self._encoders[name]
except KeyError:
raise SerializerNotInstalled(
'No encoder installed for {0}'.format(name))
defdumps(self, data, serializer=None):
if serializer and not self._encoders.get(serializer):
raise SerializerNotInstalled(
'No encoder installed for {0}'.format(serializer))
if not serializer and isinstance(data, unicode):
payload = data.encode('utf-8')
return 'text/plain', 'utf-8', payload
if serializer:
content_type, content_encoding, encoder = \
self._encoders[serializer]
else:
encoder = self._default_encode
content_type = self._default_content_type
content_encoding = self._default_content_encoding
payload = encoder(data)
return content_type, content_encoding, payload
defloads(self, data, content_type, content_encoding):
content_type = (content_type if content_type
else 'application/data')
content_encoding = (content_encoding or 'utf-8').lower()
if data:
decode = self._decoders.get(content_type)
if decode:
return decode(data)
return data
registry = SerializerRegistry()
dumps = registry.dumps
loads = registry.loads
register = registry.register
其實kombu還實現了unregister限于篇幅我就不展開了。現在我們想添加yaml的支持,只需要加這樣一個函數:
defregister_yaml():
try:
import yaml
registry.register('yaml', yaml.safe_dump, yaml.safe_load,
content_type='application/x-yaml',
content_encoding='utf-8')
except ImportError:
defnot_available(*args, **kwargs):
"""Raise SerializerNotInstalled.
Used in case a client receives a yaml message, but yaml
isn't installed.
"""
raise SerializerNotInstalled(
'No decoder installed for YAML. Install the PyYAML library')
registry.register('yaml', None, not_available, 'application/x-yaml')
register_yaml()
這樣就支持yaml了。如果希望默認使用yaml來序列化,可以執行:
registry._set_default_serializer('yaml')
是不是非常好擴展,如果哪天我希望去掉對pickle(安全問題),就可以直接注釋對應的函數就好了。寫個小例子試驗下:
yaml_data = """\
float: 3.1415926500000002
int: 10
list: [george, jerry, elaine, cosmo]
string: The quick brown fox jumps over the lazy dog
unicode: "Th\\xE9 quick brown fox jumps over th\\xE9 lazy dog"
"""
content_type, content_encoding, payload = dumps(yaml_data, serializer='yaml')
print content_type, content_encoding
assert loads(payload, content_type=content_type, content_encoding=content_encoding) == yaml_data
運行的結果就是:
? python kombu_example.py
application/x-yaml utf-8
entrypoint
在我的書里面介紹過如果使用標準庫自帶的pkg_resources.iter_entry_points實現一個簡單的插件系統。這在kombu上面也有應用,在序列化實現模塊的最后加了這么幾句:
from pkg_resources import iter_entry_points
for ep in iter_entry_points('kombu.serializers'):
args = ep.load()
register(ep.name, *args)
這是什么東西呢?pkg_resources是一個用于包發現和資源訪問的模塊,我們可以實現不同的kombu擴展,如果在這個擴展項目的setup.py里面設置對應的entry_points,在安裝之后,運行上述代碼的時候就會自動找到這些擴展,并注冊進來。這就是一個擴展系統。Flake8就是最好的這個擴展玩法的范例。
kombu的擴展不多,我選擇 kombu-fernet-serializers 來進行介紹。首先看一下它的setup.py文件:
...
entry_points={
'kombu.serializers': [
'fernet_json = kombu_fernet.serializers.json:register_args',
'fernet_yaml = kombu_fernet.serializers.yaml:register_args',
'fernet_pickle = kombu_fernet.serializers.pickle:register_args',
'fernet_msgpack = kombu_fernet.serializers.msgpack:register_args',
]
}
...
注意到了吧,這個entry點就是kombu.serializers,安裝之后就多了4個序列化方案,我們看一下fernet_json的實現:
import anyjson as _json
from . import fernet_encode, fernet_decode
MIMETYPE = 'application/x-fernet-json'
register_args = (
fernet_encode(_json.dumps),
fernet_decode(_json.loads),
MIMETYPE,
'utf-8',
)
而fernet_yaml也被放進了模塊的方式,其實和在函數內殊途同歸:
rom kombu.exceptions import SerializerNotInstalled
from . import fernet_encode, fernet_decode
try:
import yaml
except ImportError:
defnot_available(*args, **kwargs):
"""In case a client receives a yaml message, but yaml
isn't installed."""
raise SerializerNotInstalled(
'No decoder installed for YAML. Install the PyYAML library')
yaml_encoder = not_available
yaml_decoder = None
else:
yaml_encoder = yaml.safe_dump
yaml_decoder = yaml.safe_load
MIMETYPE = 'application/x-fernet-yaml'
register_args = (
fernet_encode(yaml_encoder),
fernet_decode(yaml_decoder) if yaml_decoder else None,
MIMETYPE,
'utf-8',
)
事實上,我們并不需要了解fernet_encode和fernet_decode是如何對消息做對稱加密的,只是感受下這樣添加擴展的方式是不是很優雅呢?
來自:http://www.dongwm.com/archives/優秀開源項目kombu源碼分析之registry/