范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Flink操练(十)之DS简介(10)基于时间和窗口的操作符(二)

  1 窗口操作符
  窗口操作是流处理程序中很常见的操作。窗口操作允许我们在无限流上的一段有界区间上面做聚合之类的操作。而我们使用基于时间的逻辑来定义区间。窗口操作符提供了一种将数据放进一个桶,并根据桶中的数据做计算的方法。例如,我们可以将事件放进5分钟的滚动窗口中,然后计数。
  无限流转化成有限数据的方法:使用窗口。 1.1 定义窗口操作符
  Window算子可以在keyed stream或者nokeyed stream上面使用。
  创建一个Window算子,需要指定两个部分: window assigner  定义了流的元素如何分配到window中。window assigner将会产生一条WindowedStream(或者AllWindowedStream,如果是nonkeyed DataStream的话)window function用来处理WindowedStream(AllWindowedStream)中的元素。
  下面的代码说明了如何使用窗口操作符。 stream   .keyBy(...)   .window(...)  // 指定window assigner   .reduce/aggregate/process(...) // 指定window function  stream   .windowAll(...) // 指定window assigner   .reduce/aggregate/process(...) // 指定window function
  我们的学习重点是Keyed WindowedStream。 1.2 内置的窗口分配器
  窗口分配器将会根据事件的事件时间或者处理时间来将事件分配到对应的窗口中去。窗口包含开始时间和结束时间这两个时间戳。
  所有的窗口分配器都包含一个默认的触发器: 对于事件时间:当 水位线 超过窗口结束时间,触发窗口的求值操作。对于处理时间:当机器时间超过窗口结束时间,触发窗口的求值操作。
  需要注意的是:当处于某个窗口的第一个事件到达的时候,这个窗口才会被创建。Flink不会对空窗口求值。
  Flink创建的窗口类型是 TimeWindow  ,包含开始时间和结束时间,区间是左闭右开的,也就是说包含开始时间戳,不包含结束时间戳。
  滚动窗口(tumbling windows)
  DataStream sensorData = ...  DataStream avgTemp = sensorData   .keyBy(r -> r.id)   // group readings in 1s event-time windows   .window(TumblingEventTimeWindows.of(Time.seconds(1)))   .process(new TemperatureAverager);  DataStream avgTemp = sensorData   .keyBy(r -> r.id)   // group readings in 1s processing-time windows   .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))   .process(new TemperatureAverager);  // 其实就是之前的 // shortcut for window.(TumblingEventTimeWindows.of(size)) DataStream avgTemp = sensorData   .keyBy(r -> r.id)   .timeWindow(Time.seconds(1))   .process(new TemperatureAverager);
  默认情况下,滚动窗口会和 1970-01-01-00:00:00.000  对齐,例如一个1小时的滚动窗口将会定义以下开始时间的窗口:00:00:00,01:00:00,02:00:00,等等。
  滑动窗口(sliding window)
  对于滑动窗口,我们需要指定窗口的大小和滑动的步长。当滑动步长小于窗口大小时,窗口将会出现重叠,而元素会被分配到不止一个窗口中去。当滑动步长大于窗口大小时,一些元素可能不会被分配到任何窗口中去,会被直接丢弃。
  下面的代码定义了窗口大小为1小时,滑动步长为15分钟的窗口。每一个元素将被分配到4个窗口中去。
  DataStream slidingAvgTemp = sensorData   .keyBy(r -> r.id)   .window(     SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15))   )   .process(new TemperatureAverager);  DataStream slidingAvgTemp = sensorData   .keyBy(r -> r.id)   .window(     SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(15))   )   .process(new TemperatureAverager);  DataStream slidingAvgTemp = sensorData   .keyBy(r -> r.id)   .timeWindow(Time.hours(1), Time.minutes(15))   .process(new TemperatureAverager);
  会话窗口(session windows)
  会话窗口不可能重叠,并且会话窗口的大小也不是固定的。不活跃的时间长度定义了会话窗口的界限。不活跃的时间是指这段时间没有元素到达。下图展示了元素如何被分配到会话窗口。
  DataStream sessionWindows = sensorData   .keyBy(r -> r.id)   .window(EventTimeSessionWindows.withGap(Time.minutes(15)))   .process(...);  DataStream sessionWindows = sensorData   .keyBy(r -> r.id)   .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))   .process(...);
  由于会话窗口的开始时间和结束时间取决于接收到的元素,所以窗口分配器无法立即将所有的元素分配到正确的窗口中去。相反,会话窗口分配器最开始时先将每一个元素分配到它自己独有的窗口中去,窗口开始时间是这个元素的时间戳,窗口大小是session gap的大小。接下来,会话窗口分配器会将出现重叠的窗口合并成一个窗口。 1.3 调用窗口计算函数
  window functions定义了窗口中数据的计算逻辑。有两种计算逻辑: 增量聚合函数(Incremental aggregation functions) :当一个事件被添加到窗口时,触发函数计算,并且更新window的状态(单个值)。最终聚合的结果将作为输出。ReduceFunction和AggregateFunction是增量聚合函数。全窗口函数(Full window functions) :这个函数将会收集窗口中所有的元素,可以做一些复杂计算。ProcessWindowFunction是window function。
  ReduceFunction
  例子: 计算每个传感器15s窗口中的温度最小值
  scala version val minTempPerWindow = sensorData   .map(r => (r.id, r.temperature))   .keyBy(_._1)   .timeWindow(Time.seconds(15))   .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
  java version DataStream> minTempPerwindow = sensorData     .map(new MapFunction>() {         @Override         public Tuple2 map(SensorReading value) throws Exception {             return Tuple2.of(value.id, value.temperature);         }     })     .keyBy(r -> r.f0)     .timeWindow(Time.seconds(5))     .reduce(new ReduceFunction>() {         @Override         public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {             if (value1.f1 < value2.f1) {                 return value1;             } else {                 return value2;             }         }     })
  AggregateFunction
  先来看接口定义 public interface AggregateFunction   extends Function, Serializable {    // create a new accumulator to start a new aggregate   ACC createAccumulator();    // add an input element to the accumulator and return the accumulator   ACC add(IN value, ACC accumulator);    // compute the result from the accumulator and return it.   OUT getResult(ACC accumulator);    // merge two accumulators and return the result.   ACC merge(ACC a, ACC b); }
  IN是输入元素的类型,ACC是累加器的类型,OUT是输出元素的类型。
  例子 val avgTempPerWindow: DataStream[(String, Double)] = sensorData   .map(r => (r.id, r.temperature))   .keyBy(_._1)   .timeWindow(Time.seconds(15))   .aggregate(new AvgTempFunction)  // An AggregateFunction to compute the average temperature per sensor. // The accumulator holds the sum of temperatures and an event count. class AvgTempFunction   extends AggregateFunction[(String, Double),     (String, Double, Int), (String, Double)] {    override def createAccumulator() = {     ("", 0.0, 0)   }    override def add(in: (String, Double), acc: (String, Double, Int)) = {     (in._1, in._2 + acc._2, 1 + acc._3)   }    override def getResult(acc: (String, Double, Int)) = {     (acc._1, acc._2 / acc._3)   }    override def merge(acc1: (String, Double, Int),     acc2: (String, Double, Int)) = {     (acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)   } }
  ProcessWindowFunction
  一些业务场景,我们需要收集窗口内所有的数据进行计算,例如计算窗口数据的中位数,或者计算窗口数据中出现频率最高的值。这样的需求,使用ReduceFunction和AggregateFunction就无法实现了。这个时候就需要ProcessWindowFunction了。
  先来看接口定义 public abstract class ProcessWindowFunction   extends AbstractRichFunction {    // Evaluates the window   void process(KEY key, Context ctx, Iterable vals, Collector out)     throws Exception;    // Deletes any custom per-window state when the window is purged   public void clear(Context ctx) throws Exception {}    // The context holding window metadata   public abstract class Context implements Serializable {     // Returns the metadata of the window     public abstract W window();      // Returns the current processing time     public abstract long currentProcessingTime();      // Returns the current event-time watermark     public abstract long currentWatermark();      // State accessor for per-window state     public abstract KeyedStateStore windowState();      // State accessor for per-key global state     public abstract KeyedStateStore globalState();      // Emits a record to the side output identified by the OutputTag.     public abstract  void output(OutputTag outputTag, X value);   } }
  process()方法接受的参数为:
  window的key,
  Iterable迭代器包含窗口的所有元素,
  Collector用于输出结果流。
  Context参数和别的process方法一样。而ProcessWindowFunction的Context对象还可以访问window的元数据(窗口开始和结束时间),当前处理时间和水位线,per-window state和per-key global state,side outputs。 per-window state: 用于保存一些信息,这些信息可以被process()访问,只要process所处理的元素属于这个窗口。 per-key global state: 同一个key,也就是在一条KeyedStream上,不同的window可以访问per-key global state保存的值。
  例子:计算5s滚动窗口中的最低和最高的温度。输出的元素包含了(流的Key, 最低温度, 最高温度, 窗口结束时间)。 val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData   .keyBy(_.id)   .timeWindow(Time.seconds(5))   .process(new HighAndLowTempProcessFunction)  case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)  class HighAndLowTempProcessFunction   extends ProcessWindowFunction[SensorReading,     MinMaxTemp, String, TimeWindow] {   override def process(key: String,                        ctx: Context,                        vals: Iterable[SensorReading],                        out: Collector[MinMaxTemp]): Unit = {     val temps = vals.map(_.temperature)     val windowEnd = ctx.window.getEnd      out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))   } }
  我们还可以将ReduceFunction/AggregateFunction和ProcessWindowFunction结合起来使用。ReduceFunction/AggregateFunction做增量聚合,ProcessWindowFunction提供更多的对数据流的访问权限。如果只使用ProcessWindowFunction(底层的实现为将事件都保存在ListState中),将会非常占用空间。分配到某个窗口的元素将被提前聚合,而当窗口的trigger触发时,也就是窗口收集完数据关闭时,将会把聚合结果发送到ProcessWindowFunction中,这时Iterable参数将会只有一个值,就是前面聚合的值。
  例子 input   .keyBy(...)   .timeWindow(...)   .reduce(     incrAggregator: ReduceFunction[IN],     function: ProcessWindowFunction[IN, OUT, K, W])  input   .keyBy(...)   .timeWindow(...)   .aggregate(     incrAggregator: AggregateFunction[IN, ACC, V],     windowFunction: ProcessWindowFunction[V, OUT, K, W])
  我们把之前的需求重新使用以上两种方法实现一下。 case class MinMaxTemp(id: String, min: Double, max: Double, endTs: Long)  val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData   .map(r => (r.id, r.temperature, r.temperature))   .keyBy(_._1)   .timeWindow(Time.seconds(5))   .reduce(     (r1: (String, Double, Double), r2: (String, Double, Double)) => {       (r1._1, r1._2.min(r2._2), r1._3.max(r2._3))     },     new AssignWindowEndProcessFunction   )  class AssignWindowEndProcessFunction   extends ProcessWindowFunction[(String, Double, Double),     MinMaxTemp, String, TimeWindow] {     override def process(key: String,                        ctx: Context,                        minMaxIt: Iterable[(String, Double, Double)],                        out: Collector[MinMaxTemp]): Unit = {     val minMax = minMaxIt.head     val windowEnd = ctx.window.getEnd     out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))   } }1.4 自定义窗口操作符
  Flink内置的window operators分配器已经已经足够应付大多数应用场景。尽管如此,如果我们需要实现一些复杂的窗口逻辑,例如:可以发射早到的事件或者碰到迟到的事件就更新窗口的结果,或者窗口的开始和结束决定于特定事件的接收。
  DataStream API暴露了接口和方法来自定义窗口操作符。 自定义窗口分配器 自定义窗口计算触发器(trigger) 自定义窗口数据清理功能(evictor)
  当一个事件来到窗口操作符,首先将会传给WindowAssigner来处理。WindowAssigner决定了事件将被分配到哪些窗口。如果窗口不存在,WindowAssigner将会创建一个新的窗口。
  如果一个window operator接受了一个增量聚合函数作为参数,例如ReduceFunction或者AggregateFunction,新到的元素将会立即被聚合,而聚合结果result将存储在window中。如果window operator没有使用增量聚合函数,那么新元素将被添加到ListState中,ListState中保存了所有分配给窗口的元素。
  新元素被添加到窗口时,这个新元素同时也被传给了window的trigger。trigger定义了window何时准备好求值,何时window被清空。trigger可以基于window被分配的元素和注册的定时器来对窗口的所有元素求值或者在特定事件清空window中所有的元素。
  当window operator只接收一个增量聚合函数作为参数时:
  当window operator只接收一个全窗口函数作为参数时:
  当window operator接收一个增量聚合函数和一个全窗口函数作为参数时:
  evictor是一个可选的组件,可以被注入到ProcessWindowFunction之前或者之后调用。evictor可以清除掉window中收集的元素。由于evictor需要迭代所有的元素,所以evictor只能使用在没有增量聚合函数作为参数的情况下。
  下面的代码说明了如果使用自定义的trigger和evictor定义一个window operator: stream   .keyBy(...)   .window(...)  [.trigger(...)]  [.evictor(...)]   .reduce/aggregate/process(...)
  注意:每个WindowAssigner都有一个默认的trigger。
  窗口生命周期
  当WindowAssigner分配某个窗口的第一个元素时,这个窗口才会被创建。所以不存在没有元素的窗口。
  一个窗口包含了如下状态: Window content 分配到这个窗口的元素 增量聚合的结果(如果window operator接收了ReduceFunction或者AggregateFunction作为参数)。 Window object WindowAssigner返回0个,1个或者多个window object。 window operator根据返回的window object来聚合元素。 每一个window object包含一个windowEnd时间戳,来区别于其他窗口。 触发器的定时器:一个触发器可以注册定时事件,到了定时的时间可以执行相应的回调函数,例如:对窗口进行求值或者清空窗口。 触发器中的自定义状态:触发器可以定义和使用自定义的、per-window或者per-key状态。这个状态完全被触发器所控制。而不是被window operator控制。
  当窗口结束时间来到,window operator将删掉这个窗口。窗口结束时间是由window object的end timestamp所定义的。无论是使用processing time还是event time,窗口结束时间是什么类型可以调用WindowAssigner.isEventTime()方法获得。
  窗口分配器(window assigners)
  WindowAssigner将会把元素分配到0个,1个或者多个窗口中去。我们看一下WindowAssigner接口: public abstract class WindowAssigner     implements Serializable {    public abstract Collection assignWindows(     T element,     long timestamp,     WindowAssignerContext context);    public abstract Trigger getDefaultTriger(     StreamExecutionEnvironment env);    public abstract TypeSerializer getWindowSerializer(     ExecutionConfig executionConfig);    public abstract boolean isEventTime();    public abstract static class WindowAssignerContext {     public abstract long getCurrentProcessingTime();   } }
  WindowAssigner有两个泛型参数: T: 事件的数据类型 W: 窗口的类型
  下面的代码创建了一个自定义窗口分配器,是一个30秒的滚动事件时间窗口。 class ThirtySecondsWindows     extends WindowAssigner[Object, TimeWindow] {    val windowSize: Long = 30 * 1000L    override def assignWindows(     o: Object,     ts: Long,     ctx: WindowAssigner.WindowAssignerContext   ): java.util.List[TimeWindow] = {      val startTime = ts - (ts % windowSize)     val endTime = startTime + windowSize     Collections.singletonList(new TimeWindow(startTime, endTime))   }    override def getDefaultTrigger(     env: environment.StreamExecutionEnvironment   ): Trigger[Object, TimeWindow] = {       EventTimeTrigger.create()   }    override def getWindowSerializer(     executionConfig: ExecutionConfig   ): TypeSerializer[TimeWindow] = {     new TimeWindow.Serializer   }    override def isEventTime = true }
  增量聚合示意图
  全窗口聚合示意图
  增量聚合和全窗口聚合结合使用的示意图
  触发器(Triggers)
  触发器定义了window何时会被求值以及何时发送求值结果。 触发器可以到了特定的时间触发也可以碰到特定的事件触发。例如:观察到事件数量符合一定条件或者观察到了特定的事件。
  默认的触发器将会在两种情况下触发 处理时间:机器时间到达处理时间 事件时间:水位线超过了窗口的结束时间
  触发器可以访问流的时间属性以及定时器,还可以对state状态编程。所以触发器和process function一样强大。
  例如我们可以实现一个触发逻辑:当窗口接收到一定数量的元素时,触发器触发。再比如当窗口接收到一个特定元素时,触发器触发。还有就是当窗口接收到的元素里面包含特定模式(5秒钟内接收到了两个同样类型的事件),触发器也可以触发。在一个事件时间的窗口中,一个自定义的触发器可以提前(在水位线没过窗口结束时间之前)计算和发射计算结果。这是一个常见的低延迟计算策略,尽管计算不完全,但不像默认的那样需要等待水位线没过窗口结束时间。
  每次调用触发器都会产生一个 TriggerResult来决定窗口接下来发生什么 。TriggerResult可以取以下结果:CONTINUE:什么都不做 FIRE:如果window operator有ProcessWindowFunction这个参数,将会调用这个ProcessWindowFunction。如果窗口仅有增量聚合函数(ReduceFunction或者AggregateFunction)作为参数,那么当前的聚合结果将会被发送。窗口的state不变。 PURGE:窗口所有内容包括窗口的元数据都将被丢弃。 FIRE_AND_PURGE:先对窗口进行求值,再将窗口中的内容丢弃。
  TriggerResult可能的取值使得我们可以实现很复杂的窗口逻辑。一个自定义触发器可以触发多次,可以计算或者更新结果,可以在发送结果之前清空窗口。
  接下来我们看一下Trigger API: public abstract class Trigger     implements Serializable {    TriggerResult onElement(     long timestamp,     W window,     TriggerContext ctx);    public abstract TriggerResult onProcessingTime(     long timestamp,     W window,     TriggerContext ctx);    public abstract TriggerResult onEventTime(     long timestamp,     W window,     TriggerContext ctx);    public boolean canMerge();    public void onMerge(W window, OnMergeContext ctx);    public abstract void clear(W window, TriggerContext ctx); }  public interface TriggerContext {    long getCurrentProcessingTime();    long getCurrentWatermark();    void registerProcessingTimeTimer(long time);    void registerEventTimeTimer(long time);    void deleteProcessingTimeTimer(long time);    void deleteEventTimeTimer(long time);     S getPartitionedState(     StateDescriptor stateDescriptor); }  public interface OnMergeContext extends TriggerContext {    void mergePartitionedState(     StateDescriptor stateDescriptor   ); }
  这里要注意两个地方:清空state和merging合并触发器。
  当在触发器中使用per-window state时,这里我们需要保证当窗口被删除时state也要被删除,否则随着时间的推移,window operator将会积累越来越多的数据,最终可能使应用崩溃。
  当窗口被删除时,为了清空所有状态,触发器的clear()方法需要需要删掉所有的自定义per-window state,以及使用TriggerContext对象将处理时间和事件时间的定时器都删除。
  下面的例子展示了一个触发器在窗口结束时间之前触发。当第一个事件被分配到窗口时,这个触发器注册了一个定时器,定时时间为水位线之前一秒钟。当定时事件执行,将会注册一个新的定时事件,这样,这个触发器每秒钟最多触发一次。
  scala version class OneSecondIntervalTrigger     extends Trigger[SensorReading, TimeWindow] {    override def onElement(     SensorReading r,     timestamp: Long,     window: TimeWindow,     ctx: Trigger.TriggerContext   ): TriggerResult = {     val firstSeen: ValueState[Boolean] = ctx       .getPartitionedState(         new ValueStateDescriptor[Boolean](           "firstSeen", classOf[Boolean]         )       )      if (!firstSeen.value()) {       val t = ctx.getCurrentWatermark        + (1000 - (ctx.getCurrentWatermark % 1000))       ctx.registerEventTimeTimer(t)       ctx.registerEventTimeTimer(window.getEnd)       firstSeen.update(true)     }      TriggerResult.CONTINUE   }    override def onEventTime(     timestamp: Long,     window: TimeWindow,     ctx: Trigger.TriggerContext   ): TriggerResult = {     if (timestamp == window.getEnd) {       TriggerResult.FIRE_AND_PURGE     } else {       val t = ctx.getCurrentWatermark        + (1000 - (ctx.getCurrentWatermark % 1000))       if (t < window.getEnd) {         ctx.registerEventTimeTimer(t)       }       TriggerResult.FIRE     }   }    override def onProcessingTime(     timestamp: Long,     window: TimeWindow,     ctx: Trigger.TriggerContext   ): TriggerResult = {     TriggerResult.CONTINUE   }    override def clear(     window: TimeWindow,     ctx: Trigger.TriggerContext   ): Unit = {     val firstSeen: ValueState[Boolean] = ctx       .getPartitionedState(         new ValueStateDescriptor[Boolean](           "firstSeen", classOf[Boolean]         )       )     firstSeen.clear()   } }
  java version public class TriggerExample {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);         env.setParallelism(1);          env                 .socketTextStream("localhost", 9999)                 .map(new MapFunction>() {                     @Override                     public Tuple2 map(String s) throws Exception {                         String[] arr = s.split(" ");                         return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);                     }                 })                 .assignTimestampsAndWatermarks(                         WatermarkStrategy.>forMonotonousTimestamps()                         .withTimestampAssigner(new SerializableTimestampAssigner>() {                             @Override                             public long extractTimestamp(Tuple2 stringLongTuple2, long l) {                                 return stringLongTuple2.f1;                             }                         })                 )                 .keyBy(r -> r.f0)                 .timeWindow(Time.seconds(5))                 .trigger(new OneSecondIntervalTrigger())                 .process(new ProcessWindowFunction, String, String, TimeWindow>() {                     @Override                     public void process(String s, Context context, Iterable> iterable, Collector collector) throws Exception {                         long count = 0L;                         for (Tuple2 i : iterable) count += 1;                         collector.collect("窗口中有 " + count + " 条元素");                     }                 })                 .print();          env.execute();     }      public static class OneSecondIntervalTrigger extends Trigger, TimeWindow> {         // 来一条调用一次         @Override         public TriggerResult onElement(Tuple2 r, long l, TimeWindow window, TriggerContext ctx) throws Exception {             ValueState firstSeen = ctx.getPartitionedState(                     new ValueStateDescriptor("first-seen", Types.BOOLEAN)             );              if (firstSeen.value() == null) {                 // 4999 + (1000 - 4999 % 1000) = 5000                 System.out.println("第一条数据来的时候 ctx.getCurrentWatermark() 的值是 " + ctx.getCurrentWatermark());                 long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);                 ctx.registerEventTimeTimer(t);                 ctx.registerEventTimeTimer(window.getEnd());                 firstSeen.update(true);             }             return TriggerResult.CONTINUE;         }          // 定时器逻辑         @Override         public TriggerResult onEventTime(long ts, TimeWindow window, TriggerContext ctx) throws Exception {             if (ts == window.getEnd()) {                 return TriggerResult.FIRE_AND_PURGE;             } else {                 System.out.println("当前水位线是:" + ctx.getCurrentWatermark());                 long t = ctx.getCurrentWatermark() + (1000L - ctx.getCurrentWatermark() % 1000L);                 if (t < window.getEnd()) {                     ctx.registerEventTimeTimer(t);                 }                 return TriggerResult.FIRE;             }         }          @Override         public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {             return TriggerResult.CONTINUE;         }          @Override         public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception {             ValueState firstSeen = ctx.getPartitionedState(                     new ValueStateDescriptor("first-seen", Types.BOOLEAN)             );             firstSeen.clear();         }     } }
  清理器(EVICTORS)
  evictor可以在window function求值之前或者之后移除窗口中的元素。
  我们看一下Evictor的接口定义: public interface Evictor     extends Serializable {   void evictBefore(     Iterable> elements,     int size,     W window,     EvictorContext evictorContext);    void evictAfter(     Iterable> elements,     int size,     W window,     EvictorContext evictorContext);    interface EvictorContext {      long getCurrentProcessingTime();      long getCurrentWatermark();   } }
  evictBefore()和evictAfter()分别在window function计算之前或者之后调用。Iterable迭代器包含了窗口所有的元素,size为窗口中元素的数量,window object和EvictorContext可以访问当前处理时间和水位线。可以对Iterator调用remove()方法来移除窗口中的元素。
  evictor也经常被用在GlobalWindow上,用来清除部分元素,而不是将窗口中的元素全部清空。

来做操吧!深入TypeScript高级类型和类型体操TypeScript给JavaScript扩展了类型的语法,我们可以给变量加上类型,在编译期间会做类型检查,配合编辑器还能做更准确的智能提示。此外,TypeScript还支持了高级中国13月智能手机出货量下降14。1美国调查公司IDC发布的数据显示,中国2022年13月智能手机出货量比上年同期下降14。1,降至7420万部。连续4个季度下滑,自2021年46月以来再次出现2位数降幅。据悉新冠疫微星海皇戟S,游戏电脑中最小,没有更小微星Tridents5M,2。6L超小巧体积的游戏电脑,上最小的台式游戏电脑。扩展所有的可能性大量的接口各种端口允许您轻松部署您的所有设备。低调而不失风格的设计语言融合古典建筑设计国汽智控发布国内首款面向量产的车规级数据安全产品21世纪经济报道记者左茂轩北京报道5月13日,国汽智控发布了面向量产的车规级智能汽车数据安全产品ICVSEC2。5。该版本涵盖数据防护和隐私保护两项核心功能,包括硬件软件容器等三种国产工业软件如何支持工业互联网?美的产品化改造需可推广澎湃新闻记者周玲中国制造业转型升级正在提速,工业软件作为工业互联网的重要支撑是一个突破口。近日,美的集团旗下的工业互联网平台美云智数总裁金江在接受澎湃新闻记者采访时表示,工业软件难在定制开发程序中接入腾讯文档介绍腾讯文档是腾讯于2018年4月推出的一款可多人在线协作的免费文档产品。目前支持在线文档表格收集表PPTPDF思维导图流程图七种文档品类,同时为合作伙伴提供丰富的API接口,只需南开学子在第四届incoPat新科技检索大赛中获佳绩南开新闻网讯(通讯员朱佳寅吴春)日前,第四届incoPat新科技检索大赛未来IP专家高校精英赛正式收官。经过激烈角逐,南开大学4名学子从11542名选手中脱颖而出,分别获得一二三等华为产品管理失败7把对手想的太好了1997年,中国最大的运营商中国移动采用的GSM设备在中国通信市场取得了巨大成功,GSM用户占到国内移动用户总数的42。5,到2000年中国移动用户总数达到3500万,其中新增用户4个月时间,中企进口芯片减少240亿片!外媒已对美芯产生影响随着我国科技的发展,我们在芯片方面的需要也在陆续增加,信息显示,我国每年在半导体领域,仅进口费用就高达3000亿美元,像我们现在所使用的系统手机芯片电脑芯片大部分都来自美国。此外,以求函数运行时间为例解释python3装饰器用法不适用装饰器,求函数运行时间代码importtimedefprintnum(num)foriinrange(2,100num)print(i)deftest()t1time。timQiushiJournalExcerpt集中力量推进关键核心技术攻关,加快实现高水平自立自强(ExcerptsfromPromotingtheHealthyDevelopmentoftheDigitalEconomyinChinabyNationalDevelopment
iPhone14的颜值高于iPhone13,消费者该不该等?在iPhone13正式来临之前,知名爆料人悄悄带来了iPhone14的消息,同时放出了一组iPhone14系列的外观设计渲染图。至于FaceID,目前尚不清楚苹果会采用屏下方案,还谷歌第一次用上12GB内存?Pixel6Pro这款产品的消息很多的小伙伴都是比较的漠然的吧,不少的人都是比较的好奇这款产品的消息的吧,那么我们一起来看看最新的一个信息吧!说不定还有惊喜呢!据悉,Geekben华为鸿蒙电池健康系统,正分批推送上线华为华为鸿蒙华为鸿蒙系统华为鸿蒙系统最近在HarmonyOS2。0。0。168版本,推送电池健康系统,正在分批推送上线,在电池更多设置选项中出现了一项最大容量,系统可评估设备的当前ColorOS12官宣本月16日晚发布,OPPO诸多新品一同亮相今天,ColorOS官方微博宣布,9月16日晚上1800正式发布全新ColorOS12。据目前所了解的信息来看,这次新UI主要在设计美学,系统流畅性,协同交互上有诸多亮点。从官宣海超越时代,九项全能南卡N2s耳机体验现代社会生活不易,芸芸众生皆为生活奔波,不知道有多少人和我一样,每天只有在上下班的路上,才有真正只属于自己的时间。这时候,不论听听音乐看看电视剧,都是非常惬意的事情。感谢ZAKER平时只用流量,有什么便宜的流量套餐吗?移动9元月,45G流量,1300分钟通话。联通18元月,90G流量,1500分钟通话。移动套餐我这个套餐是天津的,今年年初办的,原来用的联通冰淇淋套餐,不过今年感觉流量不够用,才换除了直通车流量,淘宝店铺没有一个自然流量怎么做?十余年电商从业者,分享我的见解经验技巧。我用心写,你用心学!除了直通车流量,淘宝店铺没有一个自然流量?这就是老花为什么一直跟大家讲,不建议盲目开车!你的店铺没有没有数据,即使是用了比较靠谱好用的无限流量卡有哪些,可以是纯流量卡?谢邀!现在已经没有无限流量卡了,从10月份开始,三大运营商的新套餐都是达量不限速套餐,也就是在用完套餐内流量之后,如果继续使用流量则不会再限速,但是会额外收费,通常是1元3GB的价快手电商想要二次生长,底气何在?丨氪金lite从4月开始品牌自播,到8月O粉节超品日活动总裁直播单场GMV超1000万总GMV超8000万,OPPO在快手电商生态的发展速度超出许多人的想象。这4个月,我们在快手从10万粉丝的O三星s22系列配置曝光,看起来散热和拍照都有明显升级?三星s22消息曝光了!这次看起来应该会有新的升级,不过对于这款产品,这次可能有一些升级值得关注!现在曝光的是三星s22plus和ultra!这两款应该都是旗舰产品!当然使用的芯片会放开微信外链,腾讯无法管理?多虑了本报讯(记者田冲)关于互联网互联互通的话题,最近引发了不少关注。从监管动作来看,互联互通势在必行,微信淘宝抖音不能再互相屏蔽外链。据媒体报道称,9月9日,工信部有关业务部门召开了屏