创建基本环境创建环境StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。enableCheckpointing(1000);。。。创建配置信息EnvironmentSettingssettingsEnvironmentSettings。newInstance()。useBlinkPlanner()。inStreamingMode()。build();StreamTableEnvironmentstreamTableStreamTableEnvironment。create(env,settings);对时间有要求的可以设置时区streamTable。getConfig()。setLocalTimeZone(ZoneId。of(AsiaShanghai));编写DDL根据官网创建简单的DDL(参考官网)表1StringuserSqlcreatetableuserinfo(uidBIGINT,moneyBIGINT)WITH(connectordatagen,rowspersecond1,fields。uid。min12,fields。uid。max15,fields。money。min1,fields。money。max10);表2StringorderSqlcreatetableorderinfo(uidBIGINT,oidBIGINT,codeBIGINT)WITH(connectordatagen,rowspersecond20,fields。uid。min12,fields。uid。max15,fields。oid。min1,fields。oid。max10,fields。code。kindsequence,fields。code。start1,fields。code。end1000);添加状态配置信息左表状态streamTable。getConfig()。setIdleLeftStateRetention(Duration。ofMillis(10000));右表状态streamTable。getConfig()。setIdleRightStateRetention(Duration。ofMillis(5000));执行streamTable。executeSql(userSql);streamTable。executeSql(orderSql);TabletablestreamTable。sqlQuery(selectu。uid,o。uid,o。code,CURRENTTIMESTAMPfromuserinfouleftjoinorderinfooonu。uido。uid);转换Retract输出DataStreamTuple2Boolean,Rowtuple2DataStreamstreamTable。toRetractStream(table,Row。class);tuple2DataStream。writeAsText(e:log。txt,FileSystem。WriteMode。OVERWRITE);env。execute();验证查看 定位到参数值 已经获取到left,right对应的ttl状态时间