基于HBase的消息隊列:HQueue

jopen 10年前發布 | 50K 次閱讀 HQueue 消息系統

1. HQueue簡介

HQueue是一淘搜索網頁抓取離線系統團隊基于HBase開發的一套分布式、持久化消息隊列。它利用HTable存儲消息數據,借助HBase Coprocessor將原始的KeyValue數據封裝成消息數據格式進行存儲,并基于HBase Client API封裝了HQueue Client API用于消息存取。

HQueue可以有效使用在需要存儲時間序列數據、作為MapReduce Job和iStream等輸入、輸出供上下游共享數據等場合。

2. HQueue特性

由于HQueue是基于HBase進行消息存取的,因此站在HDFS和HBase的肩膀上,使得其具備如下特點:

(1)支持多Partitions,可根據需求設置Queue的規模,支持高并發訪問(HBase的多Region);

(2)支持自動Failover,任何機器Down掉,Partition可自動遷移至其他機器(HBase的Failover機制);

(3)支持動態負載均衡,Partition可以動態被調度到最合理的機器上(HBase的LoadBalance機制,可動態調整);

(4)利用HBase進行消息的持久化存儲,不丟失數據(HBase HLog和HDFS Append);

(5)隊列的讀寫模式與HBase的存儲特性天然切合,具備良好的并發讀寫性能(最新消息存儲在MemStore中,寫消息直接寫入MemStore,通常場景下都是內存級操作);

(6)支持消息按Topic進行分類存取(HBase中的Qualifier);

(7)支持消息TTL,自動清理過期消息(HBase支持KeyValue級別的TTL);

(8)HQueue = HTable Schema Design + HQueue Coprocessor + HBase Client Wrapper,完全擴展開發,無任何Hack工作,可隨HBase自動升級;

(9)HQueue Client API基于HBase Client Wrapper進行簡單封裝,HBase的ThriftServer使得其支持多語言API,因此HQueue也很容易封裝出多語言API;
(10)HQueue Client API可以天然支持Hadoop MapReduce Job和iStream的InputFormat機制,利用Locality特性將計算調度到存儲最近的機器;

(11)HQueue支持消息訂閱機制(HQueue 0.3及后續版本)。

3. HQueue系統設計及處理流程

3.1. HQueue系統結構

HQueue系統結構如圖(1)所示:

HQueue系統結構圖(1):HQueue系統結構

其中:

(1)每個Queue對應一個HTable,創建Queue可以通過Presharding Table方式創建,有利于負載均衡。

(2)每個Queue可以有多個Partitions(HBase Regions),這些Partitions均勻分布在HBase集群中的多個Region Servers中。

(3)每個Partition可以在HBase集群的多個Region Servers中動態遷移。任何一臺Region Server掛掉,運行在其上的HQueue Partition可以自動遷移到其他Region Server上,并且數據不會丟失。當集群負載不均衡時,HQueue Partition會自動被HMaster遷移到負載低的Region Server。

(4)每個Message對應一個HBase KeyValue Pair,按MessageID即時間順序存儲在HBase Region中。MessageID由Timestamp和同一Timestamp下自增的SequenceID構成,詳細信息參見《Message存儲 結構》部分。

3.2. Message存儲結構

Message存儲結構如圖(2)所示:

Message存儲結構

圖(2):Message存儲結構

其中:

(1)RowKey:由PartitionID和MessageID構成。

  • PartitionID:一個Queue可以有多個Partitions,目前最多支持Short.MAX_VALUE個 Partitions。Partition ID可以不在創建Message對象時指定,而是在發送消息時設定,或者不指定而使用一個隨機Partition ID。

  • MessageID:即消息ID,它由Timestamp和SequenceID兩部分組成。Timestamp是消息寫入HQueue 時的時 間戳,單位為毫秒。SequenceID是同一Timestamp下消息的順序編號,目前最多支持同一Timestamp下 Short.MAX_VALUE個Messages。

(2)Column:由Column Family和Message Topic構成。

  • Column Family:HBase Column Family,此處為固定值“message”。

  • Message Topic :HBase Column Qualifier,消息Topic名稱。用戶可以根據需要將Message存儲在不同的Topics之下,也可以從Queue中獲取感興趣的Topics消息數據。

(3)Value:即消息內容。

3.3. HQueue消息寫入及Coprocessor處理流程

HQueue利用HQueue Client API寫入消息數據,為保證消息唯一和有序,HQueue利用Coprocessor處理用戶寫入消息的MessageID,然后立即放入HBase MemStore中,使其可以被訪問到,最后持久化的HLog中。具體的處理邏輯如圖(3)所示:

數據寫入及Coprocessor處理流程

圖(3):數據寫入及Coprocessor處理流程

其中:

(1)HQueue封裝了HQueue Client API,用戶可以使用其提供Put等方法向HQueue中寫入消息。

(2)HQueue Client會使用Message.makeKeyValueRow()用于完成將Message數據結構轉換成HBase Rowkey。HQueue所要求的RowKey格式可以參加上述內容。
(3)HQueue Client在完成RowKey的轉換后,會調用HTable的put方法按照HBase標準的寫入流程來完成消息的寫入。
(4)HQueue 上注冊有HQueueCoprocessor,它擴展自BaseRegionObserver。HRegion在真正寫入消息數據前, 會調用HQueueCoprocessor的preBatchMutate方法,該方法主要用于調整MessageID,保證MessageID唯一并且 有序。
(5)在HQueueCoprocessor的preBatchMutate方法中同時會調整Durability為SKIP_WAL,這樣HBase將不會主動將消息數據持久化進HLog。
(6)HRegion在寫入消息數據后,會調用HQueueCoprocessor的postBatchMutate方法,該方法主要完成將消息數據持久化進HLog的功能。

3.4. HQueue Scan處理流程

為了方便從Queue中Scan數據,HQueue封裝了ClientScanner,提供了QueueScanner、 PartitionScanner和CombinedPartitionScanner等Scanner,用于不同的場景。HQueue Scan的具體處理流程如圖(4)所示:

HQueue Scan處理流程

圖(4):HQueue Scan處理流程

其中:

(1)用戶可以根據需要從HQueue Client中獲取所需的Queue Scanner,目前主要提供三種Scanner:

  • QueueScanner:用于Scan Queue中全部Partitions的數據;

  • PartitionScanner:用于Scan Queue中指定Partition的數據;

  • CombinedPartitionScanner:用于Scan Queue中若干指定Partitions的數據。

(2)用戶獲取到Scanner之后,可以循環調用Scanner的next方法依次取出消息數據,直至無數據返回,本次Scan結束。Scan結束后,用戶應主動關閉Scanner以便及時釋放資源。
(3)用戶在不再使用先前創建的Queue對象時,應主動關閉Queue以便及時釋放資源。

3.5. HQueue訂閱流程

3.5.1. 整體流程

HQueue自0.3版本開始提供訂閱功能,一個訂閱者可以訂閱一個Queue的多個Partitions、多個Topics。與用戶使用 Scanner主動Scan消息數據的方式相比,訂閱方式具有(1)消息數據一旦寫入Queue便會被主動推送至訂閱者,消息送達更為及時;(2)訂閱者 被動接收新消息,可以省去HQueue無新消息數據時多余的Scan操作,減少系統開銷等優點。

HQueue訂閱流程處理邏輯如圖(5)所示:

HQueue訂閱流程處理邏輯

圖(5):HQueue訂閱流程處理邏輯

其中:

(1)HQueue訂閱主要由Subscriber、ZooKeeper和Coprocessor這三部分組成。其中:

  • Subscrier:即訂閱者。主要完成向ZoeoKeeper寫入訂閱信息、啟動監聽、接收新消息并回調注冊在其上的消息處理函數(MessageListener)等功能。

  • ZooKeeper:用于保存訂閱者提交的訂閱信息,主要包括訂閱者訂閱的Queue、Partitions和Topics;訂閱者的地址和Checkpoint等信息,更為詳細信息參見后續描述。

  • Coprocessor:主要完成從ZooKeeper獲取訂閱信息、使用InternalScanner從Queue中Scan最新的消息、將新消息發送至訂閱者并將當前Checkpoint更新至ZooKeeper等功能。

(2)Coprocessor的主要處理流程如下:
Step 1:創建Subscriber,添加訂閱信息和消息處理函數,將訂閱信息寫入ZooKeeper,啟動監聽等待接收新消息。寫入ZooKeeper中的訂閱信息主要包括:

  • 訂閱者訂閱的Queue名稱;

  • 訂閱者訂閱的Queuee Partitions以及各Partition上消息的起始ID。一個訂閱者可以訂閱多個Partitions,如果沒有指定,那么認為訂閱該Queue的所有Partitions。

  • 訂閱者訂閱的消息Topics。一個訂閱者可以訂閱多個主題,如果沒有指定,那么認為訂閱該Queue上的所有Topics。

  • 訂閱者的Addresss/Hostname和監聽端口。用戶創建訂閱者時可以指定監聽端口,如果沒有指定,那么會隨機選擇一個當前可用端口作為監聽端口。

Step 2:Coprocessor從ZooKeeper獲取訂閱信息并向ZooKeeper注冊相關Watcher,以便ZooKeeper中訂閱信息發生變化 時ZooKeeper能夠及時通知Coprocessor。Coprocessor在獲取到訂閱信息后,會根據需要創建 SubscriptionWorker等工作線程,以便從HQueue Partition中Scan消息并將消息發送至Subscriber。
Step 3:Coprocessor從HQueue Partition中Scan新消息。
Step 4:Coprocessor將新消息發送至Subscriber。
Step 5:Subscriber在接收到新消息時,會回調注冊在其上的回調函數。
Step 6:待新消息發送成功后,Coprocessor會將消息的Checkpoint更新至ZooKeeper以便后續使用。
Step 7:Subscriber取消訂閱,并從ZooKeeper中刪除必要的訂閱信息。
Step 8:ZooKeeper會通過注冊在其上的Watcher將Subscriber訂閱信息的變化通知至Coprocessor,Coprocessor根據訂閱信息的變化,暫停SubscriptionWorker等工作線程等。

3.5.2. HQueue Subscriber

HQueue Subscriber結構和主要處理邏輯如圖(6)所示:

HQueue Subscriber結構和主要處理邏輯

圖(6):HQueue Subscriber結構和主要處理邏輯

其中:

(1)Subscriber主要由兩部分組成:SubscriberZooKeeper和Thrift Server。其中,SubscriberZooKeeper主要完成與ZooKeeper相關的若干操作,包括寫入訂閱信息、刪除訂閱信息等。 Coprocessor與Subscriber之間的通訊通過Thrift來完成,Subscriber中啟動Thrift Server,監聽指定的端口,等待接收Coprocessor發送過來的新消息。

(2)Subscriber通過Thrift Server接收到新消息后,會回調注冊在其上的回調函數(MessageListeners),并將狀態碼返回給Coprocessor。
(3)可以在一個Subscriber上注冊多個MessageListeners,多個MessageListeners會被依次調用。

3.5.3. HQueue Coprocessor

HQueue Coprocessor結構和主要處理邏輯如圖(7)所示:

HQueue Coprocessor結構和主要處理邏輯

圖(7):HQueue Coprocessor結構和主要處理邏輯

其中:

(1)Coprocessor:主要由兩部分構成SubscriptionZooKeeper和SubscriptionWorker。

  • SubscriptionZooKeeper:主要完成與ZooKeeper相關的工作,包括從ZooKeeper獲取訂閱信息并注冊相關Watcher、SubscriptionWorker將Checkpoint更新至ZooKeeper等操作。

  • SubscriptionWorker又主要包括MessageScanner和MessageSender兩部分,主要完成Scan新消息、發送消息至Subscriber和更新Checkpoint等操作。

(2)MessageScanner主要完成創建InternalScanner,從Queue Partition中Scan新消息,并將其放入緩沖隊列中等操作。

  • 當緩沖隊列中沒有空閑空間時,MessageScanner會等待直至緩沖隊列中的消息被MessageSender消費掉,騰出剩余空間。

  • 當Queue Partition中沒有新消息時,MessageScanner會主動Sleep,當有新消息寫入時,Coprocessor會通過SubscriptionWorker喚醒MessageScanner,開始新一輪Scan。

(3)MessageSender主要完成從緩沖隊列中取出新消息,將其發送至Subscriber,并等待Subscriber發回響應等操作。當緩沖隊列中沒有新消息時,MessageSender會等待直至有新消息到來。
(4)MessageSender中的CheckpointUpdater會定時將當前的Checkpoint寫入ZooKeeper中的相關訂閱節點中,以便后續使用。

3.5.4. 訂閱信息層次結構

HQueue相關訂閱信息保存在ZooKeeper,ZooKeeper中訂閱信息的層次結構如圖(8)所示:

訂閱信息層次結構

圖(8):訂閱信息層次結構

其中:

(1)訂閱者節點(subscriber_x)上會記錄該訂閱者在Queue Partition上的Checkpoint。該Checkpoint由Subscriber在發起訂閱時寫入,并由 SubscriptionWorker MessageSender中的CheckpointUpdater來更新。
(2)訂閱者節點下會有兩個臨時性節點:address和topics,分別保存訂閱者的IP Address/Hostname:Port和訂閱的主題。當訂閱者主動取消訂閱時會刪除這兩個臨時節點,當訂閱者意外退出時,等Session失效 后,ZooKeeper會刪除該臨時節點。

3.5.5. 訂閱者Thrift Service

HQueue訂閱功能使用Thrift來簡化對多語言客戶端的支持。Subscriber啟動Thrift Server,監聽指定端口,接收消息,并回調MessageListeners以便處理消息。用于描述HQueue Subscriber所提供服務的接口定義如下所示:

namespace java com.etao.hadoop.hbase.queue.thrift.generated
/**
* HQueue MessageID
*/
struct TMessageID {
  1: i64 timestamp,
  2: i16 sequenceID
}
/**
* HQueue Message
*/
struct TMessage {
  1: optional TMessageID id,
  2: optional i16 partitionID,
  3: binary topic,
  4: binary value
}
/**
* HQueue Subscriber Service
*/
service HQueueSubscriberService {
  i32 consumeMessages(1:list<TMessage> messages)
}

4. HQueue使用

4.1. HQueue Toolkit

為方便用戶使用,HQueue封裝了HQueue Client API用于存取消息數據。自HQueue 0.3版本,HQueue日志運維工具集成進HQueue Shell中,構成HQueue Toolkit,為用戶提供一站式服務,方便用戶管理Queue以及Queue訂閱者。

同HBase Shell使用方式相似,用戶使用$ ${HBASE_HOME}/bin/hqueue shell便可以進入HQueue Shell命令行工具。需要注意的是,用戶在使用HQueue Toolkit之前需要確保已經部署HQueue Toolkit。

HQueue Toolkit中包括創建Queue、Disable Queue、Enable Queue、刪除Queue和清空Queue等命令。使用示例如下:

(1)創建隊列

USAGE:create ‘queue_name’, partition_count, ttl, [Configuration Dictionary]

DESCRIPTIONS:

  • queue_name:待創建的HQueue的名稱,必選參數。

  • partition_count:待創建的HQueue的Partition個數,必選參數。

  • ttl:失效時間,必選參數。

  • Configuration Dictonary:可選配置參數。目前支持的配置參數為:(1)hbase.hqueue.partitionsPerRegion; (2)hbase.hregion.memstore.flush.size;(3)hbase.hregion.majorcompaction; (4)hbase.hstore.compaction.min;(5)hbase.hstore.compaction.max; (6)hbase.hqueue.compression;(7)hbase.hstore.blockingStoreFiles等。

EXAMPLES:

  • hqueue> create ‘q1′, 32, 86400

  • hqueue> create ‘q1′, 32, 86400, {‘hbase.hqueue.partitionsPerRegion’ => ’4′, ‘hbase.hstore.compaction.min’ => ’16′, ‘hbase.hstore.compaction.max’ => ’32′}

(2)清空隊列

USAGE:truncate_queue 'queue_name'
DESCRIPTIONS:
  • queue_name:待清空的Queue名稱,必選參數。
EXAMPLES:
  • hqueue(main):013:0> truncate_queue 'replication_dev_2_test_queue'
需要注意的是:該命令與HBase Shell中的truncate有所不同,該命令僅會刪除Queue中的數據,而保留Queue的Presharding信息。
    更多操作請參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#Queue.E7.AE.A1.E7.90.86
(3)新增訂閱者
USAGE:add_subscriber 'queue_name', 'subscriber_name'
DESCRIPTIONS:
  • queue_name:隊列名稱,必選參數。
  • subscriber_name:訂閱者名稱,必選參數。
EXAMPLES:
  • add_subscriber 'replication_dev_2_test_queue', 'subscriber_1'

(4)刪除訂閱者

USAGE:delete_subscriber 'subscriber_name', 'queue_name'
DESCRIPTIONS:
  • queue_name:訂閱者所訂閱的Queue名稱,必選參數。
  • subscriber_name:訂閱者名稱,必選參數。
EXAMPLES:
  • hqueue(main):040:0> delete_subscriber 'replication_dev_2_test_queue', 'subscriber_1'

更多信息可以參閱:http://searchwiki.taobao.ali.com/index.php/HQueue_Toolkit#.E8.AE.A2.E9.98.85.E8.80.85.E7.AE.A1.E7.90.86

4.2. Put

HQueue Client API中的Put相關操作可以完成將用戶消息數據寫入HQueue中,Put支持批量操作,具體使用方式示例如下:

HQueue queue = new HQueue(queueName);

String topic1 = "crawler";
String value1 = "http://www.360test.com";

// 寫入單條消息數據,不指定Partition ID。在不指定Partition ID的情況下,將會在Queue的所有Partitions中隨機選取一個。
Message message1 = new Message(Bytes.toBytes(topic1), Bytes.toBytes(value1));
queue.put(message);

// 寫入Message時,顯式指定PartitionID。
short partitionID = 10;
queue.put(partitionID, message1);

List<Message> messages = new ArrayList<Message>();
messages.add(message1);

String topic2 = "dump";
String value2 = "http://www.jd.com";
Message message2 = new Message(Bytes.toBytes(topic2), Bytes.toBytes(value2));
messages.add(message2);

// 寫入多條消息數據,不指定Partition ID。      
queue.put(messages);

// 寫入多條消息數據,指定Partition ID。
queue.put(partitionID, messages);

queue.close();

4.3. Scan

為方便用戶從Queue中Scan消息數據,HQueue Client API提供了三種自定義Scanner,分別為:QueueScanner、PartitionScanner和CombinedPartitionScanner,使用示例如下:

String queueName = "subscription_queue";
Queue queue = new HQueue(queueName);

// 起始時間戳
long currentTimestamp = System.currentTimeMillis();
MessageID startMessageID = new MessageID(currentTimestamp - 6000);
MessageID stopMessageID = new MessageID(currentTimestamp);

Scan scan = new Scan(startMessageID, stopMessageID);
// 添加主題
scan.addTopic(Bytes.toBytes("topic1"));
scan.addTopic(Bytes.toBytes("topic2"));

Message message = null;

// 使用QueueScanner,掃描Queue下全部Partitions中的數據
QueueScanner queueScanner = queue.getQueueScanner(scan);
while ((message = queueScanner.next()) != null) {
    // no-op
}
queueScanner.close();

short partitionID1 = 1;

// 使用PartitionScanner,掃描Queue中指定的Partition的數據
PartitionScanner partitionScanner = queue.getPartitionScanner(partitionID1, scan);
while ((message = partitionScanner.next()) != null) {
    // no-op
}
partitionScanner.close();

short partitionID2 = 2;
Map<Short, Scan> partitions = new HashMap<Short, Scan>();
// 添加多個Partitions
partitions.put(partitionID1, scan);
partitions.put(partitionID2, scan);

CombinedPartitionScanner combinedScanner = queue.getCombinedPartitionScanner(partitions);
while ((message = combinedScanner.next()) != null) {
    // no-op
}
combinedScanner.close();

queue.close();

4.4. 訂閱消息

HQueue自0.3版本開始提供訂閱功能,使用方式示例如下:

HQueue queue = null;
HQueueSubscriber subscriber = null;

try {
    String queueName = "subscription_queue";
    queue = new HQueue(queueName);

    Set<Pair<Short, MessageID>> partitions = new HashSet<Pair<Short, MessageID>>();

    // 添加所訂閱的Partitions        
    Pair<Short, MessageID> partition1 = new Pair<Short, MessageID>((short)0, null);
    partitions.add(partition1);
    Pair<Short, MessageID> partition2 = new Pair<Short, MessageID>((short)1, null);
    partitions.add(partition2);
    Pair<Short, MessageID> partition3 = new Pair<Short, MessageID>((short)2, null);
    partitions.add(partition3);

    // 添加所訂閱的Topics            
    Set<String> topics = new HashSet<String>();
    topics.add("topic_1");
    topics.add("topic_2");
    topics.add("topic_3");

    // 訂閱者名稱           
    String subscriberName = "subscriber_1";

    Subscription subscription = new Subscription(subscriberName, topics);
    subscription.addPartitions(partitions);

    // 添加回調函數          
    List<MessageListener> listeners = new LinkedList<MessageListener>();
    MessageListener blackHoleListener = new BlackHoleMessageListener(subscriberName);
    listeners.add(blackHoleListener);

    // 創建訂閱者           
    subscriber = queue.createSubscriber(subscription, listeners);

    subscriber.start();

    Thread.sleep(600000L);
    subscriber.stop("Time out, request to stop subscriber:" + subscriberName);
} catch (Exception ex) {
    LOG.error("Received unexpected exception when testing subscription.", ex);
} finally {
    if (queue != null) {
        try {
        queue.close();
        queue=null;
        } catch (IOException ex) {
            // ignore the exception
        }
    }
}

4.5. ThriftServer API

HBase自帶的ThriftServer實現了對HTable的多語言API支持,HQueue在HBase ThriftServer中擴展了對HQueue的支持,使得C++、Python和PHP等語言也可以方便地訪問HQueue。

HQueue目前提供的Thrift API如下所示:

1 ScannerID messageScannerOpen(1:Text queueName,2:i16 partitionID,3:TMessageScan messageScan) 根據Scan,打開Queue中某個Partition上的Scanner
2 TMessage messageScannerGet(1:ScannerID id) 逐條獲取Message
3 list<TMessage> messageScannerGetList(1:ScannerID id,2:i32 nbMessages) 批量獲取Messages
4 void messageScannerClose(1:ScannerID id) 關閉ScannerID
5 void putMessage(1:Text queueName,2:TMessage tMessage) 向Queue中寫入Message,使用隨機的Partition ID
6 void putMessages(1:Text queueName,2:list<TMessage> tMessages) 向Queue中批量寫入Messages,使用隨機的Partition ID
7 void putMessageWithPid(1:Text queueName,2:i16 partitionID,3:TMessage tMessage) 向Queue中寫入Message,使用指定的Partition ID
8 void putMessagesWithPid(1:Text queueName,2:i16 partitionID,3:list<TMessage> tMessages) 向Queue中批量寫入Messages,使用指定的Partition ID
9 list<Text> getQueueLocations(1:Text queueName) 獲取Queue中所有Partition所在主機的地址

5. 總結

以上是對HQueue概念、特性、系統設計、處理流程以及應用等方面的簡單闡述,希望對大家有所幫助。

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!