Flink操练(四十一)之周期水位线窗口
关键代码env.getConfig().setAutoWatermarkInterval(60* 1000L);完整代码package day04; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.sql.Timestamp; import java.time.Duration; /** * @program: bigData_learn * @description: 水位线测试 * @author: Mr.逗 * @create: 2021-09-24 09:30 **/ public class WaterTest2 { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 每隔1分钟插入一次水位线 env.getConfig().setAutoWatermarkInterval(60* 1000L); DataStreamSource source = env.socketTextStream("172.17.0.50", 9999); DataStream> map = source.map(new MapFunction>() { @Override public Tuple2 map(String v) throws Exception { return Tuple2.of(v.split(" ")[0], Long.parseLong(v.split(" ")[1]) * 1000L); } }); // 默认每隔200ms的机器时间,插入一次水位线 DataStream> watermarks = map.assignTimestampsAndWatermarks( WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(0)) .withTimestampAssigner(new SerializableTimestampAssigner>() { @Override public long extractTimestamp(Tuple2 v, long l) { return v.f1;// 告诉flink事件时间是哪一个字段 } }) ); DataStream process = watermarks.keyBy(v -> v.f0) //5秒的事件时间滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction, String, String, TimeWindow>() { @Override public void process(String key, Context ctx, Iterable> it, Collector out) throws Exception { long start = ctx.window().getStart(); long end = ctx.window().getEnd(); long count = it.spliterator().getExactSizeIfKnown();//迭代器里面共有多少元素 out.collect("用户:" + key + "在窗口" + "" + new Timestamp(start) + "~" + new Timestamp(end) + "" + "中的pv次数是:" + count); } }); process.print(); String name = WaterTest2.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } }
外网热帖引争议20年老程序员的20条心得,看你认同吗?近日,新闻网站HackerNews上一个帖子突然火爆,引起网友激烈讨论,多天来一直占据热榜第一。该贴内容讲的是,一位有20年软件经验的程序员学到的20件事。程序员JustinEth
华为WATCH3深度评测,这块表确实有点不一样在众多国产智能手表品牌中,华为手表绝对是最经典也是最成功的一个,而在华为WATCH的数字系列GT系列和FIT系列三大系列中,诞生最早的便是华为WATCH数字系列,并且凭借出道即巅峰
三角梅冬季在有暖气房间掉叶子的原因在有暖气的室内养护三角梅要注意远离供暖设备,如果是地暖的话,最好把它放在桌子窗台花架上面,不要让花盆直接接触地面,否则高温炙烤就会造成三角梅脱水落叶。一定要加强通风,否则室内闷热,
红掌和白掌拼盘养?为什么不能拼盘养?红掌的叶片形似爱心,表面蜡质,材质较硬,叶片茎杆细长如同花苞一样生于枝干顶端,层次感非常强。而白掌的叶片则为披针形,相对柔软而狭长,花茎从枝干底端抽出,形态优美。红掌花期不固定,虽
为什么三角梅的叶子一碰就掉?要注意气温变化如果所在地区气温长期低于五度或者忽然低于零度,三角梅的叶子就会脱落,但这两种情况的症状却有所不同。气温长期低于五度时,三角梅是逐渐停止生长,提供给叶子的养分逐日减少,叶子会先慢慢变
杜鹃花的浇水技巧养护杜鹃花的过程中,浇水的时候一定要保证良好的通风,适当的给它见点光照,这样浇上水以后,盆土的水分能够快速地挥发掉一部分,不让土壤处于淤积的状态,就不会出现烂根烂叶的情况。杜鹃花其
大厂面试百度C后台开发面试真题汇总面试公司百度面试岗位C后台开发一面介绍下自己的项目1。TCP四次挥手中timewait作用是什么?去掉这个过程会有哪些后果?2。虚函数的实现机制?3。vector,queue的底层
和电脑一样带风扇散热系统的游戏手机,专为游戏而生。大家好!我是小妹畅聊数码这款努比亚红魔3S游戏手机,有顶级ICE2。1多维散热系统,强悍的CPU专为游戏而生,下面我一起来了解一下。努比亚红魔3搭载6。65英寸AMOLED90Hz
百度Q2财报百度App月活用户达到5。8亿北京时间8月12日,百度(NASDAQBIDU)发布了截至2021年6月30日的第二季度未经审计的财务报告。第二季度,百度实现营收314亿元,归属百度的净利润(非美国通用会计准则)
助力美好生活国美折上折APP用户版今起试运营国美旗下折上折平台又有新动作!8月12日,七夕真爱票亮免单折上折APP用户版试运营暨折上折券放送会在鹏润大厦36层举行。在国家走过百年大变局,站在百年新征程的节点上,新的机遇不断到
你不是在点外卖,你是在点料理包作者小铁锹原创财经小锄头(chutou0325)上班搬砖,下班想瘫,年轻人的一日三餐都靠外卖来解决。黄焖鸡猪脚饭麻辣香锅也能吃碗饭。外卖总有各种满减券,一顿饭20来块吃着也不贵。从