Flink操练(三十九)之水位线窗口
主要代码// 默认每隔200ms的机器时间,插入一次水位线 .assignTimestampsAndWatermarks( // 最大延迟时间设置为5秒 WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; // 告诉flink事件时间是哪一个字段 } }) ) .keyBy(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒的事件时间滚动窗口 .process(new ProcessWindowFunction, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable> elements, Collector out) throws Exception { long windowStart = context.window().getStart(); long windowEnd = context.window().getEnd(); long count = elements.spliterator().getExactSizeIfKnown(); // 迭代器里面共多少条元素 out.collect("用户:" + key + " 在窗口" + "" + new Timestamp(windowStart) + "~" + new Timestamp(windowEnd) + "" + "中的pv次数是:" + count); } }) .print();完整代码package day04; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.sql.Timestamp; import java.time.Duration; /** * @program: bigData_learn * @description: 水位线测试 * @author: Mr.逗 * @create: 2021-09-23 18:01 **/ public class WaterTest { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // `a 1` DataStreamSource source = env.socketTextStream("172.17.0.50", 9999); // (a, 1000L) DataStreamSink map = source.map(new MapFunction>() { @Override public Tuple2 map(String value) throws Exception { String[] arr = value.split(" "); return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L); } }) // 默认每隔200ms的机器时间,插入一次水位线 .assignTimestampsAndWatermarks( // 最大延迟时间设置为5秒 WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple2 element, long recordTimestamp) { return element.f1; // 告诉flink事件时间是哪一个字段 } }) ) .keyBy(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒的事件时间滚动窗口 .process(new ProcessWindowFunction, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable> elements, Collector out) throws Exception { long windowStart = context.window().getStart(); long windowEnd = context.window().getEnd(); long count = elements.spliterator().getExactSizeIfKnown(); // 迭代器里面共多少条元素 out.collect("用户:" + key + " 在窗口" + "" + new Timestamp(windowStart) + "~" + new Timestamp(windowEnd) + "" + "中的pv次数是:" + count); } }) .print(); String name = WaterTest.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } }
卧室太小?6个小技巧,小卧室秒变大卧室,宽敞明亮又整洁卧室太小怎么办?有人说做榻榻米!这确实是个不错的办法,但不是所有人都喜欢榻榻米,比如说小居!小居一直觉得榻榻米非常笨重,毫无美感可言,再加上两边的柜子,得了,原本就不敞亮的卧室变得
总是辗转难入眠?卧室家居这样选,你也能够好梦一整晚总是辗转难入眠?卧室家居这样选,你也能够好梦一整晚睡觉看似简单的一件事情,对于部分人来说却是奢望。每每到本该进入梦乡的夜深人静的夜晚,却因为各种各样的原因导致自己在床上辗转反侧无法
稻盛和夫的个人顿悟,一生干出三家世界500强企业如果我问你,做企业最重要的事是什么,你会怎么回答?你可能回答,产品很关键,人才很重要,企业战略也必不可少不同的企业经营实践,必然会产生不同的想法。日本经营之圣稻盛和夫先生曾在一套书
客服沟通技巧客服沟通技巧使用不同的钩,搞定不同类型的客户淘宝客服沟通技巧你掌握了多少,很多客服抱怨现在的客户可真不好对付,那是因为你没有找准客户特性,没有投其所好,今天小编给你带来了客服沟通技
物流不是送快递,了解什么是物流成本物流成本认识物流成本是指物流活动中所消耗的物化劳动和活劳动的货币价值(中国术语)企业在计划实施控制内部和外部物流活动过程中所发生的费用,包括企业在采购运输仓储物料和库存管理订单管理
安心服务,让您更放心安心服务,让您更放心随着5G时代的来临,5G的超低延迟和近实时响应,将大幅提升游戏与互联家庭设备的互动视频通话和视频流等场景的用户体验,客户服务成为贯穿整个网络和移动终端应用的基础
第四届绽放杯倒计时10天!用创新讲好浙江故事,绽放无限精彩5G应用征集大赛浙江分赛火热报名中2021年是十四五开局之年。作为5G发展先行省,浙江省始终坚持创新型省份建设一张蓝图绘到底。自5G试商用以来,浙江省5G场景应用项目如春笋般涌现,
解读5G时代从1G到4G,中国开启了落后追赶持平的励志之旅到了5G时代,中国通讯产业终于迎来了真正意义上的爆发。通信业一直有一个说法4G改变生活,5G改变社会。5G不仅大幅提升网络速度和质量,
职场人挫折需要挫折冲突是人生不可逾越的企盼和境遇。就每单个个体而言,其需要层次挫折冲突产生的心理震荡又往往是与其文化素养社交层次行为能力心理修养密不可分的。在社会主义市场经济与现代化建设不断
职场人需要及其机制(心理调整)需要挫折冲突是人生不可逾越的企盼和境遇。就每单个个体而言,其需要层次挫折冲突产生的心理震荡又往往是与其文化素养社交层次行为能力心理修养密不可分的。在社会主义市场经济与现代化建设不断
毕业生初入职场五忌毕业生初入职场五忌当你扛着被子,拎着饭盒走出校门时,纷繁的社会迎面而来。当你衣着笔挺,半心憧憬半心忐忑地迈入公司大门时,复杂的情形也许让你穷于应对。如何去适应你的工作?如何去融入你