我的第一个Flink程序
写在最前面Flink是支持Java和Scala两种语言,因为在日常的开发中Java语言用的比较多,所以我的这些例子都是整理学习的Java版本,但是Scala语言有自己独特的优势,各位看客可以自己根据自己的情况选择不同语言。
接下来开始咱们的第一个Flink程序
我用的环境是jdk8maven3。8。3flink1。13(现在的Flink版本到1。16但是需要JDK11)
直接上代码
pom。xml文件propertiesmaven。compiler。source8maven。compiler。sourcemaven。compiler。target8maven。compiler。targetflink。version1。13。0flink。versionjava。version1。8java。versionscala。binary。version2。12scala。binary。versionslf4j。version1。7。30slf4j。versionpropertiesdependencies!引入Flink相关依赖dependencygroupIdorg。apache。flinkgroupIdflinkjavaartifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkstreamingjava{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkclients{scala。binary。version}artifactIdversion{flink。version}versiondependency!引入日志管理相关依赖dependencygroupIdorg。slf4jgroupIdslf4japiartifactIdversion{slf4j。version}versiondependencydependencygroupIdorg。slf4jgroupIdslf4jlog4j12artifactIdversion{slf4j。version}versiondependencydependencygroupIdorg。apache。logging。log4jgroupIdlog4jtoslf4jartifactIdversion2。14。0versiondependencydependencies
文件words。txthesjavassshelloflinkasshelloflinkass1hellobigdata
文件BatchWordCount。javaimportorg。apache。flink。api。common。typeinfo。Types;importorg。apache。flink。api。java。ExecutionEnvironment;importorg。apache。flink。api。java。operators。AggregateOperator;importorg。apache。flink。api。java。operators。DataSource;importorg。apache。flink。api。java。operators。FlatMapOperator;importorg。apache。flink。api。java。operators。UnsortedGrouping;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。util。Collector;authorsongshimingdate20221110descpublicclassBatchWordCount{publicstaticvoidmain(String〔〕args)throwsException{1。创建一个执行环境ExecutionEnvironmentenvExecutionEnvironment。getExecutionEnvironment();2。从文件中读取数据DataSourceStringlineDataSourceenv。readTextFile(inputwords。txt);3。将每行的数据进行分词,转换成二元组类型FlatMapOperatorString,Tuple2String,LongwordAndOneTuplelineDataSource。flatMap((Stringline,CollectorTuple2String,Longout){将一行文本进行分词String〔〕wordsline。split();将每个单词转换成二元组输出for(Stringword:words){out。collect(Tuple2。of(word,1L));}})。returns(Types。TUPLE(Types。STRING,Types。LONG));当Lambda表达式使用Java泛型的时候,由于泛型擦除的存在,需要显示的声明类型信息4。安装word进行分组UnsortedGroupingTuple2String,LongwordAndOneGroupwordAndOneTuple。groupBy(0);5。分组内进行聚合统计AggregateOperatorTuple2String,LongsumwordAndOneGroup。sum(1);6。打印输出sum。print();}
程序运行结果