Flink操练(十八)之流处理单词统计
1、pom.xml文件<?xml version="1.0" encoding="UTF-8"?> 4.0.0 org.example Flink_learn 1.0-SNAPSHOT 1.13.0 1.8 2.12 1.7.30 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-cep_${scala.binary.version} ${flink.version} org.apache.flink flink-csv ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.bahir flink-connector-redis_2.11 1.0 mysql mysql-connector-java 8.0.21 org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j 2.14.0 org.apache.maven.plugins maven-assembly-plugin 3.3.0 jar-with-dependencies make-assembly package single org.apache.maven.plugins maven-compiler-plugin 8 2、代码实现package one; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * @program: Flink_learn * @description: 从socket读取数据然后处理 * @author: Mr.逗 * @create: 2021-09-14 15:20 **/ public class Example1 { public static void main(String[] args) { // 获取流处理的运行时环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 env.setParallelism(1); //读取数据源 //先在终端启动 `nc -lk 9999` DataStreamSource source = env.socketTextStream("172.17.0.50", 9999); // map操作 // 这里使用的flatMap方法 // map: 针对流中的每一个元素,输出一个元素 // flatMap:针对流中的每一个元素,输出0个,1个或者多个元素 DataStream mappedStream = source // 输入泛型:String; 输出泛型:WordWithCount .flatMap(new FlatMapFunction() { @Override public void flatMap(String v, Collector out) throws Exception { String[] words = v.split(","); //使用collect方法向下游发送数据 for (String word : words) { out.collect(new WordWithCount(word, 1L)); } } }); //分组shuffle KeyedStream keyedStream = mappedStream.keyBy(new KeySelector() { @Override public String getKey(WordWithCount v) throws Exception { return v.word; } }); // reduce操作 // reduce会维护一个累加器 // 第一条数据到来,作为累加器输出 // 第二条数据到来,和累加器进行聚合操作,然后输出累加器 // 累加器和流中元素的类型是一样的 SingleOutputStreamOperator reduce = keyedStream.reduce(new ReduceFunction() { @Override public WordWithCount reduce(WordWithCount v1, WordWithCount v2) throws Exception { return new WordWithCount(v1.word, v1.count + v2.count); } }); //输出 reduce.print(); String name = Example1.class.getName(); //执行程序 try { env.execute(name); }catch (Exception e) { e.printStackTrace(); } } // POJO类 // 1. 必须是公有类 // 2. 所有字段必须是public // 3. 必须有空构造器 // 模拟了case class public static class WordWithCount { public String word; public Long count; public WordWithCount() { } public WordWithCount(String word, Long count) { this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word="" + word + """ + ", count=" + count + "}"; } } } 3、终端输入nc -lk 9999[xgsUser@fraud50 ~]$ nc -lk 9999 a,b a,c a,d a,a4、结果展示WordWithCount{word="�a", count=1} WordWithCount{word="b", count=1} WordWithCount{word="a", count=1} WordWithCount{word="c", count=1} WordWithCount{word="a", count=2} WordWithCount{word="d", count=1} WordWithCount{word="a", count=3} WordWithCount{word="a", count=4}
全球首发GN2!小米向单反相机发起挑战,这次你看好吗?近年来手机领域似乎对手机拍照的领域发起了冲击,我们可以看出前有华为对手机相机领域发出的冲击,后有小米手机拍照方面的突飞猛进直指手机相机的极限。那么手机相机真的可以同单反进行媲美么?
工具界的火眼金睛!小米有品出毫米级测距仪,杜克博世咋办在精品电商平台界,小米有品一向是口碑不错的平台。这几年,笔者发现小米有品有进军工具界的准备,经常为我们上架一些智能工具,前不久就上架了一个小猴锂电胶枪,最近又力推一款小猴智能激光测
6年浮浮沉沉,小米国际部总裁周受资正式离职小米不知从何时开始,我们的国产手机就渐渐开始走向世界了。而在目前众多国产手机厂商中,在海外市场发展最好的,那无疑还是小米了。但就在昨天,也就是2021年的3月24日,小米国际部总裁周受
iPhone卖上万很正常,小米11超大杯卖8000却被喷?很多人提到小米手机,第一反应就是高性价比,甚至很多人看到小米两个字,就会联想到便宜,因此很多人对小米手机的印象还停留在2011年,1999元的阶段。但是随着科技的发展,小米已经不仅
看完这条,再也不敢这样玩手机在手机越来越强大的今天,有越来越多的人患上了手机依赖症,可那些公认舒服的姿势,其实会对人体各部位产生不同影响。1低头姿势网上流传有20岁的小姑娘小伙,有着50岁的腰60岁的颈椎,这
飒!vivo宁夏x北京汽车,带着X50穿越腾格如果你也喜欢冒险如果你也喜欢热血那么就跟我们一起来吧!跟着vivo宁夏跟着北京汽车一起穿越腾格里沙漠吧!这里有广阔浩瀚一望无际的沙漠也有冒险热血滚烫的梦想!6月20日,天气晴朗一大
整晚充电和没电了再充,哪个更伤手机?很多朋友都喜欢在睡前玩手机,睡觉时顺手把手机插上充电器,充上一整晚的电,第二天醒来再拔掉充电器。那么问题来了手机充电一夜,不拔插头会影响寿命吗?手机充一整夜,会伤到电池吗?咱们vi
手机自动扣费?三招教你关闭前几天,小宁君翻翻自己自动续费的扣款记录,突然发现我去年订视频网站VIP包月,费用一直在扣。刚开始都是首月只要几元,看着好便宜就开通了,结果自动续费就开始了,都忘记使用这个视频ap
跟极限挑战一起打卡vivo全球总部如果昨天晚上,你有准时收看火爆的国民综艺极限挑战你就会发现,这一整期节目都在vivo全球总部录制请注意,这次真的不是演习极限男团,真的来vivo啦!现在,让我们一起来康康他们在vi
4个习惯手机坏得快,别再做了最近身边一位朋友换了部vivoS7手机本来好手机值得换,这没什么但细想发现短短2年朋友已经换了4台手机如此频繁换机是人性的扭曲,还是他家里有矿?事实是新手机在他手上都活不过半年据观
让照片动起来,AI抠图居然可以这样玩?我们都知道vivo手机相册有一个非常好用的AI抠图功能,现在经过不断的升级,这个功能现在可以实现不少操作,今天就让我们一起来看看吧!首先我们要知道如何进入AI抠图功能,在相册中打开