消息總線重構之簡化客戶端

jopen 9年前發布 | 12K 次閱讀 消息總線 消息系統
 

這段時間對消息總線進行了再次重構。本次重構主要針對消息總線的pubsub組件以及對client的簡化,同時談談對消息總線的一些想法。

簡化client的復雜度

之前的client需要同時連接兩個分布式組件。消息總線的訪問需要用戶提供pubsuberHost,pubsuberPort參數,因此它首先連接的就是pubsuber。而消息總線是基于RabbitMQ構建的,因此它必然還需要連接RabbitMQ。而之所以沒有需要用戶程序提供RabbitMQ Server的地址信息,是因為它是通過pubsuber間接獲得的。

當時的想法是出于安全的角度考慮,不讓用戶直面MQ Server,而MQ的選擇理論上可以有多種,這些對用戶都是透明的。但作為一個后端組件,安全問題本不是最重要的關注目標,而替換MQ這樣的成本無異于 重寫消息總線,這樣的可能性也不大。除此之外這還帶來了額外的復雜度與高失敗率(當pubsuber與RabbitMQ兩者中有其一失敗,消息總線就將陷 入混亂),結合到消息總線較多的長連接場景(比如,push模式的consume),一旦一個組件失效就可能導致客戶程序的重啟(為了重新初始化連接)。

讓消息總線客戶端只連接單一的RabbitMQ組件,可以大大降低失效的概率,而且RabbitMQ官方client提供的失效重試機制也能更好得發揮作用。

用RPC獲取授權信息

因為之前的pubsuber部分承擔了授權信息數據源的角色,移除之前的pubsuber組件,那么就需要重新設計獲取遠程授權信息的方案。因為 RabbitMQ正好提供了基于JSON的輕量級RPC機制,我們就可以通過RPC從后端獲取授權信息,而讓后端去跟pubsuber交互。之前就曾有過 這個想法,后來在使用HBase時,發現其java client內部也有通過RPC跟Master節點交互,于是這次就確定用這種方式來實現。其實采用RPC的形式可以大大簡化客戶端的邏輯實現,而且也大 大降低了升級成本。

修改broadcast和pubsub的實現

之前pubsuber在客戶端還起到了兩個作用:實現廣播機制和實現實時控制。所以,如果要將pubsuber從client里移除,就要重新實現這兩個功能。也就是說,要找到另一種支持 實時push 的機制,考慮到其實RabbitMQ本身就可以實現長連接的即時消費功能,這里選擇直接基于RabbitMQ本身來實現。

我們新建了一個內部使用的exchange來實現消息路由。跟其他topic類型的exchange不同,這里我們采用并不常見的 headers 類型的exchange。

消息總線重構之簡化客戶端

header類型:根據消息的消息頭里包含特殊的key-value對來進行路由

考慮到我們需要重新實現上面兩個功能,所以,我們將消息分成兩類:event和notice。

  • event:內部控制消息
  • notice:廣播消息

在發送這兩個消息時,只需要在消息頭中指定對應的header的key-value即可實現自動路由。這兩種消息類型分別對應的是綁定在inner exchange后的隊列的類型。

如何節省RabbitMQ Server的資源

考慮到幾乎每個client都有接受這兩種類型的消息的需求,而我們為每個客戶端在該exchange下創建兩個queue多少有些過于浪費,最好的做法當然是在客戶端使用的會話周期內建立兩個臨時隊列,等客戶端使用結束就可立即銷毀隊列回收資源。

得益于RabbitMQ豐富的特性,我們可以很容易做到這一點。當我們實例化客戶端的時候,我們在內部創建兩個臨時且排他的隊列。所謂臨時且排他即一種特殊的隊列,它只對創建它的連接可見,當創建它的連接斷開或者消費者個數從大于零降到零時,該隊列就會被刪除,具備這種屬性的隊列幾乎是為一次會話而創建的。

臨時且排他屬性是通過在創建隊列時,指定隊列的auto-delete以及exclusive屬性同時為true來實現的。

這兩個隊列被創建后,當前客戶端會作為消費者立即掛載在隊列上等待event和notice。

發送端無需知曉上述兩個隊列的具體名稱,它只需知道代理exchange以及inner exchange的routing-key即可,然后在發送消息的消息頭中指定需要發送的是event還是notice。

代碼片段:

InnerEventEntity eventEntity = new InnerEventEntity();
eventEntity.setIdentifier(channel);
eventEntity.setValue(new String(data));
eventEntity.setType("event");
String jsonObjStr = GSON.toJson(eventEntity);

Message eventMsg = MessageFactory.createMessage(MessageType.QueueMessage);
Map<String, Object> map = new HashMap<String, Object>(1);
map.put("type", "event");
eventMsg.setHeaders(map);
eventMsg.setContent(jsonObjStr.getBytes());
AMQP.BasicProperties properties = MessageHeaderTransfer.box(eventMsg);

ProxyProducer.produce(Constants.PROXY_EXCHANGE_NAME,
                      mqChannel,
                      EVENT_ROUTING_KEY_NAME,
                      eventMsg.getContent(),
                      properties);

去除pubsuber的封裝

為什么要封裝

在去除之前,我想談談當初為什么要封裝。在最初封裝消息總線的時候,我對redis和zookeeper都有所了解,它們都有一些共同的特性,比如:

  • 都能夠以key-value的形式存取少量數據
  • 都能提供Pub/Sub的實時push變更功能

這是消息總線客戶端的pubsuber需要的,但為了提供可選性,我在這兩個特性上做了一層封裝,可以使得這種配置變更組件無論選擇哪一個,無需修改代碼,兩者都可適配。這是當初封裝的目的。

為什么要去除封裝

首先,去除封裝是回到了zookeeper而排除了redis。這么除了發現太多的開源軟件都在使用zookeeper來實現這個需求,除了發現這是zookeeper的專長,而redis只是能提供這些功能而已。除了這些,最關鍵的問題是我發現當涉及到命名服務的特性時,redis將變得不再適合。

在分布式的服務中,很可能會存在多個組件,而這些組件跟應用之間會存在一些邏輯關系,而不都是簡單的扁平關系。很多情況下,我們需要將一些關系構 建成樹狀結構。比如,現在消息總線只變成了平臺中的一個組件,我們需要在配置上體現這種關系,所以可能會由原先的扁平關系修改為如下圖這樣的形式:

消息總線重構之簡化客戶端

在這種類似于文件系統的樹狀結構下,要實現諸如獲取子節點的變更事件這樣的聯動行為redis將無能為力。這是因為redis的pubsub功能,只提供在key-value(String)類型上。也就是說,它的 value只能是一級關系。當然,為了表示多層關系,你是可以在key里以“.”進行區分,比如”app1”,”app1.message”,雖然你能知 道他們之間的關系,但它們在技術層面上是一樣的,無法產生聯動變更功能。所以在一些場景下,zookeeper是無法取代的。

一些思考

拓撲的權衡

消息總線最初的目標主要偏向消息傳遞。但在實現中的添加了一些額外的特性,比如之前的RPC功能。其實,如果單從技術層面上來看消息總線就是收發消息。但如果你將收發消息的主體包含進來(也就是 發送者消費者 )會有一些新的定位。如果有一些消費者做的事情是很通用的,基礎的,很多人都需要的或者純技術性的。那么處理這類消息的消費者就是在提供服務。比如,下面這些:

  • 將數據存入ElasticSearch
  • 發送短信
  • 發送郵件
  • 往移動端推送消息

消息總線可以直接提供這些服務以供第三方申請使用,當然如果帶上語義來看,RPC的服務端也是一種服務(只不過是同步的服務而已),其他隊列也可能在提供某種服務,只是它們的專有性更強,所以消息總線也具有提供服務的能力以及構建服務的基礎。

所以我一直在考慮,整個路由圖應該是這樣構建:

消息總線重構之簡化客戶端

還是這樣來構建:

消息總線重構之簡化客戶端

消息租賃

因為消息的通信模型生來就具有異步性。那么消息的消費時機,對于消息總線本身而言是無法知曉的,這就產生了消息長期堆積壓垮消息總線的可能。所 以,可能會考慮將消息的永久駐留改為按序駐留。這取決于業務,有些業務消息是具有時效性的,這樣的消息如果隔個幾周還沒有被消費掉,那么它存在的意義幾乎 沒有了,而它卻白白占據著總線服務器的內存或磁盤資源,甚至這些消息將永遠得不到消費也有可能。

所謂消息租賃,其實是將現在的永久留存的模式改為臨時駐留隊列的模式,具體消息能夠存活多久,這取決于給消息設置的TTL(time to live)時間,而對于TTL的評估來自于隊列申請方根據自身的業務特點而定。當然,TTL可以也可以設置為永久,這需要接收審核。

proxy的必要性

對于一個組件的擴展有很多種模式,比如proxy,smart client,plugin。消息總線封裝自RabbitMQ,其實RabbitMQ官方是帶plugin模式的擴展機制的,無奈語言所限,力所不及。

而對于proxy跟smart client的兩種模式對比來看,可算是各有所長,優勢互補。比如在侵入性上proxy侵入性更小,在掌控性上smart client掌控力更強。這里就不再過多比較。

現在對RabbitMQ的擴展采用的是smart client的形式。但這種方式總有它的受限之處,當你處于一個分布式的環境中,服務器上的資源在很多時候是被共享的(比如RabbitMQ里的隊列,它可以同時被多 個client消費),你可以將它看成是多個分支河流匯聚的場景,分支是沒辦法掌控全部的,你只有依靠Proxy Server。

我曾看到攜程開源的消息系統是在kafka和mysql前端做了一個proxy(它們稱之為broker)。會不會構建一個proxy,看進度吧。但proxy的存在缺失能帶來非常多的好處。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!