Flink操练(五)之DS简介(5)编写第一个Flink程序
Scala版Flink程序编写package learn import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time /** * @program: bigdata_learn * @description: ${description} * @author: Mr.逗 * @create: 2021-09-08 10:04 **/ object StreamingJob { def main(args: Array[String]): Unit = { //获取socket端口号 val port:Int = try{ ParameterTool.fromArgs(args).getInt("port") }catch { case e: Exception=>{ System.err.println("No port set. Use default port 9000!") } 9000 } //获取运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //链接socket获取输入数据 val text = env.socketTextStream("172.17.0.50",9000," ") import org.apache.flink.api.scala._ //解析数据(数据扁平化),分组,窗口计算,并且聚合求sum val wordWithCount = text.flatMap(line => line.split(",")).map(w =>WordWithCount(w,1)) .keyBy("word")//分组 .timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,间隔时间 //.sum("count");//sum 或者reduce 都行 .reduce((a,b)=>WordWithCount(a.word,a.count+b.count)); wordWithCount.print().setParallelism(1); //打印到控制台 env.execute("Socket Window Count!") } case class WordWithCount(word:String,count:Long) }
打开一个终端(Terminal),运行以下命令 $ nc -lk 9999
接下来使用 IDEA 运行就可以了。Java版Flink程序编写import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; public class WordCountFromSocket { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream stream = env.socketTextStream("localhost", 9999); stream.flatMap(new Tokenizer()).keyBy(r -> r.f0).sum(1).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class Tokenizer implements FlatMapFunction> { @Override public void flatMap(String value, Collector> out) throws Exception { String[] stringList = value.split("s"); for (String s : stringList) { // 使用out.collect方法向下游发送数据 out.collect(new Tuple2(s, 1)); } } } }2 下载Flink运行时环境,提交Jar包的运行方式
下载链接:http://mirror.bit.edu.cn/apache/flink/flink-1.11.1/flink-1.11.1-bin-scala_2.11.tgz
然后解压 $ tar xvfz flink-1.11.1-bin-scala_2.11.tgz
启动Flink集群 $ cd flink-1.11.1 $ ./bin/start-cluster.sh
可以打开Flink WebUI查看集群状态:http://localhost:8081
在 IDEA 中使用maven package 打包。
提交打包好的 JAR 包$ cd flink-1.11.1 $ ./bin/flink run 打包好的JAR包的绝对路径
停止Flink集群 $ ./bin/stop-cluster.sh
查看标准输出日志的位置,在 log 文件夹中。$ cd flink-1.11.1/log
风冷无霜还能有智能体验来天猫看看云米iLive冰箱436L吧一到夏日,肯定有不少小伙伴都在为家里冰箱结霜而发愁,但冰箱内壁结满冰霜后,不仅影响冰箱的制冷能力保鲜能力,冰箱更加耗电的同时也侵占了内部的储物空间。虽说如今不少高端的风冷冰箱已经没
罗永浩彪悍的人生不需要解释从年薪百万到负债6亿从锤子科技创始人到抖音直播带货一哥,罗永浩的职业道路充满着传奇色彩。当初的锤子倒闭罗永浩负债欠债6亿的新闻登上了热搜,本可以按照破产清算不用承担债务,但是罗永浩
昨夜今晨元宇宙商标申请已超1。6万件快手签下欧冠联赛版权2022年2月22日驱动中国昨夜今晨国家新闻出版总署正常接收游戏公司的版号申请搜狐网报道,昨日下午,有媒体援引一位游戏公司负责人的信息称,今年国内不发版号才是游戏行业的最大利空。受
元宇宙看似从天而降其实蓄谋已久最近有一个概念特别火,就是元宇宙。特别是在Facebook改名为Meta后,平地一声雷,又掀起了一波浪潮。今天,我也特别高兴跟大家一起探讨这个新课题新趋势。除了元宇宙,前段时间还有
面试了一个博士生,这些面试题都不会?1HashMap和HashTable区别?HashMap线程不安全,效率高。HashTable线程安全,效率低。2InnoDB与MyISAM的区别?1InnoDB支持事务,MyIS
iPhone15Pro将搭载苹果自研5G基带芯片,量产台积电4nm工艺据MacRumors报道,根据DigiTimes的一份新报告,苹果公司正在与新的供应商进行初步谈判,以获得其用于iPhone手机的首款内置5G调制解调器芯片的后端订单。据报道,苹果
苹果新机最新消息,屏下指纹没了,折叠屏有一次延期这段时间各个手机品牌的消息都比较多,对于苹果来说,大家最期待的肯定是新的iPhone14系列以及苹果的折叠屏手机。这两款产品最近都有了新的爆料,而且感觉还挺靠谱,今天小编就给大家一
Z世代文化画像Z世代一词最早流行于西方,用以指称出生于19952009年的一代年轻人,又称网络世代互联网世代。尽管Z世代们年纪尚轻,但相比于其父兄一辈,他们在社交网络中更为活跃,群体身份特征也更
退场前大跳水,骁龙888P屏下相机120W快充,512G旗舰跌至3839元自从骁龙8旗舰不断登场后,很多骁龙888和骁龙888Plus都出现大幅度降价,基本都是降价千元之上了。尤其是春节过后,个别机型更是出现大跳水了,比如图中这款小米MIX4就是了,熟悉
聊聊NFT的版税据美国消费者新闻与商业频道CNBC报道,NFT市场OpenSea估值已达130亿美元OpenSea首席执行官DevinFinzer周六晚些时候表示,OpenSea正在调查一起黑客攻
使用nvm管理node。js版本简介在实际的前端开发过程中,可能会经常遇见node。js的版本问题,不同的项目需要使用不同的node。js版本。直接安装的话,只能安装和使用node。js的一个版本。可以使用nvm