Flink操练(三十一)之状态判断连续上升数秒
1、代码逻辑实现package day03; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.util.Random; // 整数连续1s上升 public class OneSecondByContinuousUp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env .addSource(new SourceFunction() { private boolean running = true; private Random random = new Random(); @Override public void run(SourceContext ctx) throws Exception { while (running) { ctx.collect(random.nextInt()); Thread.sleep(300L); } } @Override public void cancel() { running = false; } }) .keyBy(r -> 1) .process(new IntIncreaseAlert()) .print(); env.execute(); } public static class IntIncreaseAlert extends KeyedProcessFunction { private ValueState lastInt; private ValueState timerTs; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); lastInt = getRuntimeContext().getState(new ValueStateDescriptor("last-integer", Types.INT)); timerTs = getRuntimeContext().getState(new ValueStateDescriptor("timer", Types.LONG)); } @Override public void processElement(Integer value, Context ctx, Collector out) throws Exception { Integer prevInt = null; if (lastInt.value() != null) { prevInt = lastInt.value(); } lastInt.update(value); Long ts = null; if (timerTs.value() != null) { ts = timerTs.value(); } if (prevInt == null || value < prevInt) { if (ts != null) { ctx.timerService().deleteProcessingTimeTimer(ts); timerTs.clear(); } } else if (value > prevInt && ts == null) { long oneSecLater = ctx.timerService().currentProcessingTime() + 1000L; ctx.timerService().registerProcessingTimeTimer(oneSecLater); timerTs.update(oneSecLater); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector out) throws Exception { super.onTimer(timestamp, ctx, out); out.collect("整数连续1s上升了!"); timerTs.clear(); } } } 2、结果之展示"C:Program FilesJavajdk1.8.0_191binjava.exe" "-javaagent:F:appIntelliJ IDEA 2019.3.3libidea_rt.jar=57967:F:appIntelliJ IDEA 2019.3.3bin" -Dfile.encoding=UTF-8 -classpath "C:Program FilesJavajdk1.8.0_191jrelibcharsets.jar;C:Program FilesJavajdk1.8.0_191jrelibdeploy.jar;C:Program FilesJavajdk1.8.0_191jrelibextaccess-bridge-64.jar;C:Program FilesJavajdk1.8.0_191jrelibextcldrdata.jar;C:Program FilesJavajdk1.8.0_191jrelibextdnsns.jar;C:Program FilesJavajdk1.8.0_191jrelibextjaccess.jar;C:Program FilesJavajdk1.8.0_191jrelibextjfxrt.jar;C:Program FilesJavajdk1.8.0_191jrelibextlocaledata.jar;C:Program FilesJavajdk1.8.0_191jrelibext ashorn.jar;C:Program FilesJavajdk1.8.0_191jrelibextsunec.jar;C:Program FilesJavajdk1.8.0_191jrelibextsunjce_provider.jar;C:Program FilesJavajdk1.8.0_191jrelibextsunmscapi.jar;C:Program FilesJavajdk1.8.0_191jrelibextsunpkcs11.jar;C:Program FilesJavajdk1.8.0_191jrelibextzipfs.jar;C:Program FilesJavajdk1.8.0_191jrelibjavaws.jar;C:Program FilesJavajdk1.8.0_191jrelibjce.jar;C:Program FilesJavajdk1.8.0_191jrelibjfr.jar;C:Program FilesJavajdk1.8.0_191jrelibjfxswt.jar;C:Program FilesJavajdk1.8.0_191jrelibjsse.jar;C:Program FilesJavajdk1.8.0_191jrelibmanagement-agent.jar;C:Program FilesJavajdk1.8.0_191jrelibplugin.jar;C:Program FilesJavajdk1.8.0_191jrelibresources.jar;C:Program FilesJavajdk1.8.0_191jrelibrt.jar;D:bigDatabigData_learnFlink_learn argetclasses;C:UsersAdministrator.m2repositoryorgapacheflinkflink-java1.13.0flink-java-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-core1.13.0flink-core-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-annotations1.13.0flink-annotations-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-metrics-core1.13.0flink-metrics-core-1.13.0.jar;C:UsersAdministrator.m2repositorycomesotericsoftwarekryokryo2.24.0kryo-2.24.0.jar;C:UsersAdministrator.m2repositorycomesotericsoftwareminlogminlog1.2minlog-1.2.jar;C:UsersAdministrator.m2repositoryorgobjenesisobjenesis2.1objenesis-2.1.jar;C:UsersAdministrator.m2repositorycommons-collectionscommons-collections3.2.2commons-collections-3.2.2.jar;C:UsersAdministrator.m2repositoryorgapachecommonscommons-compress1.20commons-compress-1.20.jar;C:UsersAdministrator.m2repositoryorgapachecommonscommons-lang33.3.2commons-lang3-3.3.2.jar;C:UsersAdministrator.m2repositoryorgapachecommonscommons-math33.5commons-math3-3.5.jar;C:UsersAdministrator.m2repositoryorgslf4jslf4j-api1.7.15slf4j-api-1.7.15.jar;C:UsersAdministrator.m2repositorycomgooglecodefindbugsjsr3051.3.9jsr305-1.3.9.jar;C:UsersAdministrator.m2repositoryorgapacheflinkforce-shading1.13.0force-shading-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-streaming-java_2.121.13.0flink-streaming-java_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-file-sink-common1.13.0flink-file-sink-common-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-runtime_2.121.13.0flink-runtime_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-queryable-state-client-java1.13.0flink-queryable-state-client-java-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-hadoop-fs1.13.0flink-hadoop-fs-1.13.0.jar;C:UsersAdministrator.m2repositorycommons-iocommons-io2.7commons-io-2.7.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-shaded-netty4.1.49.Final-13.0flink-shaded-netty-4.1.49.Final-13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-shaded-jackson2.12.1-13.0flink-shaded-jackson-2.12.1-13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-shaded-zookeeper-33.4.14-13.0flink-shaded-zookeeper-3-3.4.14-13.0.jar;C:UsersAdministrator.m2repositoryorgjavassistjavassist3.24.0-GAjavassist-3.24.0-GA.jar;C:UsersAdministrator.m2repositorycom ypesafeakkaakka-actor_2.122.5.21akka-actor_2.12-2.5.21.jar;C:UsersAdministrator.m2repositorycom ypesafeconfig1.3.3config-1.3.3.jar;C:UsersAdministrator.m2repositoryorgscala-langmodulesscala-java8-compat_2.12 .8.0scala-java8-compat_2.12-0.8.0.jar;C:UsersAdministrator.m2repositorycom ypesafeakkaakka-stream_2.122.5.21akka-stream_2.12-2.5.21.jar;C:UsersAdministrator.m2repositoryorgreactivestreamsreactive-streams1.0.2reactive-streams-1.0.2.jar;C:UsersAdministrator.m2repositorycom ypesafessl-config-core_2.12 .3.7ssl-config-core_2.12-0.3.7.jar;C:UsersAdministrator.m2repositoryorgscala-langmodulesscala-parser-combinators_2.121.1.1scala-parser-combinators_2.12-1.1.1.jar;C:UsersAdministrator.m2repositorycom ypesafeakkaakka-protobuf_2.122.5.21akka-protobuf_2.12-2.5.21.jar;C:UsersAdministrator.m2repositorycom ypesafeakkaakka-slf4j_2.122.5.21akka-slf4j_2.12-2.5.21.jar;C:UsersAdministrator.m2repositoryorgclappergrizzled-slf4j_2.121.3.2grizzled-slf4j_2.12-1.3.2.jar;C:UsersAdministrator.m2repositorycomgithubscoptscopt_2.123.5.0scopt_2.12-3.5.0.jar;C:UsersAdministrator.m2repositoryorgxerialsnappysnappy-java1.1.8.3snappy-java-1.1.8.3.jar;C:UsersAdministrator.m2repositorycom witterchill_2.12 .7.6chill_2.12-0.7.6.jar;C:UsersAdministrator.m2repositorycom witterchill-java .7.6chill-java-0.7.6.jar;C:UsersAdministrator.m2repositoryorglz4lz4-java1.6.0lz4-java-1.6.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-shaded-guava18.0-13.0flink-shaded-guava-18.0-13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-clients_2.121.13.0flink-clients_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-optimizer_2.121.13.0flink-optimizer_2.12-1.13.0.jar;C:UsersAdministrator.m2repositorycommons-clicommons-cli1.3.1commons-cli-1.3.1.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-table-api-java-bridge_2.121.13.0flink-table-api-java-bridge_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-table-api-java1.13.0flink-table-api-java-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-table-planner-blink_2.121.13.0flink-table-planner-blink_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-table-api-scala_2.121.13.0flink-table-api-scala_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-table-api-scala-bridge_2.121.13.0flink-table-api-scala-bridge_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-table-runtime-blink_2.121.13.0flink-table-runtime-blink_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgcodehausjaninojanino3.0.11janino-3.0.11.jar;C:UsersAdministrator.m2repositoryorgcodehausjaninocommons-compiler3.0.11commons-compiler-3.0.11.jar;C:UsersAdministrator.m2repositoryorgapachecalciteavaticaavatica-core1.17.0avatica-core-1.17.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-streaming-scala_2.121.13.0flink-streaming-scala_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-scala_2.121.13.0flink-scala_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgscala-langscala-reflect2.12.7scala-reflect-2.12.7.jar;C:UsersAdministrator.m2repositoryorgscala-langscala-library2.12.7scala-library-2.12.7.jar;C:UsersAdministrator.m2repositoryorgscala-langscala-compiler2.12.7scala-compiler-2.12.7.jar;C:UsersAdministrator.m2repositoryorgscala-langmodulesscala-xml_2.121.0.6scala-xml_2.12-1.0.6.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-table-common1.13.0flink-table-common-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-connector-files1.13.0flink-connector-files-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-shaded-asm-77.1-13.0flink-shaded-asm-7-7.1-13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-cep_2.121.13.0flink-cep_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-csv1.13.0flink-csv-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-connector-kafka_2.121.13.0flink-connector-kafka_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapachekafkakafka-clients2.4.1kafka-clients-2.4.1.jar;C:UsersAdministrator.m2repositorycomgithublubenzstd-jni1.4.3-1zstd-jni-1.4.3-1.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-connector-base1.13.0flink-connector-base-1.13.0.jar;C:UsersAdministrator.m2repositoryorgapachebahirflink-connector-redis_2.111.0flink-connector-redis_2.11-1.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-streaming-java_2.111.2.0flink-streaming-java_2.11-1.2.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-runtime_2.111.2.0flink-runtime_2.11-1.2.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-shaded-hadoop21.2.0flink-shaded-hadoop2-1.2.0.jar;C:UsersAdministrator.m2repositoryorg ukaanixz1.0xz-1.0.jar;C:UsersAdministrator.m2repositoryxmlencxmlenc .52xmlenc-0.52.jar;C:UsersAdministrator.m2repositorycommons-codeccommons-codec1.4commons-codec-1.4.jar;C:UsersAdministrator.m2repositorycommons-netcommons-net3.1commons-net-3.1.jar;C:UsersAdministrator.m2repositoryjavaxservletservlet-api2.5servlet-api-2.5.jar;C:UsersAdministrator.m2repositoryorgmortbayjettyjetty-util6.1.26jetty-util-6.1.26.jar;C:UsersAdministrator.m2repositorycomsunjerseyjersey-core1.9jersey-core-1.9.jar;C:UsersAdministrator.m2repositorycommons-elcommons-el1.0commons-el-1.0.jar;C:UsersAdministrator.m2repositorycommons-loggingcommons-logging1.1.3commons-logging-1.1.3.jar;C:UsersAdministrator.m2repositorycomjamesmurtyutilsjava-xmlbuilder .4java-xmlbuilder-0.4.jar;C:UsersAdministrator.m2repositorycommons-langcommons-lang2.6commons-lang-2.6.jar;C:UsersAdministrator.m2repositorycommons-configurationcommons-configuration1.7commons-configuration-1.7.jar;C:UsersAdministrator.m2repositorycommons-digestercommons-digester1.8.1commons-digester-1.8.1.jar;C:UsersAdministrator.m2repositoryorgcodehausjacksonjackson-core-asl1.8.8jackson-core-asl-1.8.8.jar;C:UsersAdministrator.m2repositoryorgcodehausjacksonjackson-mapper-asl1.8.8jackson-mapper-asl-1.8.8.jar;C:UsersAdministrator.m2repositoryorgapacheavroavro1.7.7avro-1.7.7.jar;C:UsersAdministrator.m2repositorycom houghtworksparanamerparanamer2.3paranamer-2.3.jar;C:UsersAdministrator.m2repositorycomjcraftjsch .1.42jsch-0.1.42.jar;C:UsersAdministrator.m2repositorycommons-beanutilscommons-beanutils-bean-collections1.8.3commons-beanutils-bean-collections-1.8.3.jar;C:UsersAdministrator.m2repositorycommons-daemoncommons-daemon1.0.13commons-daemon-1.0.13.jar;C:UsersAdministrator.m2repositoryjavaxxmlbindjaxb-api2.2.2jaxb-api-2.2.2.jar;C:UsersAdministrator.m2repositoryjavaxxmlstreamstax-api1.0-2stax-api-1.0-2.jar;C:UsersAdministrator.m2repositoryjavaxactivationactivation1.1activation-1.1.jar;C:UsersAdministrator.m2repositoryio etty etty-all4.0.27.Final etty-all-4.0.27.Final.jar;C:UsersAdministrator.m2repositorycomdata-artisansflakka-actor_2.112.3-customflakka-actor_2.11-2.3-custom.jar;C:UsersAdministrator.m2repositorycomdata-artisansflakka-remote_2.112.3-customflakka-remote_2.11-2.3-custom.jar;C:UsersAdministrator.m2repositoryio etty etty3.8.0.Final etty-3.8.0.Final.jar;C:UsersAdministrator.m2repositoryorguncommonsmathsuncommons-maths1.2.2auncommons-maths-1.2.2a.jar;C:UsersAdministrator.m2repositorycomdata-artisansflakka-slf4j_2.112.3-customflakka-slf4j_2.11-2.3-custom.jar;C:UsersAdministrator.m2repositoryorgclappergrizzled-slf4j_2.111.0.2grizzled-slf4j_2.11-1.0.2.jar;C:UsersAdministrator.m2repositorycomgithubscoptscopt_2.113.2.0scopt_2.11-3.2.0.jar;C:UsersAdministrator.m2repositorycomfasterxmljacksoncorejackson-core2.7.4jackson-core-2.7.4.jar;C:UsersAdministrator.m2repositorycomfasterxmljacksoncorejackson-databind2.7.4jackson-databind-2.7.4.jar;C:UsersAdministrator.m2repositorycomfasterxmljacksoncorejackson-annotations2.7.0jackson-annotations-2.7.0.jar;C:UsersAdministrator.m2repositoryorgapachezookeeperzookeeper3.4.6zookeeper-3.4.6.jar;C:UsersAdministrator.m2repositoryjlinejline .9.94jline-0.9.94.jar;C:UsersAdministrator.m2repositoryjunitjunit3.8.1junit-3.8.1.jar;C:UsersAdministrator.m2repositorycom witterchill_2.11 .7.4chill_2.11-0.7.4.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-clients_2.111.2.0flink-clients_2.11-1.2.0.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-optimizer_2.111.2.0flink-optimizer_2.11-1.2.0.jar;C:UsersAdministrator.m2repositoryorgapacheslingorg.apache.sling.commons.json2.0.6org.apache.sling.commons.json-2.0.6.jar;C:UsersAdministrator.m2repositorymysqlmysql-connector-java8.0.21mysql-connector-java-8.0.21.jar;C:UsersAdministrator.m2repositorycomgoogleprotobufprotobuf-java3.11.4protobuf-java-3.11.4.jar;C:UsersAdministrator.m2repositoryorgapacheflinkflink-connector-jdbc_2.121.13.0flink-connector-jdbc_2.12-1.13.0.jar;C:UsersAdministrator.m2repositoryorgslf4jslf4j-log4j121.7.30slf4j-log4j12-1.7.30.jar;C:UsersAdministrator.m2repositorylog4jlog4j1.2.17log4j-1.2.17.jar;C:UsersAdministrator.m2repositoryorgapachelogginglog4jlog4j-to-slf4j2.14.0log4j-to-slf4j-2.14.0.jar;C:UsersAdministrator.m2repositoryorgapachelogginglog4jlog4j-api2.14.0log4j-api-2.14.0.jar;C:UsersAdministrator.m2repositoryredisclientsjedis2.9.0jedis-2.9.0.jar;C:UsersAdministrator.m2repositoryorgapachecommonscommons-pool22.4.2commons-pool2-2.4.2.jar;C:UsersAdministrator.m2repositorycomgooglecodegsongson2.8.5gson-2.8.5.jar" day03.OneSecondByContinuousUp log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 整数连续1s上升了!
被中国干成白菜价的技术,曾遭美国垄断多年,分别是什么?最典型的,也是最操蛋的是挖地铁,隧道的盾构机。把个上亿美元的机器,变成了一堆垃圾。给我们国家的声誉造成重大损失,也加深了发达国家对我们在技术上的封锁。大型的技术咱也不知道啊,但有一
舞台上的机器人,能否帮观众看懂人类一百年前,捷克剧作家恰佩克异想天开,在舞台上造出一大群万能机器人,人类从此有了robot(机器人)这个词。后来生活模仿戏剧,各种各样的机器人真的来到了人的工作和生活中。而戏剧的脚步
iPhone13Pro使用体验,除了高刷,和12系列差别不大从iPhone12发布之后,就有13香这个说法,因为大家都觉得13系列会上高刷。而今年苹果新机发布后,人们傻眼了,只有13pro和max有高刷,13香不起来了,13pro才是真的香
民生银行获北京市科学技术进步二等奖一年一度的北京市科学技术奖日前揭晓,中国民生银行申报的银行分布式核心与直销银行云服务研发及应用项目荣获2020年度北京市科学技术进步奖二等奖,为获奖单位中唯一一家金融机构。北京市科
如果华为宣布推出自主的操作系统或剔除美国的国产替代,你是否欣然接受并购买?我当然欣然接受并购买了!作为一个有情怀的人,我在过去现在以及遥远的将来,都将是华为坚定而忠实的粉丝,这点从未动摇!华为推出自己的操作系统,应该不会遥远,据称该系统称为鸿蒙,系统的本
华为麒麟960怎么样?不知道题主听到了什么?怎么说的?把960说的有多好?但我分析您可能误会了,比如这么说吧,大家都说从iphone4S以后苹果的手机都不好,不再是那个带给世人惊喜的苹果了,不能再任媒体
华为手机的性价比真的不高吗?华为的性价比是最低的。就想着低配卖个比苹果手机还贵们高价,现在都是忽悠国人的。华为手机不做性价比,是一个牌子,中国的牌子而已。我不知道怎么回答。我觉得应该这样问,苹果和华为手机哪个
当你拥有华为Mate40Pro,还有必要换华为P50Pro吗?从P50Pro发布那一刻,就有不少网友在讨论到底是买最新的P50Pro还是买去年的Mate40Pro,手里有Mate40Pro也在想要不要卖了Mate40Pro去买P50Pro。其
清华首个AI学生走红!入职阿里华为,虚拟人时代来临?日前,清华大学首个AI虚拟学生华智冰首次露正脸唱歌,视频中的华智冰与真人无异,她面容姣好歌声甜美,动作真实,抱着一把吉他,自弹自唱了一首梁博的男孩。视频一经发出,引来众多网友围观,
微信国旗头像怎么弄的?看了看回答没有一个实操性强的不是链接不给力,要么就是模版少,没有想要的模板于是本人决定亲自上阵,花了一个小时的测试,终于找到了理想中,换国旗头像的操作方法。换头像时间,全程只用8秒
CPU和原子弹,哪个更难制造?一个朝鲜可以玩,你说呢提问有点问题,你应该问光刻机和原子弹哪个难造,我来告诉你,光刻机更难造CPU不难造,高性能,低能耗,小制程的CPU是真难造!比核弹还难!核弹也难,主要受世界五