Flink運行時之流處理程序生成流圖

紫丁香 7年前發布 | 7K 次閱讀 技術

DataStream API所編寫的流處理應用程序在生成作業圖(JobGraph)并提交給JobManager之前,會預先生成流圖(StreamGraph)。

什么是流圖

流圖(StreamGraph)是表示流處理程序拓撲的數據結構,它封裝了生成作業圖(JobGraph)的必要信息。它的類繼承關系如下圖所示:

當你基于 StreamGraph 的繼承鏈向上追溯,會發現它實現了 FlinkPlan 接口。

Flink效仿了傳統的關系型數據庫在執行SQL時生成執行計劃并對其進行優化的思路。FlinkPlan是Flink生成執行計劃的基接口,定義在Flink優化器模塊中,流處理程序對應的計劃是StreamingPlan,但是當前針對流處理程序沒有進行優化,因此這個類可看作是一個預留設計。

一個簡單的實現“word count”的流處理程序,其StreamGraph的形象化表示如下圖:

Flink官方提供了一個 計劃可視化器 來圖形化執行計劃,該計劃可視化器基于Flink API所生成的計劃的JSON格式表示繪制圖形。但是需要注意的是,計劃的JSON形式表示缺失了很多屬性以及部分節點(比如虛擬節點等);

上面的圖是由“節點”和“邊”組成的。節點在Flink中對應的數據結構是StreamNode,而邊對應的數據結構是StreamEdge,StreamNode和StreamEdge之間有著雙向的依賴關系。StreamEdge包含了其連接的源節點sourceVertex和目的節點targetVertex:

而StreamNode中包含了與其連接的入邊集合inEdges和出邊集合outEdges:

StreamEdge和StreamNode都有唯一的編號進行標識,但是各自編號的生成規則并不相同。

StreamNode的編號id的生成是通過調用StreamTransformation的靜態方法getNewNodeId獲得的,其實現是一個靜態計數器:

protected static Integer idCounter = 0;
publicstaticintgetNewNodeId(){   
    idCounter++;   
    return idCounter;
}

StreamEdge的編號edgeId是字符串類型,其生成的規則為:

this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames 
                + "_" + outputPartitioner;

它是由多個段連接起來的,語義的文字表述如下:

源頂點_目的頂點_輸入類型數量_輸出選擇器的名稱_輸出分區器

edgeId除了用來實現StreamEdge的hashCode及equals方法之外并沒有其他實際意義。

StreamNode是表示流處理中算子的數據結構,source和sink在StreamGraph中也是以StreamNode表示,它們也是一種算子,只是因為它們是流的輸入和輸出因而有特定的稱呼。

StreamNode除了存儲了輸入端和輸出端的StreamEdge集合,還封裝了算子的其他關鍵屬性,比如其并行度、分區的鍵信息、輸入與輸出類型的序列化器等。

從直觀上來看你已經知道了StreamNode和StreamEdge是StreamGraph的重要組成部分,但是為了生成JobGraph,StreamGraph很顯然必須得包含更多的內容。總結一下,StreamGraph中包含的屬性可分為三大類:

  • 流處理程序的執行配置;
  • 流處理程序拓撲中包含的節點和邊的信息;
  • 迭代相關的信息;

當然圍繞這些屬性的方法非常多,比如添加邊和節點,創建迭代的source/sink等。

其中的一個關鍵方法getJobGraph將用于生成JobGraph:

public JobGraphgetJobGraph(){     
    if (isIterative() && checkpointConfig.isCheckpointingEnabled() 
        && !checkpointConfig.isForceCheckpointing()) {      
        throw new UnsupportedOperationException(            
            "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "                  
            + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "                  
            + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");   
    }   
    StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);   
    returnjobgraphGenerator.createJobGraph();
}

從上面的代碼段也可見,當流處理程序中包含迭代邏輯時,檢查點功能暫時不被支持,在異常信息中Flink闡述了緣由:在迭代作業中無法保證“恰好一次”的語義。

流處理程序依賴StreamingJobGraphGenerator來生成JobGraph,至于如何生成,后續會進行剖析。

生成流圖的源碼分析

了解了什么是流圖(StreamGraph)之后,我們來分析它是如何生成的。流圖的生成是通過StreamExecutionEnvironment的getStreamGraph實例方法觸發的:

public StreamGraphgetStreamGraph(){   
    if (transformations.size() <= 0) {      
        throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");   
    }   
    returnStreamGraphGenerator.generate(this, transformations);
}

從代碼段中可見,StreamGraph的生成依賴于一個名為transformations的集合對象,它是環境對象所收集到的所有的轉換對象的集合,該集合中存儲著一個流處理程序中所有的轉換操作對應的StreamTransformation對象。

每當在DataStream對象上調用transform方法或者調用已經被實現了的一些內置的轉換函數(如map、filter等,這些轉換函數在內部也調用了transform方法),這些調用都會使得其對應的轉換對象被加入到transformations集合中去。StreamTransformation表示創建DataStream對象的轉換,流處理程序中存在多種DataStream,每種底層都對應著一個StreamTransformation。DataStream持有執行環境對象的引用,當調用transform方法時,它會調用執行環境對象的addOperator方法,將特定的StreamTransformation對象加入到transformations集合中去,這就是transformations集合中元素的來源。

DataStream API的設計存在著多重對象的封裝,我們以flatMap轉換操作為例圖示各種對象之間的構建關系:

在Flink的源碼中,這些對象的命名也并不是那么準確,比如上圖中的SingleOutputStreamOperator其實是一種DataStream,但卻以Operator結尾,讓人匪夷所思。因此較為準確的鑒定它們類型的方式是通過查看它們的繼承鏈來進行識別。

StreamGraph的生成依賴于生成器StreamGraphGenerator,每調用一次靜態方法generate才會在內部創建一個StreamGraphGenerator的實例,一個實例對應著一個StreamGraph對象。StreamGraphGenerator調用內部的實例方法generateInternal來遍歷transformations集合的每個對象:

private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {   
    for (StreamTransformation<?> transformation: transformations) {
        transform(transformation);   
    }   
    return streamGraph;
}

在transform方法中,它枚舉了Flink中每一種轉換類型,并對當前傳入的轉換類型進行判斷,然后將其分發給特定的轉換方法進行轉換,最終返回當前StreamGraph對象中跟該轉換有關的節點編號集合。

這里我們以常用的單輸入轉換方法transformOnInputTransform為例來進行分析:

private <IN, OUT> Collection<Integer> transformOnInputTransform(
    OneInputTransformation<IN, OUT> transform) {
    //遞歸地對該轉換的輸入端進行轉換
    Collection<Integer> inputIds = transform(transform.getInput());   
    // 遞歸調用可能會產生重復,這里需要以轉換過的對象進行檢查
    if (alreadyTransformed.containsKey(transform)) {      
        return alreadyTransformed.get(transform);   
    }

    //結合輸入端對應的節點編號來判斷并得出槽共享組的名稱
    String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);   
    //將當前算子(節點)加入到流圖中
    streamGraph.addOperator(transform.getId(),         
        slotSharingGroup,         
        transform.getOperator(),         
        transform.getInputType(),         
        transform.getOutputType(),         
        transform.getName());
    //如果有鍵選擇器,則進行設置
    if (transform.getStateKeySelector() != null) {      
        TypeSerializer<?> keySerializer = 
            transform.getStateKeyType().createSerializer(env.getConfig());      
        streamGraph.setOneInputStateKey(transform.getId(), 
            transform.getStateKeySelector(), keySerializer);   
    }   
    streamGraph.setParallelism(transform.getId(), transform.getParallelism()); 
    //構建從當前轉換對應的節點到輸入轉換對應的節點之間的邊
    for (Integer inputId: inputIds) {      
        streamGraph.addEdge(inputId, transform.getId(), 0);   
    }   
    //返回當前轉換對應的節點編號
    return Collections.singleton(transform.getId());
}

每遍歷完一個轉換對象,就離構建完整的流圖更近一步。不同的轉換操作類型,它們為流圖提供的“部件”并不完全相同,有的轉換只構建節點(如SourceTransformation),有的轉換除了構建節點還構建邊(如SinkTransformation),有的只構建虛擬節點(如PartitionTransformation、SelectTransformation等)。

關于虛擬節點,這里需要說明的是并非所有轉換操作都具有實際的物理意義(即物理上對應具體的算子)。有些轉換操作只是邏輯概念(例如select,split,partition,union),它們不會構建真實的StreamNode對象。比如某個流處理應用對應的轉換樹如下圖:

但在運行時,其生成的StreamGraph卻是下面這種形式:

從圖中可以看到,轉換樹中對應的一些邏輯操作在StreamGraph中并不存在,Flink將這些邏輯轉換操作轉換成了虛擬節點,它們的信息會被綁定到從source到map轉換的邊上。

Flink當前對于流處理的程序是不作優化的,所以StreamGraph就是它的執行計劃。你可以通過Flink提供的執行計劃的可視化器將StreamGraph所表述的信息以圖形化的方式展示出來,就像上文我們展示的那幅圖一樣。那么我們如何查看我們自己所編寫的程序的執行計劃呢?其實很簡單,我們以Flink源碼中flink-examples-streaming模塊中的SocketTextStreamWordCount為例,來看一下如何生成執行計劃。

我們將SocketTextStreamWordCount最后一行代碼注釋掉:

env.execute("WordCount from SocketTextStream Example");

然后將其替換成下面這句:

System.out.println(env.getExecutionPlan());

這行語句的作用是打印當前這個程序的執行計劃,它將在控制臺產生該執行計劃的JSON格式表示:

{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream",
"parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":2,
"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":4,"type":"Keyed Aggregation",
"pact":"Operator","contents":"Keyed Aggregation","parallelism":2,"predecessors":[{"id":2,
"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Unnamed","pact":"Data Sink",
"contents":"Sink: Unnamed","parallelism":2,"predecessors":[{"id":4,"ship_strategy":"FORWARD",
"side":"second"}]}]}

把上面這段JSON字符串復制到Flink的執行計劃可視化器的輸入框中,然后點擊下方的“Draw”按鈕,即可生成。

 

 

來自:http://vinoyang.com/2017/02/05/flink-runtime-generate-stream-graph/

 

 本文由用戶 紫丁香 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!