storm拓撲的并行度(parallelism)概念

MillieSpurl 8年前發布 | 23K 次閱讀 分布式/云計算/大數據

來自: http://my.oschina.net/u/2000675/blog/613142


1 storm并行的基本概念

一個運行中的拓撲是由什么構成的:工作進程(worker processes),執行器(executors)和任務(tasks)
在一個 Storm 集群中,Storm 主要通過以下三個部件來運行拓撲:
工作進程(worker processes)(進程數)
執行器(executors)(線程數)
任務(tasks)(實例組件數)

storm集群中的一個機器可以運行一個或者多個worker,對應于一個或者多個topologies.1個worker進程運行1個或多個excutor線程。每個worker從屬于一個topology.executor是單線程。每1個executor運行著相同的組件(spout或bolt)的1個或多個task。1個task執行著實際的數據處理。

下面是他們之間相互關系的簡單圖示。

2  是否需要提高workers數目

(1)  最好 一臺機器上的一個topology只使用一個worker,主要原因時減少了worker之間的數據傳輸

(2)  有更多的worker可能會有更好的表現,這取決于你的瓶頸所在。每個worker都有通過一個線程將元組轉移到其他的worker,所以如果你的瓶頸在CPU和每個worker正在處理大量的元組,更多的worker可能會提高你的吞吐量。

所以基本上沒有明確的答案,你應該根據你的環境和設計來嘗試不同的配置。

3 executor的數目
executor是真正的并行度(事實上的并行度)。(task數目是想要設置的并行度)
executor初始數目=spout數目+bolt數目+acker數目 (這些加起來也就是task數目。)
spout數目,bolt數目,acker數目運行時是不會變化的,但是executor數目可以變化。

4   是否需要提高TASK數目
TASK的存在只是為了topology擴展的靈活性,與并行度無關。

1個task執行著實際的數據處理。
1個worker進程執行一個拓撲的子集。1個worker進程從屬于1個特定的拓撲,并運行著這個拓撲的1個或多個組件(spout或bolt)的1個或多個executor。一個運行中的拓撲包括集群中的許多臺機器上的許多個這樣的進程。
1個executor是1個worker進程生成的1個線程。它可能運行著1個相同的組件(spout或bolt)的1個或多個task。
1 個task執行著實際的數據處理,你用代碼實現的每一個spout或bolt就相當于分布于整個集群中的許多個task。在1個拓撲的生命周期中,1個組 件的task的數量總是一樣的,但是1個組件的executor(線程)的數量可以隨著時間而改變。這意味著下面的條件總是成立:thread的數量 <= task的數量。默認情況下,task的數量與executor的數量一樣,例如,Storm會在每1個線程運行1個task。

配置拓撲的并發度

注意Storm的術語"并發度(parallelism)"是特別用來描述所謂的parallelism hint的,這代表1個組件的初始的executor(線程)的數量。在此文檔中我們使用術語"并發度"的一般意義來描述你不但可以配置executor的數量,還可以配置worker進程的數量,還可以是1個拓撲的task的數量。在用到并發度的狹義的定義時我們會特別提出。
下面的小節給出了一些不同的配置選項,還有你如何在代碼中設置它們。有多種方法可以進行設置,表格列舉了其中幾種。Storm目前有下列的配置優先級:defaults.yaml < storm.yaml < 特定拓撲的配置 < 內部特定組件的配置 < 外部特定組件的配置。

Worker 數量
說明:拓撲在集群中運行所需要的工作進程數
配置選項:TOPOLOGY_WORKERS

在代碼中如何使用(示例):
Config#setNumWorkers

Executors(線程)數量
說明:每個組件需要的執行線程數
配置選項:(沒有拓撲級的通用配置項)
在代碼中如何使用(示例):
TopologyBuilder#setSpout()

TopologyBuilder#setBolt()

注意:從 Storm 0.8 開始 parallelism_hint 參數代表 executor 的數量,而不是 task 的數量
Tasks 數量
說明:每個組件需要的執行任務數
配置選項:TOPOLOGY_TASKS

在代碼中如何使用(示例):
ComponentConfigurationDeclarer#setNumTasks()

以下是配置上述參數的一個簡單示例代碼:

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout);

在上面的代碼中,我們為 GreenBolt 配置了 2 個初始執行線程(executor)和 4 個關聯任務(task)。這樣,每個執行線程中會運行 2 個任務。如果你在設置 bolt 的時候不指定 task 的數量,那么每個 executor 的 task 數會默認設置為 1。

拓撲示例

下圖顯示了一個與實際應用場景很接近的簡單拓撲的結構。這個拓撲由三個組件構成:一個名為 BlueSpout 的 spout,和兩個名為 GreenBolt 和 YellowBolt 的 bolt。這些組件之間的關系是:BlueSpout 將它的輸出發送到 GreenBolt 中,然后GreenBolt 將消息繼續發送到 YellowBolt 中。

圖中是一個包含有兩個 worker 進程的拓撲。其中,藍色的 BlueSpout 有兩個 executor,每個 executor 中有一個 task,并行度為 2;綠色的 GreenBolt 有兩個 executor,每個 executor 有兩個 task,并行度也為2;而黃色的YellowBolt 有 6 個 executor,每個 executor 中有一個 task,并行度為 6,因此,這個拓撲的總并行度就是 2 + 2 + 6 = 10。具體分配到每個 worker 就有 10 / 2 = 5 個 executor。 

上圖中,GreenBolt 配置了 task 數,而 BlueSpout 和 YellowBolt 僅僅配置了 executor 數。下面是相關代碼:

Config conf = new Config();
conf.setNumWorkers(2); // use two worker processestopologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // set parallelism hint to 2topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
               .setNumTasks(4)
               .shuffleGrouping("blue-spout");
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
               .shuffleGrouping("green-bolt");StormSubmitter.submitTopology(        "mytopology",
        conf,
        topologyBuilder.createTopology()
    );

當然,Storm 還有一些其他的配置項可以控制拓撲的并行度,包括:
TOPOLOGY_MAX_TASK_PARALLELISM:該選項設置了一個組件最多能夠分配的 executor 數(線程數上限),一般用于在本地模式運行拓撲時測試分配線程的數量限制。你可以通過 Config#setMaxTaskParallelism() 來配置該參數。

如何修改運行中的拓撲的并行度

Storm 的一個很有意思的特點是你可以隨時增加或者減少 worker 或者 executor 的數量,而不需要重啟集群或者拓撲。這個方法就叫做再平衡(rebalance)。
有兩種方法可以對一個拓撲執行再平衡操作:
使用 Storm UI
使用以下所示的客戶端(CLI)工具
下面是使用 CLI 工具的一個簡單示例: 

## 重新配置拓撲 "mytopology",使得該拓撲擁有 5 個 worker processes,## 另外,配置名為 "blue-spout" 的 spout 使用 3 個 executor,## 配置名為 "yellow-bolt" 的 bolt 使用 10 個 executor。$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10

 本文由用戶 MillieSpurl 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!