大家好,我是杰哥 今天,继续来探索Flink,今天我们主要来看看Flink为我们所提供的分层API一Flink的分层API Flink不仅具有高吞吐、低延迟、高可用等优秀特性,而且还提供了易于使用的分层API,所以它也是一个易于开发的框架,它的API分层如图所示 Flink包含三层API:低级APIs有状态处理、核心APIsDataStreamAPI以及高级APIsTableAPISQL1、有状态处理 最底层级的抽象仅仅提供了有状态流,它将处理函数(ProcessFunction)嵌入到了DataStreamAPI中。通过实现ProcessFunction接口来进行操作。ProcessFunction可以处理一或两条输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。开发者可以在其中任意地修改状态,也能够注册定时器用以在未来的某一时刻触发回调函数2、核心APIsDataStreamAPI DataStream?是不是感觉还有点儿熟悉呢? 是的,上一节的例子里面,它就出现过,我们使用DataStream类来存放Flink程序中的数据集合,供Flink进行处理2读取文本流(nclk7777)DataStreamSourceStringlineDataStreamenv。socketTextStream(localhost,7777); 其中DataStreamSource其实就是集成自DataStream而这里所提到的DataStreamAPI的名字就来自于DataStream类。这个类中,可以存放包含重复项的不可变数据集合。集合中的数据可以是有限的,也可以是无界的 DataStream在使用方面类似于常规的Java中的集合,但在一些关键方面有很大的不同,比如它们是不可变的,这意味着一旦创建了它们,就不能添加或删除元素。此外,我们也不能查看里面的元素,只能使用DataStreamAPI操作(也称为转换)对它们进行一些变换操作 可以通过在Flink程序中添加源来创建初始DataStream。然后,再可以从中派生新的流,并使用映射、过滤器等API方法将它们组合在一起3、TableAPISQLAPI TableAPI是一个针对Java、Scala和Python的语言集成查询API,它允许以非常直观的方式组合来自关系操作符(如选择、筛选和连接)的查询。Flink的SQLAPI基于ApacheCalcite,它实现了SQL标准的语法 Flink提供的最高层级的抽象是SQL。这一层抽象在语法与表达能力上与TableAPI类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与TableAPI交互密切,同时SQL查询可以直接在TableAPI定义的表上执行 Flink的分层API中,最主要、最常用的是DataStreamAPI和TableAPISQL。我们可以将两者单独使用,也可以混合使用,取决于我们具体的应用场景二DataStreamAPI认识 我想,从它的操作步骤来认识它,应该是比较直观的 类似于我们曾经了解到的JDBC会有固定的7个步骤,使用Flink的DataStreamAPI进行数据的处理,实际上也包含以下5个固定步骤,而DataStreamAPI也可以随之被分为5类:1、设置执行环境2、读取输入流3、转换操作4、输出到一个或多个数据汇中5、执行程序 我们来分别看一下1、设置执行环境 在Flink中,可以使用StreamExecutionEnvironment的下列三种方式进行执行环境的创建getExecutionEnvironment();createLocalEnvironment();createRemoteEnvironment(Stringhost,intport,String。。。jarFiles); 其中前两个表示创建的是本地环境,即表示Flink程序运行在本地机器。还可以通过指定远程机器的主机名、端口号以及程序本身所生成的jar包最终拷贝到的路径,创建一个远程执行环境,使得Flink程序运行在所指定的主机上 2、读取输入流 StreamExecutionEnvironment为我们提供了一系列创建流式数据源的方法,使得我们可以将数据流读取到应用中。这些数据流可以来自于文件、数据库、消息队列等读取的数据,就可以统一放入DataStream对象中 1)从集合中读取数据ArrayListUserusernewArrayList();user。add(newUser(Mary,。home,1000L));user。add(newUser(Bob,。cart,2000L));DataStreamUserstreamenv。fromCollection(user); 2)从文件中读取数据DataStreamStringstreamenv。readTextFile(words。txt); 3)从Socket读取数据DataStreamStringstreamenv。socketTextStream(localhost,7777); 4)从消息中心读取数据DataStreamSourceStringstreamenv。addSource(newFlinkKafkaConsumerString(clicks,newSimpleStringSchema(),properties)); 5)自定义Source如果我们需要读取数据的数据源,Flink没有为其提供读取数据源的方法,我们就可以通过Flink提供的自定义Source的方式进行了那就只好自定义实现SourceFunction了。接下来我们创建一个自定义的数据源,实现SourceFunction接口。主要重写两个关键方法:run()和cancel()run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。3、转换操作 当数据被存入DataStream对象中后,我们就可以对它进行转换操作了转换的类型有多种多样。有些会重新组织一下DataStream中的数据流,比如对这些数据进行一次分组或者分区;有些呢,就会生成一个新的DataStream(类型可能会发生变化) 比如,使用map()可以将一个输入流中的所有正方形全部转换为圆形 今天在这里就不细说了,下次文章会专门为大家介绍DataStreamAPI的转换操作4、输出结果 一般场景下,我们将数据处理的结果总是会发送到某些外部系统,比如文件、数据库,以及消息中心中等,当然测试时,可以直接输出至标准输出中 比如,输出到文件的方式如下:StreamingFileSinkStringfileSinkStreamingFileSink。StringforRowFormat(newPath(。output),newSimpleStringEncoder(UTF8))。withRollingPolicy(DefaultRollingPolicy。builder()。withRolloverInterval(TimeUnit。MINUTES。toMillis(15))。withInactivityInterval(TimeUnit。MINUTES。toMillis(5))。withMaxPartSize(102410241024)。build())。build();将Event转换成String写入文件stream。map(Event::toString)。addSink(fileSink); 此外,还可以输出到Kafka,mysql,pulsar等 最新版本的Flink所支持的所有输入输出如下: 5、执行程序 在应用定义完成之后,我们就可以通过StreamExecutionEnvironment类中的execute()方法进行执行 Flink被设计为了延迟计算的方式执行,在执行execute()方法之前,前面的所有程序,只是在执行环境中构建了一个执行计划。也就是说,只有执行了execute()方法之后,前面定义的Flink应用程序才会被真正执行 这5个步骤看完了,我们再将那个简单的实例拿过来看看,确认一下是否是这样publicclassStreamWordCount{publicstaticvoidmain(String〔〕args)throwsException{1创建执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();2读取文本流(nclk7777)DataStreamSourceStringlineDataStreamenv。socketTextStream(localhost,7777);3转换操作3。1收集各个单词,定义为二元组SingleOutputStreamOperatorTuple2String,LongstreamOperatorlineDataStream。flatMap((Stringline,CollectorTuple2String,Longout){Stringwords〔〕line。split();for(Stringword:words){out。collect(Tuple2。of(word,1L));}})。returns(Types。TUPLE(Types。STRING,Types。LONG));3。2分组KeyedStreamTuple2String,Long,StringkeyedStreamstreamOperator。keyBy(datadata。f0);3。3统计SingleOutputStreamOperatorTuple2String,LongsumkeyedStream。sum(1);4打印sum。print();5启动执行env。execute();}} 这个其实就是严格按照我们上述所提到的步骤来走的 其中第2步骤,是通过一行一行地读取socket文本流内容,进行数据源的输入的,将所输入的数据流存入lineDataStream中 而第3步骤,则是对lineDataStream中的数据进行转换处理操作。首先将各个数据通过flatMap()方法转换为二元组,然后使用keyBy()方法,对其元素进行分组,接着再使用sum()方法,对分组之后的元素进行求和操作 随着数据流的不断输入,所输入的每个单词出现的次数便会通过程序被源源不断地输出总结 Flink提供了分层API,便于我们进行开发。在实际开发中,我们使用最多的就是DataStreamAPI,而怎样认识DataStreamAPI呢?从它的操作步骤来看,可以分为5个步骤,也就是说DataStreamAPI可以分为5大类: 1、设置执行环境 2、读取输入流 3、转换操作 4、输出到一个或多个数据汇中 5、执行程序 嗯,就这样。每天学习一点,时间会见证你的强大 欢迎大家关注我们的公众号【青梅主码】,一起持续性学习吧 往期精彩回顾 总结复盘 架构设计读书笔记与感悟总结 带领新人团队的沉淀总结 复盘篇:问题解决经验总结复盘 网络篇 网络篇(四):《图解TCPIP》读书笔记 网络篇(一):《趣谈网络协议》读书笔记(一) 事务篇章 事务篇(四):Spring事务并发问题解决 事务篇(三):分享一个隐性事务失效场景 事务篇(一):毕业三年,你真的学会事务了吗? Docker篇章 Docker篇(六):DockerCompose如何管理多个容器? Docker篇(二):Docker实战,命令解析 Docker篇(一):为什么要用Docker? 。。。。。。。。。。 SpringCloud篇章 SpringCloud(十三):Feign居然这么强大? SpringCloud(十):消息中心篇Kafka经典面试题,你都会吗? SpringCloud(九):注册中心选型篇四种注册中心特点超全总结 SpringCloud(四):公司内部,关于Eureka和zookeeper的一场辩论赛 。。。。。。。。。。 SpringBoot篇章 SpringBoot(十二):陌生又熟悉的OAuth2。0协议,实际上每个人都在用 SpringBoot(七):你不能不知道的Mybatis缓存机制! SpringBoot(六):那些好用的数据库连接池们 SpringBoot(四):让人又爱又恨的JPA SpringBoot(一):特性概览 。。。。。。。。。。 翻译 〔译〕用Mint这门强大的语言来创建一个Web应用 【译】基于50万个浏览器指纹的新发现 使用CSS提升页面渲染速度 WebTransport会在不久的将来取代WebRTC吗? 。。。。。。。。。 职业、生活感悟 你有没有想过,旅行的意义是什么? 程序员的职业规划 灵魂拷问:人生最重要的是什么? 如何高效学习一个新技术? 如何让自己更坦然地度过一天? 。。。。。。。。。。