

  1 基于时间的双流Join
  数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或Join。Flink DataStream API中内置有两个可以根据时间条件对数据流进行Join的算子:基于间隔的Join和基于窗口的Join。本节我们会对它们进行介绍。
  注意,你要设计的Join算子需要具备高效的状态访问模式及有效的状态清理策略。 1.1 基于间隔的Join
  基于间隔的Join目前只支持事件时间以及INNER JOIN语义(无法发出未匹配成功的事件)。下面的例子定义了一个基于间隔的Join。 input1   .intervalJoin(input2)   .between(, ) // 相对于input1的上下界   .process(ProcessJoinFunction) // 处理匹配的事件对
  Join成功的事件对会发送给ProcessJoinFunction。下界和上界分别由负时间间隔和正时间间隔来定义,例如between(Time.hour(-1), Time.minute(15))。在满足下界值小于上界值的前提下,你可以任意对它们赋值。例如,允许出现B中事件的时间戳相较A中事件的时间戳早1~2小时这样的条件。
  scala version object IntervalJoinExample {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)      /*     A.intervalJoin(B).between(lowerBound, upperBound)     B.intervalJoin(A).between(-upperBound, -lowerBound)      */      val stream1 = env       .fromElements(         ("user_1", 10 * 60 * 1000L, "click"),         ("user_1", 16 * 60 * 1000L, "click")       )       .assignAscendingTimestamps(_._2)       .keyBy(r => r._1)      val stream2 = env       .fromElements(         ("user_1", 5 * 60 * 1000L, "browse"),         ("user_1", 6 * 60 * 1000L, "browse")       )       .assignAscendingTimestamps(_._2)       .keyBy(r => r._1)      stream1       .intervalJoin(stream2)       .between(Time.minutes(-10), Time.minutes(0))       .process(new ProcessJoinFunction[(String, Long, String), (String, Long, String), String] {         override def processElement(in1: (String, Long, String), in2: (String, Long, String), context: ProcessJoinFunction[(String, Long, String), (String, Long, String), String]#Context, collector: Collector[String]): Unit = {           collector.collect(in1 + " => " + in2)         }       })       .print()      stream2       .intervalJoin(stream1)       .between(Time.minutes(0), Time.minutes(10))       .process(new ProcessJoinFunction[(String, Long, String), (String, Long, String), String] {         override def processElement(in1: (String, Long, String), in2: (String, Long, String), context: ProcessJoinFunction[(String, Long, String), (String, Long, String), String]#Context, collector: Collector[String]): Unit = {           collector.collect(in1 + " => " + in2)         }       })       .print()      env.execute()   } }
  下面的例子展示了如何定义基于窗口的Join。 input1.join(input2)   .where(...)       // 为input1指定键值属性   .equalTo(...)     // 为input2指定键值属性   .window(...)      // 指定WindowAssigner   [.trigger(...)]   // 选择性的指定Trigger   [.evictor(...)]   // 选择性的指定Evictor   .apply(...)       // 指定JoinFunction
  下图展示了DataStream API中基于窗口的Join是如何工作的。
  scala version object TwoWindowJoinExample {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)     env.setParallelism(1)      val stream1 = env       .fromElements(         ("a", 1000L),         ("a", 2000L)       )       .assignAscendingTimestamps(_._2)      val stream2 = env       .fromElements(         ("a", 3000L),         ("a", 4000L)       )       .assignAscendingTimestamps(_._2)      stream1       .join(stream2)       // on A.id = B.id       .where(_._1)       .equalTo(_._1)       .window(TumblingEventTimeWindows.of(Time.seconds(5)))       .apply(new JoinFunction[(String, Long), (String, Long), String] {         override def join(in1: (String, Long), in2: (String, Long)): String = {           in1 + " => " + in2         }       })       .print()      env.execute()   } }
  java version public class TwoWindowJoinExample {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          DataStream> stream1 = env                 .fromElements(                         Tuple2.of("a", 1000L),                         Tuple2.of("b", 1000L),                         Tuple2.of("a", 2000L),                         Tuple2.of("b", 2000L)                 )                 .assignTimestampsAndWatermarks(                         WatermarkStrategy                                 .>forMonotonousTimestamps()                                 .withTimestampAssigner(                                         new SerializableTimestampAssigner>() {                                             @Override                                             public long extractTimestamp(Tuple2 stringLongTuple2, long l) {                                                 return stringLongTuple2.f1;                                             }                                         }                                 )                 );          DataStream> stream2 = env                 .fromElements(                         Tuple2.of("a", 3000L),                         Tuple2.of("b", 3000L),                         Tuple2.of("a", 4000L),                         Tuple2.of("b", 4000L)                 )                 .assignTimestampsAndWatermarks(                         WatermarkStrategy                                 .>forMonotonousTimestamps()                                 .withTimestampAssigner(                                         new SerializableTimestampAssigner>() {                                             @Override                                             public long extractTimestamp(Tuple2 stringLongTuple2, long l) {                                                 return stringLongTuple2.f1;                                             }                                         }                                 )                 );          stream1                 .join(stream2)                 .where(r -> r.f0)                 .equalTo(r -> r.f0)                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))                 .apply(new JoinFunction, Tuple2, String>() {                     @Override                     public String join(Tuple2 stringLongTuple2, Tuple2 stringLongTuple22) throws Exception {                         return stringLongTuple2 + " => " + stringLongTuple22;                     }                 })                 .print();          env.execute();     } }2 处理迟到的元素
  DataStream API提供了三种策略来处理迟到元素 直接抛弃迟到的元素 将迟到的元素发送到另一条流中去 可以更新窗口已经计算完的结果,并发出计算结果。 2.1 抛弃迟到元素
  抛弃迟到的元素是event time window operator的默认行为。也就是说一个迟到的元素不会创建一个新的窗口。
  process function可以通过比较迟到元素的时间戳和当前水位线的大小来很轻易地过滤掉迟到元素。 2.2 重定向迟到元素
  迟到的元素也可以使用侧输出(side output)特性被重定向到另外的一条流中去。迟到元素所组成的侧输出流可以继续处理或者sink到持久化设施中去。
  scala version val readings = env   .socketTextStream("localhost", 9999, " ")   .map(line => {     val arr = line.split(" ")     (arr(0), arr(1).toLong * 1000)   })   .assignAscendingTimestamps(_._2)  val countPer10Secs = readings   .keyBy(_._1)   .timeWindow(Time.seconds(10))   .sideOutputLateData(     new OutputTag[(String, Long)]("late-readings")   )   .process(new CountFunction())  val lateStream = countPer10Secs   .getSideOutput(     new OutputTag[(String, Long)]("late-readings")   )  lateStream.print()
  实现 CountFunction  :class CountFunction extends ProcessWindowFunction[(String, Long),   String, String, TimeWindow] {   override def process(key: String,                        context: Context,                        elements: Iterable[(String, Long)],                        out: Collector[String]): Unit = {     out.collect("窗口共有" + elements.size + "条数据")   } }
  java version public class RedirectLateEvent {      private static OutputTag> output = new OutputTag>("late-readings"){};      public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          DataStream> stream = env                 .socketTextStream("localhost", 9999)                 .map(new MapFunction>() {                     @Override                     public Tuple2 map(String s) throws Exception {                         String[] arr = s.split(" ");                         return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);                     }                 })                 .assignTimestampsAndWatermarks(                         WatermarkStrategy.                                 // like scala: assignAscendingTimestamps(_._2)                                 >forMonotonousTimestamps()                                 .withTimestampAssigner(new SerializableTimestampAssigner>() {                                     @Override                                     public long extractTimestamp(Tuple2 value, long l) {                                         return value.f1;                                     }                                 })                 );          SingleOutputStreamOperator lateReadings = stream                 .keyBy(r -> r.f0)                 .timeWindow(Time.seconds(5))                 .sideOutputLateData(output) // use after keyBy and timeWindow                 .process(new ProcessWindowFunction, String, String, TimeWindow>() {                     @Override                     public void process(String s, Context context, Iterable> iterable, Collector collector) throws Exception {                         long exactSizeIfKnown = iterable.spliterator().getExactSizeIfKnown();                         collector.collect(exactSizeIfKnown + " of elements");                     }                 });          lateReadings.print();         lateReadings.getSideOutput(output).print();          env.execute();     } }
  scala version val readings: DataStream[SensorReading] = ... val filteredReadings: DataStream[SensorReading] = readings   .process(new LateReadingsFilter)  // retrieve late readings val lateReadings: DataStream[SensorReading] = filteredReadings   .getSideOutput(new OutputTag[SensorReading]("late-readings"))   /** A ProcessFunction that filters out late sensor readings and   * re-directs them to a side output */ class LateReadingsFilter     extends ProcessFunction[SensorReading, SensorReading] {    val lateReadingsOut = new OutputTag[SensorReading]("late-readings")    override def processElement(       SensorReading r,       ctx: ProcessFunction[SensorReading, SensorReading]#Context,       out: Collector[SensorReading]): Unit = {      // compare record timestamp with current watermark     if (r.timestamp < ctx.timerService().currentWatermark()) {       // this is a late reading => redirect it to the side output       ctx.output(lateReadingsOut, r)     } else {       out.collect(r)     }   } }
  java version public class RedirectLateEvent {      private static OutputTag output = new OutputTag("late-readings"){};      public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          SingleOutputStreamOperator> stream = env                 .socketTextStream("localhost", 9999)                 .map(new MapFunction>() {                     @Override                     public Tuple2 map(String s) throws Exception {                         String[] arr = s.split(" ");                         return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);                     }                 })                 .assignTimestampsAndWatermarks(                         WatermarkStrategy.                                 >forMonotonousTimestamps()                                 .withTimestampAssigner(new SerializableTimestampAssigner>() {                                     @Override                                     public long extractTimestamp(Tuple2 value, long l) {                                         return value.f1;                                     }                                 })                 )                 .process(new ProcessFunction, Tuple2>() {                     @Override                     public void processElement(Tuple2 stringLongTuple2, Context context, Collector> collector) throws Exception {                         if (stringLongTuple2.f1 < context.timerService().currentWatermark()) {                             context.output(output, "late event is comming!");                         } else {                             collector.collect(stringLongTuple2);                         }                      }                 });          stream.print();         stream.getSideOutput(output).print();          env.execute();     } }2.3 使用迟到元素更新窗口计算结果
  window operator API提供了方法来明确声明我们要等待迟到元素。当使用event-time window,我们可以指定一个时间段叫做allowed lateness。window operator如果设置了allowed lateness,这个window operator在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内(allowed lateness设置的)保留所有的元素。
  当迟到元素在allowed lateness时间内到达时,这个迟到元素会被实时处理并发送到触发器(trigger)。当水位线没过了窗口结束时间+allowed lateness时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。
  Allowed lateness可以使用allowedLateness()方法来指定,如下所示: val readings: DataStream[SensorReading] = ...  val countPer10Secs: DataStream[(String, Long, Int, String)] = readings   .keyBy(_.id)   .timeWindow(Time.seconds(10))   // process late readings for 5 additional seconds   .allowedLateness(Time.seconds(5))   // count readings and update results if late readings arrive   .process(new UpdatingWindowCountFunction)    /** A counting WindowProcessFunction that distinguishes between   * first results and updates. */ class UpdatingWindowCountFunction     extends ProcessWindowFunction[SensorReading,       (String, Long, Int, String), String, TimeWindow] {    override def process(       id: String,       ctx: Context,       elements: Iterable[SensorReading],       out: Collector[(String, Long, Int, String)]): Unit = {      // count the number of readings     val cnt = elements.count(_ => true)      // state to check if this is     // the first evaluation of the window or not     val isUpdate = ctx.windowState.getState(       new ValueStateDescriptor[Boolean](         "isUpdate",         Types.of[Boolean]))      if (!isUpdate.value()) {       // first evaluation, emit first result       out.collect((id, ctx.window.getEnd, cnt, "first"))       isUpdate.update(true)     } else {       // not the first evaluation, emit an update       out.collect((id, ctx.window.getEnd, cnt, "update"))     }   } }
  java version public class UpdateWindowResultWithLateEvent {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);          DataStreamSource stream = env.socketTextStream("localhost", 9999);          stream                 .map(new MapFunction>() {                     @Override                     public Tuple2 map(String s) throws Exception {                         String[] arr = s.split(" ");                         return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);                     }                 })                 .assignTimestampsAndWatermarks(                         WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5))                         .withTimestampAssigner(new SerializableTimestampAssigner>() {                             @Override                             public long extractTimestamp(Tuple2 stringLongTuple2, long l) {                                 return stringLongTuple2.f1;                             }                         })                 )                 .keyBy(r -> r.f0)                 .timeWindow(Time.seconds(5))                 .allowedLateness(Time.seconds(5))                 .process(new UpdateWindowResult())                 .print();          env.execute();     }      public static class UpdateWindowResult extends ProcessWindowFunction, String, String, TimeWindow> {         @Override         public void process(String s, Context context, Iterable> iterable, Collector collector) throws Exception {             long count = 0L;             for (Tuple2 i : iterable) {                 count += 1;             }              // 可见范围比getRuntimeContext.getState更小,只对当前key、当前window可见             // 基于窗口的状态变量,只能当前key和当前窗口访问             ValueState isUpdate = context.windowState().getState(                     new ValueStateDescriptor("isUpdate", Types.BOOLEAN)             );              // 当水位线超过窗口结束时间时,触发窗口的第一次计算!             if (isUpdate.value() == null) {                 collector.collect("窗口第一次触发计算!一共有 " + count + " 条数据!");                 isUpdate.update(true);             } else {                 collector.collect("窗口更新了!一共有 " + count + " 条数据!");             }         }     } }
