(1)通过FlinkSQL将数据写入mysqldemo
FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。
(1)首先引入POM依赖: 1.13.1 2.12 1.7.30 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-json ${flink.version} com.fasterxml.jackson.core jackson-databind 2.12.0 mysql mysql-connector-java 8.0.16 com.alibaba fastjson 1.2.66
(2)编写代码 public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() //.useOldPlanner() // flink .useBlinkPlanner() // blink .build(); StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings); String ddl = "CREATE TABLE flinksinksds(r " + "componentname STRING,r " + "componentcount INT,r " + "componentsum INTr " + ") WITH(r " + ""connector.type"="jdbc",r " + ""connector.driver" = "com.mysql.cj.jdbc.Driver"," + ""connector.url"="jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai",r " + ""connector.table"="flinksink",r " + ""connector.username"="root",r " + ""connector.password"="root",r " + ""connector.write.flush.max-rows"="1"r " + ")"; System.err.println(ddl); ste.executeSql(ddl); String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" + "values("1024", 1 , 2 )"; ste.executeSql(insert); env.execute(); System.exit(0);}
(3)执行结果: