Flink操练(八)之DS简介(8)FlinkDataStreamAPI(三)
1 设置并行度
Flink应用程序在一个像集群这样的分布式环境中并行执行。当一个数据流程序提交到作业管理器执行时,系统将会创建一个数据流图,然后准备执行需要的操作符。每一个操作符将会并行化到一个或者多个任务中去。每个算子的并行任务都会处理这个算子的输入流中的一份子集。一个算子并行任务的个数叫做算子的并行度。它决定了算子执行的并行化程度,以及这个算子能处理多少数据量。
算子的并行度可以在执行环境这个层级来控制,也可以针对每个不同的算子设置不同的并行度。默认情况下,应用程序中所有算子的并行度都将设置为执行环境的并行度。执行环境的并行度(也就是所有算子的默认并行度)将在程序开始运行时自动初始化。如果应用程序在本地执行环境中运行,并行度将被设置为CPU的核数。当我们把应用程序提交到一个处于运行中的Flink集群时,执行环境的并行度将被设置为集群默认的并行度,除非我们在客户端提交应用程序时显式的设置好并行度。
通常情况下,将算子的并行度定义为和执行环境并行度相关的数值会是个好主意。这允许我们通过在客户端调整应用程序的并行度就可以将程序水平扩展了。我们可以使用以下代码来访问执行环境的默认并行度。
我们还可以重写执行环境的默认并行度,但这样的话我们将再也不能通过客户端来控制应用程序的并行度了。
算子默认的并行度也可以通过重写来明确指定。在下面的例子里面,数据源的操作符将会按照环境默认的并行度来并行执行,map操作符的并行度将会是默认并行度的2倍,sink操作符的并行度为2。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment; int defaultP = env.getParallelism; env .addSource(new CustomSource) .map(new MyMapper) .setParallelism(defaultP * 2) .print() .setParallelism(2);
当我们通过客户端将应用程序的并行度设置为16并提交执行时,source操作符的并行度为16,mapper并行度为32,sink并行度为2。如果我们在本地环境运行应用程序的话,例如在IDE中运行,机器是8核,那么source任务将会并行执行在8个任务上面,mapper运行在16个任务上面,sink运行在2个任务上面。
并行度是动态概念,任务槽数量是静态概念。并行度<=任务槽数量。一个任务槽最多运行一个并行度。 2 类型
Flink程序所处理的流中的事件一般是对象类型。操作符接收对象输出对象。所以Flink的内部机制需要能够处理事件的类型。在网络中传输数据,或者将数据写入到状态后端、检查点和保存点中,都需要我们对数据进行序列化和反序列化。为了高效的进行此类操作,Flink需要流中事件类型的详细信息。Flink使用了 Type Information 的概念来表达数据类型,这样就能针对不同的数据类型产生特定的序列化器,反序列化器和比较操作符。
Flink也能够通过分析输入数据和输出数据来自动获取数据的类型信息以及序列化器和反序列化器。尽管如此,在一些特定的情况下,例如匿名函数或者使用泛型的情况下,我们需要明确的提供数据的类型信息,来提高我们程序的性能。
在这一节中,我们将讨论Flink支持的类型,以及如何为数据类型创建相应的类型信息,还有就是在Flink无法推断函数返回类型的情况下,如何帮助Flink的类型系统去做类型推断。 2.1 支持的数据类型
Flink支持Java和Scala提供的所有普通数据类型。最常用的数据类型可以做以下分类: Primitives(原始数据类型) Java和Scala的Tuples(元组) Scala的样例类 POJO类型 一些特殊的类型
接下来让我们一探究竟。
Primitives
Java和Scala提供的所有原始数据类型都支持,例如 Int (Java的Integer ),String,Double等等。下面举一个例子:DataStream[Long] numbers = env.fromElements(1L, 2L, 3L, 4L); numbers.map(n -> n + 1);
Tuples
元组是一种组合数据类型,由固定数量的元素组成。
Flink为Java的Tuple提供了高效的实现。Flink实现的Java Tuple最多可以有25个元素,根据元素数量的不同,Tuple都被实现成了不同的类:Tuple1,Tuple2,一直到Tuple25。Tuple类是强类型。 DataStream> persons = env .fromElements( Tuple2.of("Adam", 17), Tuple2.of("Sarah", 23) ); persons.filter(p -> p.f1 > 18);
Tuple的元素可以通过它们的public属性访问——f0,f1,f2等等。或者使用 getField(int pos) 方法来访问,元素下标从0开始:import org.apache.flink.api.java.tuple.Tuple2 Tuple2 personTuple = Tuple2.of("Alex", 42); Integer age = personTuple.getField(1); // age = 42
不同于Scala的Tuple,Java的Tuple是可变数据结构,所以Tuple中的元素可以重新进行赋值。重复利用Java的Tuple可以减轻垃圾收集的压力。举个例子: personTuple.f1 = 42; // set the 2nd field to 42 personTuple.setField(43, 1); // set the 2nd field to 43
POJO
POJO类的定义: 公有类 无参数的公有构造器 所有的字段都是公有的,可以通过getters和setters访问。 所有字段的数据类型都必须是Flink支持的数据类型。
举个例子: public class Person { public String name; public int age; public Person() {} public Person(String name, int age) { this.name = name; this.age = age; } } DataStream persons = env.fromElements( new Person("Alex", 42), new Person("Wendy", 23) );
其他数据类型 Array, ArrayList, HashMap, Enum Hadoop Writable types
2.2 为数据类型创建类型信息
Flink类型系统的核心类是 TypeInformation 。它为系统在产生序列化器和比较操作符时,提供了必要的类型信息。例如,如果我们想使用某个key来做连接查询或者分组操作,TypeInformation 可以让Flink做更严格的类型检查。
Flink针对Java和Scala分别提供了类来产生类型信息。在Java中,类是 org.apache.flink.api.common.typeinfo.Types
举个例子: TypeInformation intType = Types.INT; TypeInformation> tupleType = Types .TUPLE(Types.LONG, Types.STRING); TypeInformation personType = Types .POJO(Person.class);3 定义Key以及引用字段
在Flink中,我们必须明确指定输入流中的元素中的哪一个字段是key。 3.1 使用字段位置进行keyByDataStream> input = ... KeyedStream, String> keyed = input.keyBy(1);
如果我们想要用元组的第2个字段和第3个字段做keyBy,可以看下面的例子。 input.keyBy(1, 2);3.2 使用字段表达式来进行keyBy
对于样例类: DataStream sensorStream = ... sensorStream.keyBy("id");
对于元组: DataStream> javaInput = ... javaInput.keyBy("f2") // key Java tuple by 3rd field3.3 Key选择器
方法类型 KeySelector[IN, KEY] > getKey(IN): KEY
两个例子
scala version val sensorData = ... val byId = sensorData.keyBy(r => r.id)val input = ... input.keyBy(value => math.max(value._1, value._2))
java version DataStream sensorData = ... KeyedStream byId = sensorData.keyBy(r -> r.id);DataStream> input = ... input.keyBy(value -> Math.max(value.f0, value.f1));4 实现UDF函数,更细粒度的控制流4.1 函数类
Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
例子实现了FilterFunction接口 class FilterFilter extends FilterFunction { @Override public Boolean filter(String value) { return value.contains("flink"); } } DataStream flinkTweets = tweets.filter(new FlinkFilter);
还可以将函数实现成匿名类 DataStream flinkTweets = tweets.filter( new RichFilterFunction { @Override public Boolean filter(String value) { return value.contains("flink"); } } )
我们filter的字符串"flink"还可以当作参数传进去。 DataStream tweets = ... DataStream flinkTweets = tweets.filter(new KeywordFilter("flink")); class KeywordFilter(keyWord: String) extends FilterFunction { @Override public Boolean filter(String value) = { return value.contains(keyWord); } }4.2 匿名函数
匿名函数可以实现一些简单的逻辑,但无法实现一些高级功能,例如访问状态等等。 DataStream tweets = ... DataStream flinkTweets = tweets.filter(r -> r.contains("flink"));4.3 富函数
我们经常会有这样的需求:在函数处理数据之前,需要做一些初始化的工作;或者需要在处理数据时可以获得函数执行上下文的一些信息;以及在处理完数据时做一些清理工作。而DataStream API就提供了这样的机制。
DataStream API提供的所有转换操作函数,都拥有它们的"富"版本,并且我们在使用常规函数或者匿名函数的地方来使用富函数。例如下面就是富函数的一些例子,可以看出,只需要在常规函数的前面加上 Rich 前缀就是富函数了。RichMapFunction RichFlatMapFunction RichFilterFunction ...
当我们使用富函数时,我们可以实现两个额外的方法: open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。open()函数通常用来做一些只需要做一次即可的初始化工作。 close()方法是生命周期中的最后一个调用的方法,通常用来做一些清理工作。
另外,getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,当前子任务的索引,当前子任务的名字。同时还它还包含了访问 分区状态 的方法。下面看一个例子: public static class MyFlatMap extends RichFlatMapFunction> { private int subTaskIndex = 0; @Override public void open(Configuration configuration) { int subTaskIndex = getRuntimeContext.getIndexOfThisSubtask; // 做一些初始化工作 // 例如建立一个和HDFS的连接 } @Override public void flatMap(Integer in, Collector> out) { if (in % 2 == subTaskIndex) { out.collect((subTaskIndex, in)); } } @Override public void close() { // 清理工作,断开和HDFS的连接。 } }5 Sink
Flink没有类似于spark中foreach方法,让用户进行迭代的操作。所有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。 stream.addSink(new MySink(xxxx));
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。 5.1 Kafka
Kafka版本为0.11 org.apache.flink flink-connector-kafka-0.11_2.11 ${flink.version}
Kafka版本为2.0以上 org.apache.flink flink-connector-kafka_2.11 ${flink.version}
主函数中添加sink: DataStream union = high .union(low) .map(r -> r.temperature.toString); union.addSink( new FlinkKafkaProducer011( "localhost:9092", "test", new SimpleStringSchema() ) );5.2 Redis org.apache.bahir flink-connector-redis_2.11 1.0
定义一个redis的mapper类,用于定义保存到redis时调用的命令:
scala version object SinkToRedisExample { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.addSource(new SensorSource) val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build() stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink)) env.execute() } class MyRedisSink extends RedisMapper[SensorReading] { override def getKeyFromData(t: SensorReading): String = t.id override def getValueFromData(t: SensorReading): String = t.temperature.toString override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "sensor") } }
java version public class WriteToRedisExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream stream = env.addSource(new SensorSource()); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build(); stream.addSink(new RedisSink(conf, new MyRedisSink())); env.execute(); } public static class MyRedisSink implements RedisMapper { @Override public String getKeyFromData(SensorReading sensorReading) { return sensorReading.id; } @Override public String getValueFromData(SensorReading sensorReading) { return sensorReading.temperature + ""; } @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "sensor"); } } }5.3 ElasticSearch
在主函数中调用: org.apache.flink flink-connector-elasticsearch6_2.11 ${flink.version}
可选依赖: org.elasticsearch.client elasticsearch-rest-high-level-client 7.9.1 org.elasticsearch elasticsearch 7.9.1
示例代码:
scala version object SinkToES { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val httpHosts = new util.ArrayList[HttpHost]() httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")) val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] { override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = { val hashMap = new util.HashMap[String, String]() hashMap.put("data", t.toString) val indexRequest = Requests .indexRequest() .index("sensor") // 索引是sensor,相当于数据库 .source(hashMap) requestIndexer.add(indexRequest) } } ) // 设置每一批写入es多少数据 esSinkBuilder.setBulkFlushMaxActions(1) val stream = env.addSource(new SensorSource) stream.addSink(esSinkBuilder.build()) env.execute() } }
java version public class SinkToES { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ArrayList httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); ElasticsearchSink.Builder sensorReadingBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction() { @Override public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { HashMap map = new HashMap<>(); map.put("data", sensorReading.toString()); IndexRequest indexRequest = Requests .indexRequest() .index("sensor") // 索引是sensor,相当于数据库 .source(map); requestIndexer.add(indexRequest); } } ); sensorReadingBuilder.setBulkFlushMaxActions(1); DataStream stream = env.addSource(new SensorSource()); stream.addSink(sensorReadingBuilder.build()); env.execute(); } }5.4 JDBC自定义sink mysql mysql-connector-java 8.0.21
示例代码:
scala version object SinkToMySQL { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val stream = env.addSource(new SensorSource) stream.addSink(new MyJDBCSink) env.execute() } class MyJDBCSink extends RichSinkFunction[SensorReading] { var conn: Connection = _ var insertStmt: PreparedStatement = _ var updateStmt: PreparedStatement = _ override def open(parameters: Configuration): Unit = { conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/sensor", "zuoyuan", "zuoyuan" ) insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)") updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?") } override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = { updateStmt.setDouble(1, value.temperature) updateStmt.setString(2, value.id) updateStmt.execute() if (updateStmt.getUpdateCount == 0) { insertStmt.setString(1, value.id) insertStmt.setDouble(2, value.temperature) insertStmt.execute() } } override def close(): Unit = { insertStmt.close() updateStmt.close() conn.close() } } }
java version public class SinkToMySQL { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream stream = env.addSource(new SensorSource()); stream.addSink(new MyJDBCSink()); env.execute(); } public static class MyJDBCSink extends RichSinkFunction { private Connection conn; private PreparedStatement insertStmt; private PreparedStatement updateStmt; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/sensor", "zuoyuan", "zuoyuan" ); insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)"); updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?"); } @Override public void invoke(SensorReading value, Context context) throws Exception { updateStmt.setDouble(1, value.temperature); updateStmt.setString(2, value.id); updateStmt.execute(); if (updateStmt.getUpdateCount() == 0) { insertStmt.setString(1, value.id); insertStmt.setDouble(2, value.temperature); insertStmt.execute(); } } @Override public void close() throws Exception { super.close(); insertStmt.close(); updateStmt.close(); conn.close(); } } }
海柔创新机器人实现功能设计双重突破宝安讯(宝安日报记者何祖兰)海柔创新自主研发的HAIPICKA42FW动态调宽箱式机器人凭借创新的设计理念卓越的产品性能,从全球57个国家提交的11000多件作品中脱颖而出,斩获2
机器人社区配送,离我们还有多远其实,全球科研界和企业早已开始探索自动配送机器人。这个新生事物能否破解此题,并在更长远的未来,为城市生活带来更大便利?大学校园理想中试场景研究显示,未来几年网上购物将呈现倍数增长的
机器人减速机访谈20220416搜索更多机构调研纪要电话会纪要请访问文八股调研电脑端来源网络(真伪自辨)全文共计12406字减速机访谈减速机行业发展市场竞争格局及未来趋势核心观点1。全球RV减速机市场竞争格局1)
马化腾很无奈为什么你们都将QQ号卖了?网友可以换套房在微信还未诞生前,QQ一直是国内最大的社交软件。在以前网络不如今天发达的时候,很多人都使用QQ在网上认识其它的朋友。并且随着QQ不断更新迭代,它所给用户带来的体验越来越多,所以说Q
浙江一项调查显示过半快递外卖人员每月平均只休息一两天澎湃新闻首席记者葛熔金通讯员丛杨共青团浙江省委日前对全省快递外卖业工作人员进行的问卷调查显示,快递外卖人员中2335岁的年轻人占68。2,56。09的工作人员每月平均休息12天。据
金融市场大统一丨地方数据交易所三大痛点确权难技术体系分散监管尺度不一21世纪经济报道记者陈植上海报道4月10日,中共中央国务院关于加快建设全国统一大市场的意见(下称意见)正式发布。意见提出要加快培育统一的技术和数据市场,加快培育数据要素市场,建立健
声称要正式对标苹果的小米是真有实力,还是飘了?目前,在手机领域,有着两个当之无愧的霸主。一个是已经基本退出中国市场的三星,另一个则是中国用户非常熟悉的苹果。因为三星在国内的销量基本接近0。因此目前大多数国产手机厂商的目标就是超
英飞凌的IGBT和MCU在普遍缺料的汽车供应链,对于芯片的供给缺失,也成了制约产能提升和销售目标实现的关键,从已知的信息来看,重点影响还是微处理器以及功率半导体的供给饱和。从半导体产业的快速发展来说,是应
公务员考试申论热点手机App自动续费背景链接近年来,手机应用付费市场火爆,但随之而来的,手机APP会员包月包年等服务套餐自动续费现象已经成为人们的困扰。例如使用某APP,只想开通一个月的会员服务,但是在第二个月却被自
AndroidFrameworkAndroidFramework包含三个内容服务端客户端linux驱动服务端AndroidFramework服务端包括两个很重要的类WindowManagerService(WMS
中科院李家洋院士团队创制世界首例重新设计与快速驯化的四倍体水稻材料开辟水稻育种新途径今年5月,我们的水稻材料就要在北京实验田里试种了,未来有望推广到更多地方。说起手头的工作,余泓眼睛里都是喜悦。余泓是中科院遗传与发育生物学研究所李家洋院士团队的一员。他说的水稻材料