Flink操练(三十四)之自定义键控状态(四)ReducingState
一、ReducingState的方法
ReducingState是和ReduceFunction配合使用
get() 获取状态的值
add(IN value)方法添加一个元素,触发reduceFunction计算一次
二、ReducingState的描述器
ReducingState的描述器和之前ValueState、ListState不同,它得和一个ReduceFunction配合使用。三、统计单词
1、KeyedProcessFunction处理类package test; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** * @Description: 求和 * @Param: * @return: * @Author: Mr.逗 * @Date: 2021/9/9 */ public class CountSumWithReduceState extends RichFlatMapFunction, Tuple2> { private ReducingState reducingState; /***状态初始化*/ @Override public void open(Configuration parameters) throws Exception { ReducingStateDescriptor descriptor = new ReducingStateDescriptor("ReducingDescriptor", new ReduceFunction() { @Override public Long reduce(Long v1, Long v2) throws Exception { return v1 + v2; } },Long.class); reducingState = getRuntimeContext().getReducingState(descriptor); } @Override public void flatMap(Tuple2 element, Collector> collector) throws Exception { //将状态放入 reducingState.add(element.f1); collector.collect(Tuple2.of(element.f0,reducingState.get())); } }
2、主体类package test; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @program: bigdata_learn * @description: 测试reduceState * @author: Mr.逗 * @create: 2021-09-08 17:43 **/ public class TestKeyedReduceStateMain { public static void main(String[] args) throws Exception{ //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(16); //获取数据源 DataStreamSource> dataStreamSource = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 5L), Tuple2.of(2L, 2L), Tuple2.of(2L, 6L)); // 输出: //(1,5.0) //(2,4.0) dataStreamSource .keyBy(0) .flatMap(new CountSumWithReduceState()) .print(); String name = TestKeyedReduceStateMain.class.getName(); env.execute(name); } }
3、结果展示
四、计算最高温度
1、处理类package test; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** * @Description: 求最大值 * @Param: * @return: * @Author: Mr.逗 * @Date: 2021/9/9 */ public class CountMaxWithReduceState extends RichFlatMapFunction, Tuple2> { private ReducingState reducingState; /***状态初始化*/ @Override public void open(Configuration parameters) throws Exception { ReducingStateDescriptor descriptor = new ReducingStateDescriptor("ReducingDescriptor", new ReduceFunction() { @Override public Long reduce(Long v1, Long v2) throws Exception { return v1>=v2?v1:v2; } },Long.class); reducingState = getRuntimeContext().getReducingState(descriptor); } @Override public void flatMap(Tuple2 element, Collector> collector) throws Exception { //将状态放入 reducingState.add(element.f1); collector.collect(Tuple2.of(element.f0,reducingState.get())); } }
2、主体类package test; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @program: bigdata_learn * @description: 测试reduceState * @author: Mr.逗 * @create: 2021-09-08 17:43 **/ public class TestKeyedReduceStateMain { public static void main(String[] args) throws Exception{ //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(16); //获取数据源 DataStreamSource> dataStreamSource = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 5L), Tuple2.of(2L, 2L), Tuple2.of(2L, 6L)); // 输出: //(1,5.0) //(2,4.0) dataStreamSource .keyBy(0) .flatMap(new CountMaxWithReduceState()) .print(); String name = TestKeyedReduceStateMain.class.getName(); env.execute(name); } }
3、结果展示
硕美科MX503SP透明版无线蓝牙耳机测评爆红不止是因为颜值在这个颜值即正义的时代里边,外观设计早已和品质性能一样成为消费者选购时着重考虑的一大因素。这不,硕美科前些阵子发布的MX503SP透明版无线蓝牙耳机就因清凉感十足的外表而吸引了众多
百元耳机只能听个响?瓷音未来Mars耳机给你不一样的体验随着手机制造商们纷纷取消3。5mm耳机孔,蓝牙耳机的发展是未来的一大趋势,越来越多的厂商推出了自己的蓝牙耳机。那么优秀的蓝牙耳机不仅能带来属于音质上的享受,还要有降噪的功效,能找到
雷蛇魔音海妖迷你版麦克风测评小巧,好音质如果你是一名游戏老玩家,想必你已经拥有一套炫酷的键盘等外设,不过目前很多游戏耳机附带麦克风的实际游戏语言效果并不太好,尤其对于资深游戏玩家或者主播而言,一款独立麦克风或许更能符合他
人工智能,让一切照片都魔性地动起来近期,2006年火遍中国的神曲蚂蚁呀嘿再次火了一把。只需下载一个软件,上传一张图片,就可以让里面的人动起来,扭着脖子,翻着白眼儿,魔性地齐唱蚂蚁呀嘿。在好奇心的驱使下,蜉蝣君也玩了
取代锂电池?钠离子电池商业化进度中国进展可喜编者按最近很多朋友问起中国钠离子电池的进展。本文仅针对中国领先的的钠离子研发初创中科海钠HiNa,及英国初创企业Faradion的进展,介绍钠离子电池的商业化历史及进展。2021年
亚马逊ERP铺货采集货代软件定制贴牌独立部署无限授权店铺亚马逊FBM软件快速铺货ERP软件批量上传,订单同步发货!ERP介绍具备精准利润分析,产品表现,FBA进销存,广告管理,关键词跟踪,跟卖监控等功能。亚马逊FBM软件快速铺货ERP软
虾皮ERP仓储打包软件,独立部署,独立贴牌如果没有ERP系统的,一个人的跨境电商,那么你的所有时间都要被虾皮铺货以及货代仓储处理订单的时间填满,从选品采购运营优化客服所有的事情都要一个人包办,包括回复邮件订单处理产品发货等
英特尔独立显卡XeHPG发布在即关注点有哪些呢?英特尔今天神秘兮兮地发布了一段二进制码作为猜谜游戏,这么简单的游戏很快就被人破解,得出的是一段IP地址35。160。237。208,指向的是域名是xehpg。intel。com,原
抖音独立电商软件10月上线?无奈之举去年,抖音电商部门实现了超过5000亿的GMV,是2020全年目标的3倍。这样傲人的成绩,让字节跳动愈发重视抖音的商业价值。小店支付,物流一系列的业务线布局,使得抖音整个电商闭环场
长大后你想当什么?小孩小编从网上看到了以往的旧照片那个时候,笑容真的很灿烂其实儿童节是给大人过的小孩子那么开心,每天都是儿童节看看以前的老照片想要拍娃的用户们知道你们想要的还是参数所以先放参数快门125
大山大河,为什么除了广角,还要带一支长焦今天,摄影师朱皓宇为我们分享了他的镜头选择策略和使用体验,让我们跟随他的镜头,看看哪些镜头更适合你吧。朱皓宇四川省摄影家协会会员四川省青年摄影家协会会员视觉中国签约摄影师四月时,四