范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Flink操练(三十九)之水位线窗口

  主要代码// 默认每隔200ms的机器时间,插入一次水位线                 .assignTimestampsAndWatermarks(                         // 最大延迟时间设置为5秒                         WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5))                                 .withTimestampAssigner(new SerializableTimestampAssigner>() {                                     @Override                                     public long extractTimestamp(Tuple2 element, long recordTimestamp) {                                         return element.f1; // 告诉flink事件时间是哪一个字段                                     }                                 })                 )                 .keyBy(r -> r.f0)                 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒的事件时间滚动窗口                 .process(new ProcessWindowFunction, String, String, TimeWindow>() {                     @Override                     public void process(String key, Context context, Iterable> elements, Collector out) throws Exception {                         long windowStart = context.window().getStart();                         long windowEnd   = context.window().getEnd();                         long count       = elements.spliterator().getExactSizeIfKnown(); // 迭代器里面共多少条元素                         out.collect("用户:" + key + " 在窗口" +                                 "" + new Timestamp(windowStart) + "~" + new Timestamp(windowEnd) + "" +                                 "中的pv次数是:" + count);                     }                 })                 .print();完整代码package day04;  import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.sql.Timestamp; import java.time.Duration;  /**  * @program: bigData_learn  * @description: 水位线测试  * @author: Mr.逗  * @create: 2021-09-23 18:01  **/ public class WaterTest {     public static void main(String[] args) {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         // `a 1`         DataStreamSource source = env.socketTextStream("172.17.0.50", 9999);         // (a, 1000L)         DataStreamSink map = source.map(new MapFunction>() {             @Override             public Tuple2 map(String value) throws Exception {                 String[] arr = value.split(" ");                 return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);             }         })                 // 默认每隔200ms的机器时间,插入一次水位线                 .assignTimestampsAndWatermarks(                         // 最大延迟时间设置为5秒                         WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(5))                                 .withTimestampAssigner(new SerializableTimestampAssigner>() {                                     @Override                                     public long extractTimestamp(Tuple2 element, long recordTimestamp) {                                         return element.f1; // 告诉flink事件时间是哪一个字段                                     }                                 })                 )                 .keyBy(r -> r.f0)                 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒的事件时间滚动窗口                 .process(new ProcessWindowFunction, String, String, TimeWindow>() {                     @Override                     public void process(String key, Context context, Iterable> elements, Collector out) throws Exception {                         long windowStart = context.window().getStart();                         long windowEnd   = context.window().getEnd();                         long count       = elements.spliterator().getExactSizeIfKnown(); // 迭代器里面共多少条元素                         out.collect("用户:" + key + " 在窗口" +                                 "" + new Timestamp(windowStart) + "~" + new Timestamp(windowEnd) + "" +                                 "中的pv次数是:" + count);                     }                 })                 .print();         String name = WaterTest.class.getName();         try {             env.execute(name);         }catch (Exception e)         {             e.printStackTrace();         }     } }

iphone6sp用了6年了,现在特别卡,有什么好的办法吗?iPhone6sp卡的原因,一般是一电池容量下降,若下降到百分之八十,手机cpu会自动降频,必然会卡。你的手机用了6年了,估计电池容量应该较低二软件装的过多,内存占的过满,手机运行为什么微信在国内这么火,国外却没人愿意用?这个问题问得有点不好回答了,不好回答的原因就是国外没人愿意用的原因!微信有追蹤功能和内容检查,洋人吓都吓死了,那敢用,国外把隐私权看的极重啊。国内火的为什么国外就得火呢?是不是你觉电脑不能升级w11,提示未检测到TPM,该怎么办?绕过TPM2。0装Win11最简单的方法10月5日微软正式发布了Win11,很多朋友也想尝鲜安装win11,相比Windows10,Windows11的安装限制较多。1硬盘必须为GACS辐射空调系统,加分选择如何选择辐射空调系统品牌?如该品牌拥有以下几点是为系统加分的,可以列入候选名单。系统加分题01技术层面在选择品牌时,一定全面结合其安装施工设计技术方面,因辐射空调系统对厂家有较高的最新007电影,邦德用的手机品牌为什么换成诺基亚不换成苹果?作为谍战片的经典007系列一直深受许多特工迷的喜爱,近日最新007电影无暇赴死已上线各大影院,那么这次詹姆斯邦德又会遇到什么样的任务和危机,又会使用什么样的装备化险为夷?这些都蛮让小米MIX4都准备出来了!为何我还会花3000多买一台小米10Pro呢?因为今年骁龙888普遍被评价发热严重,我就没考虑过要买搭载骁龙888的旗舰机,而小米10Pro的骁龙865就成为了我目前最好的备用机选择2021年Q2的二手国产手机销量第一名是小米新能源汽车炒作口号努力向美好事物靠拢,让中国重汽超跌反转中国重汽是我国最大的商用车制造企业之一,是新能源汽车概念股。中国重汽依托雄厚的技术优势洞察先机的前瞻把握多品类的新能源技术路线等优势,在新能源汽车领域,尤其是氢燃料汽车领域呈现出良OPPO哪款手机性价比高?目前OPPO一共有5个系列,分别是X系列,Reno系列,ACE系列,K系列和A系列,这些系列产品里面如果可以与性价比挂点钩的也就ACE系列了,不过这个系列今年还没有更新,属于就不考为什么苹果始终不做全面屏?是技术问题吗?苹果手机从iPhoneX发展到现在的iPhone12,手机正面一直保持着刘海设计的屏幕外观。它之所以到现在没有改成前卫的全面屏,并不是因为技术方面的限制,而是真的感觉没必要。一。什5g酵母怎么量?5g酵母怎么量?相信付出爱是让自己幸福的最好方式,深深地相信,美食是抚慰人心的不二法门,更多美食问答,请关注觅源良食,让我们给您答疑解惑!做面食,比如包子馒头花卷,都是需要用到酵母怎么说服父母让自己购买二手好用的产品,例如iPhone?大家好,我是玩机少年,如果有购机烦恼可以和我一起分享讨论哦。耐用,性价比,是父母比较看重的方面你可以和你父母说,跳广场舞的大妈都在用这个手机。1。耐用。你父母肯定不希望你经常换手机
卧室太小?6个小技巧,小卧室秒变大卧室,宽敞明亮又整洁卧室太小怎么办?有人说做榻榻米!这确实是个不错的办法,但不是所有人都喜欢榻榻米,比如说小居!小居一直觉得榻榻米非常笨重,毫无美感可言,再加上两边的柜子,得了,原本就不敞亮的卧室变得总是辗转难入眠?卧室家居这样选,你也能够好梦一整晚总是辗转难入眠?卧室家居这样选,你也能够好梦一整晚睡觉看似简单的一件事情,对于部分人来说却是奢望。每每到本该进入梦乡的夜深人静的夜晚,却因为各种各样的原因导致自己在床上辗转反侧无法稻盛和夫的个人顿悟,一生干出三家世界500强企业如果我问你,做企业最重要的事是什么,你会怎么回答?你可能回答,产品很关键,人才很重要,企业战略也必不可少不同的企业经营实践,必然会产生不同的想法。日本经营之圣稻盛和夫先生曾在一套书客服沟通技巧客服沟通技巧使用不同的钩,搞定不同类型的客户淘宝客服沟通技巧你掌握了多少,很多客服抱怨现在的客户可真不好对付,那是因为你没有找准客户特性,没有投其所好,今天小编给你带来了客服沟通技物流不是送快递,了解什么是物流成本物流成本认识物流成本是指物流活动中所消耗的物化劳动和活劳动的货币价值(中国术语)企业在计划实施控制内部和外部物流活动过程中所发生的费用,包括企业在采购运输仓储物料和库存管理订单管理安心服务,让您更放心安心服务,让您更放心随着5G时代的来临,5G的超低延迟和近实时响应,将大幅提升游戏与互联家庭设备的互动视频通话和视频流等场景的用户体验,客户服务成为贯穿整个网络和移动终端应用的基础第四届绽放杯倒计时10天!用创新讲好浙江故事,绽放无限精彩5G应用征集大赛浙江分赛火热报名中2021年是十四五开局之年。作为5G发展先行省,浙江省始终坚持创新型省份建设一张蓝图绘到底。自5G试商用以来,浙江省5G场景应用项目如春笋般涌现,解读5G时代从1G到4G,中国开启了落后追赶持平的励志之旅到了5G时代,中国通讯产业终于迎来了真正意义上的爆发。通信业一直有一个说法4G改变生活,5G改变社会。5G不仅大幅提升网络速度和质量,职场人挫折需要挫折冲突是人生不可逾越的企盼和境遇。就每单个个体而言,其需要层次挫折冲突产生的心理震荡又往往是与其文化素养社交层次行为能力心理修养密不可分的。在社会主义市场经济与现代化建设不断职场人需要及其机制(心理调整)需要挫折冲突是人生不可逾越的企盼和境遇。就每单个个体而言,其需要层次挫折冲突产生的心理震荡又往往是与其文化素养社交层次行为能力心理修养密不可分的。在社会主义市场经济与现代化建设不断毕业生初入职场五忌毕业生初入职场五忌当你扛着被子,拎着饭盒走出校门时,纷繁的社会迎面而来。当你衣着笔挺,半心憧憬半心忐忑地迈入公司大门时,复杂的情形也许让你穷于应对。如何去适应你的工作?如何去融入你