笔记九Flink常用的sink方法
1。1DataSink数据输出
经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。
1。1。1print打印
打印是最简单的一个Sink,通常是用来做实验和测试时使用。如果想让一个DataStream输出打印的结果,直接可以在该DataStream调用print方法。另外,该方法还有一个重载的方法,可以传入一个字符,指定一个Sink的标识名称,如果有多个打印的Sink,用来区分到底是哪一个Sink的输出。importorg。apache。flink。api。common。functions。FlatMapFunction;importorg。apache。flink。api。common。functions。RuntimeContext;importorg。apache。flink。configuration。Configuration;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。datastream。SingleOutputStreamOperator;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。streaming。api。functions。sink。RichSinkFunction;importorg。apache。flink。util。Collector;publicclassPrintSinkDemo{publicstaticvoidmain(String〔〕args)throwsException{local模式默认的并行度是当前机器的逻辑核的数量ConfigurationconfigurationnewConfiguration();StreamExecutionEnvironmentenvStreamExecutionEnvironment。createLocalEnvironmentWithWebUI(configuration);intparallelism0env。getParallelism();System。out。println(执行环境默认的并行度:parallelism0);DataStreamSourceStringlinesenv。socketTextStream(cs2886,8888);获取DataStream的并行度intparallelismlines。getParallelism();System。out。println(SocketSource的并行度:parallelism);lines。print();lines。addSink(newMyPrintSink())。name(myprintsink);env。execute();}publicstaticclassMyPrintSinkextendsRichSinkFunctionString{privateintindexOfThisSubtask;Overridepublicvoidopen(Configurationparameters)throwsException{RuntimeContextruntimeContextgetRuntimeContext();indexOfThisSubtaskruntimeContext。getIndexOfThisSubtask();}Overridepublicvoidinvoke(Stringvalue,Contextcontext)throwsException{System。out。println(indexOfThisSubtask1value);}}}
下面的结果是WordCount例子中调用printSink输出在控制台的结果,细心的读者会发现,在输出的单词和次数之前,有一个数字前缀,我这里是14,这个数字是该Sink所在subtask的Index1。有的读者运行的结果数字前缀是18,该数字前缀其实是与任务的并行度相关的,由于该任务是以local模式运行,默认的并行度是所在机器可用的逻辑核数即线程数,我的电脑是2核4线程的,所以subtask的Index范围是03,将Index1,显示的数字前缀就是14了。这里在来仔细的观察一下运行的结果发现:相同的单词输出结果的数字前缀一定相同,即经过keyBy之后,相同的单词会被shuffle到同一个subtask中,并且在同一个subtask的同一个组内进行聚合。一个subtask中是可能有零到多个组的,如果是有多个组,每一个组是相互独立的,累加的结果不会相互干扰。
1。1。2writerAsText以文本格式输出
该方法是将数据以文本格式实时的写入到指定的目录中,本质上使用的是TextOutputFormat格式写入的。每输出一个元素,在该内容后面同时追加一个换行符,最终以字符的形式写入到文件中,目录中的文件名称是该Sink所在subtask的Index1。该方法还有一个重载的方法,可以额外指定一个枚举类型的参数writeMode,默认是WriteMode。NOOVERWRITE,如果指定相同输出目录下有相同的名称文件存在,就会出现异常。如果是WriteMode。OVERWRITE,会将以前的文件覆盖。importorg。apache。flink。api。common。functions。RuntimeContext;importorg。apache。flink。configuration。Configuration;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。streaming。api。functions。sink。RichSinkFunction;publicclassWriteSinkDemo{publicstaticvoidmain(String〔〕args)throwsException{local模式默认的并行度是当前机器的逻辑核的数量ConfigurationconfigurationnewConfiguration();StreamExecutionEnvironmentenvStreamExecutionEnvironment。createLocalEnvironmentWithWebUI(configuration);intparallelism0env。getParallelism();System。out。println(执行环境默认的并行度:parallelism0);DataStreamSourceStringlinesenv。socketTextStream(localhost,8888);获取DataStream的并行度intparallelismlines。getParallelism();System。out。println(SocketSource的并行度:parallelism);lines。writeAsText(file:UsersxingDesktopout);env。execute();}publicstaticclassMyPrintSinkextendsRichSinkFunctionString{privateintindexOfThisSubtask;Overridepublicvoidopen(Configurationparameters)throwsException{RuntimeContextruntimeContextgetRuntimeContext();indexOfThisSubtaskruntimeContext。getIndexOfThisSubtask();}Overridepublicvoidinvoke(Stringvalue,Contextcontext)throwsException{System。out。println(indexOfThisSubtask1value);}}}
1。1。3writeAsCsv以csv格式输出
该方法是将数据以csv格式写入到指定的目录中,本质上使用的是CsvOutputFormat格式写入的。每输出一个元素,在该内容后面同时追加一个换行符,最终以csv的形式(类似Excel的格式,字段和字段之间用逗号分隔)写入到文件中,目录中的文件名称是该Sink所在subtask的Index1。需要说明的是,该Sink并不是将数据实时的写入到文件中,而是有一个BufferedOutputStream,默认缓存的大小为4096个字节,只有达到这个大小,才会flush到磁盘。另外程序在正常退出,调用Sink的close方法也会flush到磁盘。DataStreamTuple2String,IntegerresultwordAndOne。keyBy(0)。sum(1);result。writeAsCsv(path);
1。1。4writeUsingOutputFormat以指定的格式输出
该方法是将数据已指定的格式写入到指定目录中,该方法要传入一个OutputFormat接口的实现类,该接口有很多已经实现好了的实现类,并且可以根据需求自己实现,所以该方法更加灵活。writeAsText和writeAsCsv方法底层都是调用了writeUsingOutputFormat方法。DataStreamTuple2String,IntegerresultwordAndOne。keyBy(0)。sum(1);result。writeUsingOutputFormat(newTextOutputFormat(newPath(path));
1。1。5writeToSocket输出到网络端口
该方法是将数据输出到指定的Socket网络地址端口。该方法需要传入三个参数:第一个为ip地址或主机名,第二个为端口号,第三个为数据输出的序列化格式SerializationSchema。输出之前,指定的网络端口服务必须已经启动。DataStreamSourceStringlinesenv。socketTextStream(localhost,8888);lines。writeToSocket(localhost,9999,newSimpleStringSchema());
1。1。6RedisSink
该方法是将数据输出到Redis数据库中,Redis是一个基于内存、性能极高的NoSQL数据库,数据还可以持久化到磁盘,读写速度快,适合存储keyvalue类型的数据。Redis不仅仅支持简单的keyvalue类型的数据,同时还提供list,set,zset,hash等数据结构的存储。Flink实时计算出的结果,需要快速的输出存储起来,要求写入的存储系统的速度要快,这个才不会造成数据积压。Redis就是一个非常不错的选择。
首先在maven项目中的pom。xml中添加RedisSink的依赖。!redis依赖dependencygroupIdorg。apache。bahirgroupIdflinkconnectorredis{scala。binary。version}artifactIdversion1。1SNAPSHOTversiondependency
接下来就是定义一个类(或者静态内部类)实现RedisMapper即可,需要指定一个泛型,这里是Tuple2String,Integer,即写入到Redis中的数据的类型,并实现三个方法。第一个方法是getCommandDescription方法,返回RedisCommandDescription实例,在该构造方法中可以指定写入到Redis的方法类型为HSET,和Redis的additionalKey即value为HASH类型外面key的值;第二个方法getKeyFromData是指定value为HASH类型对应key的值;第三个方法geVauleFromData是指定value为HASH类型对应value的值。
在使用之前,先newFlinkJedisPoolConfig,设置Redis的ip地址或主机名、端口号、密码等。然后newRedisSink将准备好的conf和RedisWordCountMapper实例传入到其构造方法中,最后调用DataStream的addSink方法,将new好的RedisSink作为参数传入。importorg。apache。flink。api。common。functions。FlatMapFunction;importorg。apache。flink。api。java。functions。KeySelector;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。datastream。KeyedStream;importorg。apache。flink。streaming。api。datastream。SingleOutputStreamOperator;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。streaming。connectors。redis。RedisSink;importorg。apache。flink。streaming。connectors。redis。common。config。FlinkJedisPoolConfig;importorg。apache。flink。streaming。connectors。redis。common。mapper。RedisCommand;importorg。apache。flink。streaming。connectors。redis。common。mapper。RedisCommandDescription;importorg。apache。flink。streaming。connectors。redis。common。mapper。RedisMapper;importorg。apache。flink。util。Collector;从指定的socket读取数据,对单词进行计算,将结果写入到Redis中publicclassRedisSinkDemo{publicstaticvoidmain(String〔〕args)throwsException{创建Flink流计算执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();创建DataStreamSourceDataStreamSourceStringlinesenv。socketTextStream(cs2886,8888);调用Transformation开始调用TransformationSingleOutputStreamOperatorTuple2String,IntegerwordAndOnelines。flatMap(newFlatMapFunctionString,Tuple2String,Integer(){OverridepublicvoidflatMap(Stringline,CollectorTuple2String,Integercollector)throwsException{String〔〕wordsline。split();for(Stringword:words){newTuple2String,Integer(word,1)collector。collect(Tuple2。of(word,1));}}});分组KeyedStreamTuple2String,Integer,StringkeyedwordAndOne。keyBy(newKeySelectorTuple2String,Integer,String(){OverridepublicStringgetKey(Tuple2String,Integertp)throwsException{returntp。f0;}});聚合SingleOutputStreamOperatorTuple2String,Integersummedkeyed。sum(1);Transformation结束调用Sinksummed。addSink()FlinkJedisPoolConfigconfnewFlinkJedisPoolConfig。Builder()。setHost(localhost)。setDatabase(0)。build();summed。addSink(newRedisSinkTuple2String,Integer(conf,newRedisWordCountMapper()));启动执行env。execute(StreamingWordCount);}publicstaticclassRedisWordCountMapperimplementsRedisMapperTuple2String,Integer{OverridepublicRedisCommandDescriptiongetCommandDescription(){returnnewRedisCommandDescription(RedisCommand。HSET,WORDCOUNT);}OverridepublicStringgetKeyFromData(Tuple2String,Integerdata){returndata。f0;}OverridepublicStringgetValueFromData(Tuple2String,Integerdata){returndata。f1。toString();}}}
1。1。7KafkaSink
在实际的生产环境中,经常会有一些场景,需要将Flink处理后的数据快速地写入到一个分布式、高吞吐、高可用、可用保证ExactlyOnce的消息中间件中,供其他的应用消费处理后的数据。Kafka就是Flink最好的黄金搭档,Flink不但可以从Kafka中消费数据,还可以将处理后的数据写入到Kafka,并且吞吐量高、数据安全、可以保证ExactlyOnce等。
Flink可以和Kafka多个版本整合,比如0。11。x、1。x、2。x等,从Flink1。9开始,使用的是kafka2。2的客户端,所以这里使用kafka的版本是2。2。2,并且使用最新的API。
下面的例子就是将数据写入到Kafka中,首先要定义一个类实现KafkaSerializationSchema接口,指定一个泛型,String代表要写入到Kafka的数据为String类型。该类的功能是指定写入到Kafka中数据的序列化Schema,需要重写serialize方法,将要写入的数据转成二进制数组,并封装到一个ProducerRecord中返回。importorg。apache。flink。api。common。functions。RuntimeContext;importorg。apache。flink。api。common。serialization。SimpleStringSchema;importorg。apache。flink。configuration。Configuration;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。streaming。api。functions。sink。RichSinkFunction;importorg。apache。flink。streaming。connectors。kafka。FlinkKafkaProducer;publicclassKafkaSinkDemo{publicstaticvoidmain(String〔〕args)throwsException{local模式默认的并行度是当前机器的逻辑核的数量ConfigurationconfigurationnewConfiguration();StreamExecutionEnvironmentenvStreamExecutionEnvironment。createLocalEnvironmentWithWebUI(configuration);intparallelism0env。getParallelism();System。out。println(执行环境默认的并行度:parallelism0);DataStreamSourceStringlinesenv。socketTextStream(cs2886,8888);获取DataStream的并行度intparallelismlines。getParallelism();System。out。println(SocketSource的并行度:parallelism);lines。writeAsText(file:UsersxingDesktopout);FlinkKafkaProducerStringkafkaProducernewFlinkKafkaProducer(cs2887:9092,cs2888:9092,cs2889:9092,wordcount18,newSimpleStringSchema());lines。addSink(kafkaProducer);env。execute();}}
启动nclk8888,然后启动上述代码程序;
在nc窗口中输入数据,使用kafka可以消费到;
kafka消费wordcount18的topic:
〔rootcs2888〕kafkaconsoleconsumerzookeepercs2888:2181topicwordcount18
然后将Kafka相关的参数设置到Properties中,再newFlinkKafkaProducer,将要写入的topic名称、Kafka序列化Schema、Properties和写入到Kafka的Semantic语义作为FlinkKafkaProducer构造方法参数传入。最好调用addSink方法将FlinkKafkaProducer的引用传入到该方法中。虽然下面的代码指定了EXACTLYONCE语义,但是没有开启Checkpointing,是没法实现的。具有怎样实现ExactlyOnce,会在后面原理深入的章节进行讲解。
1。1。8StreamFileDataSink
实时处理的数据,有一些场景要输出到其他分布式文件系统中,比如HadoopHDFS、AmazonS3(SimpleStorageService)、AliyunOSS(ObjectStorageService)等。因为这些分布式文件系统都具有高可用、可扩展、多副本、存储海量数据等特点。存储到分布式文件系统的数据,就可以做一些离线的数据分析,比如离线的数仓、数据挖掘、机器学习等。
从Flink1。9开始,原来的BucketingSink已经标记为过时,在未来的版本将会被移除。推荐使用StreamFileDataSink,该Sink不但可以将数据写入到各种文件系统中,可以保证ExaclyOnce语义,还支持以列式存储的格式写入,功能更强大。
下面的例子是将数据写入到HDFS中,首先在maven项目的pom。xml文件引入HDFS文件系统的依赖:dependencygroupIdorg。apache。flinkgroupIdflinkconnectorfilesystem2。12artifactIdversion1。12SNAPSHOTversiondependencydependencygroupIdorg。apache。hadoopgroupIdhadoopclientartifactIdversion2。6。0versiondependency
通过DefaultRollingPolicy这个工具类,指定文件滚动生成的策略。这里设置的文件滚动生成策略有两个,一个是距离上一次生成文件时间超过30秒,另一个是文件大小达到100mb。这两个条件只要满足其中一个即可滚动生成文件。然后StreamingFileSink。forRowFormat方法将文件输出目录、文件写入的编码传入,再调用withRollingPolicy关联上面的文件滚动生成策略,接着调用build方法构建好StreamingFileSink,最后将其作为参数传入到addSink方法中。
1。1。9JDBCSinkpackagecom。bigdata。sink;importcom。bigdata。utils。DateUtil;importorg。apache。flink。api。common。typeinfo。Types;importorg。apache。flink。connector。jdbc。JdbcConnectionOptions;importorg。apache。flink。connector。jdbc。JdbcSink;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。datastream。SingleOutputStreamOperator;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。util。Collector;importjava。util。Arrays;importjava。util。List;authorsongshimingdate20221210descpublicclassSinkToMySql2{publicstaticvoidmain(String〔〕args)throwsException{System。out。println(startDateUtil。getCurrentdatetime());StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();DataStreamSourceStringlineDataStreamSourceenv。readTextFile(input200。csv);DataStreamSourceStringlineDataStreamSourceenv。readTextFile(E:t1trxrecord20220821V2。csv,gb2312);SingleOutputStreamOperatorListStringwordListlineDataStreamSource。flatMap((Stringline,CollectorListStringout){linenewString(line。getBytes(utf8),gbk);System。out。println(lineline);String〔〕wordsline。split(,);out。collect(Arrays。asList(words));})。returns(Types。LIST(Types。STRING));StringsqlStrINSERTINTOdbtest。testt1(TRXID,PARENTTRXID,MERCHANTNO,BRANCHOFFICE,EXPANDORG,MAINTENANCEORG,STORECD,TERMINALNO,TRADETIME,SETTLEMENTDATE,PRODUCTNAME,TRADETP,TRADESTA,TRADECARDNO,TRADECARDTP,ISSUERCODE,TRADEINITAMT,TRADEAMT,BILLINGCYCLE,FEECOLLECTIONSTA,MERFEE,SYSTEMCOST,BRANDFEE,NETPROFIT,MCC18,MCC42,ACCOUNTID,BUSINESSTYPE,ORDERNO,OTHERMERNO,OTHERACCOUNTNO,SUBMITWAYS,TERMINALCODE,TERMINALBATCH,TERMINALTRACKNO,TRADEREFERENCENO,CHANNELNO,CHANNELMERNO,TRADEREMARKS,TRADEABSTRACT,TRADEIP,CHANNELRETCODE,SUBMITTIME,ERRCODE,ERRMSG,CHANNELTRADETP,INSTALLMENTSUBSIDYFEE,SUBSIDYINFO,DCCCURRENCY,DCCAMT,DCCEXCHANGERATE,PACKAGEFEE,APPID,PACKAGEID)VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);wordList。addSink(JdbcSink。sink(sqlStr,((statement,word){for(inti0;iword。size();i){Stringstrword。get(i)。trim();statement。setString(i1,str);}}),newJdbcConnectionOptions。JdbcConnectionOptionsBuilder()。withUrl(jdbc:mysql:192。168。0。11:3306dbtest?useUnicodetruecharacterEncodingutf8)。withDriverName(com。mysql。jdbc。Driver)。withUsername(root)。withPassword(123456)。build()));env。execute();System。out。println(startDateUtil。getCurrentdatetime());}}
蓝盈莹允许一切发生色钻饰连体上衣MichaelKorsCollection钻饰圈形耳环MidnightOperaHouse条纹西装半身裙均为Versace牛仔钻饰胸衣Pinko黑色水台凉鞋Sergi
有能力却有污点的人,该不该重用?用人的格局,决定了发展的上限我们常说,道不同,不相为谋。这话就是告诉我们,只要路的方向是一致的,我们是可以合作的,合作共赢嘛!但是,有的人,却有道德洁癖,对那些道德上有点不太完美的人,看不顺眼,不愿意跟这样的
虚实结合畅谈发展,前海这场元宇宙活动亮了智联万物发展计划之数字宇宙创新无界深国际科技嘉年华系列活动日前在深国际前海图钉Camp举行。活动以线上线下结合方式进行,通过元宇宙体验馆元宇宙主题演讲元宇宙三方汇谈等环节,推动人才
中年人最好不要想着重温旧梦!(切忌)头条创作挑战赛喜欢回头看,是一个人性的弱点。我有个朋友就犯过一个错误,她说自己就是放不下初恋,后来参加同学聚会,两人见面后又开始来往,关键是朋友都结婚好几年了。但没过多久,朋友就跟
曾仕强为啥有钱人经历磨难再多,大多还是有钱人?我们只要把所有人的钱全部归公,全部收回来。不出一段时间,有钱人还是有钱,没钱人还是没有钱。再来一次,所有收回来。再过几年,原先那些有钱人,他又开始有钱。这个你要好好去想想,为什么会
死亡不可怕,被人遗忘才是高三英语课上老师给我们看了寻梦环游记,相信大家也都看过,也是从那次开始我对死亡有了新的看法。说句实话,我对死亡其实是恐惧的,人无论一生过得多么平淡或是灿烂,好像都逃脱不了最后的结果
以情动人人是有感情的动物,而语言所负载的意义,除了理性信息之外,就是情感信息,这种情感信息的内涵十分丰富,不仅诉诸人的理性,而且是要打动人的情感。感人心者,莫先乎情,这就要。求我们在说话中
感悟摘抄我的微信中有一些晚安心语,和励志的朋友圈短语。摘录了几条,分享给你们。保持良好的心态,去面对生活中的得失。如果,你太在乎得失,你就会活得很累,活得很焦虑。不要太过于焦虑,先顾好眼下
其实每一个人都是自己的英雄作者莫奈匆匆流年,封印了(太多的)过往,琵琶声声却隐于山林。虽然它们都会消失不见,但谁也不能说它不曾存在过。同理,曾经的我们或许因为某剧某游戏的影响,都或多或少的有过英雄梦,但随着
梅子黄时致逝去的青春一hr那天下午,梅子破天荒叫了我。她是用写纸条的方式叫我的。那纸条是一绺两指宽的英语作业本纸,上面用彩笔描画的字迹,恍若隔世,让人触目惊心。放学后沟渠见。梅子我不知这样传唤意味着什
Qt多个信号关联同一个槽函数背景多个信号需要执行同一个函数或者一类函数的时候,可以选择每个信号创建一个槽函数去实现功能,如果直接关联到一个函数中,该函数只能执行一份功能,有时候并不能满足业务需求在多个信号绑定