Flink運行時之生成作業圖

JeremiahF63 7年前發布 | 7K 次閱讀 技術

生成作業圖

在分析完了流處理程序生成的流圖(StreamGraph)以及批處理程序生成的優化后的計劃(OptimizedPlan)之后,下一步就是生成它們面向Flink運行時執行引擎的共同抽象——作業圖(JobGraph)。

什么是作業圖

作業圖(JobGraph)是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一。

相比流圖(StreamGraph)以及批處理優化計劃(OptimizedPlan),JobGraph發生了一些變化,已經不完全是“靜態”的數據結構了,因為它加入了中間結果集(IntermediateDataSet)這一“動態”概念。

作業頂點(JobVertex)、中間數據集(IntermediateDataSet)、作業邊(JobEdge)是組成JobGraph的基本元素。這三個對象彼此之間互為依賴:

  • 一個JobVertex關聯著若干個JobEdge作為輸入端以及若干個IntermediateDataSet作為其生產的結果集;
  • 一個IntermediateDataSet關聯著一個JobVertex作為生產者以及若干個JobEdge作為消費者;
  • 一個JobEdge關聯著一個IntermediateDataSet可認為是源以及一個JobVertex可認為是目標消費者;

因此一個JobGraph可能的圖形化表示如下:

那么JobGraph是怎么組織并存儲這些元素的呢?其實JobGraph只以Map的形式存儲了所有的JobVertex,鍵是JobVertexID:

private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

至于其它的元素,通過JobVertex都可以根據關系找尋到。

JobGraph包含了如下這些屬性:

  • 描述作業相關的信息,比如上面的頂點、作業編號、名稱等;
  • 用戶程序包相關的信息,比如類路徑等;
  • 執行的一些配置信息,比如異步快照的配置、會話超時、是否允許排隊調度等;

絕大部分的實例方法都是維護這些屬性的。

需要注意的是,用于迭代的反饋邊(feedback edge)當前并不體現在JobGraph中,而是被內嵌在特殊的JobVertex中通過反饋信道(feedback channel)在它們之間建立關系。

流圖生成作業圖

這篇文章我們來分析流處理程序是如何從之前的Stream生成JobGraph的。這部分的實現位于類StreamingJobGraphGenerator中,它是流處理程序的JobGraph生成器,其核心是createJobGraph方法,它體現了生成JobGraph的主干調用,實現代碼如下:

public JobGraph createJobGraph() {
//創建一個JobGraph實例對象 jobGraph = new JobGraph(streamGraph.getJobName());

//設置對task的調度模式為ALL,即所有的算子立即同時啟動
jobGraph.setScheduleMode(ScheduleMode.ALL);   

//對用于輔助生成JobGraph的一些實例變量進行初始化
init();   

//給StreamGraph的每個StreamNode生成一個hash值,該hash值在節點不發生改變的情況下多次生成始終是一致的,
//可用來判斷節點在多次提交時是否產生了變化并且該值也將作為JobVertex的ID
Map<Integer, byte[]> hashes = traverseStreamGraphAndGenerateHashes();   

//基于StreamGraph從所有的source開始構建task chain
setChaining(hashes);   

//給頂點設置物理邊(入邊)
setPhysicalEdges();   

//為每個JobVertex設置slotShareGroup,同時為迭代的source/sink對設置CoLocationGroup
setSlotSharing();      

//配置檢查點
configureCheckpointing();   

//配置重啟策略
configureRestartStrategy();   

//傳遞執行配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());    

return jobGraph;

}</code></pre>

接下來我們挨個對幾個關鍵的方法進行分析。第一個要分析的方法是traverseStreamGraphAndGenerateHashes,它會對StreamGraph進行遍歷并為每一個StreamNode都生成其哈希值,生成的哈希值將用于為每個JobVertex創建JobVertexID。方法的完整實現如下:

private Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes() {
    //hash函數
final HashFunction hashFunction = Hashing.murmur3_128(0);
final Map<Integer, byte[]> hashes = new HashMap<>();
//存儲訪問過了的節點編號 Set<Integer> visited = new HashSet<>();
//入隊即將訪問的節點對象 Queue<StreamNode> remaining = new ArrayDeque<>();

//source是一個流拓撲的起點,從source開始遍歷
//hash值的生成是順序敏感的(依賴于順序),因此首先要對source ID集合進行排序
//因為如果source的ID集合順序不固定,那意味著多次提交包含該source ID集合的程序時可能導致不同的遍歷路徑,
//從而破壞了hash生成的因素
List<Integer> sources = new ArrayList<>();   
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {      
    sources.add(sourceNodeId);   
}   
Collections.sort(sources);   

//按照排好的順序,進行廣度遍歷,注意這不是樹結構,而是圖,因為就一個節點而言,其輸入和輸出都可能有多條路徑
for (Integer sourceNodeId : sources) {      
    remaining.add(streamGraph.getStreamNode(sourceNodeId));      
    visited.add(sourceNodeId);   
}   
StreamNode currentNode;   
//從即將訪問的節點隊列中出隊首部的一個元素,沒有元素了則結束
while ((currentNode = remaining.poll()) != null) {      
    // 給當前節點生成哈希值,并返回是否生成成功
    if (generateNodeHash(currentNode, hashFunction, hashes)) {         
        //遍歷當前節點的所有輸出邊
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            //獲取輸出邊的目標頂點(該邊另一頭的頂點)            
            StreamNode child = outEdge.getTargetVertex();
            //如果目標頂點沒被訪問過,則加入待訪問隊列和易訪問元素集合            
            if (!visited.contains(child.getId())) {               
                remaining.add(child);               
                visited.add(child.getId());            
            }         
        }      
    }      
    else {         
        //如果對當前節點的哈希值生成操作失敗,則將其從已訪問的節點中移除,等待后續再次訪問        
        visited.remove(currentNode.getId());      
    }   
}   
return hashes;

}</code></pre>

在上面代碼段中調用的generateNodeHash方法,其實現邏輯大致分為兩大部分,這兩部分對應了生成哈希的兩種方式:

  • 根據StreamTransformation的編號進行計算
  • 根據一些因素來綜合計算

第二種方式對應的因素有如下三種:

  • 節點相關的屬性(ID、并行度、UDF的類名)
  • 鏈接在一起的輸出節點相關的屬性
  • 輸入節點的哈希值

這里值得注意的是節點相關的ID屬性,它并不是StreamTransformation的ID,因為StreamTransformation的ID是一個靜態計數器,它可能會導致邏輯相同的Job最終生成的哈希值卻不同。考慮下面的示例:

//program 1
DataStream<String> s1 = ...;     //s1.ID = 1
DataStream<String> s2 = ...;     //s2.ID = 2
s1.union(s2).print();
//program 2
DataStream<String> s2 = ...;     //s2.ID = 1
DataStream<String> s1 = ...;     //s1.ID = 2
s1.union(s2).print();

對于上面示例代碼中的兩個語義等價的程序,當借助StreamTransformation的ID屬性來生成哈希值時會出現不一致。因此,Flink所使用的ID值其實是已完成哈希值計算的節點數目。這樣就不會出現上述因為source定義的順序不同而導致語義上等價的程序產生不一致哈希值的情況。最終traverseStreamGraphAndGenerateHashes方法將會為所有的StreamNode生成對應的哈希值。

為了更高效得執行,Flink對DAG在調度上進行了優化,該優化稱之為 算子鏈接 (operator chain)。它允許某些算子可以“鏈接”在一起,在調度時這些被鏈接到一起的算子會被視為一個任務(Task)。而在執行時,一個Task會被并行化成若干個subTask實例進行執行,一個subTask對應一個執行線程。算子鏈接的示意圖如下:

這種優化能減少線程之間的切換和跨節點的數據交換從而在減少時延的同時提升吞吐量。

當算子互相鏈接之后,原先存在于互相鏈接的算子之間的邊就只是邏輯上存在的。而被鏈接的算子整體跟其他無法與其鏈接的算子之間的邊才是真正的物理邊。另外,為了方便源碼解讀,需要對“鏈接”和“連接”加以區分。在當前的上下文中,“鏈接”指的是“算子鏈”的形成方式,而“連接”指的是在算子之間建立關系。

接下來我們就來分析,將算子鏈接起來的setChaining方法。setChaining會沿著source生成算子鏈(但不要被其方法名誤導,它其實還完成了很多額外的工作,比如創建JobVertex)。

setChaining會遍歷StreamGraph中的sourceID集合。為每個source調用createChain方法,該方法以當前source為起點向后遍歷并創建算子鏈。createChain方法會收集當前節點所連接的物理邊,并為鏈接頭節點與物理邊下游的算子建立連接關系。

/**

  • @param startNodeId : 起始節點編號
  • @param currentNodeId : 當前遍歷節點編號
  • @param hashes : 節點編號與hash值映射表
  • @return 遍歷過的邊集合 */ private List<StreamEdge> createChain(Integer startNodeId,
                                 Integer currentNodeId, 
                                 Map<Integer, byte[]> hashes,
                                 int chainIndex) {   
    
    //如果起始節點沒有被構建過,才進入分支;否則直接返回一個空List(遞歸結束條件) if (!builtVertices.contains(startNodeId)) {
     //存儲遍歷過的邊,該對象被作為最終結果返回  
     List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
     //存儲可以被鏈接的出邊      
     List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();      
     //存儲不可被鏈接的出邊
     List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();      
     //遍歷當前節點的每個出邊
     for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
         //如果該出邊是可被鏈接的,則加入可被鏈接的出邊集合,否則加入不可被鏈接的出邊集合         
         if (isChainable(outEdge)) {            
             chainableOutputs.add(outEdge);         
         } else {            
             nonChainableOutputs.add(outEdge);         
         }      
     }      
     //遍歷每個可被鏈接的出邊,然后進行遞歸遍歷
     for (StreamEdge chainable : chainableOutputs) {      
         //起始節點不變,以該可被鏈接的出邊的目標節點作為“當前”節點進行遞歸遍歷并將遍歷過的邊集合加入到當前集合中  
         //這里值得注意的是所有可鏈接的邊本身并不會被加入這個集合! 
         transitiveOutEdges.addAll(
             createChain(startNodeId, chainable.getTargetId(), hashes, chainIndex + 1));      
     }      
     //遍歷不可鏈接的出邊,同樣進行遞歸遍歷
     for (StreamEdge nonChainable : nonChainableOutputs) {       
         //將當前不可鏈接的出邊加入到遍歷過的邊集合中  
         transitiveOutEdges.add(nonChainable);    
         //同樣進行遞歸遍歷,不過這里的起始節點和當前節點都被設置為該邊的目標節點     
         createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, 0);      
     }      
     //為當前節點創建鏈接的完整名稱,如果當前節點沒有可鏈接的邊,那么其名稱將直接是當前節點的operator名稱
     chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));   
     //創建流配置對象,流配置對象針對單個作業頂點而言,包含了頂點相關的所有信息。
     //當創建配置對象的時候,如果當前節點即為起始節點(鏈接頭),會先為該節點創建JobVertex對象
     StreamConfig config = currentNodeId.equals(startNodeId)            
                             ? createJobVertex(startNodeId, hashes)            
                             : new StreamConfig(new Configuration());  
     //然后為當前節點初始化流配置對象里的一系列屬性    
     setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);  
     //如果當前節點是起始節點(chain頭節點)    
     if (currentNodeId.equals(startNodeId)) {         
         //設置該節點是chain的開始
         config.setChainStart();
         config.setChainIndex(0);
         //設置不可鏈接的出邊         
         config.setOutEdgesInOrder(transitiveOutEdges);         
         //設置所有出邊
         config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); 
         //遍歷當前節點的所有不可鏈接的出邊集合        
         for (StreamEdge edge : transitiveOutEdges) {            
             //給當前節點到不可鏈接的出邊之間建立連接
             //通過出邊找到其下游流節點,根據邊的分區器類型,構建下游流節點跟輸入端上游流節點(也即起始節點)
             //的連接關系。在這個構建的過程中也就創建了IntermediateDataSet及JobEdge并跟當前節點的JobVertex
             //三者建立了關聯關系
             connect(startNodeId, edge);         
         }         
         //將當前節點的所有子節點的流配置對象進行序列化
         config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));      
     } else {         //如果當前節點是chain中的節點,而非chain的頭節點
         Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);         
         if (chainedConfs == null) {            
             chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());         
         }
         config.setChainIndex(chainIndex);         
         //將當前節點的流配置對象加入到chain頭節點點相關的配置中
         chainedConfigs.get(startNodeId).put(currentNodeId, config);      
     }      
     //返回所有不可鏈接的邊
     return transitiveOutEdges;   
    
    } else {
     return new ArrayList<>();   
    
    } }</code></pre>

    上面的代碼段中會先將當前節點的出邊按照它們是否是可被鏈接進行分類,isChainable方法包含了判斷邏輯,一個出邊如果是可鏈接的,它需要滿足的條件如下:

    return downStreamVertex.getInEdges().size() == 1         //如果邊的下游流節點的入邊數目為1(也即其為單輸入算子)
    
     && outOperator != null                           //邊的下游節點對應的算子不為null
     && headOperator != null                          //邊的上游節點對應的算子不為null
     && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)      //邊兩端節點有相同的槽共享組名稱
     && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS //邊下游算子的鏈接策略為ALWAYS     
     && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||         
     headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)//上游算子的鏈接策略為HEAD或者ALWAYS   
     && (edge.getPartitioner() instanceof ForwardPartitioner)      //邊的分區器類型是ForwardPartitioner
     && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()   //上下游節點的并行度相等   
     && streamGraph.isChainingEnabled();        //當前的streamGraph允許鏈接的</code></pre> 
    

    在createChain中會調用createJobVertex為鏈接頭節點或者無法鏈接的節點創建JobVertex對象,創建完成之后會將它加入JobGraph并為當前的這個JobVertex創建流配置對象(StreamConfig)。

    對于無法鏈接的物理邊,Flink會將鏈頭(chain header)與這些物理邊(以及物理邊所連接著的目標算子)進行連接(代碼段中的connect方法),連接的過程也是創建JobEdge與IntermediateDataSet并跟它們建立關系的過程。

    現在讓我們回到createJobGraph方法的上下文中來,在setChaining方法調用中找出了物理出邊以及從源到目的節點之間建立了連接。接著,會調用setPhysicalEdges從目標節點向源節點之間建立入邊的連接。

    接下來,為相關的節點設置槽共享組(SlotSharingGroup)以及同位組(CoLocationGroup),這兩種機制都用于限制算子的部署。其中,CoLocationGroup主要用于迭代算子的執行。

    當用戶的Flink程序配置了檢查點信息,那么需要將檢查點相關的配置加入到JobGraph中去,這部分邏輯通過方法configureCheckpointing來完成,它將JobVertex劃分成三類:

    • triggerVertices:存儲接收“觸發檢查點”消息的JobVertex集合,當前只收集source頂點;
    • ackVertices:收集需要應答檢查點消息的JobVertex集合,當前收集所有的JobVertex;
    • commitVertices:存儲接收“提交檢查點”消息的JobVertex集合,當前收集所有JobVertex;

    這些信息都被封裝在JobSnapshottingSettings對象中,然后被存儲到JobGraph。

    基本生成JobGraph的主要步驟就是這些。接下來,我們將分析批處理程序在優化器生成的OptimizedPlan的基礎之上如何生成的JobGraph。

    優化后的計劃生成作業圖

    分析完了流圖如何生成作業圖,下面我們來分析批處理程序經過優化后的計劃如何生成作業圖。其核心代碼位于flink-clients模塊下的ClusterClient類中的getJobGraph方法中:

    JobGraphGenerator gen = new JobGraphGenerator(this.config);
    job = gen.compileJobGraph((OptimizedPlan) optPlan);

    這里的JobGraphGenerator位于optimizer模塊中(注意跟流處理中生成JobGraph的StreamingJobGraphGenerator進行區別),它用于將優化器優化后的OptimizedPlan編譯成JobGraph。編譯的過程不作任何決策與假設,也就是說作業最終如何被執行早已被優化器確定,而編譯也是在此基礎上做確定性的映射。

    JobGraphGenerator實現了Visitor接口,因此它是一個遍歷器,遍歷的對象是計劃節點(PlanNode)。

    關于遍歷器、計劃節點等更多的細節請參考“優化器”相關的文章。

    compileJobGraph方法在內部調用OptimizedPlan的accept方法遍歷它,而遍歷訪問器就是JobGraphGenerator自身:

    program.accept(this);

    在OptimizedPlan中,accept會挨個在每個sink上調用accept:

    public void accept(Visitor<PlanNode> visitor) {
    for (SinkPlanNode node : this.dataSinks) {
     node.accept(visitor);   
    
    } }</code></pre>

    批處理中的計劃是以sink作為起始點,然后通過遍歷訪問器逆向遍歷直至source。

    從sink開始的逆向遍歷是符合特定的模式的:

    public void accept(Visitor<PlanNode> visitor) {
     //前置遍歷,如果返回值為true,才會進行更進一步的后續操作
    if (visitor.preVisit(this)) {
     //獲取到當前sink的輸入端繼續遍歷,該調用會引發遞歸調用      
     this.input.getSource().accept(visitor);      
     //獲得所有的廣播輸入通道,對所有的廣播輸入通道源進行遍歷      
     for (Channel broadcastInput : getBroadcastInputs()) {         
         broadcastInput.getSource().accept(visitor);      
     }
     //進行后置遍歷            
     visitor.postVisit(this);   
    
    } }</code></pre>

    先來分析一下preVisit方法,它是遍歷時的“前進”方法,它會對要遍歷的PlanNode的具體類型進行枚舉推斷,針對不同的類型為其創建對應的JobVertex對象,接著為JobVertex對象設置相關屬性,最后將其加入到一個公共的PlanNode與JobVertex的映射字典中去。

    接下來是postVisit方法,它可以看成是遍歷時的“后退”方法,當在某個節點上調用到postVisit方法時,表明該節點的前任(從正常的source往sink方向)都已經遍歷完成。因此該方法在這里用來將當前節點與其前任建立連接。

    postVisit方法同樣會判斷節點的類型,特殊節點特殊處理。例如,如果節點的類型是IterationPlanNode,那么它將立即遍歷迭代路徑中的節點。這里有可能存在遞歸遍歷,所以使用了一個“棧”結構來保存當前節點。

    if (this.currentIteration != null) {
    this.iterationStack.add(this.currentIteration); } this.currentIteration = (IterationPlanNode) node; this.currentIteration.acceptForStepFunction(this);

if (this.iterationStack.isEmpty()) {
this.currentIteration = null; } else {
this.currentIteration = this.iterationStack.remove(this.iterationStack.size() - 1); }</code></pre>

回到compileJobGraph方法的上下文中,在對OptimizedPlan進行遍歷之后,會對收集到的迭代節點進行處理。通過遍歷迭代描述符(IterationDescriptor)并判斷其代表的節點屬于哪種迭代類型來進行特定的處理:

for (IterationDescriptor iteration : this.iterations.values()) {   
    if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {      
        finalizeBulkIteration(iteration);   
    } else if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) {      
        finalizeWorksetIteration(iteration);   
    } else {      
        throw new CompilerException();   
    }
}

到此,遍歷工作已經完成。下面會把鏈接任務的配置寫入其父節點(也就是容器節點)的配置中。接著新建JobGraph對象并進行一系列設置,比如添加JobVertex、為JobVertex設置SlotSharingGroup等。然后將之前注冊的緩存文件加入到Job的配置中,釋放相關資源后返回JobGraph對象。

 

 

來自:http://blog.csdn.net/yanghua_kobe/article/details/56321793

 

 本文由用戶 JeremiahF63 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!