范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Flink1。13架构全集一文带你由浅入深精通Flink方方面面SQL篇D

  七、函数
  在SQL中,我们可以把一些数据的转换操作包装起来,嵌入到SQL查询中统一调用,这就是"函数"(functions)。Flink的Table API和SQL同样提供了函数的功能。两者在调用时略有不同:Table API中的函数是通过数据对象的方法调用来实现的;而SQL则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串str转换成全大写的形式,Table API的写法是调用str这个String对象的upperCase()方法:
  而SQL中的写法就是直接引用UPPER()函数,将str作为参数传入:
  由于Table API是内嵌在Java语言中的,很多方法需要在类中额外添加,因此扩展功能比较麻烦,目前支持的函数比较少;而且Table API也不如SQL的通用性强,所以一般情况下较少使用。下面我们主要介绍Flink SQL中函数的使用。
  Flink SQL中的函数可以分为两类:一类是SQL中内置的系统函数,直接通过函数名调用就可以,能够实现一些常用的转换操作,比如之前我们用到的COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。7.1 系统函数
  系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL提供了大量的系统函数,几乎支持所有的标准SQL中的操作,这为我们使用SQL编写流处理程序提供了极大的方便。
  Flink SQL中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。
  1. 标量函数(Scalar Functions)
  标量函数指的就是只对输入数据做转换操作、返回一个值的函数。标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准SQL中也有定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官网的完整函数列表。
  比较函数(Comparison Functions)
  比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。这个比较表达式可以是用 <、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。例如:(1)value1 = value2  判断两个值相等; (2)value1 <> value2  判断两个值不相等 (3)value IS NOT NULL 判断value不为空
  逻辑函数(Logical Functions)
  逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型的值。例如:(1)boolean1 OR boolean2  布尔值boolean1与布尔值boolean2取逻辑或 (2)boolean IS FALSE  判断布尔值boolean是否为false (3)NOT boolean  布尔值boolean取逻辑非
  算术函数(Arithmetic Functions)
  进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:(1)numeric1 + numeric2  两数相加 (2)POWER(numeric1, numeric2)  幂运算,取数numeric1的numeric2次方 (3)RAND()  返回(0.0, 1.0)区间内的一个double类型的伪随机数
  字符串函数(String Functions)
  进行字符串处理的函数。例如:(1)string1 || string2  两个字符串的连接 (2)UPPER(string)  将字符串string转为全部大写 (3)CHAR_LENGTH(string)  计算字符串string的长度
  时间函数(Temporal Functions)
  进行与时间相关操作的函数。例如:(1)DATE string  按格式"yyyy-MM-dd"解析字符串string,返回类型为SQL Date (2)TIMESTAMP string  按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为SQL timestamp (3)CURRENT_TIME  返回本地时区的当前时间,类型为SQL time(与LOCALTIME等价) (4)INTERVAL string range  返回一个时间间隔。
  2. 聚合函数(Aggregate Functions)
  聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。标准SQL中常见的聚合函数Flink SQL都是支持的,目前也在不断扩展,为流处理应用提供更强大的功能。例如:(1)COUNT(*)  返回所有行的数量,统计个数。 (2)SUM([ ALL | DISTINCT ] expression)  对某个字段进行求和操作。默认情况下省略了关键字ALL,表示对所有行求和;如果指定DISTINCT,则会对数据进行去重,每个值只叠加一次。 (3)RANK()   返回当前值在一组值中的排名。 (4)ROW_NUMBER()    对一组值排序后,返回当前值的行号。
  其中,RANK()和ROW_NUMBER()一般用在OVER窗口中。7.2 自定义函数(UDF)
  系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。Flink的Table API和SQL提供了多种自定义函数的接口,以抽象类的形式定义。当前UDF主要有以下几类:标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值; 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表; 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值; 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一个或多个新的行数据。
  1. 整体调用流程要想在代码中使用自定义的函数,我们需要首先自定义对应UDF抽象类的实现,并在表环境中注册这个函数,然后就可以在Table API和SQL中调用了。
  (1)注册函数
  注册函数时需要调用表环境的createTemporarySystemFunction()方法,传入注册的函数名以及UDF类的Class对象:// 注册函数 tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
  我们自定义的UDF类叫作MyFunction,它应该是上面四种UDF抽象类中某一个的具体实现;在环境中将它注册为名叫MyFunction的函数。
  (2)使用Table API调用函数
  在Table API中,需要使用call()方法来调用自定义函数:tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
  这里call()方法有两个参数,一个是注册好的函数名MyFunction,另一个则是函数调用时本身的参数。这里我们定义MyFunction在调用时,需要传入的参数是myField字段。
  (3)在SQL中调用函数
  当我们将函数注册为系统函数之后,在SQL中的调用就与内置系统函数完全一样了:tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
  可见,SQL的调用方式更加方便,我们后续依然会以SQL为例介绍UDF的用法。
  2. 标量函数(Scalar Functions)
  自定义标量函数可以把0个、 1个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数是"一对一"的转换。想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类ScalarFunction,并实现叫作eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public),而且名字必须是eval。求值方法eval可以重载多次,任何数据类型都可作为求值方法的参数和返回值类型。
  这里需要特别说明的是,ScalarFunction抽象类中并没有定义eval()方法,所以我们不能直接在代码中重写(override);但Table API的框架底层又要求了求值方法必须名字为eval()。这是Table API和SQL目前还显得不够完善的地方,未来的版本应该会有所改进。下面我们来看一个具体的例子。我们实现一个自定义的哈希(hash)函数HashFunction,用来求传入对象的哈希值。public static class HashFunction extends ScalarFunction {   // 接受任意类型输入,返回 INT 型输出   public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {     return o.hashCode();   } } // 注册函数 tableEnv.createTemporarySystemFunction("HashFunction", HashFunction.class); // 在 SQL 里调用注册好的函数 tableEnv.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
  这里我们自定义了一个ScalarFunction,实现了eval()求值方法,将任意类型的对象传入,得到一个Int类型的哈希值返回。当然,具体的求哈希操作就省略了,直接调用对象的hashCode()方法即可。
  另外注意,由于Table API在对函数进行解析时需要提取求值方法参数的类型引用,所以我们用DataTypeHint(inputGroup = InputGroup.ANY)对输入参数的类型做了标注,表示eval的参数可以是任意类型。
  3. 表函数(Table Functions)
  跟标量函数一样,表函数的输入参数也可以是 0个、1个或多个标量值;不同的是,它可以返回任意多行数据。"多行数据"事实上就构成了一个表,所以"表函数"可以认为就是返回一个表的函数,这是一个"一对多"的转换关系。之前我们介绍过的窗口TVF,本质上就是表函数。
  类似地,要实现自定义的表函数,需要自定义类来继承抽象类TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction类本身是有一个泛型参数T的,这就是表函数返回数据的类型;而eval()方法没有返回类型,内部也没有return语句,是通过调用collect()方法来发送想要输出的行数据的。
  在SQL中调用表函数,需要使用LATERAL TABLE()来生成扩展的"侧向表",然后与原始表进行联结(Join)。这里的Join操作可以是直接做交叉联结(cross join),在FROM后用逗号分隔两个表就可以;也可以是以ON TRUE为条件的左联结(LEFT JOIN)。
  下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数SplitFunction,可以将一个字符串转换成(字符串,长度)的二元组。// 注意这里的类型标注,输出是Row类型,Row中包含两个字段:word和length。 @FunctionHint(output = @DataTypeHint("ROW")) public static class SplitFunction extends TableFunction {   public void eval(String str) {     for (String s : str.split(" ")) {       // 使用collect()方法发送一行数据       collect(Row.of(s, s.length()));     }   } } // 注册函数 tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class); // 在 SQL 里调用注册好的函数 // 1. 交叉联结 tableEnv.sqlQuery(   "SELECT myField, word, length " +   "FROM MyTable, LATERAL TABLE(SplitFunction(myField))"); // 2. 带ON TRUE条件的左联结 tableEnv.sqlQuery(   "SELECT myField, word, length " +   "FROM MyTable " +   "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE"); // 重命名侧向表中的字段 tableEnv.sqlQuery(   "SELECT myField, newWord, newLength " +   "FROM MyTable " +   "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");
  这里我们直接将表函数的输出类型定义成了ROW,这就是得到的侧向表中的数据类型;每行数据转换后也只有一行。我们分别用交叉联结和左联结两种方式在SQL中进行了调用,还可以对侧向表的中字段进行重命名。
  4. 聚合函数(Aggregate Functions)
  用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据(也就是一个表)聚合成一个标量值。这是一个标准的"多对一"的转换。聚合函数的概念我们之前已经接触过多次,如SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定义聚合函数来实现功能了。自定义聚合函数需要继承抽象类AggregateFunction。AggregateFunction有两个泛型参数,T表示聚合输出的结果类型,ACC则表示聚合的中间状态类型。
  Flink SQL中的聚合函数的工作原理如下:(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API中的AggregateFunction非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。 (2)对于输入的每一行数据,都会调用accumulate()方法来更新累加器,这是聚合的核心过程。 (3)当所有的数据都处理完之后,通过调用getValue()方法来计算并返回最终的结果。
  所以,每个 AggregateFunction 都必须实现以下几个方法:createAccumulator() 这是创建累加器的方法。没有输入参数,返回类型为累加器类型ACC。 accumulate() 这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是当前的累加器,类型为ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。需要注意的是,accumulate()与之前的求值方法eval()类似,也是底层架构要求的,必须为public,方法名必须为accumulate,且无法直接override、只能手动实现。 getValue() 这是得到最终返回结果的方法。输入参数是ACC类型的累加器,输出类型为T。
  在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且名字必须跟上面写的完全一样。createAccumulator、getValue、getResultType 以及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以override;而其他则都是底层架构约定的方法。
  下面举一个具体的示例,我们从学生的分数表ScoreTable中计算每个学生的加权平均分。// 累加器类型定义 public static class WeightedAvgAccumulator {     public long sum = 0;    // 加权和     public int count = 0;    // 数据个数 } // 自定义聚合函数,输出为长整型的平均值,累加器类型为 WeightedAvgAccumulator public static class WeightedAvg extends AggregateFunction {     @Override     public WeightedAvgAccumulator createAccumulator() {         return new WeightedAvgAccumulator();    // 创建累加器     }     @Override     public Long getValue(WeightedAvgAccumulator acc) {         if (acc.count == 0) {             return null;    // 防止除数为0         } else {             return acc.sum / acc.count;    // 计算平均值并返回         }     }     // 累加计算方法,每来一行数据都会调用     public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {         acc.sum += iValue * iWeight;         acc.count += iWeight;     } } // 注册自定义聚合函数 tableEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class); // 调用函数计算加权平均值 Table result = tableEnv.sqlQuery(         "SELECT student, WeightedAvg(score, weight) FROM ScoreTable GROUP BY student" );
  聚合函数的accumulate()方法有三个输入参数。第一个是WeightedAvgAccum类型的累加器;另外两个则是函数调用时输入的字段:要计算的值 ivalue 和 对应的权重 iweight。这里我们并不考虑其它方法的实现,只要有必须的三个方法就可以了。
  5. 表聚合函数(Table Aggregate Functions)
  用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个"多对多"的转换。自定义表聚合函数需要继承抽象类TableAggregateFunction。TableAggregateFunction的结构和原理与AggregateFunction非常类似,同样有两个泛型参数,用一个ACC类型的累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction中也必须对应实现:createAccumulator() 创建累加器的方法,与AggregateFunction中用法相同。 accumulate() 聚合计算的核心方法,与AggregateFunction中用法相同。 emitValue() 所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着AggregateFunction中的getValue()方法;区别在于emitValue没有输出类型,而输入参数有两个:第一个是ACC类型的累加器,第二个则是用于输出数据的"收集器"out,它的类型为Collect。另外,emitValue()在抽象类中也没有定义,无法override,必须手动实现。
  表聚合函数相对比较复杂,它的一个典型应用场景就是TOP-N查询。比如我们希望选出一组数据排序后的前两名,这就是最简单的TOP-2查询。没有现成的系统函数,那么我们就可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在accumulate()方法中进行比较更新,最终在emitValue()中调用两次out.collect()将前两名数据输出。具体代码如下:// 聚合累加器的类型定义,包含最大的第一和第二两个数据 public static class Top2Accumulator {     public Integer first;     public Integer second; } // 自定义表聚合函数,查询一组数中最大的两个,返回值为(数值,排名)的二元组 public static class Top2 extends TableAggregateFunction, Top2Accumulator> {     @Override     public Top2Accumulator createAccumulator() {         Top2Accumulator acc = new Top2Accumulator();         acc.first = Integer.MIN_VALUE;    // 为方便比较,初始值给最小值         acc.second = Integer.MIN_VALUE;         return acc;     }     // 每来一个数据调用一次,判断是否更新累加器     public void accumulate(Top2Accumulator acc, Integer value) {         if (value > acc.first) {             acc.second = acc.first;             acc.first = value;         } else if (value > acc.second) {             acc.second = value;         }     }     // 输出(数值,排名)的二元组,输出两行数据     public void emitValue(Top2Accumulator acc, Collector> out) {         if (acc.first != Integer.MIN_VALUE) {             out.collect(Tuple2.of(acc.first, 1));         }         if (acc.second != Integer.MIN_VALUE) {             out.collect(Tuple2.of(acc.second, 2));         }     } }
  目前SQL中没有直接使用表聚合函数的方式,所以需要使用Table API的方式来调用:// 注册表聚合函数函数 tableEnv.createTemporarySystemFunction("Top2", Top2.class); // 在Table API中调用函数 tableEnv.from("MyTable")   .groupBy($("myField"))   .flatAggregate(call("Top2", $("value")).as("value", "rank"))   .select($("myField"), $("value"), $("rank"));
  这里使用了flatAggregate()方法,它就是专门用来调用表聚合函数的接口。对MyTable中数据按myField字段进行分组聚合,统计value值最大的两个;并将聚合结果的两个字段重命名为value和rank,之后就可以使用select()将它们提取出来了。八、连接到外部系统
  在Table API和SQL编写的Flink程序中,可以在创建表的时候用WITH子句指定连接器(connector),这样就可以连接到外部系统进行数据交互了。Flink的Table API和SQL支持了各种不同的连接器。当然,最简单的其实就是连接到控制台打印输出:CREATE TABLE ResultTable ( user STRING, cnt BIGINT WITH ( "connector" = "print" );
  这里只需要在WITH中定义connector为print就可以了。而对于其它的外部系统,则需要增加一些配置项。
  8.1 Kafka
  Kafka的SQL连接器可以从Kafka的主题(topic)读取数据转换成表,也可以将表数据写入Kafka的主题。换句话说,创建表的时候指定连接器为Kafka,则这个表既可以作为输入表,也可以作为输出表。
  1. 引入依赖
  想要在Flink程序中使用Kafka连接器,需要引入如下依赖:   org.apache.flink   flink-connector-kafka_${scala.binary.version}   ${flink.version} 
  这里我们引入的Flink和Kafka的连接器,与之前DataStream API中引入的连接器是一样的。如果想在SQL客户端里使用Kafka连接器,还需要下载对应的jar包放到lib目录下。另外,Flink为各种连接器提供了一系列的"表格式"(table formats),比如CSV、JSON、Avro、Parquet等等。这些表格式定义了底层存储的二进制数据和表的列之间的转换方式,相当于表的序列化工具。对于Kafka而言,CSV、JSON、Avro等主要格式都是支持的, 根据Kafka连接器中配置的格式,我们可能需要引入对应的依赖支持。以CSV为例:   org.apache.flink   flink-csv   ${flink.version} 
  由于SQL客户端中已经内置了CSV、JSON的支持,因此使用时无需专门引入;而对于没有内置支持的格式(比如Avro),则仍然要下载相应的jar包。
  2. 创建连接到Kafka的表
  创建一个连接到Kafka表,需要在CREATE TABLE的DDL中在WITH子句里指定连接器为Kafka,并定义必要的配置参数。下面是一个具体示例:CREATE TABLE KafkaTable (   `user` STRING,   `url` STRING,   `ts` TIMESTAMP(3) METADATA FROM "timestamp" ) WITH (   "connector" = "kafka",   "topic" = "events",   "properties.bootstrap.servers" = "hadoop102:9092",   "properties.group.id" = "testGroup",   "scan.startup.mode" = "earliest-offset",   "format" = "csv" )
  3. Upsert Kafka
  正常情况下,Kafka作为保持数据顺序的消息队列,读取和写入都应该是流式的数据,对应在表中就是仅追加(append-only)模式。如果我们想要将有更新操作(比如分组聚合)的结果表写入Kafka,就会因为Kafka无法识别撤回(retract)或更新插入(upsert)消息而导致异常。为了解决这个问题,Flink专门增加了一个"更新插入Kafka"(Upsert Kafka)连接器。这个连接器支持以更新插入(UPSERT)的方式向Kafka的topic中读写数据。下面是一个创建和使用Upsert Kafka表的例子:CREATE TABLE pageviews_per_region (   user_region STRING,   pv BIGINT,   uv BIGINT,   PRIMARY KEY (user_region) NOT ENFORCED ) WITH (   "connector" = "upsert-kafka",   "topic" = "pageviews_per_region",   "properties.bootstrap.servers" = "...",   "key.format" = "avro",   "value.format" = "avro" ); CREATE TABLE pageviews (   user_id BIGINT,   page_id BIGINT,   viewtime TIMESTAMP,   user_region STRING,   WATERMARK FOR viewtime AS viewtime - INTERVAL "2" SECOND ) WITH (   "connector" = "kafka",   "topic" = "pageviews",   "properties.bootstrap.servers" = "...",   "format" = "json" ); -- 计算 pv、uv 并插入到 upsert-kafka表中 INSERT INTO pageviews_per_region SELECT   user_region,   COUNT(*),   COUNT(DISTINCT user_id) FROM pageviews GROUP BY user_region;8.2 文件系统
  另一类非常常见的外部系统就是文件系统(File System)了。Flink提供了文件系统的连接器,支持从本地或者分布式的文件系统中读写数据。这个连接器是内置在Flink中的,所以使用它并不需要额外引入依赖。下面是一个连接到文件系统的示例:CREATE TABLE MyTable (   column_name1 INT,   column_name2 STRING,   ...   part_name1 INT,   part_name2 STRING ) PARTITIONED BY (part_name1, part_name2) WITH (   "connector" = "filesystem",           -- 连接器类型   "path" = "...",  -- 文件路径   "format" = "..."                      -- 文件格式 )
  这里在WITH前使用了PARTITIONED BY对数据进行了分区操作。文件系统连接器支持对分区文件的访问。8.3 JDBC
  Flink提供的JDBC连接器可以通过JDBC驱动程序(driver)向任意的关系型数据库读写数据,比如MySQL、PostgreSQL、Derby等。作为TableSink向数据库写入数据时,运行的模式取决于创建表的DDL是否定义了主键(primary key)。如果有主键,那么JDBC连接器就将以更新插入(Upsert)模式运行,可以向外部数据库发送按照指定键(key)的更新(UPDATE)和删除(DELETE)操作;如果没有定义主键,那么就将在追加(Append)模式下运行,不支持更新和删除操作。
  1. 引入依赖
  想要在Flink程序中使用JDBC连接器,需要引入如下依赖:   org.apache.flink   flink-connector-jdbc_${scala.binary.version}   ${flink.version} 
  此外,为了连接到特定的数据库,我们还用引入相关的driver依赖,比如MySQL:     mysql     mysql-connector-java     5.1.38 
  2. 创建JDBC表
  创建JDBC表的方法与前面Upsert Kafka大同小异。下面是一个具体示例:-- 创建一张连接到 MySQL的表CREATE TABLE MyTable (   id BIGINT,   name STRING,   age INT,   status BOOLEAN,   PRIMARY KEY (id) NOT ENFORCED ) WITH (    "connector" = "jdbc",    "url" = "jdbc:mysql://hadoop102:3306/mydatabase",    "table-name" = "users" ); -- 将另一张表 T的数据写入到 MyTable 表中 INSERT INTO MyTable SELECT id, name, age, status FROM T;
  这里创建表的DDL中定义了主键,所以数据会以Upsert模式写入到MySQL表中;而到MySQL的连接,是通过WITH子句中的url定义的。8.4 Elasticsearch
  Elasticsearch作为分布式搜索分析引擎,在大数据应用中有非常多的场景。Flink提供的Elasticsearch的SQL连接器只能作为TableSink,可以将表数据写入Elasticsearch的索引(index)。Elasticsearch连接器的使用与JDBC连接器非常相似,写入数据的模式同样是由创建表的DDL中是否有主键定义决定的。
  1. 引入依赖
  想要在Flink程序中使用Elasticsearch连接器,需要引入对应的依赖。具体的依赖与Elasticsearch服务器的版本有关,对于6.x版本引入依赖如下:   org.apache.flink  flink-connector-elasticsearch6_${scala.binary.version} ${flink.version}  对于Elasticsearch 7以上的版本,引入的依赖则是:    org.apache.flink  flink-connector-elasticsearch7_${scala.binary.version} ${flink.version} 
  2. 创建连接到Elasticsearch的表
  创建Elasticsearch表的方法与JDBC表基本一致。下面是一个具体示例:-- 创建一张连接到 Elasticsearch的 表 CREATE TABLE MyTable (   user_id STRING,   user_name STRING   uv BIGINT,   pv BIGINT,   PRIMARY KEY (user_id) NOT ENFORCED ) WITH (   "connector" = "elasticsearch-7",   "hosts" = "http://hadoop102:9200",   "index" = "users" );
  这里定义了主键,所以会以更新插入(Upsert)模式向Elasticsearch写入数据。

周鹏首秀0分!CBA揭幕战深圳大胜,山东3外援哑火陶汉林独木难支北京时间10月10日,新赛季CBA常规赛打响,揭幕战在深圳和山东之间进行,这是一场强强对话。在季前赛中,双方都曾击败卫冕冠军辽宁,实力不容小视。经过4节鏖战,深圳队10290击败山日本女乒头戴大熊猫领奖,伊藤美诚回应贺劭清中国新闻网日本女团头戴大熊猫开心领奖。国际乒联供图其实这个大熊猫配饰是早田希娜和她的教练在酒店发现的,那是一个画自画像的活动,我们六个人都画了自画像然后获得了它。谈及和队友在悲痛!迪巴拉重伤告别2022,穆里尼奥哭了,国米和尤文笑了罗马队靠着斯莫林和迪巴拉的进球,主场21艰难战胜莱切,在意甲取得两连胜。不过,穆里尼奥和罗马球员以及罗马球迷却高兴不起来。面对弱旅莱切,在第5分钟就取得领先,第22分钟对手就少一人U17亚预赛8队全胜,国足创奇迹,韩国被爆冷,也门跻身亚洲强队备受关注的U17亚洲杯预选赛已经正式落下帷幕,在最后一轮小组赛中,国足憾负澳大利亚,不过依靠着多支亚洲球队的帮助,中国队压哨获得出线权,跻身明年的U17亚洲杯决赛圈。至此,本次U1CBA失业球员再添1人,2米11高塔失业,签广厦无望,李春江帮不上时间来到10月10日,CBA新赛季常规赛今天正式打响,经过一个夏天的调整,各支球队的新阵容重组结束,一般来说核心球员不会有太多变动,但是每年都有新人涌现,与此同时一些老将逐渐退出联CBA新赛季今晚开打揭幕战浙江东阳光迎战辽宁男篮今晚19点35分,CBA新赛季的大幕就将开启,第一阶段常规赛将在杭州赛区采用赛会制的方式,分别在黄龙体育中心体育馆和杭州体育馆举行。在今晚的揭幕战中,对阵双方是浙江东阳光男篮和辽宁谷爱凌有望嫁欧洲首富儿子?男方家族资产超万亿,两人相差8岁在2022年这一年,谷爱凌用一天时间成为年轻人的骄傲,也走到很多人一生都难以企及的高度。因此,人们称其为天才少女,以赞叹她的成绩。不过随着对谷爱凌的深入了解,这个我们自以为的体育天CBA新赛季战火重燃,广东三队各有哪些变化?文羊城晚报全媒体记者郝浩宇10月10日,20222023赛季CBA联赛第一阶段将在浙江杭州拉开大幕,本阶段比赛依旧采取赛会制形式进行。按照CBA官方的通告,首阶段比赛将不开放观众入拿下个人第27个世界冠军后,马龙谈未来的打算昨晚,队长马龙率领中国男乒击败德国队,夺得世乒赛男团冠军,这也是马龙的第8个团体赛世界冠军以及个人第27个世界冠军。自2004年首次代表中国乒乓球队参加世界大赛,马龙已是第8次出战积分榜日本队30升第3,赛后开心庆祝!中国女排跌至第4遇劲敌10月9日21点30分,女排世锦赛复赛关键之战,日本过招荷兰。第一局,日本队迫使荷兰队再次暂停应对。林琴奈面对对方双人拦网,被拦回界内。山田二千华进攻出界,失误丢分。主教练随即暂停一战封神!张本智和力压樊振东王楚钦,和马龙争世乒赛MVP万众瞩目的成都世乒赛落下帷幕,男女团的冠军被国乒收入囊中,在亿万球迷的见证下国乒守住了国球荣誉,女乒3比0横扫日本女乒实现世乒赛5连冠,男乒半决赛3比2险胜日本队,决赛3比0横扫德
情人节封面大片72岁王石老当益壮,42岁田朴珺凹凸有致,太配了近日,王石和田朴珺夫妇合体登上出色WSJ中文版二月刊,演绎情人节封面大片。或许在这组充满爱的张力的大片,可以瞥见王石和田朴珺婚姻关系中的较劲和默契。第一个封面,42岁的田朴珺穿着黑为什么头皮上,会长疙瘩脓包?别再用手随便抠了!比较精致的人会注意头皮护理,定期到美容院在专业人员帮助下进行头部按摩,保养,可以让头部保持轻松,舒适状态,当然,没有这方面需求也可以在头部清洁时下工夫,洗头发不要单纯清洁毛发,头皮日常妆步骤步骤1先清洁(水份吸收后)2再涂抹爽肤水(涂抹方向两边脸颊往外,下巴往上,额头往两边)3乳液(油性涂少量,干性可多涂)4防晒5隔离霜(先刷子上大面积部位脸颊额头下巴再用海绵扑拍开,带着哈苏游北京手机拍照也够用2023年的春节,报复性旅游真的来了我作为一个旅游领域的生活家,也是三年了没怎么往外跑,这次新年放假肯定要带孩子去远点的地方玩。因为孩子幼儿园经常有爱国教育,所以他对天安门有一种执用700元就能买到的手机!红米10A有哪些地方吸引人虽然说绝大部分人都会将手机价位投向千元机以上,但事实证明,百元机同样备受欢迎,从使用途径可以分为两类,第一类当作备用机,仅用作打电话,第二类当作老人机,送给家里老人。一谈及百元手机针对美国对华为的制裁,对chatGPT做了一个深度采访针对美国对中国的技术封锁的几个热门问题,对chatGPT做了访谈。全程问答下来,整体还是很客观,人工智能已经可以让人刮目相看,现在芯片竞争本来就已经落后,很可能在AI革命又要落后一科贝尔我自认本赛季表现不错能理解诺伊尔因塔帕被解雇的爆发直播吧2月14日讯多特门将科贝尔日前做客了踢球者与流媒体dazn合作的播客节目,他在其中谈到了自己成为门将的选择本赛季的表现对于诺伊尔因塔帕洛维奇被解雇而爆发的看法等话题。成为门将中国女排颜值担当!完美腰臀比羡煞旁人,退役后逆生长婚姻幸福中国作为一个体育大国,各项体育运动都取得了不错的成绩,但是令人遗憾的是,在世界公认的三大球运动中,我们却距离世界前列的水平还有很大差距,唯一能够让我们在三大球运动中感到一丝慰藉的也亚历山大很高兴拿到周最佳球员我有几次被侵犯ampampamp但未获判罚直播吧2月14日讯雷霆今日100103惜败于鹈鹕。赛后,雷霆球员亚历山大接受了媒体采访。谈及自己获得周最佳,亚历山大说道能够获得周最佳球员,这对我来说很棒。谈及本场比赛,亚历山大继探索哪种金属持有细胞功能入门证新模型阐明生命的可能起源科技日报记者张梦然数十亿年前的地球是什么样子?哪些特征有助于生命的形成?在科学杂志最近发表的一篇论文中,美国罗切斯特大学和科罗拉多大学博尔德分校研究人员揭示了寻找答案的关键信息。该俄方排查进步MS23飞船来源中国新闻网俄罗斯国家航天集团公司总经理鲍里索夫13日表示,该司成立的紧急委员会正在全面排查导致进步MS21货运飞船温控系统密封失效的可能原因。鲍里索夫说,过去几个月中,国际空间