2。1。3虚拟Transformation的转换 虚拟的Transformation生成的时候不会转换为SteramNode,而是添加为虚拟节点。 privatevoidaddEdgeInternal(IntegerupStreamVertexID,IntegerdownStreamVertexID,inttypeNumber,StreamPartitionerlt;?partitioner,ListStringoutputNames,OutputTagoutputTag,ShuffleModeshuffleMode){当上游是sideoutput时,递归调用,并传入sideoutput信息if(virtualSideOutputNodes。containsKey(upStreamVertexID)){intvirtualIdupStreamVertexID;upStreamVertexIDvirtualSideOutputNodes。get(virtualId)。f0;if(outputTagnull){outputTagvirtualSideOutputNodes。get(virtualId)。f1;}addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,null,outputTag,shuffleMode);}当上游是select时,递归调用,并传入select信息elseif(virtualSelectNodes。containsKey(upStreamVertexID)){intvirtualIdupStreamVertexID;upStreamVertexIDvirtualSelectNodes。get(virtualId)。f0;if(outputNames。isEmpty()){selectionsthathappendownstreamoverrideearlierselectionsoutputNamesvirtualSelectNodes。get(virtualId)。f1;}addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputNames,outputTag,shuffleMode);}当上游是Partition时,递归调用,并传入Partition信息elseif(virtualPartitionNodes。containsKey(upStreamVertexID)){intvirtualIdupStreamVertexID;upStreamVertexIDvirtualPartitionNodes。get(virtualId)。f0;if(partitionernull){partitionervirtualPartitionNodes。get(virtualId)。f1;}shuffleModevirtualPartitionNodes。get(virtualId)。f2;addEdgeInternal(upStreamVertexID,downStreamVertexID,typeNumber,partitioner,outputNames,outputTag,shuffleMode);}不是以上逻辑转换的情况,真正构建StreamEdgeelse{StreamNodeupstreamNodegetStreamNode(upStreamVertexID);StreamNodedownstreamNodegetStreamNode(downStreamVertexID);Ifnopartitionerwasspecifiedandtheparallelismofupstreamanddownstreamoperatormatchesuseforwardpartitioning,userebalanceotherwise。没有指定partitioner时,会为其选择forward或者rebalanceif(partitionernullupstreamNode。getParallelism()downstreamNode。getParallelism()){partitionernewForwardPartitionerObject();}elseif(partitionernull){partitionernewRebalancePartitionerObject();}if(partitionerinstanceofForwardPartitioner){if(upstreamNode。getParallelism()!downstreamNode。getParallelism()){thrownewUnsupportedOperationException(Forwardpartitioningdoesnotallowchangeofparallelism。Upstreamoperation:upstreamNodeparallelism:upstreamNode。getParallelism(),downstreamoperation:downstreamNodeparallelism:downstreamNode。getParallelism()Youmustuseanotherpartitioningstrategy,suchasbroadcast,rebalance,shuffleorglobal。);}}if(shuffleModenull){shuffleModeShuffleMode。UNDEFINED;}创建StreamEdge,并将该SteramEdge添加到上游的输出,下游的输入。StreamEdgeedgenewStreamEdge(upstreamNode,downstreamNode,typeNumber,outputNames,partitioner,outputTag,shuffleMode);getStreamNode(edge。getSourceId())。addOutEdge(edge);getStreamNode(edge。getTargetId())。addInEdge(edge);}}2。2作业图 JobGraph可以由流计算的StreamGraph和批处理的OptimizedPlan转换而来。流计算中,在StreamGraph的基础上进行了一些优化,如果通过OperatorChain机制将算子合并起来,在执行时,调度在同一个Task线程上,避免数据的跨线程、跨网段的传递。 2。2。1JobGraph核心对象JobVertex 经过算子融合优化后符合条件的多个SteramNode可能会融合在一起生成一个JobVertex,即一个JobVertex包含一个或多个算子,JobVertex的输入是JobEdge,输出是IntermediateDataSet。JobEdge JobEdge是JobGraph中连接IntermediateDataSet和JobVertex的边,表示JobGraph中的一个数据流转通道,其上游数据源是IntermediateDataSet,下游消费者是JobVertex。数据通过JobEdge由IntermediateDataSet传递给JobVertex。IntermediateDataSet 中间数据集IntermediateDataSet是一种逻辑结构,用来表示JobVertex的输出,即该JobVertex中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。 IntermediateDataSet的个数与该JobVertex对应的StreamNode的出边数量相同,可以是一个或者多个。2。2。2JobGraph生成过程 StreamingJobGraphGenerator负责流计算JobGraph的生成,在转换前需要进行一系列的预处理。privateJobGraphcreateJobGraph(){preValidate();makesurethatallverticesstartimmediately设置调度模式jobGraph。setScheduleMode(streamGraph。getScheduleMode());Generatedeterministichashesforthenodesinordertoidentifythemacrosssubmissionifftheydidntchange。为每个节点生成确定的hashid作为唯一表示,在提交和执行过程中保持不变。MapInteger,byte〔〕hashesdefaultStreamGraphHasher。traverseStreamGraphAndGenerateHashes(streamGraph);Generatelegacyversionhashesforbackwardscompatibility为了向后保持兼容,为每个节点生成老版本的hashidListMapInteger,byte〔〕legacyHashesnewArrayList(legacyStreamGraphHashers。size());for(StreamGraphHasherhasher:legacyStreamGraphHashers){legacyHashes。add(hasher。traverseStreamGraphAndGenerateHashes(streamGraph));}MapInteger,ListTuple2byte〔〕,byte〔〕chainedOperatorHashesnewHashMap();真正对SteramGraph进行转换,生成JobGraph图setChaining(hashes,legacyHashes,chainedOperatorHashes);setPhysicalEdges();设置共享slotgroupsetSlotSharingAndCoLocation();setManagedMemoryFraction(Collections。unmodifiableMap(jobVertices),Collections。unmodifiableMap(vertexConfigs),Collections。unmodifiableMap(chainedConfigs),idstreamGraph。getStreamNode(id)。getMinResources(),idstreamGraph。getStreamNode(id)。getManagedMemoryWeight());配置checkpointconfigureCheckpointing();jobGraph。setSavepointRestoreSettings(streamGraph。getSavepointRestoreSettings());如果有之前的缓存文件的配置,则重新读入JobGraphGenerator。addUserArtifactEntries(streamGraph。getUserArtifacts(),jobGraph);settheExecutionConfiglastwhenithasbeenfinalizedtry{设置执行环境配置jobGraph。setExecutionConfig(streamGraph。getExecutionConfig());}catch(IOExceptione){thrownewIllegalConfigurationException(CouldnotserializetheExecutionConfig。Thisindicatesthatnonserializabletypes(likecustomserializers)wereregistered);}returnjobGraph;} 预处理完毕后,开始构建JobGraph中的点和边,从Source向下遍历StreamGraph,逐步创建JObGraph,在创建的过程中同事完成算子融合(OperatorChain)优化。 执行具体的Chain和JobVertex生成、JobEdge的关联、IntermediateDataSet。从StreamGraph读取数据的StreamNode开始,递归遍历同时将StreamOperator连接在一起。 整理构建的逻辑如下(看上图!!!): 1)从Source开始,Source与下游的FlatMap不可连接,Source是起始节点,自己成为一个JobVertx。 2)此时开始一个新的连接分析,FlatMap是起始节点,与下游的KeyedAgg也不可以连接,那么FlatMap自己成为一个JobVertex。 3)此时开始一个新的连接分析。KeyedAgg是起始节点,并且与下游的Sink可以连接,那么递归地分析Sink节点,构造Sink与其下游是否可以连接,因为Slink没有下游,所以KeyedAgg和Sink节点连接在一起,共同构成了一个JobVertex。在这个JobVertex中,KeyedAgg是起始节点,index编号为0,sink节点index编号为1。 构建JobVertex的时候需要将StreamNode中的重要配置信息复制到JobVertex中。构建好JobVertex之后,需要构建JobEdge将JobVertex连接起来。KeyedAgg和Sink之间构成了一个算子连接,连接内部的算子之间无序构成JobEdge进行连接。 在构建JobEdge的时候,很重要的一点是确定上游JobVertex和下游JobVertex的数据交换方式。此时根据ShuffleMode来确定ResultPartition类型,用FlinkPartition来确定JobVertex的连接方式。 Shuffle确定了ResultPartition,那么就可以确定上游JobVertex输出的IntermediateDataSet的类型了,也就知道JobEdge的输入IntermediateDataSet。 ForwardPartitioner和RescalePartitioner两种类型的Partitioner转换为DistributionPattern。POINTWISE的分发模式。其他类型的Partitioner统一转换为DistributionPattern。ALLTOALL模式。 JobGraph的构建和OperatorChain优化:privateListStreamEdgecreateChain(IntegerstartNodeId,IntegercurrentNodeId,MapInteger,byte〔〕hashes,ListMapInteger,byte〔〕legacyHashes,intchainIndex,MapInteger,ListTuple2byte〔〕,byte〔〕chainedOperatorHashes){if(!builtVertices。contains(startNodeId)){ListStreamEdgetransitiveOutEdgesnewArrayListStreamEdge();ListStreamEdgechainableOutputsnewArrayListStreamEdge();ListStreamEdgenonChainableOutputsnewArrayListStreamEdge();StreamNodecurrentNodestreamGraph。getStreamNode(currentNodeId);获取当前节点的出边,判断是否符合OperatorChain的条件分为两类:chainableoutputs,nonchainableoutputsfor(StreamEdgeoutEdge:currentNode。getOutEdges()){if(isChainable(outEdge,streamGraph)){chainableOutputs。add(outEdge);}else{nonChainableOutputs。add(outEdge);}}对于chainable的边,递归调用createchain返回值添加到transitiveOutEdges中for(StreamEdgechainable:chainableOutputs){transitiveOutEdges。addAll(createChain(startNodeId,chainable。getTargetId(),hashes,legacyHashes,chainIndex1,chainedOperatorHashes));}对于无法chain在一起的边,边的下游节点作为Operatorchain的Head节点进行递归调用,返回值添加到transitiveOutEdges中for(StreamEdgenonChainable:nonChainableOutputs){transitiveOutEdges。add(nonChainable);createChain(nonChainable。getTargetId(),nonChainable。getTargetId(),hashes,legacyHashes,0,chainedOperatorHashes);}ListTuple2byte〔〕,byte〔〕operatorHasheschainedOperatorHashes。computeIfAbsent(startNodeId,knewArrayList());byte〔〕primaryHashByteshashes。get(currentNodeId);OperatorIDcurrentOperatorIdnewOperatorID(primaryHashBytes);for(MapInteger,byte〔〕legacyHash:legacyHashes){operatorHashes。add(newTuple2(primaryHashBytes,legacyHash。get(currentNodeId)));}chainedNames。put(currentNodeId,createChainedName(currentNodeId,chainableOutputs));chainedMinResources。put(currentNodeId,createChainedMinResources(currentNodeId,chainableOutputs));chainedPreferredResources。put(currentNodeId,createChainedPreferredResources(currentNodeId,chainableOutputs));if(currentNode。getInputFormat()!null){getOrCreateFormatContainer(startNodeId)。addInputFormat(currentOperatorId,currentNode。getInputFormat());}if(currentNode。getOutputFormat()!null){getOrCreateFormatContainer(startNodeId)。addOutputFormat(currentOperatorId,currentNode。getOutputFormat());}如果当前节点是起始节点,则直接创建JobVertex,否则返回一个空的StreamConfigStreamConfigconfigcurrentNodeId。equals(startNodeId)?createJobVertex(startNodeId,hashes,legacyHashes,chainedOperatorHashes):newStreamConfig(newConfiguration());将StreamNode中的配置信息序列化到Streamconfig中。setVertexConfig(currentNodeId,config,chainableOutputs,nonChainableOutputs);再次判断,如果是Chain的起始节点,执行connect()方法,创建JobEdge和IntermediateDataset否则将当前节点的StreamConfig添加到chainedConfig中。if(currentNodeId。equals(startNodeId)){config。setChainStart();config。setChainIndex(0);config。setOperatorName(streamGraph。getStreamNode(currentNodeId)。getOperatorName());for(StreamEdgeedge:transitiveOutEdges){connect(startNodeId,edge);}config。setOutEdgesInOrder(transitiveOutEdges);config。setTransitiveChainedTaskConfigs(chainedConfigs。get(startNodeId));}else{chainedConfigs。computeIfAbsent(startNodeId,knewHashMapInteger,StreamConfig());config。setChainIndex(chainIndex);StreamNodenodestreamGraph。getStreamNode(currentNodeId);config。setOperatorName(node。getOperatorName());chainedConfigs。get(startNodeId)。put(currentNodeId,config);}config。setOperatorID(currentOperatorId);if(chainableOutputs。isEmpty()){config。setChainEnd();}returntransitiveOutEdges;}else{returnnewArrayList();}}2。2。3算子融合 一个Operatorchain在同一个Task线程内执行。OperatorChain内的算子之间,在同一个线程内通过方法调用的方式传递数据,能减少线程之间的切换,减少消息的序列化反序列化,无序借助内存缓存区,也无须通过网络在算子间传递数据,可在减少延迟的同时提高整体吞吐量 operatorchain的条件: 1)下游节点的入度为1 2)SteramEdge的下游节点对应的算子不为null 3)StreamEdge的上游节点对应的算子不为null 4)StreamEdge的上下游节点拥有相同的slotSharingGroup,默认都是default。 5)下游算子的连接策略为ALWAYS。 6)上游算子的连接策略为ALWAYS或者HEAD。 7)StreamEdge的分区类型为ForwardPartitioner 8)上下游节点的并行度一致 9)当前StreamGraph允许chain2。3执行图 2。3。1ExecutionGraph核心对象ExecutionJobVertex 该对象和JobGraph中的JobVertex一一对应。该对象还包含了一组ExecutionVertex,数量与该JobVertex中所包含的SteramNode的并行度一致。 ExecutionJobVertex用来将一个JobVertex封装成一ExecutionJobVertex,并以此创建ExecutionVertex、Execution、IntermediateResult和IntermediateResultPartition,用于丰富ExecutionGraph。 在ExecutionJobVertex的构造函数中,首先是依据对应的JobVertex的并发度,生成对应个数的ExecutionVertex。其中,一个ExecutionVertex代表一个ExecutionJobVertex的并发子Task。然后是将原来JobVertex的中间结果IntermediateDataSet转化为ExecutionGrap中IntermediateResultExecutionVertex ExecutionJobVertex中会对作业进行并行化处理,构造可以并行执行的实例,每个并行执行的实例就是ExecutionVertex。 构建ExecutionVertex的同时,也回构建ExecutionVertex的输出IntermediateResult。并且将ExecutionEdge输出为IntermediatePartition。 ExecutionVertex的构造函数中,首先会创建IntermediatePartition,并通过IntermediateResult。setPartition()建立IntermediateResult和IntermediateResultPartition之间的关系,然后生成Execution,并配置资源相关。IntermediateResult IntermediateResult又叫做中间结果集,该对象是个逻辑概念,表示ExecutionJobVertex的输出,和JobGraph中的IntermediateDataSet一一对应,同样,一个ExecutionJobVertex可以有多个中间二级果,取决于当前JobVertex有几个出边。 一个中间结果集包含多个中间结果分区IntermediateResultPartition,其个数等于该JobVertex的并发度。IntermediateResultPartitionIntermediateResultPartition又叫做中间结果分区,表示1个ExecutionVertex输出结果,与ExecutionEdge相关联。ExecutionEdge 表示ExecutionVertex的输入,连接到上游产生的IntermediateResultPartition。一个Execution对应于唯一的一个IntermediateResultPartition和一个ExecutionVertex。一个ExecutionVertex可以有多个ExecutionEdge。Execution ExecutionVertex相当于每个Task的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为一个Execution,执行一个ExecutionVertex的一次尝试。JobManager和TaskManager之间关于Task的部署和Task执行状态的更新都是通过ExecutionAttemptID来标识实例的。在故障或者数据需要重算的情况下,ExecutionVertex可能会有多个ExecutionAttemptID。一个Execution通过ExecutionAttemptID标识。2。3。2ExecutionGrap生成过程 初始话作业调度器的时候,根据JobGraph生活ExecutionGraph。在SchedulerBase的构造方法中触发构建,最终调用SchedulerBasecreateExecutionGraph触发实际的构建动作,使用ExecutionGraphBuiler构建ExecutionGraph。 核心代码attachJobGraph: 构建ExecutionEdge的连接策略:点对点连接(DistributionPattern。POINTWISE) 该策略用来连接当前ExecutionVertex与上游的IntermediataeResultParition。 连接分三种情况 1)一对一连接:并发的Task数量与分区数相等。 2)多对一连接:下游的Task数量小于上游的分区数,此时分两种情况: a:下游Task可以分配同数量的结果分区IntermediataeResultParition。如上游有4个结果分区,下游有2个Task,那么每个Task会分配两个结果分区进行消费。 b:每个Task消费的上游分区结果数据不均,如上游有3个结果分区,下游有两个Task,那么一个Task分配2个结果分区消费,另一个分配一个结果分区消费。 3)一对多连接:下游的Task数量多余上游的分区数,此时两种情况:a:每个结果分区的下游消费Task数据量相同,如上游有两个结果分区,下游有4个Task,每个结果分区被两个Task消费。b:每个结果分区的下游消费Task数量不相同,如上游有两个结果分区,下游有3个Task,那么一个结果分区分配2个Task消费,另一个结果分区分配一个Task消费。全连接(DistributionPattern。ALLTOALL) 该策略下游的ExecutionVertex与上游的所有IntermediataeResultParition建立连接,消费其生产的数据。一般全连接的情况意味着数据在Shuffle。