使用 Python 進行分布式系統協調

darcy_yang 8年前發布 | 12K 次閱讀 分布式系統 Python Python開發

隨著大數據時代的到來,分布式是解決大數據問題的一個主要手段,隨著越來越多的分布式的服務,如何在分布式的系統中對這些服務做協調變成了一個很棘手的問題。今天我們就來看看如何使用Python,利用開源對分布式服務做協調。

在對分布式的應用做協調的時候,主要會碰到以下的應用場景:

  • 業務發現(service discovery)找到分布式系統中存在那些可用的服務和節點
  • 名字服務 (name service)通過給定的名字知道到對應的資源
  • 配置管理 (configuration management)如何在分布式的節點中共享配置文件,保證一致性。
  • 故障發現和故障轉移 (failure detection and failover)當某一個節點出故障的時候,如何檢測到并通知其它節點, 或者把想用的服務轉移到其它的可用節點
  • 領導選舉(leader election)如何在眾多的節點中選舉一個領導者,來協調所有的節點
  • 分布式的鎖 (distributed exclusive lock)如何通過鎖在分布式的服務中進行同步
  • 消息和通知服務 (message queue and notification)如何在分布式的服務中傳遞消息,以通知的形式對事件作出主動的響應

有許多的開源軟件試圖解決以上的全部或者部分問題,例如ZooKeeper,consul,doozerd等等,我們現在就看看它們是如何做的。

ZooKeeper

ZooKeeper 是使用最廣泛,也是最有名的解決分布式服務的協調問題的開源軟件了,它最早和Hadoop一起開發,后來成為了Apache的頂級項目,很多開源的項目都在使用ZooKeeper,例如大名鼎鼎的Kafka。

Zookeeper本身是一個分布式的應用,通過對共享的數據的管理來實現對分布式應用的協調。

ZooKeeper使用一個樹形目錄作為數據模型,這個目錄和文件目錄類似,目錄上的每一個節點被稱作ZNodes。

ZooKeeper提供基本的API來操縱和控制Znodes,包括對節點的創建,刪除,設置和獲取數據,獲得子節點等。

除了這些基本的操作,ZooKeeper還提供了一些配方(Recipe),其實就是一些常見的用例,例如鎖,兩階段提交,領導選舉等等。

ZooKeeper本身是用Java開發的,所以對Java的支持是最自然的。它同時還提供了C語言的綁定。

Kazoo 是一個非常成熟的Zookeeper Python客戶端,我們這就看看如果使用Python來調用ZooKeeper。(注意,運行以下的例子,需要在本地啟動ZooKeeper的服務)

基本操作

以下的例子現實了對Znode的基本操作,首先要創建一個客戶端的連接,并啟動客戶端。然后我們可以利用該客戶端對Znode做增刪改,取內容的操作。最后推出客戶端。

from kazoo.client import KazooClient
 
import logging
logging.basicConfig()
 
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
 
# Ensure a path, create if necessary
zk.ensure_path("/test/zk1")
 
# Create a node with data
zk.create("/test/zk1/node", b"a test value")
 
# Determine if a node exists
if zk.exists("/test/zk1"):
    print "the node exist"
 
# Print the version of a node and its data
data, stat = zk.get("/test/zk1")
print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
 
# List the children
children = zk.get_children("/test/zk1")
print("There are %s children with names %s" % (len(children), children))
 
zk.stop()

通過對ZNode的操作,我們可以完成一些分布式服務協調的基本需求,包括名字服務,配置服務,分組等等。

故障檢測(Failure Detection)

在分布式系統中,一個最基本的需求就是當某一個服務出問題的時候,能夠通知其它的節點或者某個管理節點。

ZooKeeper提供ephemeral Node的概念,當創建該Node的服務退出或者異常中止的時候,該Node會被刪除,所以我們就可以利用這種行為來監控服務運行狀態。

以下是worker的代碼

from kazoo.client import KazooClient
import time
 
import logging
logging.basicConfig()
 
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
 
# Ensure a path, create if necessary
zk.ensure_path("/test/failure_detection")
 
# Create a node with data
zk.create("/test/failure_detection/worker",
          value=b"a test value", ephemeral=True)
 
while True:
    print "I am alive!"
    time.sleep(3)
 
zk.stop()

以下的monitor 代碼,監控worker服務是否運行。

from kazoo.client import KazooClient
 
import time
 
import logging
logging.basicConfig()
 
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
 
# Determine if a node exists
while True:
    if zk.exists("/test/failure_detection/worker"):
        print "the worker is alive!"
    else:
        print "the worker is dead!"
        break
    time.sleep(3)
 
zk.stop()

領導選舉

Kazoo直接提供了領導選舉的API,使用起來非常方便。

from kazoo.client import KazooClient
import time
import uuid
 
import logging
logging.basicConfig()
 
my_id = uuid.uuid4()
 
def leader_func():
    print "I am the leader {}".format(str(my_id))
    while True:
        print "{} is working! ".format(str(my_id))
        time.sleep(3)
 
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
 
election = zk.Election("/electionpath")
 
# blocks until the election is won, then calls
# leader_func()
election.run(leader_func)
 
zk.stop()

你可以同時運行多個worker,其中一個會獲得Leader,當你殺死當前的leader后,會有一個新的leader被選出。

分布式鎖

鎖的概念大家都熟悉,當我們希望某一件事在同一時間只有一個服務在做,或者某一個資源在同一時間只有一個服務能訪問,這個時候,我們就需要用到鎖。

from kazoo.client import KazooClient
import time
import uuid
 
import logging
logging.basicConfig()
 
my_id = uuid.uuid4()
 
def work():
    print "{} is working! ".format(str(my_id))
 
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
 
lock = zk.Lock("/lockpath", str(my_id))
 
print "I am {}".format(str(my_id))
 
while True:
    with lock:
        work()
    time.sleep(3)    
 
zk.stop()

當你運行多個worker的時候,不同的worker會試圖獲取同一個鎖,然而只有一個worker會工作,其它的worker必須等待獲得鎖后才能執行。

監視

ZooKeeper提供了監視(Watch)的功能,當節點的數據被修改的時候,監控的function會被調用。我們可以利用這一點進行配置文件的同步,發消息,或其他需要通知的功能。

from kazoo.client import KazooClient
import time
 
import logging
logging.basicConfig()
 
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
 
@zk.DataWatch('/path/to/watch')
def my_func(data, stat):
    if data:
        print "Data is %s" % data
        print "Version is %s" % stat.version 
    else :
        print "data is not available"
 
while True:
    time.sleep(10)
 
zk.stop()

除了我們上面列舉的內容外,Kazoo還提供了許多其他的功能,例如:計數,租約,隊列等等。

Consul

Consul 是用Go開發的分布式服務協調管理的工具,它提供了服務發現,健康檢查,Key/Value存儲等功能,并且支持跨數據中心的功能。

Consul提供ZooKeeper類似的功能,它的基于HTTP的API可以方便的和各種語言進行綁定。自然 Python 也在列。

與Zookeeper有所差異的是Consul通過基于Client/Server架構的Agent部署來支持跨Data Center的功能。

Consul在Cluster傷的每一個節點都運行一個Agent,這個Agent可以使Server或者Client模式。Client負責到Server的高效通信,相對為無狀態的。 Server負責包括選舉領導節點,維護cluster的狀態,對所有的查詢做響應,跨數據中心的通信等等。

KV基本操作

類似于Zookeeper,Consul支持對KV的增刪查改的操作。

import consul
 
c = consul.Consul()
 
# set data for key foo
c.kv.put('foo', 'bar')
 
# poll a key for updates
index = None
while True:
    index, data = c.kv.get('foo', index=index)
    print data['Value']
    
c.kv.delete('foo')

這里和ZooKeeper對Znode的操作幾乎是一樣的。

服務發現(Service Discovery)和健康檢查(Health Check)

Consul的另一個主要的功能是用于對分布式的服務做管理,用戶可以注冊一個服務,同時還提供對服務做健康檢測的功能。

首先,用戶需要定義一個服務。

{
  "service": {
    "name": "redis",
    "tags": ["master"],
    "address": "127.0.0.1",
    "port": 8000,
    "checks": [
      {
        "script": "/usr/local/bin/check_redis.py",
        "interval": "10s"
      }
    ]
  }}

其中,服務的名字是必須的,其它的字段可以自選,包括了服務的地址,端口,相應的健康檢查的腳本。當用戶注冊了一個服務后,就可以通過Consul來查詢該服務,獲得該服務的狀態。

Consul支持三種Check的模式:

  • 調用一個外部腳本(Script),在該模式下,consul定時會調用一個外部腳本,通過腳本的返回內容獲得對應服務的健康狀態。

  • 調用HTTP,在該模式下,consul定時會調用一個HTTP請求,返回2XX,則為健康;429 (Too many request)是警告。其它均為不健康

  • 主動上報,在該模式下,服務需要主動調用一個consul提供的HTTP PUT請求,上報健康狀態。

Python API提供對應的接口,大家可以參考 http://python-consul.readthedocs.org/en/latest/

  • Consul.Agent.Service

  • Consul.Agent.Check

Consul的Health Check和Zookeeper的Failure Detection略有不同,ZooKeeper可以利用ephemeral Node來檢測服務的狀態,Consul的Health Check,通過調用腳本,HTTP或者主動上報的方式檢查服務的狀態,更為靈活,可以獲得等多的信息,但是也需要做更多的工作。

故障檢測(Failure Detection)

Consul提供Session的概念,利用Session可以檢查服務是否存活。

對每一個服務我們都可以創建一個session對象,注意這里我們設置了ttl,consul會以ttl的數值為間隔時間,持續的對session的存活做檢查。對應的在服務中,我們需要持續的renew session,保證session是合法的。

import consul
import time
 
c = consul.Consul()
 
s = c.session.create(name="worker",behavior='delete',ttl=10)
 
print "session id is {}".format(s)
 
while True:
    c.session.renew(s)
    print "I am alive ..."
    time.sleep(3)

Moniter代碼用于監控worker相關聯的session的狀態,但發現worker session已經不存在了,就做出響應的處理。

import consul
import time
 
def is_session_exist(name, sessions):
    for s in sessions:
        if s['Name'] == name:
            return True
 
    return False
 
c = consul.Consul()
 
while True:
    index, sessions = c.session.list()
    if is_session_exist('worker', sessions):
        print "worker is alive ..."
    else:
        print 'worker is dead!'
        break
    time.sleep(3)

這里注意,因為是基于ttl(最小10秒)的檢測,從業務中斷到被檢測到,至少有10秒的時延,對應需要實時響應的情景,并不適用。Zookeeper使用ephemeral Node的方式時延相對短一點,但也非實時。

領導選舉和分布式的鎖

無論是Consul本身還是Python客戶端,都不直接提供Leader Election的功能,但是 這篇文檔 介紹了如何利用Consul的KV存儲來實現Leader Election,利用Consul的KV功能,可以很方便的實現領導選舉和鎖的功能。

當對某一個Key做put操作的時候,可以創建一個session對象,設置一個acquire標志為該 session,這樣就獲得了一個鎖,獲得所得客戶則是被選舉的leader。

代碼如下:

import consul
import time
 
c = consul.Consul()
 
def request_lead(namespace, session_id):
    lock = c.kv.put(leader_namespace,"leader check", acquire=session_id)
    return lock
 
def release_lead(session_id):
    c.session.destroy(session_id)
 
def whois_lead(namespace):
    index,value = c.kv.get(namespace)
    session = value.get('Session')
    if session is None:
        print 'No one is leading, maybe in electing'
    else:
        index, value = c.session.info(session)
        print '{} is leading'.format(value['ID'])
 
def work_non_block():
    print "working"
 
def work_block():
    while True:
        print "working"
        time.sleep(3)
 
leader_namespace = 'leader/test'
 
## initialize leader key/value node
leader_index, leader_node = c.kv.get(leader_namespace)
 
if leader_node is None:
    c.kv.put(leader_namespace,"a leader test")
 
while True:
    whois_lead(leader_namespace)
    session_id = c.session.create(ttl=10)
    if request_lead(leader_namespace,session_id):
        print "I am now the leader"
        work_block()
        release_lead(session_id)
    else:
        print "wait leader elected!"
    time.sleep(3)

利用同樣的機制,可以方便的實現鎖,信號量等分布式的同步操作。

監視

Consul的Agent提供了Watch的功能,然而Python客戶端并沒有相應的接口。

etcd

etcd 是另一個用GO開發的分布式協調應用,它提供一個分布式的Key/Value存儲來進行共享的配置管理和服務發現。

同樣的etcd使用基于HTTP的API,可以靈活的進行不同語言的綁定。

基本操作

import etcd
 
client = etcd.Client() 
client.write('/nodes/n1', 1)
print client.read('/nodes/n1').value

etcd對節點的操作和ZooKeeper類似,不過etcd不支持ZooKeeper的ephemeral Node的概念,要監控服務的狀態似乎比較麻煩。

分布式鎖

etcd支持分布式鎖,以下是一個例子。

import sys
 
sys.path.append("../../")
 
import etcd
 
import uuid
import time
 
my_id = uuid.uuid4()
 
def work():
    print "I get the lock {}".format(str(my_id))
 
client = etcd.Client() 
 
lock = etcd.Lock(client, '/customerlock', ttl=60)
 
with lock as my_lock:
    work()
    lock.is_locked()  # True
    lock.renew(60)
lock.is_locked()  # False

老版本的etcd支持leader election,但是在最新版該功能被deprecated了,參見https://coreos.com/etcd/docs/0.4.7/etcd-modules/

其它

我們針對分布式協調的功能討論了三個不同的開源應用,其實還有許多其它的選擇,我這里就不一一介紹,大家有興趣可以訪問以下的鏈接:

總結

ZooKeeper無疑是分布式協調應用的最佳選擇,功能全,社區活躍,用戶群體很大,對所有典型的用例都有很好的封裝,支持不同語言的綁定。缺點是,整個應用比較重,依賴于Java,不支持跨數據中心。

Consul作為使用Go語言開發的分布式協調,對業務發現的管理提供很好的支持,他的HTTP API也能很好的和不同的語言綁定,并支持跨數據中心的應用。缺點是相對較新,適合喜歡嘗試新事物的用戶。

etcd是一個更輕量級的分布式協調的應用,提供了基本的功能,更適合一些輕量級的應用來使用。

參考

如果大家對于分布式系統的協調想要進行更多的了解,可以閱讀一下的鏈接:

http://stackoverflow.com/questions/6047917/zookeeper-alternatives-cluster-coordination-service

http://txt.fliglio.com/2014/05/encapsulated-services-with-consul-and-confd/

http://txt.fliglio.com/2013/12/service-discovery-with-docker-docker-links-and-beyond/

http://www.serfdom.io/intro/vs-zookeeper.html

http://devo.ps/blog/zookeeper-vs-doozer-vs-etcd/

https://www.digitalocean.com/community/articles/how-to-set-up-a-serf-cluster-on-several-ubuntu-vps

http://www.slideshare.net/JyrkiPulliainen/taming-pythons-with-zoo-keeper-ep2013?qid=e1267f58-090d-4147-9909-ec673525e76b&v=qf1&b=&from_search=8

http://muratbuffalo.blogspot.com/2014/09/paper-summary-tango-distributed-data.html

https://developer.yahoo.com/blogs/hadoop/apache-zookeeper-making-417.html

http://www.knewton.com/tech/blog/2014/12/eureka-shouldnt-use-zookeeper-service-discovery/

http://codahale.com/you-cant-sacrifice-partition-tolerance/

 

來自:http://python.jobbole.com/86713/

 

 本文由用戶 darcy_yang 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!