Flink 原理與實現:如何生成 StreamGraph
繼上文 Flink 原理與實現:架構和拓撲概覽 中介紹了Flink的四層執行圖模型,本文將主要介紹 Flink 是如何根據用戶用Stream API編寫的程序,構造出一個代表拓撲結構的StreamGraph的。
注:本文比較偏源碼分析,所有代碼都是基于 flink-1.0.x 版本,建議在閱讀本文前先對Stream API有個了解,詳見 官方文檔 。
StreamGraph 相關的代碼主要在 org.apache.flink.streaming.api.graph 包中。構造StreamGraph的入口函數是 StreamGraphGenerator.generate(env, transformations) 。該函數會由觸發程序執行的方法 StreamExecutionEnvironment.execute() 調用到。也就是說 StreamGraph 是在 Client 端構造的,這也意味著我們可以在本地通過調試觀察 StreamGraph 的構造過程。
Transformation
StreamGraphGenerator.generate 的一個關鍵的參數是 List<StreamTransformation<?>> 。 StreamTransformation 代表了從一個或多個 DataStream 生成新 DataStream 的操作。 DataStream 的底層其實就是一個 StreamTransformation ,描述了這個 DataStream 是怎么來的。
StreamTransformation的類圖如下圖所示:
DataStream 上常見的 transformation 有 map、flatmap、filter等(見 DataStream Transformation 了解更多)。這些transformation會構造出一棵 StreamTransformation 樹,通過這棵樹轉換成 StreamGraph。比如 DataStream.map 源碼如下,其中 SingleOutputStreamOperator 為DataStream的子類:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 通過java reflection抽出mapper的返回值類型
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
// 返回一個新的DataStream,SteramMap 為 StreamOperator 的實現類
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// 新的transformation會連接上當前DataStream中的transformation,從而構建成一棵樹
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
// 所有的transformation都會存到 env 中,調用execute時遍歷該list生成StreamGraph
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
從上方代碼可以了解到,map轉換將用戶自定義的函數 MapFunction 包裝到 StreamMap 這個Operator中,再將 StreamMap 包裝到 OneInputTransformation ,最后該transformation存到env中,當調用 env.execute 時,遍歷其中的transformation集合構造出StreamGraph。其分層實現如下圖所示:
另外,并不是每一個 StreamTransformation 都會轉換成 runtime 層中物理操作。有一些只是邏輯概念,比如 union、split/select、partition等。如下圖所示的轉換樹,在運行時會優化成下方的操作圖。
union、split/select、partition中的信息會被寫入到 Source –> Map 的邊中。通過源碼也可以發現, UnionTransformation , SplitTransformation , SelectTransformation , PartitionTransformation 由于不包含具體的操作所以都沒有StreamOperator成員變量,而其他StreamTransformation的子類基本上都有。
StreamOperator
DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是運行時的具體實現,會決定UDF(User-Defined Funtion)的調用方式。下圖所示為 StreamOperator 的類圖(點擊查看大圖):
可以發現,所有實現類都繼承了 AbstractStreamOperator 。另外除了 project 操作,其他所有可以執行UDF代碼的實現類都繼承自 AbstractUdfStreamOperator ,該類是封裝了UDF的StreamOperator。UDF就是實現了 Function 接口的類,如 MapFunction , FilterFunction 。
生成 StreamGraph 的源碼分析
我們通過在DataStream上做了一系列的轉換(map、filter等)得到了StreamTransformation集合,然后通過 StreamGraphGenerator.generate 獲得StreamGraph,該方法的源碼如下:
// 構造 StreamGraph 入口函數
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
// 自底向上(sink->source)對轉換樹的每個transformation進行轉換。
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
// 對具體的一個transformation進行轉換,轉換成 StreamGraph 中的 StreamNode 和 StreamEdge
// 返回值為該transform的id集合,通常大小為1個(除FeedbackTransformation)
private Collection<Integer> transform(StreamTransformation<?> transform) {
// 跳過已經轉換過的transformation
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
// 為了觸發 MissingTypeInfo 的異常
transform.getOutputType();
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() > 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationId(transform.getId(), transform.getUid());
}
return transformedIds;
}
最終都會調用 transformXXX 來對具體的StreamTransformation進行轉換。我們可以看下 transformOnInputTransform(transform) 的實現:
private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
// 遞歸對該transform的直接上游transform進行轉換,獲得直接上游id集合
Collection<Integer> inputIds = transform(transform.getInput());
// 遞歸調用可能已經處理過該transform了
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
// 添加 StreamNode
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
// 添加 StreamEdge
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
該函數首先會對該transform的上游transform進行遞歸轉換,確保上游的都已經完成了轉化。然后通過transform構造出StreamNode,最后與上游的transform進行連接,構造出StreamNode。
最后再來看下對邏輯轉換(partition、union等)的處理,如下是 transformPartition 函數的源碼:
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
StreamTransformation<T> input = partition.getInput();
List<Integer> resultIds = new ArrayList<>();
// 直接上游的id
Collection<Integer> transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
// 生成一個新的虛擬id
int virtualId = StreamTransformation.getNewNodeId();
// 添加一個虛擬分區節點,不會生成 StreamNode
streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
resultIds.add(virtualId);
}
return resultIds;
}
對partition的轉換沒有生成具體的StreamNode和StreamEdge,而是添加一個虛節點。當partition的下游transform(如map)添加edge時(調用 StreamGraph.addEdge ),會把partition信息寫入到edge中。如 StreamGraph.addEdgeInternal 所示:
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, new ArrayList<String>());
}
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames) {
// 當上游是select時,遞歸調用,并傳入select信息
if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
// select上游的節點id
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
}
// 當上游是partition時,遞歸調用,并傳入partitioner信息
else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
// partition上游的節點id
upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtuaPartitionNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
} else {
// 真正構建StreamEdge
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 未指定partitioner的話,會為其選擇 forward 或 rebalance 分區。
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
// 健康檢查, forward 分區必須要上下游的并發度一致
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
// 創建 StreamEdge
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
// 將該 StreamEdge 添加到上游的輸出,下游的輸入
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
總結
本文主要介紹了 Stream API 中 Transformation 和 Operator 的概念,以及如何根據Stream API編寫的程序,構造出一個代表拓撲結構的StreamGraph的。本文的源碼分析涉及到較多代碼,如果有興趣建議結合完整源碼進行學習。下一篇文章將介紹 StreamGraph 如何轉換成 JobGraph 的,其中設計到了圖優化的技巧。
來自: http://wuchong.me/blog/2016/05/04/flink-internal-how-to-build-streamgraph