基于HBase的消息隊列:HQueue
1. HQueue簡介
HQueue是一淘搜索網頁抓取離線系統團隊基于HBase開發的一套分布式、持久化消息隊列。它利用HTable存儲消息數據,借助HBase Coprocessor將原始的KeyValue數據封裝成消息數據格式進行存儲,并基于HBase Client API封裝了HQueue Client API用于消息存取。
HQueue可以有效使用在需要存儲時間序列數據、作為MapReduce Job和iStream等輸入、輸出供上下游共享數據等場合。
2. HQueue特性
由于HQueue是基于HBase進行消息存取的,因此站在HDFS和HBase的肩膀上,使得其具備如下特點:
(1)支持多Partitions,可根據需求設置Queue的規模,支持高并發訪問(HBase的多Region);
(2)支持自動Failover,任何機器Down掉,Partition可自動遷移至其他機器(HBase的Failover機制);
(3)支持動態負載均衡,Partition可以動態被調度到最合理的機器上(HBase的LoadBalance機制,可動態調整);
(4)利用HBase進行消息的持久化存儲,不丟失數據(HBase HLog和HDFS Append);
(5)隊列的讀寫模式與HBase的存儲特性天然切合,具備良好的并發讀寫性能(最新消息存儲在MemStore中,寫消息直接寫入MemStore,通常場景下都是內存級操作);
(6)支持消息按Topic進行分類存取(HBase中的Qualifier);
(7)支持消息TTL,自動清理過期消息(HBase支持KeyValue級別的TTL);
(8)HQueue = HTable Schema Design + HQueue Coprocessor + HBase Client Wrapper,完全擴展開發,無任何Hack工作,可隨HBase自動升級;
(9)HQueue Client API基于HBase Client Wrapper進行簡單封裝,HBase的ThriftServer使得其支持多語言API,因此HQueue也很容易封裝出多語言API;
(10)HQueue Client API可以天然支持Hadoop MapReduce Job和iStream的InputFormat機制,利用Locality特性將計算調度到存儲最近的機器;
(11)HQueue支持消息訂閱機制(HQueue 0.3及后續版本)。
3. HQueue系統設計及處理流程
3.1. HQueue系統結構
HQueue系統結構如圖(1)所示:
其中:
(1)每個Queue對應一個HTable,創建Queue可以通過Presharding Table方式創建,有利于負載均衡。
(2)每個Queue可以有多個Partitions(HBase Regions),這些Partitions均勻分布在HBase集群中的多個Region Servers中。
(3)每個Partition可以在HBase集群的多個Region Servers中動態遷移。任何一臺Region Server掛掉,運行在其上的HQueue Partition可以自動遷移到其他Region Server上,并且數據不會丟失。當集群負載不均衡時,HQueue Partition會自動被HMaster遷移到負載低的Region Server。
(4)每個Message對應一個HBase KeyValue Pair,按MessageID即時間順序存儲在HBase Region中。MessageID由Timestamp和同一Timestamp下自增的SequenceID構成,詳細信息參見《Message存儲 結構》部分。
3.2. Message存儲結構
Message存儲結構如圖(2)所示:
圖(2):Message存儲結構
其中:
(1)RowKey:由PartitionID和MessageID構成。
-
PartitionID:一個Queue可以有多個Partitions,目前最多支持Short.MAX_VALUE個 Partitions。Partition ID可以不在創建Message對象時指定,而是在發送消息時設定,或者不指定而使用一個隨機Partition ID。
-
MessageID:即消息ID,它由Timestamp和SequenceID兩部分組成。Timestamp是消息寫入HQueue 時的時 間戳,單位為毫秒。SequenceID是同一Timestamp下消息的順序編號,目前最多支持同一Timestamp下 Short.MAX_VALUE個Messages。
(2)Column:由Column Family和Message Topic構成。
-
Column Family:HBase Column Family,此處為固定值“message”。
-
Message Topic :HBase Column Qualifier,消息Topic名稱。用戶可以根據需要將Message存儲在不同的Topics之下,也可以從Queue中獲取感興趣的Topics消息數據。
(3)Value:即消息內容。
3.3. HQueue消息寫入及Coprocessor處理流程
HQueue利用HQueue Client API寫入消息數據,為保證消息唯一和有序,HQueue利用Coprocessor處理用戶寫入消息的MessageID,然后立即放入HBase MemStore中,使其可以被訪問到,最后持久化的HLog中。具體的處理邏輯如圖(3)所示:
圖(3):數據寫入及Coprocessor處理流程
其中:
(1)HQueue封裝了HQueue Client API,用戶可以使用其提供Put等方法向HQueue中寫入消息。
(2)HQueue Client會使用Message.makeKeyValueRow()用于完成將Message數據結構轉換成HBase Rowkey。HQueue所要求的RowKey格式可以參加上述內容。
(3)HQueue Client在完成RowKey的轉換后,會調用HTable的put方法按照HBase標準的寫入流程來完成消息的寫入。
(4)HQueue 上注冊有HQueueCoprocessor,它擴展自BaseRegionObserver。HRegion在真正寫入消息數據前, 會調用HQueueCoprocessor的preBatchMutate方法,該方法主要用于調整MessageID,保證MessageID唯一并且 有序。
(5)在HQueueCoprocessor的preBatchMutate方法中同時會調整Durability為SKIP_WAL,這樣HBase將不會主動將消息數據持久化進HLog。
(6)HRegion在寫入消息數據后,會調用HQueueCoprocessor的postBatchMutate方法,該方法主要完成將消息數據持久化進HLog的功能。
3.4. HQueue Scan處理流程
為了方便從Queue中Scan數據,HQueue封裝了ClientScanner,提供了QueueScanner、 PartitionScanner和CombinedPartitionScanner等Scanner,用于不同的場景。HQueue Scan的具體處理流程如圖(4)所示:
圖(4):HQueue Scan處理流程
其中:
(1)用戶可以根據需要從HQueue Client中獲取所需的Queue Scanner,目前主要提供三種Scanner:
-
QueueScanner:用于Scan Queue中全部Partitions的數據;
-
PartitionScanner:用于Scan Queue中指定Partition的數據;
-
CombinedPartitionScanner:用于Scan Queue中若干指定Partitions的數據。
(2)用戶獲取到Scanner之后,可以循環調用Scanner的next方法依次取出消息數據,直至無數據返回,本次Scan結束。Scan結束后,用戶應主動關閉Scanner以便及時釋放資源。
(3)用戶在不再使用先前創建的Queue對象時,應主動關閉Queue以便及時釋放資源。
3.5. HQueue訂閱流程
3.5.1. 整體流程
HQueue自0.3版本開始提供訂閱功能,一個訂閱者可以訂閱一個Queue的多個Partitions、多個Topics。與用戶使用 Scanner主動Scan消息數據的方式相比,訂閱方式具有(1)消息數據一旦寫入Queue便會被主動推送至訂閱者,消息送達更為及時;(2)訂閱者 被動接收新消息,可以省去HQueue無新消息數據時多余的Scan操作,減少系統開銷等優點。
HQueue訂閱流程處理邏輯如圖(5)所示:
圖(5):HQueue訂閱流程處理邏輯
其中:
(1)HQueue訂閱主要由Subscriber、ZooKeeper和Coprocessor這三部分組成。其中:
-
Subscrier:即訂閱者。主要完成向ZoeoKeeper寫入訂閱信息、啟動監聽、接收新消息并回調注冊在其上的消息處理函數(MessageListener)等功能。
-
ZooKeeper:用于保存訂閱者提交的訂閱信息,主要包括訂閱者訂閱的Queue、Partitions和Topics;訂閱者的地址和Checkpoint等信息,更為詳細信息參見后續描述。
-
Coprocessor:主要完成從ZooKeeper獲取訂閱信息、使用InternalScanner從Queue中Scan最新的消息、將新消息發送至訂閱者并將當前Checkpoint更新至ZooKeeper等功能。
(2)Coprocessor的主要處理流程如下:
Step 1:創建Subscriber,添加訂閱信息和消息處理函數,將訂閱信息寫入ZooKeeper,啟動監聽等待接收新消息。寫入ZooKeeper中的訂閱信息主要包括:
-
訂閱者訂閱的Queue名稱;
-
訂閱者訂閱的Queuee Partitions以及各Partition上消息的起始ID。一個訂閱者可以訂閱多個Partitions,如果沒有指定,那么認為訂閱該Queue的所有Partitions。
-
訂閱者訂閱的消息Topics。一個訂閱者可以訂閱多個主題,如果沒有指定,那么認為訂閱該Queue上的所有Topics。
-
訂閱者的Addresss/Hostname和監聽端口。用戶創建訂閱者時可以指定監聽端口,如果沒有指定,那么會隨機選擇一個當前可用端口作為監聽端口。
Step 2:Coprocessor從ZooKeeper獲取訂閱信息并向ZooKeeper注冊相關Watcher,以便ZooKeeper中訂閱信息發生變化 時ZooKeeper能夠及時通知Coprocessor。Coprocessor在獲取到訂閱信息后,會根據需要創建 SubscriptionWorker等工作線程,以便從HQueue Partition中Scan消息并將消息發送至Subscriber。
Step 3:Coprocessor從HQueue Partition中Scan新消息。
Step 4:Coprocessor將新消息發送至Subscriber。
Step 5:Subscriber在接收到新消息時,會回調注冊在其上的回調函數。
Step 6:待新消息發送成功后,Coprocessor會將消息的Checkpoint更新至ZooKeeper以便后續使用。
Step 7:Subscriber取消訂閱,并從ZooKeeper中刪除必要的訂閱信息。
Step 8:ZooKeeper會通過注冊在其上的Watcher將Subscriber訂閱信息的變化通知至Coprocessor,Coprocessor根據訂閱信息的變化,暫停SubscriptionWorker等工作線程等。
3.5.2. HQueue Subscriber
HQueue Subscriber結構和主要處理邏輯如圖(6)所示:
圖(6):HQueue Subscriber結構和主要處理邏輯
其中:
(1)Subscriber主要由兩部分組成:SubscriberZooKeeper和Thrift Server。其中,SubscriberZooKeeper主要完成與ZooKeeper相關的若干操作,包括寫入訂閱信息、刪除訂閱信息等。 Coprocessor與Subscriber之間的通訊通過Thrift來完成,Subscriber中啟動Thrift Server,監聽指定的端口,等待接收Coprocessor發送過來的新消息。
(2)Subscriber通過Thrift Server接收到新消息后,會回調注冊在其上的回調函數(MessageListeners),并將狀態碼返回給Coprocessor。
(3)可以在一個Subscriber上注冊多個MessageListeners,多個MessageListeners會被依次調用。
3.5.3. HQueue Coprocessor
HQueue Coprocessor結構和主要處理邏輯如圖(7)所示:
圖(7):HQueue Coprocessor結構和主要處理邏輯
其中:
(1)Coprocessor:主要由兩部分構成SubscriptionZooKeeper和SubscriptionWorker。
-
SubscriptionZooKeeper:主要完成與ZooKeeper相關的工作,包括從ZooKeeper獲取訂閱信息并注冊相關Watcher、SubscriptionWorker將Checkpoint更新至ZooKeeper等操作。
-
SubscriptionWorker又主要包括MessageScanner和MessageSender兩部分,主要完成Scan新消息、發送消息至Subscriber和更新Checkpoint等操作。
(2)MessageScanner主要完成創建InternalScanner,從Queue Partition中Scan新消息,并將其放入緩沖隊列中等操作。
-
當緩沖隊列中沒有空閑空間時,MessageScanner會等待直至緩沖隊列中的消息被MessageSender消費掉,騰出剩余空間。
-
當Queue Partition中沒有新消息時,MessageScanner會主動Sleep,當有新消息寫入時,Coprocessor會通過SubscriptionWorker喚醒MessageScanner,開始新一輪Scan。
(3)MessageSender主要完成從緩沖隊列中取出新消息,將其發送至Subscriber,并等待Subscriber發回響應等操作。當緩沖隊列中沒有新消息時,MessageSender會等待直至有新消息到來。
(4)MessageSender中的CheckpointUpdater會定時將當前的Checkpoint寫入ZooKeeper中的相關訂閱節點中,以便后續使用。
3.5.4. 訂閱信息層次結構
HQueue相關訂閱信息保存在ZooKeeper,ZooKeeper中訂閱信息的層次結構如圖(8)所示:
圖(8):訂閱信息層次結構
其中:
(1)訂閱者節點(subscriber_x)上會記錄該訂閱者在Queue Partition上的Checkpoint。該Checkpoint由Subscriber在發起訂閱時寫入,并由 SubscriptionWorker MessageSender中的CheckpointUpdater來更新。
(2)訂閱者節點下會有兩個臨時性節點:address和topics,分別保存訂閱者的IP Address/Hostname:Port和訂閱的主題。當訂閱者主動取消訂閱時會刪除這兩個臨時節點,當訂閱者意外退出時,等Session失效 后,ZooKeeper會刪除該臨時節點。
3.5.5. 訂閱者Thrift Service
HQueue訂閱功能使用Thrift來簡化對多語言客戶端的支持。Subscriber啟動Thrift Server,監聽指定端口,接收消息,并回調MessageListeners以便處理消息。用于描述HQueue Subscriber所提供服務的接口定義如下所示:
namespace java com.etao.hadoop.hbase.queue.thrift.generated /** * HQueue MessageID */ struct TMessageID { 1: i64 timestamp, 2: i16 sequenceID } /** * HQueue Message */ struct TMessage { 1: optional TMessageID id, 2: optional i16 partitionID, 3: binary topic, 4: binary value } /** * HQueue Subscriber Service */ service HQueueSubscriberService { i32 consumeMessages(1:list<TMessage> messages) }
4. HQueue使用
4.1. HQueue Toolkit
為方便用戶使用,HQueue封裝了HQueue Client API用于存取消息數據。自HQueue 0.3版本,HQueue日志運維工具集成進HQueue Shell中,構成HQueue Toolkit,為用戶提供一站式服務,方便用戶管理Queue以及Queue訂閱者。
同HBase Shell使用方式相似,用戶使用$ ${HBASE_HOME}/bin/hqueue shell便可以進入HQueue Shell命令行工具。需要注意的是,用戶在使用HQueue Toolkit之前需要確保已經部署HQueue Toolkit。
HQueue Toolkit中包括創建Queue、Disable Queue、Enable Queue、刪除Queue和清空Queue等命令。使用示例如下:
(1)創建隊列
USAGE:create ‘queue_name’, partition_count, ttl, [Configuration Dictionary]
DESCRIPTIONS:
-
queue_name:待創建的HQueue的名稱,必選參數。
-
partition_count:待創建的HQueue的Partition個數,必選參數。
-
ttl:失效時間,必選參數。
-
Configuration Dictonary:可選配置參數。目前支持的配置參數為:(1)hbase.hqueue.partitionsPerRegion; (2)hbase.hregion.memstore.flush.size;(3)hbase.hregion.majorcompaction; (4)hbase.hstore.compaction.min;(5)hbase.hstore.compaction.max; (6)hbase.hqueue.compression;(7)hbase.hstore.blockingStoreFiles等。
EXAMPLES:
-
hqueue> create ‘q1′, 32, 86400
-
hqueue> create ‘q1′, 32, 86400, {‘hbase.hqueue.partitionsPerRegion’ => ’4′, ‘hbase.hstore.compaction.min’ => ’16′, ‘hbase.hstore.compaction.max’ => ’32′}
(2)清空隊列
USAGE:truncate_queue 'queue_name' DESCRIPTIONS:
queue_name:待清空的Queue名稱,必選參數。
EXAMPLES:
hqueue(main):013:0> truncate_queue 'replication_dev_2_test_queue'
需要注意的是:該命令與HBase Shell中的truncate有所不同,該命令僅會刪除Queue中的數據,而保留Queue的Presharding信息。 更多操作請參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#Queue.E7.AE.A1.E7.90.86 (3)新增訂閱者 USAGE:add_subscriber 'queue_name', 'subscriber_name' DESCRIPTIONS:
queue_name:隊列名稱,必選參數。
subscriber_name:訂閱者名稱,必選參數。
EXAMPLES:
add_subscriber 'replication_dev_2_test_queue', 'subscriber_1'
(4)刪除訂閱者
USAGE:delete_subscriber 'subscriber_name', 'queue_name' DESCRIPTIONS:
queue_name:訂閱者所訂閱的Queue名稱,必選參數。
subscriber_name:訂閱者名稱,必選參數。
EXAMPLES:
hqueue(main):040:0> delete_subscriber 'replication_dev_2_test_queue', 'subscriber_1'
更多信息可以參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#.E8.AE.A2.E9.98.85.E8.80.85.E7.AE.A1.E7.90.86
4.2. Put
HQueue Client API中的Put相關操作可以完成將用戶消息數據寫入HQueue中,Put支持批量操作,具體使用方式示例如下:
HQueue queue = new HQueue(queueName); String topic1 = "crawler"; String value1 = "http://www.360test.com"; // 寫入單條消息數據,不指定Partition ID。在不指定Partition ID的情況下,將會在Queue的所有Partitions中隨機選取一個。 Message message1 = new Message(Bytes.toBytes(topic1), Bytes.toBytes(value1)); queue.put(message); // 寫入Message時,顯式指定PartitionID。 short partitionID = 10; queue.put(partitionID, message1); List<Message> messages = new ArrayList<Message>(); messages.add(message1); String topic2 = "dump"; String value2 = "http://www.jd.com"; Message message2 = new Message(Bytes.toBytes(topic2), Bytes.toBytes(value2)); messages.add(message2); // 寫入多條消息數據,不指定Partition ID。 queue.put(messages); // 寫入多條消息數據,指定Partition ID。 queue.put(partitionID, messages); queue.close();
4.3. Scan
為方便用戶從Queue中Scan消息數據,HQueue Client API提供了三種自定義Scanner,分別為:QueueScanner、PartitionScanner和CombinedPartitionScanner,使用示例如下:
String queueName = "subscription_queue"; Queue queue = new HQueue(queueName); // 起始時間戳 long currentTimestamp = System.currentTimeMillis(); MessageID startMessageID = new MessageID(currentTimestamp - 6000); MessageID stopMessageID = new MessageID(currentTimestamp); Scan scan = new Scan(startMessageID, stopMessageID); // 添加主題 scan.addTopic(Bytes.toBytes("topic1")); scan.addTopic(Bytes.toBytes("topic2")); Message message = null; // 使用QueueScanner,掃描Queue下全部Partitions中的數據 QueueScanner queueScanner = queue.getQueueScanner(scan); while ((message = queueScanner.next()) != null) { // no-op } queueScanner.close(); short partitionID1 = 1; // 使用PartitionScanner,掃描Queue中指定的Partition的數據 PartitionScanner partitionScanner = queue.getPartitionScanner(partitionID1, scan); while ((message = partitionScanner.next()) != null) { // no-op } partitionScanner.close(); short partitionID2 = 2; Map<Short, Scan> partitions = new HashMap<Short, Scan>(); // 添加多個Partitions partitions.put(partitionID1, scan); partitions.put(partitionID2, scan); CombinedPartitionScanner combinedScanner = queue.getCombinedPartitionScanner(partitions); while ((message = combinedScanner.next()) != null) { // no-op } combinedScanner.close(); queue.close();
4.4. 訂閱消息
HQueue自0.3版本開始提供訂閱功能,使用方式示例如下:
HQueue queue = null; HQueueSubscriber subscriber = null; try { String queueName = "subscription_queue"; queue = new HQueue(queueName); Set<Pair<Short, MessageID>> partitions = new HashSet<Pair<Short, MessageID>>(); // 添加所訂閱的Partitions Pair<Short, MessageID> partition1 = new Pair<Short, MessageID>((short)0, null); partitions.add(partition1); Pair<Short, MessageID> partition2 = new Pair<Short, MessageID>((short)1, null); partitions.add(partition2); Pair<Short, MessageID> partition3 = new Pair<Short, MessageID>((short)2, null); partitions.add(partition3); // 添加所訂閱的Topics Set<String> topics = new HashSet<String>(); topics.add("topic_1"); topics.add("topic_2"); topics.add("topic_3"); // 訂閱者名稱 String subscriberName = "subscriber_1"; Subscription subscription = new Subscription(subscriberName, topics); subscription.addPartitions(partitions); // 添加回調函數 List<MessageListener> listeners = new LinkedList<MessageListener>(); MessageListener blackHoleListener = new BlackHoleMessageListener(subscriberName); listeners.add(blackHoleListener); // 創建訂閱者 subscriber = queue.createSubscriber(subscription, listeners); subscriber.start(); Thread.sleep(600000L); subscriber.stop("Time out, request to stop subscriber:" + subscriberName); } catch (Exception ex) { LOG.error("Received unexpected exception when testing subscription.", ex); } finally { if (queue != null) { try { queue.close(); queue=null; } catch (IOException ex) { // ignore the exception } } }
4.5. ThriftServer API
HBase自帶的ThriftServer實現了對HTable的多語言API支持,HQueue在HBase ThriftServer中擴展了對HQueue的支持,使得C++、Python和PHP等語言也可以方便地訪問HQueue。
HQueue目前提供的Thrift API如下所示:
1 | ScannerID messageScannerOpen(1:Text queueName,2:i16 partitionID,3:TMessageScan messageScan) | 根據Scan,打開Queue中某個Partition上的Scanner |
2 | TMessage messageScannerGet(1:ScannerID id) | 逐條獲取Message |
3 | list<TMessage> messageScannerGetList(1:ScannerID id,2:i32 nbMessages) | 批量獲取Messages |
4 | void messageScannerClose(1:ScannerID id) | 關閉ScannerID |
5 | void putMessage(1:Text queueName,2:TMessage tMessage) | 向Queue中寫入Message,使用隨機的Partition ID |
6 | void putMessages(1:Text queueName,2:list<TMessage> tMessages) | 向Queue中批量寫入Messages,使用隨機的Partition ID |
7 | void putMessageWithPid(1:Text queueName,2:i16 partitionID,3:TMessage tMessage) | 向Queue中寫入Message,使用指定的Partition ID |
8 | void putMessagesWithPid(1:Text queueName,2:i16 partitionID,3:list<TMessage> tMessages) | 向Queue中批量寫入Messages,使用指定的Partition ID |
9 | list<Text> getQueueLocations(1:Text queueName) | 獲取Queue中所有Partition所在主機的地址 |
5. 總結
以上是對HQueue概念、特性、系統設計、處理流程以及應用等方面的簡單闡述,希望對大家有所幫助。