使用 Python 進行分布式系統協調
隨著大數據時代的到來,分布式是解決大數據問題的一個主要手段,隨著越來越多的分布式的服務,如何在分布式的系統中對這些服務做協調變成了一個很棘手的問題。今天我們就來看看如何使用Python,利用開源對分布式服務做協調。
在對分布式的應用做協調的時候,主要會碰到以下的應用場景:
- 業務發現(service discovery)找到分布式系統中存在那些可用的服務和節點
- 名字服務 (name service)通過給定的名字知道到對應的資源
- 配置管理 (configuration management)如何在分布式的節點中共享配置文件,保證一致性。
- 故障發現和故障轉移 (failure detection and failover)當某一個節點出故障的時候,如何檢測到并通知其它節點, 或者把想用的服務轉移到其它的可用節點
- 領導選舉(leader election)如何在眾多的節點中選舉一個領導者,來協調所有的節點
- 分布式的鎖 (distributed exclusive lock)如何通過鎖在分布式的服務中進行同步
- 消息和通知服務 (message queue and notification)如何在分布式的服務中傳遞消息,以通知的形式對事件作出主動的響應
有許多的開源軟件試圖解決以上的全部或者部分問題,例如ZooKeeper,consul,doozerd等等,我們現在就看看它們是如何做的。
ZooKeeper
ZooKeeper 是使用最廣泛,也是最有名的解決分布式服務的協調問題的開源軟件了,它最早和Hadoop一起開發,后來成為了Apache的頂級項目,很多開源的項目都在使用ZooKeeper,例如大名鼎鼎的Kafka。
Zookeeper本身是一個分布式的應用,通過對共享的數據的管理來實現對分布式應用的協調。
ZooKeeper使用一個樹形目錄作為數據模型,這個目錄和文件目錄類似,目錄上的每一個節點被稱作ZNodes。
ZooKeeper提供基本的API來操縱和控制Znodes,包括對節點的創建,刪除,設置和獲取數據,獲得子節點等。
除了這些基本的操作,ZooKeeper還提供了一些配方(Recipe),其實就是一些常見的用例,例如鎖,兩階段提交,領導選舉等等。
ZooKeeper本身是用Java開發的,所以對Java的支持是最自然的。它同時還提供了C語言的綁定。
Kazoo 是一個非常成熟的Zookeeper Python客戶端,我們這就看看如果使用Python來調用ZooKeeper。(注意,運行以下的例子,需要在本地啟動ZooKeeper的服務)
基本操作
以下的例子現實了對Znode的基本操作,首先要創建一個客戶端的連接,并啟動客戶端。然后我們可以利用該客戶端對Znode做增刪改,取內容的操作。最后推出客戶端。
from kazoo.client import KazooClient
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# Ensure a path, create if necessary
zk.ensure_path("/test/zk1")
# Create a node with data
zk.create("/test/zk1/node", b"a test value")
# Determine if a node exists
if zk.exists("/test/zk1"):
print "the node exist"
# Print the version of a node and its data
data, stat = zk.get("/test/zk1")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
# List the children
children = zk.get_children("/test/zk1")
print("There are %s children with names %s" % (len(children), children))
zk.stop()
通過對ZNode的操作,我們可以完成一些分布式服務協調的基本需求,包括名字服務,配置服務,分組等等。
故障檢測(Failure Detection)
在分布式系統中,一個最基本的需求就是當某一個服務出問題的時候,能夠通知其它的節點或者某個管理節點。
ZooKeeper提供ephemeral Node的概念,當創建該Node的服務退出或者異常中止的時候,該Node會被刪除,所以我們就可以利用這種行為來監控服務運行狀態。
以下是worker的代碼
from kazoo.client import KazooClient
import time
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# Ensure a path, create if necessary
zk.ensure_path("/test/failure_detection")
# Create a node with data
zk.create("/test/failure_detection/worker",
value=b"a test value", ephemeral=True)
while True:
print "I am alive!"
time.sleep(3)
zk.stop()
以下的monitor 代碼,監控worker服務是否運行。
from kazoo.client import KazooClient
import time
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
# Determine if a node exists
while True:
if zk.exists("/test/failure_detection/worker"):
print "the worker is alive!"
else:
print "the worker is dead!"
break
time.sleep(3)
zk.stop()
領導選舉
Kazoo直接提供了領導選舉的API,使用起來非常方便。
from kazoo.client import KazooClient
import time
import uuid
import logging
logging.basicConfig()
my_id = uuid.uuid4()
def leader_func():
print "I am the leader {}".format(str(my_id))
while True:
print "{} is working! ".format(str(my_id))
time.sleep(3)
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
election = zk.Election("/electionpath")
# blocks until the election is won, then calls
# leader_func()
election.run(leader_func)
zk.stop()
你可以同時運行多個worker,其中一個會獲得Leader,當你殺死當前的leader后,會有一個新的leader被選出。
分布式鎖
鎖的概念大家都熟悉,當我們希望某一件事在同一時間只有一個服務在做,或者某一個資源在同一時間只有一個服務能訪問,這個時候,我們就需要用到鎖。
from kazoo.client import KazooClient
import time
import uuid
import logging
logging.basicConfig()
my_id = uuid.uuid4()
def work():
print "{} is working! ".format(str(my_id))
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
lock = zk.Lock("/lockpath", str(my_id))
print "I am {}".format(str(my_id))
while True:
with lock:
work()
time.sleep(3)
zk.stop()
當你運行多個worker的時候,不同的worker會試圖獲取同一個鎖,然而只有一個worker會工作,其它的worker必須等待獲得鎖后才能執行。
監視
ZooKeeper提供了監視(Watch)的功能,當節點的數據被修改的時候,監控的function會被調用。我們可以利用這一點進行配置文件的同步,發消息,或其他需要通知的功能。
from kazoo.client import KazooClient
import time
import logging
logging.basicConfig()
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
@zk.DataWatch('/path/to/watch')
def my_func(data, stat):
if data:
print "Data is %s" % data
print "Version is %s" % stat.version
else :
print "data is not available"
while True:
time.sleep(10)
zk.stop()
除了我們上面列舉的內容外,Kazoo還提供了許多其他的功能,例如:計數,租約,隊列等等。
Consul
Consul 是用Go開發的分布式服務協調管理的工具,它提供了服務發現,健康檢查,Key/Value存儲等功能,并且支持跨數據中心的功能。
Consul提供ZooKeeper類似的功能,它的基于HTTP的API可以方便的和各種語言進行綁定。自然 Python 也在列。
與Zookeeper有所差異的是Consul通過基于Client/Server架構的Agent部署來支持跨Data Center的功能。
Consul在Cluster傷的每一個節點都運行一個Agent,這個Agent可以使Server或者Client模式。Client負責到Server的高效通信,相對為無狀態的。 Server負責包括選舉領導節點,維護cluster的狀態,對所有的查詢做響應,跨數據中心的通信等等。
KV基本操作
類似于Zookeeper,Consul支持對KV的增刪查改的操作。
import consul
c = consul.Consul()
# set data for key foo
c.kv.put('foo', 'bar')
# poll a key for updates
index = None
while True:
index, data = c.kv.get('foo', index=index)
print data['Value']
c.kv.delete('foo')
這里和ZooKeeper對Znode的操作幾乎是一樣的。
服務發現(Service Discovery)和健康檢查(Health Check)
Consul的另一個主要的功能是用于對分布式的服務做管理,用戶可以注冊一個服務,同時還提供對服務做健康檢測的功能。
首先,用戶需要定義一個服務。
{
"service": {
"name": "redis",
"tags": ["master"],
"address": "127.0.0.1",
"port": 8000,
"checks": [
{
"script": "/usr/local/bin/check_redis.py",
"interval": "10s"
}
]
}}
其中,服務的名字是必須的,其它的字段可以自選,包括了服務的地址,端口,相應的健康檢查的腳本。當用戶注冊了一個服務后,就可以通過Consul來查詢該服務,獲得該服務的狀態。
Consul支持三種Check的模式:
-
調用一個外部腳本(Script),在該模式下,consul定時會調用一個外部腳本,通過腳本的返回內容獲得對應服務的健康狀態。
-
調用HTTP,在該模式下,consul定時會調用一個HTTP請求,返回2XX,則為健康;429 (Too many request)是警告。其它均為不健康
-
主動上報,在該模式下,服務需要主動調用一個consul提供的HTTP PUT請求,上報健康狀態。
Python API提供對應的接口,大家可以參考 http://python-consul.readthedocs.org/en/latest/
-
Consul.Agent.Service
-
Consul.Agent.Check
Consul的Health Check和Zookeeper的Failure Detection略有不同,ZooKeeper可以利用ephemeral Node來檢測服務的狀態,Consul的Health Check,通過調用腳本,HTTP或者主動上報的方式檢查服務的狀態,更為靈活,可以獲得等多的信息,但是也需要做更多的工作。
故障檢測(Failure Detection)
Consul提供Session的概念,利用Session可以檢查服務是否存活。
對每一個服務我們都可以創建一個session對象,注意這里我們設置了ttl,consul會以ttl的數值為間隔時間,持續的對session的存活做檢查。對應的在服務中,我們需要持續的renew session,保證session是合法的。
import consul
import time
c = consul.Consul()
s = c.session.create(name="worker",behavior='delete',ttl=10)
print "session id is {}".format(s)
while True:
c.session.renew(s)
print "I am alive ..."
time.sleep(3)
Moniter代碼用于監控worker相關聯的session的狀態,但發現worker session已經不存在了,就做出響應的處理。
import consul
import time
def is_session_exist(name, sessions):
for s in sessions:
if s['Name'] == name:
return True
return False
c = consul.Consul()
while True:
index, sessions = c.session.list()
if is_session_exist('worker', sessions):
print "worker is alive ..."
else:
print 'worker is dead!'
break
time.sleep(3)
這里注意,因為是基于ttl(最小10秒)的檢測,從業務中斷到被檢測到,至少有10秒的時延,對應需要實時響應的情景,并不適用。Zookeeper使用ephemeral Node的方式時延相對短一點,但也非實時。
領導選舉和分布式的鎖
無論是Consul本身還是Python客戶端,都不直接提供Leader Election的功能,但是 這篇文檔 介紹了如何利用Consul的KV存儲來實現Leader Election,利用Consul的KV功能,可以很方便的實現領導選舉和鎖的功能。
當對某一個Key做put操作的時候,可以創建一個session對象,設置一個acquire標志為該 session,這樣就獲得了一個鎖,獲得所得客戶則是被選舉的leader。
代碼如下:
import consul
import time
c = consul.Consul()
def request_lead(namespace, session_id):
lock = c.kv.put(leader_namespace,"leader check", acquire=session_id)
return lock
def release_lead(session_id):
c.session.destroy(session_id)
def whois_lead(namespace):
index,value = c.kv.get(namespace)
session = value.get('Session')
if session is None:
print 'No one is leading, maybe in electing'
else:
index, value = c.session.info(session)
print '{} is leading'.format(value['ID'])
def work_non_block():
print "working"
def work_block():
while True:
print "working"
time.sleep(3)
leader_namespace = 'leader/test'
## initialize leader key/value node
leader_index, leader_node = c.kv.get(leader_namespace)
if leader_node is None:
c.kv.put(leader_namespace,"a leader test")
while True:
whois_lead(leader_namespace)
session_id = c.session.create(ttl=10)
if request_lead(leader_namespace,session_id):
print "I am now the leader"
work_block()
release_lead(session_id)
else:
print "wait leader elected!"
time.sleep(3)
利用同樣的機制,可以方便的實現鎖,信號量等分布式的同步操作。
監視
Consul的Agent提供了Watch的功能,然而Python客戶端并沒有相應的接口。
etcd
etcd 是另一個用GO開發的分布式協調應用,它提供一個分布式的Key/Value存儲來進行共享的配置管理和服務發現。
同樣的etcd使用基于HTTP的API,可以靈活的進行不同語言的綁定。
基本操作
import etcd
client = etcd.Client()
client.write('/nodes/n1', 1)
print client.read('/nodes/n1').value
etcd對節點的操作和ZooKeeper類似,不過etcd不支持ZooKeeper的ephemeral Node的概念,要監控服務的狀態似乎比較麻煩。
分布式鎖
etcd支持分布式鎖,以下是一個例子。
import sys
sys.path.append("../../")
import etcd
import uuid
import time
my_id = uuid.uuid4()
def work():
print "I get the lock {}".format(str(my_id))
client = etcd.Client()
lock = etcd.Lock(client, '/customerlock', ttl=60)
with lock as my_lock:
work()
lock.is_locked() # True
lock.renew(60)
lock.is_locked() # False
老版本的etcd支持leader election,但是在最新版該功能被deprecated了,參見https://coreos.com/etcd/docs/0.4.7/etcd-modules/
其它
我們針對分布式協調的功能討論了三個不同的開源應用,其實還有許多其它的選擇,我這里就不一一介紹,大家有興趣可以訪問以下的鏈接:
- eureka https://github.com/Netflix/eureka Netflix開發的定位服務,應用于fail over和load balance的功能
- curator http://curator.apache.org/ 基于ZooKeeper的更高層次的封裝
- doozerd https://github.com/ha/doozerd 基于GO的高可靠,分布式的數據存儲,過去兩年已經不活躍
- openreplica http://openreplica.org/ 基于Python開發的,面向對象的接口的分布式應用協調的工具
- serf http://www.serfdom.io/ serf提供輕量級的cluster成員管理,故障檢測(failure detection)和協調。開發基于GO語言。Consul使用了serf提供的功能
- noah https://github.com/lusis/Noah 基于ruby的ZooKeeper實現,過去三年不活躍
- copy cat https://github.com/kuujo/copycat 基于日志的分布式協調的框架,使用Java開發
總結
ZooKeeper無疑是分布式協調應用的最佳選擇,功能全,社區活躍,用戶群體很大,對所有典型的用例都有很好的封裝,支持不同語言的綁定。缺點是,整個應用比較重,依賴于Java,不支持跨數據中心。
Consul作為使用Go語言開發的分布式協調,對業務發現的管理提供很好的支持,他的HTTP API也能很好的和不同的語言綁定,并支持跨數據中心的應用。缺點是相對較新,適合喜歡嘗試新事物的用戶。
etcd是一個更輕量級的分布式協調的應用,提供了基本的功能,更適合一些輕量級的應用來使用。
參考
如果大家對于分布式系統的協調想要進行更多的了解,可以閱讀一下的鏈接:
http://stackoverflow.com/questions/6047917/zookeeper-alternatives-cluster-coordination-service
http://txt.fliglio.com/2014/05/encapsulated-services-with-consul-and-confd/
http://txt.fliglio.com/2013/12/service-discovery-with-docker-docker-links-and-beyond/
http://www.serfdom.io/intro/vs-zookeeper.html
http://devo.ps/blog/zookeeper-vs-doozer-vs-etcd/
https://www.digitalocean.com/community/articles/how-to-set-up-a-serf-cluster-on-several-ubuntu-vps
http://muratbuffalo.blogspot.com/2014/09/paper-summary-tango-distributed-data.html
https://developer.yahoo.com/blogs/hadoop/apache-zookeeper-making-417.html
http://www.knewton.com/tech/blog/2014/12/eureka-shouldnt-use-zookeeper-service-discovery/
http://codahale.com/you-cant-sacrifice-partition-tolerance/
來自:http://python.jobbole.com/86713/