Flinksql窗口函数功能验证
数据准备:向测试环境kafka TEST topic依次添加如下6条数据{ "bidtime" : "2020-04-15 08:05:00" , "price" : "4.00" , "item" : "C" }, { "bidtime" : "2020-04-15 08:07:00" , "price" : "2.00" , "item" : "A" }, { "bidtime" : "2020-04-15 08:09:00" , "price" : "5.00" , "item" : "D" }, { "bidtime" : "2020-04-15 08:11:00" , "price" : "3.00" , "item" : "B" }, { "bidtime" : "2020-04-15 08:13:00" , "price" : "1.00" , "item" : "E" }, { "bidtime" : "2020-04-15 08:17:00" , "price" : "6.00" , "item" : "F" } 打开flink sql client 建表:事件时间(Event Time)create table Bid( bidtime TIMESTAMP ( 3 ), price DECIMAL ( 10 , 2 ), item STRING, watermark for bidtime as bidtime - interval "1" second ) WITH ( "connector" = "kafka" , "topic" = "TEST.LIQIANG" , "properties.bootstrap.servers" = "172.22.17.26:9092,172.22.17.27:9092,172.22.17.28:9092" , "scan.startup.mode" = "earliest-offset" , "properties.group.id" = "flink-sql-client-local" , "properties.fetch.max.bytes" = "5242880" , "format" = "json" ); 处理时间(Processing Time)create table Bid( bidtime TIMESTAMP ( 3 ), price DECIMAL ( 10 , 2 ), item STRING, pt as PROCTIME() ) WITH ( "connector" = "kafka" , "topic" = "TEST.LIQIANG" , "properties.bootstrap.servers" = "172.22.17.26:9092,172.22.17.27:9092,172.22.17.28:9092" , "scan.startup.mode" = "earliest-offset" , "properties.group.id" = "flink-sql-client-local" , "properties.fetch.max.bytes" = "5242880" , "format" = "json" ); 做窗口查询:事件时间查询select tumble_rowtime(bidtime, interval "20" SECOND ) as window_time, sum (price) from Bid group by tumble(bidtime, interval "20" SECOND ); 处理时间查询select TUMBLE_PROCTIME(pt, interval "20" SECOND ) as window_time, TUMBLE_START(pt, interval "20" SECOND ) as window_start, TUMBLE_START(pt, interval "20" SECOND ) as window_end, sum (price) as sum_price from Bid group by tumble(pt, interval "20" SECOND ); 结果展示: