Flink操练(二十)之并行度使用讲解
1、代码实现逻辑package one; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @program: Flink_learn * @description: 并行度的设置 * 针对每个算子设置的并行度的优先级高于全局并行度 * 本程序需要两个任务插槽 * @author: Mr.逗 * @create: 2021-09-14 15:40 **/ public class Example3 { public static void main(String[] args) { // 获取流处理的运行时环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行任务的数量为1 // 需要1个任务插槽 env.setParallelism(1); //读取数据源 // 并行度设置为1 DataStreamSource stream = env.fromElements("hello world", "hello world").setParallelism(1); // map操作 // 这里使用的flatMap方法 // map: 针对流中的每一个元素,输出一个元素 // flatMap:针对流中的每一个元素,输出0个,1个或者多个元素 // 并行度设置为2 SingleOutputStreamOperator mappedStream = stream // 输入泛型:String; 输出泛型:WordWithCount .flatMap(new FlatMapFunction() { @Override public void flatMap(String v, Collector out) throws Exception { String[] words = v.split(" "); for (String w : words) { // 使用collect方法向下游发送数据 out.collect(new WordWithCount(w, 1L)); } } }).setParallelism(2); //分组shuffle // 第一个泛型:流中元素的泛型 // 第二个泛型:key的泛型 KeyedStream keyedStream = mappedStream.keyBy(new KeySelector() { @Override public String getKey(WordWithCount v) throws Exception { return v.word; } }); // reduce操作 // reduce会维护一个累加器 // 第一条数据到来,作为累加器输出 // 第二条数据到来,和累加器进行聚合操作,然后输出累加器 // 累加器和流中元素的类型是一样的 SingleOutputStreamOperator reduce = keyedStream.reduce(new ReduceFunction() { @Override public WordWithCount reduce(WordWithCount v1, WordWithCount v2) throws Exception { return new WordWithCount(v1.word, v1.count + v2.count); } }); //输出 reduce.print(); String name = Example3.class.getName(); try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } // POJO类 // 1. 必须是公有类 // 2. 所有字段必须是public // 3. 必须有空构造器 // 模拟了case class public static class WordWithCount { public String word; public Long count; public WordWithCount() { } public WordWithCount(String word, Long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word="" + word + """ + ", count=" + count + "}"; } } } 2、结果展示WordWithCount{word="hello", count=1} WordWithCount{word="world", count=1} WordWithCount{word="hello", count=2} WordWithCount{word="world", count=2}
航天女英雄王亚平!被隔离14天突如其来的新冠肺炎疫情,打乱了我们原本的生活节奏,从疫情爆发两年多来,我们一直在与他做抗争,也探索出了很多防治方法,其中隔离14天就是最重要的,最关键的一种手段。那么,对于地面上的
为何国产机越来越贵了?一台华为手机价格直逼八千,到底值不值?这种带节奏的话题,平台就不应该提出来!国产手机好几个品牌,为什么单独点名华为?难道苹果就应该高高在上,国产手机永远中低端?只要是物有所值,只要是有人认可,就不是问题!你不能想要高端
你还记得没有手机的日子里你是怎么过的吗?没有手机的时候,那时候我还小,每天都出去玩儿了。上小河里儿捉鱼,上山上采野菜,跳皮筋儿,满院子里跑,每天都很开心。你好!很高兴我回答说还记不记得没有手机的日子?当然记得,没有手机的
华为余承东官宣华为手机供应极大改善,能买到了4月26日消息,今日在社交平台上,华为终端官方发布了4月28日19点华为折叠旗舰及全场景新品发布会的预热内容在预热内容上,华为常务董事终端BGCEO智能汽车解决方案BUCEO余承东
余承东供应链恢复,华为手机能买到了,MateXs2轻薄耐摔华为定档28号召开新品发布会,正式带来新一代MateXs2折叠屏手机。在官方账号预热视频中,余承东透露供应链得到改善,未来想买华为手机就能买到,MateXs2是一款划时代意义的大屏
华为鸿蒙系统最全关闭广告方法,大幅提升手机使用体验感因为本人用的就是鸿蒙系统,所以详细分享一下关闭系统广告。1进入设置华为账号隐私中心订阅信息关闭2文件管理点击我的点击设置个性化广告关闭3进入设置智慧助手点击右上角四个点个性化设置关
五一前想买手机,不妨看看这四款,最低一款仅1278元马上就五一假期了,要换手机的同学不妨看看这四款手机,性能体验都属于同价位顶尖水平,覆盖低中高三档,最低一款仅1278元。第一款iPhone134899起虽然官网价格依然5999元,
拍照靓丽,游戏党最爱,vivoX80经得起你的使用vivoX80预售即将开始,4月29日0点各大电商开启。这一次vivoX80带来了多个系列可选,满足不同人群对于存储的需求,从8GB128GB版8GB256GB版12GB256GB
一加Ace游戏手机机型一加Ace操作系统coloros12。1(安卓12)处理器天玑8100max独立游戏芯片制程工艺5nmCPU主频A7842。8GHzA5542GHzGPUMaliG610内存L
自然论文目前跳最高的机器人能跳30米超过自身高度百倍来源中国新闻网中新网北京4月28日电(记者孙自法)国际著名学术期刊自然最新发表一篇工程学论文,研究人员报告了一个机器人能够跳跃30米高,超过自身高度的100倍,它超越了此前报告的设
你是否了解你的大脑(上篇)科学家们一个接一个的科研成果让我们对记忆有了越来越多的了解,但直到今天,科学家所发现的所谓大脑的秘密也仅仅只是冰山一角,在很大程度上,大脑和记忆仍是神秘的。记忆是一个过程,并且当你