No-SQL數據庫中的事務性設計—VLCP中的ObjectDB簡要介紹
之前的一篇文章 No-SQL數據庫中的事務性設計 中,我們簡單介紹了一種在No-SQL數據庫中實現事務性的方法,這種方法也是VLCP內置的ObjectDB模塊的基本原理。ObjectDB是VLCP的核心模塊之一,它將一切配置存儲、狀態返回、分布式協調、事務性的問題都統一到了同一組數據庫風格的接口上,同時,它可以對下對接實現了相應接口的任意存儲(目前支持Redis和ZooKeeper),只需要實現相對簡單的一組無狀態接口即可。
VLCP是一個開源的基于協程模型的異步IO框架, 它是一個開源SDN控制器,不過同時也可以用于快速的Web開發,尤其適合編寫微服務,協程對于高并發和協調一致性的良好支持、ObjectDB對于數據一致性的高度抽象,再加上配置子系統、“搭積木”式的模塊管理、以及一些稱手的小工具,可以讓你在幾個小時之內完成一個簡單系統的開發測試部署工作。題外話,前段時間剛剛為公司內部的kibana寫了一個提供權限控制的攔截器,通過介入HTTP請求的方式控制不同用戶能夠看到的頁面內容,包含了用戶權限控制的API——使用VLCP框架,僅僅用了5個小時的時間就完成了開發、測試到上線,甚至還自帶了無狀態、高可用的時髦能力。
VLCP的項目地址是 https:// github.com/hubo1016/vlc p
ObjectDB的設計目標與保證
ObjectDB的接口,在上一篇文章提到的WALK和MUPDATE的基礎上,做了一部分改進,最重要的一點是實現了數據的 主動推送 機制: 允許用戶獲取一組數據,之后保持對這些key上的數據的監聽,隨時得到最新的值,并在數據發生變化時收到推送通知。 這個功能的靈感來源于OVSDB,不過保持更新的方法上,ObjectDB要簡單得多,而且ObjectDB充分利用了VLCP的協程設計,對編程更加友好。
ObjectDB也擴展了對事務一致性的保證,將這個保證擴展到了整個程序中的不同協程,以及數據推送上:
- 在整個程序中,任意時間點上,所有協程對同一個key獲取到的數據都是完全相同的同一份;這意味著即使是不同批次WALK(MGET)得到的數據,也可以隨時保持事務一致性。
- 同一個MUPDATE事務寫入的多個key,總是被同時推送到任意一個客戶端上,并被一致性地更新,因此客戶端以任意方式獲得的數據,都永遠不會看到它們出現事務不一致的情況。當等待更新通知時,如果多個key是在同一個事務中被更新的,可以保證等待這個通知的協程只會收到一次通知。
這個保證,加上前一篇文章中論述的事務性的保證,加在一起構成了非常強的一致性保證,實際上可以以任意的方式來使用和修改數據,而不用擔心出現任何形式的Race Condition,也不需要增加復雜的鎖機制。我將這種一致性稱為 視圖一致性 。后端存儲中存儲的數據的量可能很大,任意時刻,我們都可以選擇監控這個后端存儲中的一個子集的數據,這些數據以事務一致的方式展現在整個程序中的所有協程面前,并且隨時跟隨其他客戶端的寫入而更新,同時這個選定的子集也可以隨著我們要求的變化而隨時擴展或縮小。對于許多應用來說,這是夢寐以求的理想模型。
設計這樣嚴格的指標,主要是因為VLCP的設計目標:SDN控制器。基于OpenFlow的SDN控制器,需要隨時隨著網絡拓撲的變化,將實時的流表下發到OpenFlow交換機中,實時性、一致性、正確性、穩定性都是最基本的要求,同時,也要求一定程度的擴展性,保證性能隨著網絡拓撲復雜性的增加而不急劇下降(這就要求視圖可以只選擇需要的子集)。
ObjectDB的接口簡介
在VLCP中,與其他服務一樣,ObjectDB以一個獨立模塊的方式發布(參考 https:// zhuanlan.zhihu.com/p/21 813920?refer=sdnnetwork ),調用ObjectDB的接口也使用統一的ModuleAPI的風格。也就是說,在任意協程中調用ObjectDB的API,都是通過callAPI的方式:
for m in callAPI(container, 'objectdb', method_name, method_params):
yield m
result = container.retvalue
關于VLCP的協程編程模型可以參考(VLCP協程框架簡介)
存儲類型
首先是ObjectDB中存儲的數據類型的約定,ObjectDB通常使用JSON序列化的方式存儲數據(也可以通過配置選擇改用pickle),為了方便起見,所有存儲的對象都是DataObject類的一個子類,類似于下面的方式定義:
from vlcp.utils.dataobject import DataObject
class PhysicalNetwork(DataObject):
_prefix = 'viperflow.physicalnetwork'
_indices = ("id",)
它需要聲明_prefix和_indices兩個屬性。ObjectDB本質上來說仍然是KVDB,每個數據都會關聯到一個唯一的key上,這個key的構造方式是_prefix + _indices中的屬性值,比如說上面定義的結構,如果有某個PhysicalNetwork對象,它的id屬性的值是my_vlan_network,那么這個對象在存儲的時候就會自動選擇viperflow.physicalnetwork.my_vlan_network作為自己的key,以后獲取的時候也需要用這個key來獲取。除此以外的屬性可以任意賦值,只要這些屬性值可以正確被JSON序列化就可以,包括字符串、數字、列表、字典(只能以字符串為key)、元組(會轉換為列表)以及定義了jsonencode/jsondecode方法的其他對象(這是VLCP規定的一種簡單的JSON序列化協議)
基本上來說,我們定義的類型可以存進任意的數據,只要組織成合適的形式就可以,也很容易在以后進行擴展。
事務性的級聯查詢——walk方法
ObjectDB基本的查詢方法是walk方法,這是個很有特色的方法,與基于SQL的數據庫截然不同,很適合查詢有關聯性的多個key。它的接口聲明是這樣的:
def walk(self, keys, walkerdict, requestid, nostale = False):
"Recursively retrieve keys with customized functions. walkerdict is a dictionary key->walker(key, obj, walk, save)."
它是我們上一篇文章中論述的WALK原語的一種貼近實際應用的變種。比起上一篇文章中輸入單一的walker,walker接受keys、values作為參數,這里的walk方法,接受一個字典,其中指定對于每個key使用哪個walker。這樣的好處是我們可以針對不同的數據類型寫不同的walker,然后在一個walk過程中將多個walker組合起來。同時,將大的walker拆分成比較小的walker,也可以提高執行效率。keys是要初始獲取的列表,它可以比walkerdict中指定的key更多,這樣如果我們確信walker會在后續過程中使用某個key,就可以提前將它獲取回來,提高執行效率。
字典中傳入的walker應當是以下形式的函數:
def my_walker(key, value, walk, save):
...
key和value是初始的key和對應的(DataObject類型的)值。walk和save是兩個從外部傳入的函數,調用walk方法可以獲取某個新的key的值,而調用save方法可以告知系統某個key是需要返回的。當調用walk方法時,有可能會拋出KeyError,表示目前這個key的值還沒有取回,一般我們需要捕獲這個異常,暫時停止進一步的walk過程。典型的寫法如下:
def _walk_lognet(self, key, value, walk, save):
save(key)
if value is None:
return
if self._parent.prepush:
# Acquire all logical ports
try:
netmap = walk(LogicalNetworkMap.default_key(value.id))
except KeyError:
pass
else:
save(netmap.getkey())
for logport in netmap.ports.dataset():
try:
p = walk(logport.getkey())
except KeyError:
pass
else:
#if p is not None and hasattr(p, 'mac_address') and hasattr(p, 'ip_address'):
save(logport.getkey())
注意到我們可以充分利用Python的語言特性,這個walker可以是某個類的方法,也可以是一個閉包,其中可以使用外部的參數值。一般我們在walk外面使用try結構,處理掉KeyError,當沒有返回KeyError的時候,使用save來保存成功獲取的值。在walk的同時,可以通過獲取到的對象的值來進行篩選,只返回需要的部分。這樣就可以實現很復雜的級聯查詢。
我們常見的帶有JOIN和WHERE的SQL語句可以很容易轉換成walker,并且更容易理解。比如說:
SELECT * FROM student JOIN grade ON student.grade = grade.id
WHERE student.name = "LiLei"
我們在關系型數據庫中有grade和student兩個表,在student的grade字段上建立外鍵。在ObjectDB中,這種查詢可以以更自由的方式進行:
class Student(DataObject):
_prefix = "school.student"
_indices = ("name",)
class Grade(DataObject):
_prefix = "school.grade"
_indices = ("id",)
def query_student(name, container, requestid):
def _walker(key, value, walk, save):
# Start with Student
if value is None:
# Student is not found
return
save(key)
if hasattr(value, 'grade'):
# Query grade
try:
grade = walk(Grade.default_key(value.grade))
except KeyError:
# Not retrieved, will automatically retry
pass
else:
save(grade.getkey())
student_key = Student.default_key(name)
for m in callAPI(container, 'objectdb', 'walk',
{'keys': (student_key,),
'walkerdict': {student_key: _walker},
'requestid': requestid}):
yield m
# Return values are: (saved_keys, saved_values)
# There is no guarentees for the order
keys, values = container.retvalue
if not keys:
raise ValueError('Student is not found')
elif len(keys) == 1:
return (values[0],)
elif values[0].isinstance(Student): # Notice, not isinstance(values[0], Student)
return (values[0], values[1])
else:
return (values[1], values[0])
當然,對于這種比較簡單的外鍵,后面我們會提到一個更簡單的方案。
requestid參數與監聽功能有關。它是一個由調用方提供的值,必須是不可變的,可以是字符串、數字、元組甚至對象。當walk成功返回的時候,所有返回的key會被加入到監控列表中,并標記由requestid監控。當這個數據不再需要監聽最新值的時候,可以通過unwatch方法取消監控。通常,我們會使用一個和當前功能有關的字符串(比如模塊名 + 方法名),加上一個遞增的計數器,構成一個元組,來生成一個唯一的requestid,這樣比uuid快一些。
注意walker函數可能被多次調用,最終結果是由最后一次調用的結果來決定的。一般我們需要保證walker函數的冪等性——使用相同的數據進行多次調用,產生的結果是相同的。如果某次調用中walker拋出了異常,整個walk方法會失敗,并且拋出相應的異常——當出現問題的時候,通過這種方法中斷一個查詢也是很不錯的。
另外,如果walk返回了None,或者value的初始值就是None,表示當前數據庫中不存在這個key。不存在的key也同樣可以通過save()返回并監控,在這種情況下,我們可以等待這個key被創建出來,并且得到相應的更新通知。
nostale方法控制當數據庫連接中斷時,是否允許返回已經緩存的結果(而非最新結果)。默認情況下,當數據庫連接不可用時,接口會返回當前已經緩存了的內容,來最大程度維持系統運行——但這也可能讓系統因為過期的數據而產生不正常的行為。使用nostale參數要求接口在沒有最新數據可用的情況下拋出異常。
監控的取消——unwatch/munwatch
def unwatch(self, key, requestid):
"Cancel management of a key"
def munwatch(self, keys, requestid):
"Cancel management of keys"
這兩個接口很簡單直接,通過keys和requestid,指定取消某一組keys的監控。unwatch是munwatch的單個key的版本。
有的時候我們希望用完之后無論如何都取消監聽,防止在出現異常的時候永遠監聽下去,可以用watch_context幫助方法:
from vlcp.utils.dataobject import watch_context
with watch_context(keys, values, requestid, container):
...
當以任意方式離開watch_context的范圍(正常,或者拋出異常),會自動啟動新的協程并且調用munwatch來正確取消對這些key的監聽。
簡便方法——get,mget,getonce,mgetonce,watch,mwatch
有時候我們需求比較簡單,僅僅是獲取一組key的值,并不需要walk進行級聯查詢,雖然這也可以通過一個立即save()的walk方法實現,但更簡單的還是直接調用這組簡便方法。所有m開頭的接口接受多個key的元組作為參數,而非m開頭的接受單個key。
def mget(self, keys, requestid, nostale = False):
"Get multiple objects and manage them. Return references to the objects."
def get(self, key, requestid, nostale = False):
"Get an object from specified key, and manage the object. Return a reference to the object or None if not exists."
def mgetonce(self, keys, nostale = False):
"Get multiple objects, return copies of them. Referenced objects are not retrieved."
def getonce(self, key, nostale = False):
"Get a object without manage it. Return a copy of the object, or None if not exists. Referenced objects are not retrieved."
def watch(self, key, requestid, nostale = False):
"Try to find an object and return a reference. Use reference.isdeleted() to test whether the object exists. "\
"Use reference.wait(container) to wait for the object to be existed."
def mwatch(self, keys, requestid, nostale = False):
"Try to return all the references, see watch()"
參數的意義與walk中相似。它們之間有一些細微的差異,watch/mwatch與walk的特性最為接近,相當于對其中每個key執行立即save()的操作,如果這個key對應的值不存在,會返回一個空的數據引用(ReferenceObject,后面會提到),可以通過這個引用接收key的更新通知,在key被創建出來時收到通知。get/mget在key對應數據存在時與watch的行為一致,但如果key對應的值不存在,會返回None,并且不會監控相應的值。getonce/mgetonce有比較不一樣的特性,它會立即返回這組key當前值的一個副本,并不會監控相應的值,這個副本也不會隨著視圖進行更新,這在需要快速測試某些key的當前值、或者調試時有用。
檢查監控列表——watchlist
def watchlist(self, requestid = None):
"Return a dictionary whose keys are database keys, and values are lists of request ids. Optionally filtered by request id"
通常用于調試,可以用來觀察當前監控的key的列表,以及它們分別被哪些requestid保留著。VLCP中所有的ModuleAPI,都可以通過vlcp.service.manage.webapi.WebAPI模塊導出到Web接口中,這樣需要的時候可以隨時查看當前ObjectDB的工作狀態,比如假如在8181端口上開啟了WebAPI:
curl http://localhost:8181/objectdb/watchlist | python -m json.tool
寫入數據——transact
def transact(self, keys, updater, withtime = False):
"Try to update keys in a transact, with an updater(keys, values), which returns (updated_keys, updated_values). "\
"The updater may be called more than once. If withtime = True, the updater should take three parameters: "\
"(keys, values, timestamp) with timestamp as the server time"
基本上來說是上一篇文章中的MUPDATE原語的翻版。它需要傳入一個updater函數作為參數,這個函數描述了需要進行的事務操作,它接受keys,values兩個參數,如果withtime=True,還接受一個額外的timestamp參數,代表來自于數據庫服務器的時間戳,單位是微秒(對ZooKeeper來說實際精度只有毫秒)。使用timestamp可以保證在任意客戶端上使用的時間都是連續的數據庫服務器時間,防止因為客戶端本地時間的差異而導致順序問題。通常會選擇使用timestamp來實現超時邏輯:在寫入數據時,同時在某個字段中記錄超時時間;在讀取數據時,如果發現超時,則忽略,同時刪除已經超時的數據。再配合定時器不斷更新超時時間,可以實現一個可靠的發現(Discover)協議。
一個常見的updater寫法的例子:
# Using network driver together with IPAM driver
rets = []
def _ipam_stage(keys, values, timestamp):
reservepool = values[0]
removed_keys = self._remove_staled_pools(reservepool, timestamp)
poolids = [poolid for poolid, (cidr, _) in reservepool.reserved_pools.items()
if cidr == request_cidr]
if not poolids:
raise ValueError('Pool %r is not reserved by VLCP IPAM plugin' % (request_cidr,))
docker_ipam_poolid = poolids[0]
rets[:] = [docker_ipam_poolid]
removed_keys.append(IPAMReserveMarker.default_key(reservepool.reserved_pools[docker_ipam_poolid][0]))
del reservepool.reserved_pools[docker_ipam_poolid]
return ((keys[0],) + tuple(removed_keys), (reservepool,) + (None,) * len(removed_keys))
for m in callAPI(self, 'objectdb', 'transact', {'keys': (IPAMPoolReserve.default_key(),),
'updater': _ipam_stage,
'withtime': True}):
yield m
self.retvalue = (rets[0],)
它的參數和返回值都包含(keys, values)的鍵值對,這個結構是比較適合級聯的,可以在已有的updater外層包上新的邏輯,添加新的key和value,增加額外的功能,這樣可以很容易將多個updater合并成為一個transact過程。updater傳入的keys的順序與transact方法接收到的keys的順序相同,而values的順序與keys一一對應。updater返回的keys的列表與傳入的keys的列表不需要有關聯,既可以修改傳入的keys,也可以直接修改沒有傳入的keys。如果updater對某個key返回None作為相應的value,表示從數據庫中刪除這個key。
updater可以很容易地替代關系型數據庫中的BEGIN/COMMIT之間的多條語句,以非常自由、一致、Pythonic的方式進行事務操作。它的局限性是必須提前指定需要進行update的key的列表,不過這可以通過上一篇文章中提到的WRITEWALK的邏輯,用walk配合update來完成(目前還沒有相應的writewalk接口,以后可能會增加)。updater一般通過閉包的形式提供,它也可以使用可變對象或者Python3中的nonlocal變量額外返回一些值(比如上面例子中的rets列表就是用來返回值的)
一次update會自動創建一次事務寫入,并且推送這些key的更新到已經監聽了這些key的客戶端上,從而將自己的更改推送到所有的其他客戶端的視圖中。如果在updater中拋出異常,這個異常會中斷事務的執行,這種情況下不會寫入任何內容,異常會拋出到transact方法的調用方,通過這種方式可以安全地中斷一個事務的執行,比如發現不滿足事務的執行條件的情況(相當于T-SQL中的Rollback)。
等待更新——DataObjectUpdateEvent
當ObjectDB第一次獲取某個key、或者收到某個key的更新通知、或者從數據庫連接斷開的狀態下恢復時,ObjectDB會在完成對這個key的獲取后,發送DataObjectUpdateEvent,通過接收并處理這個Event,其他協程可以等待某個或者某些特定key的更新。一般來說不需要直接處理這個事件,而是通過以下三個方法之一進行處理:
for m in refobj.wait(container):
yield m
for m in refobj.waitif(container, expr):
yield m
for m in refobj.waitif(container, expr, nextchange = True):
yield m
for m in multiwaitif(references, container, expr):
yield m
for m in multiwaitif(references, container, expr, nextchange = True):
yield m
wait()和waitif()方法可以等待單個的對象的狀態變化。wait()方法等待一個被刪除的對象重新創建起來。waitif()方法通過傳入一個expr函數,來指定等待的條件:expr是一個接受單個參數的函數,通常是個lambda表達式,它的第一個參數接受更新之后的新的值,如果expr(refobj)返回真值則停止等待,否則繼續等待;如果使用lambda x: x.isdeleted(),就可以等待一個對象直到它被刪除。
multiwaitif是waitif的多個key的版本,它的第一個參數是需要同時等待的多個對象,expr是接受兩個參數的函數:
def my_expr(references, updated_refs):
...
第一個參數是references列表,可以用來獲取最新的值;第二個是這一次更新的值的列表,可以用來判斷哪些值發生了更新,哪些值沒有發生更新。
nextchange參數控制等待的行為,如果nextchange = True,則忽略當前狀態,至少等待到下一次變化再返回。這種情況下可以用waitif(container, lambda x: True, True)來等待任意的變化。
技術細節
ObjectDB模塊使用上圖的結構進行搭建。對外,它提供了一組ModuleAPI;而內部來說,它也同樣依賴更為底層的模塊,這些模塊中,負責KVStorage的模塊提供了最基礎的MGET和MUPDATE方法(參考上篇文章) ,它可以是RedisDB與ZooKeeperDB中的一個;Notifier提供了一個訂閱/發布接口,可以讓ObjectDB將數據更新的消息推送到其他客戶端,底層實現目前是Redis的Pub/Sub或ZooKeeper的WATCH機制,但也可以選擇其他實現,從邏輯上來說,KVStorage和Notifier可以無關。
寫入過程
寫入的過程基本遵循前一篇文章中的方法,區別在于在MUPDATE成功執行完成之后,會通過Notifier模塊執行一個PUBLISH過程。這個PUBLISH過程會將這一次更新的所有的key的列表,打包成一個消息,推送給其他客戶端;于是收到消息的客戶端會同時收到這一批寫入的所有key的列表,保證使用一個MGET將它們取回,從而保證取回數據的視圖一致性。
當數據庫中有非常多的key的時候,我們通常不希望收到所有的通知,而是只收到與我們監控的key有關的通知,這樣在共用一套數據庫的客戶端很多的時候,不至于所有的寫入都發送都其他客戶端上,引起性能下降。為了實現這個目標,我們會將這一次更新的keys列表發送到每個key命名的channel上,這樣只有訂閱了這個key的客戶端才會收到通知,多個key的情況下可能會收到一次更新的多個通知,會有一個去重復的機制。但是,如果一次更新的key過多,由于要發送的數據量是key * key的量級,會造成嚴重的性能問題,因此當一次更新的key過多時,會選擇發送到一個特殊的全局channel上,一次通知所有的客戶端,這也是一種權衡。
注意到PUBLISH在MUPDATE成功之后執行,這意味著如果更新通知到達,我們一定可以獲取到至少是這一次更新時的數據,但是,我們也可能:直接獲取到了更加新的數據;或者,在更新通知到達之前,就獲取到了這一次更新的數據。后續讀取過程中會充分考慮到這些因素。
讀取過程
讀取的過程與前一篇文章中描述的方法略有區別,由于數據更新推送的需求,ObjectDB中有一層有狀態的邏輯。具體來說,所有取回的數據,會在當前進程中保存一個全局唯一的副本,這個副本叫做數據鏡像,它是實際的后端存儲中相應數據的實時鏡像。實際返回給調用方的則是指向這個數據的引用(通過ReferenceObject類提供,其中通過__getattr__方法將屬性的獲取重定向到原始的DataObject中)。
當有新的walk方法調用,或者有來自Notifier的更新通知時,ObjectDB會開啟一個新的更新循環,在循環結束之前,新的更新通知、新的walk方法都會統一由這個循環進行執行。在這個過程中,新循環會將獲取到的新數據,暫存在一個臨時數據層中,暫時不更新到數據鏡像中,直到循環執行完畢,再將臨時數據層中的數據更新到當前數據鏡像層中,形成一個新的視圖一致的數據鏡像,同時返回之前查詢的結果。
具體來說,循環中每次獲取回來的最新數據,與之前保存的臨時數據,也分到不同的層中,如下圖所示:
數據空間劃分
在整個循環過程中,由于walker可能不能一次成功,同時也可能有新的請求、新的更新加入,因此每次獲取到的數據的列表不完全一致。最簡單的保證視圖一致性的方法是每次都將當前臨時數據層中所有的key,再加上新的需要獲取的key,全部通過一個MGET獲取回來,但這樣當同時執行的walker很多時,每次需要重新獲取的key的數量就會很多,對性能有負面影響;變通的方法是,對于每個獨立的walker,我們都讓它在某個部分一致性的子集中執行即可。
上圖中,我們將最新更新到的數據,與之前更新的數據+已經緩存的數據鏡像中的數據,拆分成兩個數據空間。如果最新獲取到的數據中包含了某個key,我們就在現有數據中屏蔽這個key,不允許walker訪問。執行walker時,我們會根據第一個key的所在空間,來確定walker需要執行的空間:
如果當前walker使用的是舊數據,則讓walker執行過程中,只允許完全使用舊數據,如果試圖取回新獲取的數據,則失敗;
如果當前walker使用的是新數據,則讓walker執行過程中,只允許使用上一次取回的數據,如果試圖使用此范圍外的數據,則失敗。
一旦某個walker失敗,我們就將walker在執行過程中用到的所有的key,都加入到下一次要取回的列表中。這樣一旦某個walker執行中遇到了需要更新的數據,它就會一直在最新數據空間中執行,直到成功一次為止。
這樣我們可以保證按我們的順序獲取數據時,walker永遠都在一致的數據中執行。
完整邏輯
整個循環的邏輯可以歸納如下:
- 如果有新請求,將新的walker加入到walkers集合;
- 當前正在等待處理的更新通知 加入到 update_list集合;
- 對update_list中所有的key訂閱更新通知(在獲取值之前訂閱更新通知,這樣可以保證不遺漏后續更新)
- 對update_list中所有的key,執行MGET,取回一致的結果;
- 對walkers中所有的walker,依起始的key所在空間選擇執行的數據空間,然后依次執行,如果walker請求了不存在于當前數據空間中的key(包括尚未取回,和不在當前數據空間的情況),將這個walker所有用到的key加入update_list集合
- 將MGET取回的結果更新到update_result(臨時數據層)中
- 如果update_list集合為空,且沒有正在等待處理的更新通知,轉到8,否則回到1
- 將update_result更新到managed_result(數據鏡像層)中,并且依照managed_result中的數據,依次返回給每個walk請求;對update_result中所有的key,發出數據已變更的通知,并且將所有的通知標記為同一個批次,這樣多個key收到的更新通知會被去重。
這個邏輯的實現非常依賴于VLCP的協程特性,通過協程調度可以高效地讓消息在不同協程之間傳遞,而不依賴各種各樣的鎖,這也是ObjectDB得以成立的一個重要原因。
額外功能
為了使用方便,ObjectDB在上一篇文章的基礎上,增加了許多額外的輔助功能,這些功能內部都是依賴上篇文章中的WALK/MUPDATE邏輯進行的,使用這些功能可以大大簡化常規的數據操作,降低walker/updater編寫的復雜性
自動引用、ReferenceObject與鍵值的分拆
前面我們舉過一個Student與Grade的關聯查詢的例子。在許多場景中,我們存儲的數據對象都以外鍵的形式關聯到了其他的key,如果每次都需要手寫walker來取回這些key是很無趣的,幸運的是ObjectDB提供了一種機制來快速創建外鍵,并且自動將外鍵指向的對象取回。
ReferenceObject是ObjectDB使用的內部類型之一,它代表一個對實際DataObject的引用。實際上ObjectDB幾乎所有接口,除了getonce返回DataObject的副本以外,其他都返回ReferenceObject。wait(), waitif()也是ReferenceObject的接口。除了這些接口外,對ReferenceObject的屬性的獲取都會被轉到相應的DataObject中。
除了作為ObjectDB的返回值,ReferenceObject的另一項作用是可以保存在某個DataObject中,表示一個當前DataObject的外鍵。當我們用getonce以外的任意方法(get, mget, walk)返回這個key對應的值的時候,如果這個值包含了ReferenceObject,這些ReferenceObject指向的值也會被自動取回,并且監聽;如果包含了ReferenceObject的key停止監聽,則這些被自動引用的key也會停止監聽。這樣就大大簡化了外鍵的使用。
回到前面的Student和Grade的例子中,現在我們可以簡單這樣做:
from vlcp.utils.dataobject import updater, set_new
def create_student(name, gradeid, container):
@updater
def _updater(student, grade):
student = set_new(student, Student.create_instance(name))
student.grade = grade.create_reference()
return (student,)
for m in callAPI(container, 'objectdb', 'transact',
{'keys': (Student.default_key(name),
Grade.default_key(gradeid)),
'updater': _updater}):
yield m
def get_student(name, container, requestid):
for m in callAPI(container, 'objectdb', 'get',
{'key': Student.default_key(name),
'requestid': requestid}):
yield m
student = container.retvalue
print "Student %r is in grade %r" % (student.name, student.grade.id)
updater注解是書寫updater的一個幫助方法,它可以將keys, values形式傳入的數據,通過*values的形式傳遞給參數列表,這樣就提高了代碼可讀性。不過使用這種形式書寫的updater必須從左到右寫入結果,返回的值的列表會從左到右依次匹配原始的key。set_new會判斷原來的值,如果非空則立即報錯,否則替換成新的值,在想要創建新的值又要避免因為主鍵重復而覆蓋舊的值的時候很方便。
取回的Student對象中,Student的grade是個ReferenceObject,這個ReferenceObject指向的Grade對象會自動被取回,其中的屬性也可以直接通過student.grade.id的方式進行訪問。如果取回的對象還有其他ReferenceObject,這些對象也可以自動再次取回,這樣很容易取回一整個級聯的結果。
內部實現上,ObjectDB會自動為每個key添加一個取回所有Reference的walker,因此一致性的保證與直接使用walk方法是一樣的。
對于非常大、包含非常多屬性的鍵值來說,維護起來是比較困難的,每次修改都需要重新讀取再重新寫入整個值,而且只能整體監聽,不能只監聽一部分屬性。這時候將鍵值拆分成多個鍵會是個好主意。使用ReferenceObject的方法,甚至可以在不影響使用的情況下,將一部分鍵值拆分到獨立的key當中,實現多個key之間的共享,或者獨立的更新通知。
自動刪除關聯鍵
這也是傳統的關系型數據庫中的一個功能,當有外鍵關聯時,如果刪除外鍵相應的行,則自動刪除關聯行。在上一節中我們也提到了分拆鍵值的問題,但是如果分拆出去的鍵值不能隨著主要的鍵值刪除而刪除,我們就可能在數據庫中殘留數據。好在ObjectDB提供了同等的自動刪除鍵值的功能:
class LogicalPortVXLANInfo(DataObject):
_prefix = 'viperflow.logicalportvxlaninfo'
_indices = ('id',)
def __init__(self, prefix=None, deleted=False):
super(LogicalPortVXLANInfo, self).__init__(prefix=prefix, deleted=deleted)
self.endpoints = []
LogicalPort._register_auto_remove('LogicalPortVXLANInfo', lambda x: [LogicalPortVXLANInfo.default_key(x.id)])
使用_register_auto_remove方法,可以在已經定義的DataObject類型上,添加一個附加的自動刪除功能。它有兩個參數,第一個是這個自動刪除過程的名稱,可以用來調試,相同名稱只會保留一個;第二個參數是一個函數(lambda表達式),它接受已經定義的舊類型的對象作為參數,然后返回當這個對象刪除時,需要一起刪除的key的列表。這些會被一起刪除的key如果也注冊了auto_remove功能,會進一步觸發,從而一次刪除更多的key。當然,這些刪除動作全部都是在一個事務中完成的,如果某一步出錯,整個刪除過程都會回滾,因此要么都刪除,要么都不刪除。也不用擔心自動刪除會有循環引用的問題,可以放心讓兩個key互相關聯為自動刪除的關系,保證其中一個刪除時另一個一定跟著刪除。
從內部實現上來說,這是通過在用戶傳入的updater外側級聯系統的updater來實現的。
WeakReference與DataObjectSet
WeakReference對象不帶有自動引用的功能,它可以用來單純保存一個key,但不自動取回,這樣就防止了取回的對象數量太多的問題。可以使用walk方法來取回需要的key。
DataObjectSet是一個預定義的用來保存WeakReference的集合類型,可以在其中添加或者刪除WeakReference,起到保存對象集合的目的。可以用find方法按照保存對象的主鍵(_indices)進行查找或者篩選。
自動的反向索引
這是一個非常重要的功能。除了使用外鍵進行索引以外,我們還經常需要按照對象的屬性值來反查對象,為了加速這個查找過程,經常需要對對象的屬性創建索引,這個索引叫做反向索引(也叫倒排索引)。一般對于No-SQL來說,創建這種索引是需要代碼實現的,但對于ObjectDB來說,只需要簡單聲明就可以了:
class LogicalPort(DataObject):
_prefix = 'viperflow.logicalport'
_indices = ("id",)
_unique_keys = (('_mac_address_index', ('network', 'mac_address')),
('_ip_address_index', ('network', 'ip_address')))
這個聲明的方式與SQL中創建索引的方式極其相似,首先起一個索引的名稱(不能和同一對象下的其他索引重復),然后聲明需要使用的屬性值。創建自動索引的時候,會自動形成這樣一個key:
indices . DataObject的_prefix . 索引名稱 . 屬性值拼接成字符串
比如對于network = some_network, mac_address = 02:00:01:02:03:04的某個LogicalPort對象,它的_mac_address_index的索引就是:
indices.viperflow++logicalport._mac_address_index.some_network++02:00:01:02:03:04
中間的++是字符轉義的結果。實際上我們不用關心這些細節,簡單用
mac_key = LogicalPort.unique_key('_ip_address_index', 'some_network', '02:00:01:02:03:04')
就可以生成這個key,用于walk方法。返回對象是UniqueKeyReference類型,它的ref屬性是原始DataObject的引用。
除了用于查找以外,unique key的另一重作用跟關系數據庫中一樣,如果發生了鍵值重復的情況,會阻止transact寫入并且返回鍵值重復的錯誤,這樣就可以很容易防止MAC地址重復、IP地址重復之類的情況發生。
允許重復的key可以使用_multi_keys來聲明, 與_unique_keys類似,不過允許鍵值重復。multi_key()方法可以返回相應的索引的key,對應的值是MultiKeyReference類型,它的set屬性是相應屬性值的所有的對象的WeakReference構成的DataObjectSet。
每一個索引依不同屬性值創建不同的key,如果想要按屬性值的范圍查找,比如說查找IP地址在192.168.1.100到192.168.1.200之間的對象,要怎么辦呢?反向索引本身也有一個集合用來保存不同屬性值的索引的WeakReference,可以用
UniqueKeyReference.get_keyset_from_key(mac_key)
MultiKeyReference.get_keyset_from_key(multi_key)
返回這個集合的key,它對應的是UniqueKeySet/MultiKeySet對象,它的set屬性是UniqueKeyReference/MultiKeyReference對象的WeakReference組成的DataObjectSet。
所有這些自動創建的索引,會隨著DataObject的創建而創建,隨著DataObject的刪除而刪除,可以像普通的key一樣get、walk或者監聽。內部實現上,它是通過在用戶傳入的updater外層,級聯一層系統的updater來實現的。
轉化為可視結果
使用vlcp.utils.dataobject.dump方法,可以簡單地將DataObject對象轉化為字典形式的描述,這個形式可以進一步轉化為JSON,寫入日志,或者使用pformat格式化成較為容易閱讀的形式。
ObjectDB與React模型、GraphQL等新技術
一個穩定、可靠的系統,最佳的一種設計方式是這樣的:
有一個單一、一致的狀態 ;
所有其他輸出都是這個狀態的函數,狀態變化則輸出變化,狀態相同則輸出相同。
這樣的系統可以很輕易地實現高可擴展性、高一致性,以及無狀態、隨時從任意狀態中恢復等優良特性。但是實現上的要求比較高,比起傳統的workflow的模型,系統需要能夠感知狀態的變化,跟隨狀態變化而進行改變。這種模型我稱之為React模型。流行的ReactJS就是個不錯的例子。
我們可以看到ObjectDB的推送模型就非常適合這種React模型。實際上,VLCP的SDN控制器邏輯基本就是基于React模型的,這樣我們可以很容易保證系統從任意狀態中恢復。
另一項新技術GraphQL,是一種REST API的替代,它可以在WebService接口上使用關聯查詢。我們可以看到,它幾乎就可以對應ObjectDB的walk方法,如果使用ObjectDB的walk來實現,GraphQL的查詢結果將可以保證事務一致性。當然并不是說就一定應當這么實現,因為還有性能上的考慮。
結論
VLCP中的ObjectDB是一個重磅級的設計,它完美滿足了SDN開發的要求,由它可以開發出一個完全去中心、完全高可用、完全可擴展的系統,與它提供的實時性、一致性相比,現有其他SDN的定時抓取新數據之類的策略就像是個玩具,這也是VLCP作為產品級SDN控制器的牢不可破的一層保障。除了SDN以外,其他任何需要比較強的一致性,或者哪怕只是你懶得處理一致性問題、又懶得寫SQL、懶得維護關系型數據庫、懶得為每次數據結構變動寫升級腳本的情況下,這都會是你一個應當考慮的選擇,它會帶給你非常多的驚喜。在分布一致性的系統中,需要進行分布式鎖、節點發現、主節點競選之類的過程的時候,也可以很容易派上用場。當底層使用ZooKeeper作為存儲的時候,整個系統是高可用的,不會出現任何的單點故障,而且任意一點都可以隨時斷開或重啟,并在重新連接或重新啟動之后恢復。
ObjectDB也有它的缺點,與其他數據庫方案相比,它的單機讀寫性能受到Python實現(單核心)的限制,比MySQL等數據庫要低,多個事務要使用相同的key時,也會像其他數據庫一樣的性能下降。但在集群中,不相關的key的寫入不會占用相同的資源,總的性能還是比較高的。它比較適合系統配置或者少量的協調一致性這樣的并不頻繁寫入、但對一致性要求比較高的情形。由于數據鏡像與主動推送機制的存在,當多個協程讀取相同的數據的時候,緩存會生效,因此讀取的性能比較好,適合寫入少而讀取多的系統。對于頻繁寫入但很少讀取的系統則不是特別適用。
來自:https://zhuanlan.zhihu.com/p/23747209