分析Flink的侧流输出源代码Flink的侧流输出为我们提供了侧流(拆分)输出的功能。根据条件,一个流可以分成几个不同的流,然后会做不同的处理逻辑。让我们看看与下一个侧流输出相关的源代码。
让我们看看下面的演示。一条溪流分为三条,一条主流和两条支流。
SingleOutputStreamOperator & ltJasonLeePOJO & gtprocess = Kafka _ source 1 . process(new process function & lt;JasonLeePOJO,JasonLeePOJO & gt(){ @ Override public void process element(JasonLeePOJO value,ProcessFunction & ltJasonLeePOJO,JasonLeePOJO & gt。上下文ctx,收集器& ltJasonLeePOJO & gtOut)抛出异常{//这是主流输出if (value.getname()。equals(" flink "){ out . collect(value);//下面两个是流量测量输出} else if (value.getname()。equals(" spark "){ CTX . output(test,value);//测量流量}否则if (value.getname()。equals(" Hadoop "){ CTX . output(test1,value);} } });为了更清楚地看到每个操作符,我禁用了操作符链,该任务的DAG图如下:
这就更清楚了。显然,从流程操作符开始,一个数据流被分成三个数据流。当然也不是默认禁止。
运算符链所有运算符都链接在一起。
源代码分析我们先来看第一个主流输出的源代码,即out.collect(value),其中out实际上是一个TimestampedCollector对象。
TimestampedCollector # collect @ override public void collect(T record){ output . collect(reuse . replace(record));}在collect方法中保存一个Output对象,用于输出数据。在这里,它实际上是一个计数输出。它是一个用output包装的对象,主要用于更新发送数据和输出数据的度量。
counting output # collect @ override public void collect(stream record & lt;OUT & gtrecord){ numrecordsout . Inc();output.collect(记录);}在Countingoutput中也保存一个output对象,但这里的输出是BroadcastingOutputCollector对象。从它的名字可以看出,它是向下游广播数据的。这里有个问题?把数据广播到下游,那岂不是每个下游数据流都有这个数据?这种情况下,如何实现导流?带着这个问题,让我们看看BroadcastingOutputCollector的collect方法是如何实现的。
BroadcastingOutputCollector # collect @ override public void collect(stream record & lt;T & gtRecord) {//这里的outputs数组有三个输出,对应于上述三个输出流for(output < stream record & lt;T & gt& gtoutput : outputs) { output.collect(记录);}}还在BroadcastingOutputCollector对象中保存一个输出对象。实际上,它们都实现了输出接口,并向下游发送数据。这里的输出是一个输出数组,它表示所有的下游输出。因为上面有三个输出流,所以数组包含三个输出对象。
循环调用output的collect方法向下游发送数据。因为我中断了operatorChain,流程操作符和下游打印操作符不在同一个操作符链上,所以上下游操作符之间的数据传输是RecordWriterOutput。否则,使用CopyingChainingOutput或ChainingOutput。具体用哪个输出这里就不介绍了,以后有时间再单独介绍。
RecordWriterOutput # collect @ override public void collect(stream record & lt;OUT & gtRecord) {//主流没有outputTag,只有outputTag if (this.outputTag!= null) { //我们不负责发射到主输出。返回;} pushToRecordWriter(记录);}接下来看看RecordWriterOutput的collect方法。在collect方法中,我们将首先确定outputTag是否为空。如果不是空的,我们会直接退回。否则,我们会将数据推送给下游运营商。只有侧流输出需要定义outputTag。主流(正常流)没有outputTag。所以这里会使用pushToRecordWriter方法将数据写到下游,也就是说虽然数据会广播到所有下游,但实际上其他两个侧流都是直接返回的,只有主流会将数据推送到下游,这就解释了上面的问题。
然后看第二侧流输出ctx.output(test,value)的源代码。这里的ctx实际上是ProcessOperator#ContextImpl对象。
process operator # context impl # output @ override public & lt;X & gtvoid输出(OutputTag & ltX & gtoutputTag,X value){ if(output tag = = null){ throw new IllegalArgumentException(" output tag不得为null。");} output.collect(outputTag,new StreamRecord & lt& gt(value,element . gettimestamp());}如果outputTag为空,直接抛出异常。因为这是一个侧流,所以必须定义OutputTag。这里的输出实际上是父类AbstractStreamOperator持有的一个变量。如果outputTag不为空,则调用输出的collect方法将数据发送到下游。这里的输出是如上所述的CountingOutput,但是collect方法是另一个重载方法。
计数输出# collect @ Overridepublic & ltX & gtvoid collect(output tag & lt;X & gtoutputTag,StreamRecord & ltX & gtrecord){ numrecordsout . Inc();output.collect(输出标签,记录);}可以发现这个collect方法比上面多了一个OutputTag参数,就是使用了边流输出时定义的OutputTag对象,然后调用output的collect方法发送数据。与上面的方法一样,这个方法也是BroadcastingOutputCollector对象的另一个重载方法,多了一个OutputTag参数。
BroadcastingOutputCollector # collect @ override public & lt;X & gtvoid collect(output tag & lt;X & gtoutputTag,StreamRecord & ltX & gt记录){ for(输出& ltStreamRecord & ltT & gt& gtoutput:outputs){ output . collect(output tag,record);}}这里的逻辑同上,同样的循环调用collect方法发送数据。
RecordWriterOutput # collect @ override public & lt;X & gtvoid collect(output tag & lt;X & gtoutputTag,StreamRecord & ltX & gtRecord) {//首先判断两个OutputTags是否相同if (output tag。对此负责。output tag,output tag)) {pushtorecordwriter(记录);}}}在这个collect方法中,我们会先判断传递过来的OutputTag对象和成员变量this.outputTag是否相等。如果是,我们将发送数据,否则,我们将不做任何处理。因此,这里一次只会选择一个下游侧流输出数据,从而实现所谓的分流。
output tag # isResponsibleForpublic静态布尔值isResponsibleFor(@ Nullable output tag & lt;?& gt所有者,@ Nonnull OutputTag & lt?& gtother){ return other . equals(owner);}可以看到,在isResponsibleFor方法内部,直接调用OutputTag的equals方法来判断两个对象是否相等。
第三个侧流test1 ctx.output(test1,value)和第二个侧流test完全一样,这里就不看代码了。
以上是调车操作的完成,那么如何才能得到调车后的结果(数据流)?我们可以通过getSideOutput方法得到。
数据流& ltJasonLeePOJO & gtside output = process . getsideoutput(test);数据流& ltJasonLeePOJO & gtside output 1 = process . getsideoutput(test1);
GetSideOutput源代码公开
综上所述,通过分析侧流输出的源代码,在调车时通过广播将数据发送给下游操作员。对于主流数据,只有OutputTag是空的,所以侧流直接返回,不做任何处理,因为OutputTag不是空的。对于侧流数据,确定两个输出标签是否相等,所以每次数据只会发送到对应的下游侧流,实现分流逻辑。
以上是详细分析Flink端流输出源代码的例子。更多关于Flink侧流输出的信息,请关注主机频道zhujipindao的其他相关文章。com!
评论前必须登录!
注册