范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Flink操练(八)之DS简介(8)FlinkDataStreamAPI(三)

  1 设置并行度
  Flink应用程序在一个像集群这样的分布式环境中并行执行。当一个数据流程序提交到作业管理器执行时,系统将会创建一个数据流图,然后准备执行需要的操作符。每一个操作符将会并行化到一个或者多个任务中去。每个算子的并行任务都会处理这个算子的输入流中的一份子集。一个算子并行任务的个数叫做算子的并行度。它决定了算子执行的并行化程度,以及这个算子能处理多少数据量。
  算子的并行度可以在执行环境这个层级来控制,也可以针对每个不同的算子设置不同的并行度。默认情况下,应用程序中所有算子的并行度都将设置为执行环境的并行度。执行环境的并行度(也就是所有算子的默认并行度)将在程序开始运行时自动初始化。如果应用程序在本地执行环境中运行,并行度将被设置为CPU的核数。当我们把应用程序提交到一个处于运行中的Flink集群时,执行环境的并行度将被设置为集群默认的并行度,除非我们在客户端提交应用程序时显式的设置好并行度。
  通常情况下,将算子的并行度定义为和执行环境并行度相关的数值会是个好主意。这允许我们通过在客户端调整应用程序的并行度就可以将程序水平扩展了。我们可以使用以下代码来访问执行环境的默认并行度。
  我们还可以重写执行环境的默认并行度,但这样的话我们将再也不能通过客户端来控制应用程序的并行度了。
  算子默认的并行度也可以通过重写来明确指定。在下面的例子里面,数据源的操作符将会按照环境默认的并行度来并行执行,map操作符的并行度将会是默认并行度的2倍,sink操作符的并行度为2。 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment; int defaultP = env.getParallelism; env   .addSource(new CustomSource)   .map(new MyMapper)   .setParallelism(defaultP * 2)   .print()   .setParallelism(2);
  当我们通过客户端将应用程序的并行度设置为16并提交执行时,source操作符的并行度为16,mapper并行度为32,sink并行度为2。如果我们在本地环境运行应用程序的话,例如在IDE中运行,机器是8核,那么source任务将会并行执行在8个任务上面,mapper运行在16个任务上面,sink运行在2个任务上面。
  并行度是动态概念,任务槽数量是静态概念。并行度<=任务槽数量。一个任务槽最多运行一个并行度。 2 类型
  Flink程序所处理的流中的事件一般是对象类型。操作符接收对象输出对象。所以Flink的内部机制需要能够处理事件的类型。在网络中传输数据,或者将数据写入到状态后端、检查点和保存点中,都需要我们对数据进行序列化和反序列化。为了高效的进行此类操作,Flink需要流中事件类型的详细信息。Flink使用了 Type Information  的概念来表达数据类型,这样就能针对不同的数据类型产生特定的序列化器,反序列化器和比较操作符。
  Flink也能够通过分析输入数据和输出数据来自动获取数据的类型信息以及序列化器和反序列化器。尽管如此,在一些特定的情况下,例如匿名函数或者使用泛型的情况下,我们需要明确的提供数据的类型信息,来提高我们程序的性能。
  在这一节中,我们将讨论Flink支持的类型,以及如何为数据类型创建相应的类型信息,还有就是在Flink无法推断函数返回类型的情况下,如何帮助Flink的类型系统去做类型推断。 2.1 支持的数据类型
  Flink支持Java和Scala提供的所有普通数据类型。最常用的数据类型可以做以下分类: Primitives(原始数据类型) Java和Scala的Tuples(元组) Scala的样例类 POJO类型 一些特殊的类型
  接下来让我们一探究竟。
  Primitives
  Java和Scala提供的所有原始数据类型都支持,例如 Int  (Java的Integer  ),String,Double等等。下面举一个例子:DataStream[Long] numbers = env.fromElements(1L, 2L, 3L, 4L); numbers.map(n -> n + 1);
  Tuples
  元组是一种组合数据类型,由固定数量的元素组成。
  Flink为Java的Tuple提供了高效的实现。Flink实现的Java Tuple最多可以有25个元素,根据元素数量的不同,Tuple都被实现成了不同的类:Tuple1,Tuple2,一直到Tuple25。Tuple类是强类型。 DataStream> persons = env   .fromElements(     Tuple2.of("Adam", 17),     Tuple2.of("Sarah", 23)   );  persons.filter(p -> p.f1 > 18);
  Tuple的元素可以通过它们的public属性访问——f0,f1,f2等等。或者使用 getField(int pos)  方法来访问,元素下标从0开始:import org.apache.flink.api.java.tuple.Tuple2  Tuple2 personTuple = Tuple2.of("Alex", 42); Integer age = personTuple.getField(1); // age = 42
  不同于Scala的Tuple,Java的Tuple是可变数据结构,所以Tuple中的元素可以重新进行赋值。重复利用Java的Tuple可以减轻垃圾收集的压力。举个例子: personTuple.f1 = 42; // set the 2nd field to 42 personTuple.setField(43, 1); // set the 2nd field to 43
  POJO
  POJO类的定义: 公有类 无参数的公有构造器 所有的字段都是公有的,可以通过getters和setters访问。 所有字段的数据类型都必须是Flink支持的数据类型。
  举个例子: public class Person {   public String name;   public int age;    public Person() {}    public Person(String name, int age) {     this.name = name;     this.age = age;   } }  DataStream persons = env.fromElements(   new Person("Alex", 42),   new Person("Wendy", 23) );
  其他数据类型 Array, ArrayList, HashMap, Enum Hadoop Writable types
  2.2 为数据类型创建类型信息
  Flink类型系统的核心类是 TypeInformation  。它为系统在产生序列化器和比较操作符时,提供了必要的类型信息。例如,如果我们想使用某个key来做连接查询或者分组操作,TypeInformation  可以让Flink做更严格的类型检查。
  Flink针对Java和Scala分别提供了类来产生类型信息。在Java中,类是 org.apache.flink.api.common.typeinfo.Types
  举个例子: TypeInformation intType = Types.INT;  TypeInformation> tupleType = Types   .TUPLE(Types.LONG, Types.STRING);  TypeInformation personType = Types   .POJO(Person.class);3 定义Key以及引用字段
  在Flink中,我们必须明确指定输入流中的元素中的哪一个字段是key。 3.1 使用字段位置进行keyByDataStream> input = ... KeyedStream, String> keyed = input.keyBy(1);
  如果我们想要用元组的第2个字段和第3个字段做keyBy,可以看下面的例子。 input.keyBy(1, 2);3.2 使用字段表达式来进行keyBy
  对于样例类: DataStream sensorStream = ... sensorStream.keyBy("id");
  对于元组: DataStream> javaInput = ... javaInput.keyBy("f2") // key Java tuple by 3rd field3.3 Key选择器
  方法类型 KeySelector[IN, KEY]   > getKey(IN): KEY
  两个例子
  scala version val sensorData = ... val byId = sensorData.keyBy(r => r.id)val input = ... input.keyBy(value => math.max(value._1, value._2))
  java version DataStream sensorData = ... KeyedStream byId = sensorData.keyBy(r -> r.id);DataStream> input = ... input.keyBy(value -> Math.max(value.f0, value.f1));4 实现UDF函数,更细粒度的控制流4.1 函数类
  Flink暴露了所有udf函数的接口(实现方式为接口或者抽象类)。例如MapFunction, FilterFunction, ProcessFunction等等。
  例子实现了FilterFunction接口 class FilterFilter extends FilterFunction {   @Override   public Boolean filter(String value) {     return value.contains("flink");   } }  DataStream flinkTweets = tweets.filter(new FlinkFilter);
  还可以将函数实现成匿名类 DataStream flinkTweets = tweets.filter(   new RichFilterFunction {     @Override     public Boolean filter(String value) {       return value.contains("flink");     }   } )
  我们filter的字符串"flink"还可以当作参数传进去。 DataStream tweets = ... DataStream flinkTweets = tweets.filter(new KeywordFilter("flink"));  class KeywordFilter(keyWord: String) extends FilterFunction {   @Override   public Boolean filter(String value) = {     return value.contains(keyWord);   } }4.2 匿名函数
  匿名函数可以实现一些简单的逻辑,但无法实现一些高级功能,例如访问状态等等。 DataStream tweets = ... DataStream flinkTweets = tweets.filter(r -> r.contains("flink"));4.3 富函数
  我们经常会有这样的需求:在函数处理数据之前,需要做一些初始化的工作;或者需要在处理数据时可以获得函数执行上下文的一些信息;以及在处理完数据时做一些清理工作。而DataStream API就提供了这样的机制。
  DataStream API提供的所有转换操作函数,都拥有它们的"富"版本,并且我们在使用常规函数或者匿名函数的地方来使用富函数。例如下面就是富函数的一些例子,可以看出,只需要在常规函数的前面加上 Rich  前缀就是富函数了。RichMapFunction RichFlatMapFunction RichFilterFunction ...
  当我们使用富函数时,我们可以实现两个额外的方法: open()方法是rich function的初始化方法,当一个算子例如map或者filter被调用之前open()会被调用。open()函数通常用来做一些只需要做一次即可的初始化工作。 close()方法是生命周期中的最后一个调用的方法,通常用来做一些清理工作。
  另外,getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,当前子任务的索引,当前子任务的名字。同时还它还包含了访问 分区状态 的方法。下面看一个例子: public static class MyFlatMap extends RichFlatMapFunction> {   private int subTaskIndex = 0;    @Override   public void open(Configuration configuration) {     int subTaskIndex = getRuntimeContext.getIndexOfThisSubtask;     // 做一些初始化工作     // 例如建立一个和HDFS的连接   }    @Override   public void flatMap(Integer in, Collector> out) {     if (in % 2 == subTaskIndex) {       out.collect((subTaskIndex, in));     }   }    @Override   public void close() {     // 清理工作,断开和HDFS的连接。   } }5 Sink
  Flink没有类似于spark中foreach方法,让用户进行迭代的操作。所有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。 stream.addSink(new MySink(xxxx));
  官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。 5.1 Kafka
  Kafka版本为0.11    org.apache.flink   flink-connector-kafka-0.11_2.11   ${flink.version} 
  Kafka版本为2.0以上    org.apache.flink   flink-connector-kafka_2.11   ${flink.version} 
  主函数中添加sink: DataStream union = high   .union(low)   .map(r -> r.temperature.toString);  union.addSink(   new FlinkKafkaProducer011(     "localhost:9092",     "test",     new SimpleStringSchema()   ) );5.2 Redis   org.apache.bahir   flink-connector-redis_2.11   1.0 
  定义一个redis的mapper类,用于定义保存到redis时调用的命令:
  scala version object SinkToRedisExample {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)      val stream = env.addSource(new SensorSource)      val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build()      stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink))      env.execute()   }    class MyRedisSink extends RedisMapper[SensorReading] {     override def getKeyFromData(t: SensorReading): String = t.id      override def getValueFromData(t: SensorReading): String = t.temperature.toString      override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "sensor")   } }
  java version public class WriteToRedisExample {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);          DataStream stream = env.addSource(new SensorSource());           FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build();          stream.addSink(new RedisSink(conf, new MyRedisSink()));          env.execute();     }      public static class MyRedisSink implements RedisMapper {         @Override         public String getKeyFromData(SensorReading sensorReading) {             return sensorReading.id;         }          @Override         public String getValueFromData(SensorReading sensorReading) {             return sensorReading.temperature + "";         }          @Override         public RedisCommandDescription getCommandDescription() {             return new RedisCommandDescription(RedisCommand.HSET, "sensor");         }     } }5.3 ElasticSearch
  在主函数中调用:    org.apache.flink   flink-connector-elasticsearch6_2.11   ${flink.version} 
  可选依赖:      org.elasticsearch.client     elasticsearch-rest-high-level-client     7.9.1        org.elasticsearch     elasticsearch     7.9.1 
  示例代码:
  scala version object SinkToES {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)      val httpHosts = new util.ArrayList[HttpHost]()     httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))     val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](       httpHosts,       new ElasticsearchSinkFunction[SensorReading] {         override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {           val hashMap = new util.HashMap[String, String]()           hashMap.put("data", t.toString)            val indexRequest = Requests             .indexRequest()             .index("sensor") // 索引是sensor,相当于数据库             .source(hashMap)            requestIndexer.add(indexRequest)         }       }      )     // 设置每一批写入es多少数据     esSinkBuilder.setBulkFlushMaxActions(1)      val stream = env.addSource(new SensorSource)      stream.addSink(esSinkBuilder.build())      env.execute()   } }
  java version public class SinkToES {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         ArrayList httpHosts = new ArrayList<>();         httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));         ElasticsearchSink.Builder sensorReadingBuilder = new ElasticsearchSink.Builder<>(                 httpHosts,                 new ElasticsearchSinkFunction() {                     @Override                     public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {                         HashMap map = new HashMap<>();                         map.put("data", sensorReading.toString());                          IndexRequest indexRequest = Requests                                 .indexRequest()                                 .index("sensor") // 索引是sensor,相当于数据库                                 .source(map);                          requestIndexer.add(indexRequest);                     }                 }         );          sensorReadingBuilder.setBulkFlushMaxActions(1);          DataStream stream = env.addSource(new SensorSource());          stream.addSink(sensorReadingBuilder.build());          env.execute();     } }5.4 JDBC自定义sink   mysql   mysql-connector-java   8.0.21 
  示例代码:
  scala version object SinkToMySQL {   def main(args: Array[String]): Unit = {     val env = StreamExecutionEnvironment.getExecutionEnvironment     env.setParallelism(1)      val stream = env.addSource(new SensorSource)      stream.addSink(new MyJDBCSink)      env.execute()   }    class MyJDBCSink extends RichSinkFunction[SensorReading] {     var conn: Connection = _     var insertStmt: PreparedStatement = _     var updateStmt: PreparedStatement = _      override def open(parameters: Configuration): Unit = {       conn = DriverManager.getConnection(         "jdbc:mysql://localhost:3306/sensor",         "zuoyuan",         "zuoyuan"       )       insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)")       updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?")     }      override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {       updateStmt.setDouble(1, value.temperature)       updateStmt.setString(2, value.id)       updateStmt.execute()        if (updateStmt.getUpdateCount == 0) {         insertStmt.setString(1, value.id)         insertStmt.setDouble(2, value.temperature)         insertStmt.execute()       }     }      override def close(): Unit = {       insertStmt.close()       updateStmt.close()       conn.close()     }   } }
  java version public class SinkToMySQL {     public static void main(String[] args) throws Exception {         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);          DataStream stream = env.addSource(new SensorSource());          stream.addSink(new MyJDBCSink());          env.execute();     }      public static class MyJDBCSink extends RichSinkFunction {         private Connection conn;         private PreparedStatement insertStmt;         private PreparedStatement updateStmt;          @Override         public void open(Configuration parameters) throws Exception {             super.open(parameters);             conn = DriverManager.getConnection(                     "jdbc:mysql://localhost:3306/sensor",                     "zuoyuan",                     "zuoyuan"             );             insertStmt = conn.prepareStatement("INSERT INTO temps (id, temp) VALUES (?, ?)");             updateStmt = conn.prepareStatement("UPDATE temps SET temp = ? WHERE id = ?");         }          @Override         public void invoke(SensorReading value, Context context) throws Exception {             updateStmt.setDouble(1, value.temperature);             updateStmt.setString(2, value.id);             updateStmt.execute();              if (updateStmt.getUpdateCount() == 0) {                 insertStmt.setString(1, value.id);                 insertStmt.setDouble(2, value.temperature);                 insertStmt.execute();             }         }          @Override         public void close() throws Exception {             super.close();             insertStmt.close();             updateStmt.close();             conn.close();         }     } }

海柔创新机器人实现功能设计双重突破宝安讯(宝安日报记者何祖兰)海柔创新自主研发的HAIPICKA42FW动态调宽箱式机器人凭借创新的设计理念卓越的产品性能,从全球57个国家提交的11000多件作品中脱颖而出,斩获2机器人社区配送,离我们还有多远其实,全球科研界和企业早已开始探索自动配送机器人。这个新生事物能否破解此题,并在更长远的未来,为城市生活带来更大便利?大学校园理想中试场景研究显示,未来几年网上购物将呈现倍数增长的机器人减速机访谈20220416搜索更多机构调研纪要电话会纪要请访问文八股调研电脑端来源网络(真伪自辨)全文共计12406字减速机访谈减速机行业发展市场竞争格局及未来趋势核心观点1。全球RV减速机市场竞争格局1)马化腾很无奈为什么你们都将QQ号卖了?网友可以换套房在微信还未诞生前,QQ一直是国内最大的社交软件。在以前网络不如今天发达的时候,很多人都使用QQ在网上认识其它的朋友。并且随着QQ不断更新迭代,它所给用户带来的体验越来越多,所以说Q浙江一项调查显示过半快递外卖人员每月平均只休息一两天澎湃新闻首席记者葛熔金通讯员丛杨共青团浙江省委日前对全省快递外卖业工作人员进行的问卷调查显示,快递外卖人员中2335岁的年轻人占68。2,56。09的工作人员每月平均休息12天。据金融市场大统一丨地方数据交易所三大痛点确权难技术体系分散监管尺度不一21世纪经济报道记者陈植上海报道4月10日,中共中央国务院关于加快建设全国统一大市场的意见(下称意见)正式发布。意见提出要加快培育统一的技术和数据市场,加快培育数据要素市场,建立健声称要正式对标苹果的小米是真有实力,还是飘了?目前,在手机领域,有着两个当之无愧的霸主。一个是已经基本退出中国市场的三星,另一个则是中国用户非常熟悉的苹果。因为三星在国内的销量基本接近0。因此目前大多数国产手机厂商的目标就是超英飞凌的IGBT和MCU在普遍缺料的汽车供应链,对于芯片的供给缺失,也成了制约产能提升和销售目标实现的关键,从已知的信息来看,重点影响还是微处理器以及功率半导体的供给饱和。从半导体产业的快速发展来说,是应公务员考试申论热点手机App自动续费背景链接近年来,手机应用付费市场火爆,但随之而来的,手机APP会员包月包年等服务套餐自动续费现象已经成为人们的困扰。例如使用某APP,只想开通一个月的会员服务,但是在第二个月却被自AndroidFrameworkAndroidFramework包含三个内容服务端客户端linux驱动服务端AndroidFramework服务端包括两个很重要的类WindowManagerService(WMS中科院李家洋院士团队创制世界首例重新设计与快速驯化的四倍体水稻材料开辟水稻育种新途径今年5月,我们的水稻材料就要在北京实验田里试种了,未来有望推广到更多地方。说起手头的工作,余泓眼睛里都是喜悦。余泓是中科院遗传与发育生物学研究所李家洋院士团队的一员。他说的水稻材料
国产全画幅摄影机崛起旗舰标杆专业拍摄必备KINEFINITYMAVOEdge6K摄影机国产之光卓曜科技,3月30日正式推出了KINEFINITYMAVOEdge6K摄影机。配备了全画幅规格的32CMOS传感器,有效像素为iGameGeForceRTX3090Ti水神显卡首发体验暴打TITANRTX七彩虹iGameNeptune水神系列显卡,一直以来是仅次于九段的王牌产品。全新的GeForceRTX3090Ti显卡作为英伟达RTX30系列显卡的巅峰之作,与七彩虹iGame水神蓝厂7英寸大屏新机曝光素皮后盖滑动静音按键,定位商务旗舰?目前国内几家头部手机厂商的迭代新品都发布得差不多了,蓝厂虽说来得慢,但是一下子会推出几款重磅新品。vivo新品发布会已经定档4月11日,除了X80系列,XNoteXFold以及平板市值蒸发超96,裁员千人,微信电商第一股怎么了?微信自己做了,有赞只能死只要腾讯还在一天,那么国内的所有互联网公司结局只有倒闭!2010年7月,著名互联网行业期刊计算机世界发布了一篇名为的腾讯的文章,文章犀利的指出腾讯靠抄袭发家搬进新宿舍没有网咋办?蒲公英X4C无线4G路由器居然可以这样救急前言当你住在员工宿舍,没有网络接口,别家的WiFi信号又不好应该咋办?这次我就遇到了类似的问题一个读完研的发小被安排在医院的员工宿舍。作为初来乍到的实习生,配搭的房间自然也不是太好运动时佩戴什么耳机比较合适骨传导运动耳机推荐运动和音乐都是现代生活中必不可少的生活元素,每天进行适当的运动,听一些音乐都可以缓解生活上的劳累。运动时佩戴有线耳机不仅不方便,还会对耳朵造成伤害,无线蓝牙耳机已经成为了运动耳机的刚聊什么,手机就会给推什么,真的吗?01调用手机麦克风权限如今,我们在下载使用手机APP时,经常会按照要求习惯性授予其手机权限,有时就包括手机麦克风。这就让一些APP有机可乘,当我们使用这些APP时,就有被通过麦克风投影仪伤不伤眼?了解一下伤眼的判断标准是什么?大家下班后,主要的放松是什么?想必,除了刷手机打游戏,绝大多数的人就是看电视吧!可对于租房党和学生族而言,如今动辄六七十寸的大电视放在所处的空间里,真的是个大累赘,手机或是电脑又没一探究竟手机充电器能不能混用?现在小伙伴们经常会遇到这么一种情况在手机快没电时,经常随便拿起一部充电器来充电。对此,有的小伙伴认为这样会很危险,有的认为只要能充电则万事大吉,那究竟这样做合不合理,安不安全?带着坚持两个聚焦战略普联软件上市首年净利同比增长66。18中证网讯(王珞)普联软件3月30日晚间发布上市后的首份年度报告,2021年公司坚持聚焦大型集团客户聚焦优势业务领域的发展战略,持续扩大战略客户服务领域,不断拓展细分领域的客户市场,新能源你想知道的事光伏基本面硅料价格走势,能否抄底以下内容来自泰达宏利基金直播间,嘉宾泰达宏利基金经理孙硕,在管产品泰达宏利绩优混合。更多精彩内容,请订阅泰达宏利头条号。Q1近期新能源的四个主要行业基本面如何?首先来看光伏,光伏的