Storm在線業務實踐-集群空閑CPU飆高問題排查

jopen 9年前發布 | 29K 次閱讀 Storm 分布式/云計算/大數據

最近將公司的在線業務遷移到Storm集群上,上線后遇到低峰期CPU耗費嚴重的情況。在解決問題的過程中深入了解了storm的內部實現原 理,并且解決了一個storm0.9-0.10版本一直存在的嚴重bug,目前代碼已經合并到了storm新版本中,在這篇文章里會介紹這個問題出現的場 景、分析思路、解決的方式和一些個人的收獲。

背景

首先簡單介紹一下Storm,熟悉的同學可以直接跳過這段。

Storm是推ter開源的一個大數據處理框架,專注于流式數據的處理。Storm通過創建拓撲結構(Topology)來轉換數據流。和Hadoop的作業(Job)不同,Topology會持續轉換數據,除非被集群關閉。

下圖是一個簡單的Storm Topology結構圖。

Storm在線業務實踐-集群空閑CPU飆高問題排查

可以看出Topology是由不同組件(Component)串/并聯形成的有向圖。數據元組(Tuple)會在Component之間通過數據流的形式進行有向傳遞。Component有兩種

  • Spout:Tuple來源節點,持續不斷的產生Tuple,形成數據流
  • Bolt:Tuple處理節點,處理收到的Tuple,如果有需要,也可以生成新的Tuple傳遞到其他Bolt

目前業界主要在離線或者對實時性要求不高業務中使用Storm。隨著Storm版本的更迭,可靠性和實時性在逐漸增強,已經有運行在線業務的能力。因此我們嘗試將一些實時性要求在百毫秒級的在線業務遷入Storm集群。

現象

  1. 某次高峰時,Storm上的一個業務拓撲頻繁出現消息處理延遲。延時達到了10s甚至更高。查看高峰時的物理機指標監控,CPU、內存和IO都有很大的余量。判斷是隨著業務增長,服務流量逐漸增加,某個Bolt之前設置的并行度不夠,導致消息堆積了。
  2. 臨時增加該Bolt并行度,解決了延遲的問題,但是第二天的低峰期,服務突然報警,CPU負載過高,達到了100%。

排查

  1. 用Top看了下CPU占用,系統調用占用了70%左右。再用wtool對Storm的工作進程進行分析,找到了CPU占用最高的線程
    java.lang.Thread.State: TIMED_WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
            - parking to wait for  <0x0000000640a248f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
            at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
            at com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
            at com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
            at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:97)
            at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
            at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
            at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
            at clojure.lang.AFn.run(AFn.java:24)
            at java.lang.Thread.run(Thread.java:745)

    我們可以看到這些線程都在信號量上等待。調用的來源是disruptor$consume_batch_when_available。

  2. disruptor是Storm內部消息隊列的封裝。所以先了解了一下Storm內部的消息傳輸機制。

    Storm在線業務實踐-集群空閑CPU飆高問題排查(圖片來源Understanding the Internal Message Buffers of Storm

    Storm的工作節點稱為Worker(其實就是一個JVM進程)。不同Worker之間通過Netty(舊版Storm使用ZeroMQ)進行通訊。

    每個Worker內部包含一組Executor。Strom會為拓撲中的每個Component都分配一個Executor。在實際的數據處理流程 中,數據以消息的形式在Executor之間流轉。Executor會循環調用綁定的Component的處理方法來處理收到的消息。

    Executor之間的消息傳輸使用隊列作為消息管道。Storm會給每個Executor分配兩個隊列和兩個處理線程。

    • 工作線程:讀取接收隊列,對消息進行處理,如果產生新的消息,會寫入發送隊列
    • 發送線程:讀取發送隊列,將消息發送其他Executor

    當Executor的發送線程發送消息時,會判斷目標Executor是否在同一Worker內,如果是,則直接將消息寫入目標Executor的接收隊列,如果不是,則將消息寫入Worker的傳輸隊列,通過網絡發送。

    Executor工作/發送線程讀取隊列的代碼如下,這里會循環調用consume-batch-when-available讀取隊列中的消息,并對消息進行處理。

    (async-loop
      (fn []
        ...
        (disruptor/consume-batch-when-available receive-queue event-handler)            
        ...
        ))
  3. 我們再來看一下consume_batch_when_available這個函數里做了什么。
    (defn consume-batch-when-available
      [^DisruptorQueue queue handler]
      (.consumeBatchWhenAvailable queue handler))

    前面提到Storm使用隊列作為消息管道。Storm作為流式大數據處理框架,對消息傳輸的性能很敏感,因此使用了高效內存隊列Disruptor Queue作為消息隊列。

    Storm在線業務實踐-集群空閑CPU飆高問題排查

    Disruptor Queue是LMAX開源的一個無鎖內存隊列。內部實現如下。

    Storm在線業務實踐-集群空閑CPU飆高問題排查(圖片來源Disruptor queue Introduction

    Disruptor Queue通過Sequencer來管理隊列,Sequencer內部使用RingBuffer存儲消息。RingBuffer中消息的位置使用Sequence表示。隊列的生產消費過程如下

    • Sequencer使用一個Cursor來保存寫入位置。
    • 每個Consumer都會維護一個消費位置,并注冊到Sequencer。
    • Consumer通過SequenceBarrier和Sequencer進行交互。Consumer每次消費時,SequenceBarrier會比較消費位置和Cursor來判斷是否有可用消息:如果沒有,會按照設定的策略等待消息;如果有,則讀取消息,修改消費位置。
    • Producer在寫入前會查看所有消費者的消費位置,在有可用位置時會寫入消息,更新Cursor。

    查看DisruptorQueue.consumeBatchWhenAvailable實現如下

    final long nextSequence = _consumer.get() + 1;
    final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
    if (availableSequence >= nextSequence) {
        consumeBatchToCursor(availableSequence, handler);
    }

    繼續查看_barrier.waitFor方法

    public long waitFor(final long sequence, final long timeout, final TimeUnit units) throws AlertException, InterruptedException {
        checkAlert();
        return waitStrategy.waitFor(sequence, cursorSequence, dependentSequences, this, timeout, units);
    }

    Disruptor Queue為消費者提供了若干種消息等待策略

    • BlockingWaitStrategy:阻塞等待,CPU占用小,但是會切換線程,延遲較高
    • BusySpinWaitStrategy:自旋等待,CPU占用高,但是無需切換線程,延遲低
    • YieldingWaitStrategy:先自旋等待,然后使用Thread.yield()喚醒其他線程,CPU占用和延遲比較均衡
    • SleepingWaitStrategy:先自旋,然后Thread.yield(),最后調用LockSupport.parkNanos(1L),CPU占用和延遲比較均衡

    Storm的默認等待策略為BlockingWaitStrategy。BlockingWaitStrategy的waitFor函數實現如下

    if ((availableSequence = cursor.get()) < sequence) {
            lock.lock();
            try {
                ++numWaiters;
                while ((availableSequence = cursor.get()) < sequence) {
                    barrier.checkAlert();
    
                    if (!processorNotifyCondition.await(timeout, sourceUnit)) {
                        break;
                    }
                }
            }
            finally {
                --numWaiters;
                lock.unlock();
            }
    }

    BlockingWaitStrategy內部使用信號量來阻塞Consumer,當await超時后,Consumer線程會被自動喚醒,繼續循環查詢可用消息。

  4. 而DisruptorQueue.consumeBatchWhenAvailable方法中可以看到,Storm此處設置超時為10ms。推測在 沒有消息或者消息量較少時,Executor在消費隊列時會被阻塞,由于超時時間很短,工作線程會頻繁超時然后重新阻塞,導致CPU占用飆高。

    嘗試將10ms修改成100ms,編譯Storm后重新部署集群,使用Storm的demo拓撲,將bolt并發度調到1000,修改spout代碼為10s發一條消息。經測試CPU占用大幅減少。

    再將100ms改成1s,測試CPU占用基本降為零。

  5. 但是隨著調高超時,測試時并沒有發現消息處理有延時。繼續查看BlockingWaitStrategy代碼,發現Disruptor Queu的Producer在寫入消息后會喚醒等待的Consumer。

    if (0 != numWaiters)
    {
        lock.lock();
        try
        {
            processorNotifyCondition.signalAll();
        }
        finally
        {
            lock.unlock();
        }
    }

    這樣,Storm的10ms超時就很奇怪了,沒有減少消息延時,反而增加了系統負載。帶著這個疑問查看代碼的上下文,發現在構造DisruptorQueue對象時有這么一句注釋

    ;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), 
            as sometimes the consumer stays blocked even when there's an item on the queue.
    (defnk disruptor-queue
        [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
        (DisruptorQueue. queue-name
                    ((CLAIM-STRATEGY claim-strategy) buffer-size)
                    (mk-wait-strategy wait-strategy)))

    Storm使用的Disruptor Queue版本為2.10.1。查看Disruptor Queue的change log,發現該版本的BlockingWaitStrategy有潛在的并發問題,可能導致某條消息在寫入時沒有喚醒等待的消費者。

    2.10.2 Released (21-Aug-2012)

    • Bug fix, potential race condition in BlockingWaitStrategy.
    • Bug fix set initial SequenceGroup value to -1 (Issue #27).
    • Deprecate timeout methods that will be removed in version 3.

    因此Storm使用了短超時,這樣在出現并發問題時,沒有被喚醒的消費方也會很快因為超時重新查詢可用消息,防止出現消息延時。

    這樣如果直接修改超時到1000ms,一旦出現并發問題,最壞情況下消息會延遲1000ms。在權衡性能和延時之后,我們在Storm的配置文件中增加配置項來修改超時參數。這樣使用者可以自己選擇保證低延時還是低CPU占用率。

  6. 就BlockingWaitStrategy的潛在并發問題咨詢了Disruptor Queue的作者,得知2.10.4版本已經修復了這個并發問題(Race condition in 2.10.1 release
    )。

    將Storm依賴升級到此版本。但是對Disruptor Queue的2.10.1做了并發測試,無法復現這個并發問題,因此也無法確定2.10.4是否徹底修復。謹慎起見,在升級依賴的同時保留了之前的超時配 置項,并將默認超時調整為1000ms。經測試,在集群空閑時CPU占用正常,并且壓測也沒有出現消息延時。

總結

  1. 關于集群空閑CPU反而飆高的問題,已經向Storm社區提交PR并且已被接受[STORM-935] Update Disruptor queue version to 2.10.4。在線業務流量通常起伏很大,如果被這個問題困擾,可以考慮應用此patch。
  2. Storm UI中可以看到很多有用的信息,但是缺乏記錄,最好對其進行二次開發(或者直接讀取ZooKeeper中信息),記錄每個時間段的數據,方便分析集群和拓撲運行狀況。

來自:http://daiwa.ninja/index.php/2015/07/18/storm-cpu-overload/

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!