范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Flink操练(三十二)之自定义键控状态(二)ListState

  0 简介
  ListState[T]保存一个列表,列表里的元素的数据类型为T。基本操作如下: ListState.add(value: T) ListState.addAll(values: java.util.List[T]) ListState.get()返回Iterable[T] ListState.update(values: java.util.List[T])
  ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。 1.实例
  1.1 实例一
  首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法:     private var itemState : ListState[ItemViewCount] = _      override def open(parameters: Configuration): Unit = {        //命名状态变量的名字和类型       val itemStateDescription: ListStateDescriptor[ItemViewCount] = new ListStateDescriptor[ItemViewCount]("itemState", classOf[ItemViewCount])       itemState = getRuntimeContext.getListState(itemStateDescription)     }
  ListStateDescriptor提供了几种不同的定义方式:
  两个参数分别是ListStateDescriptor的名字和typeClass
  1.2 实例二 package qiuhua; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; import org.apache.flink.util.Collector; import java.util.Collections; import java.util.List;  /**  * @program: bigdata_learn  * @description: 通过ListState求key 出现了 3 次,则需要计算平均值  * @author: Mr.逗  * @create: 2021-09-08 16:18  **/ public class CountAverageWithListState extends RichFlatMapFunction, Tuple2> {     /**      * ValueState : 里面只能存一条元素      * ListState : 里面可以存很多数据      */     private ListState> elementsByKey;      @Override     public void open(Configuration parameters) throws Exception {         super.open(parameters);         //注册状态         ListStateDescriptor> descriptor = new ListStateDescriptor<>                 ("list_state"//状态名字                         , Types.TUPLE(Types.LONG, Types.LONG)//状态存储的数据类型                 );         elementsByKey=getRuntimeContext().getListState(descriptor);     }      @Override     public void flatMap(Tuple2 value, Collector> out) throws Exception {         Iterable> currentState  = elementsByKey.get();//拿到当前key的状态值         //如果状态值没有初始化,则初始化         if(currentState==null)         {             elementsByKey.addAll(Collections.emptyList());         }         //更新状态         elementsByKey.update((List>) value);         //判断,如果当前key出现了3次,则需要计算平均值,并且输出         List> allElements = Lists.newArrayList(currentState);         if (allElements.size()==3)         {             long count=0;             long sum=0;             for(Tuple2 ele:allElements)             {                 count++;                 sum+=ele.f1;             }             double avg=(double)sum/count;             out.collect(Tuple2.of(value.f0,avg));             //清除状态             elementsByKey.clear();         }     } }
  总结
  Flink提供了三种基于key/value的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件: 直接基于keyedStream或者由keyedStream转换来的windowedStream 必须继承RichFunction
  实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能: val fromTransactionDataStream = watermarkTransaction       .keyBy(_.code)       .window(TumblingEventTimeWindows.of(Time.seconds(10)))        val transaction = fromTransactionDataStream       .apply(new StockTransactionApply)       .keyBy(_._3)       .flatMap(new TransactionStateFlatMapFunction)

互联网摸鱼大赏图片来源视觉中国文吴怼怼,作者麦可可当一款名为摸鱼的软件光明正大地出现在苹果APPStore上时,你会猛然发觉,摸鱼已经成为全世界打工人的共鸣了。老实说,电子厂的流水线和CBD的互MySQL的varchar水真的太深了,你真的会用吗?1。InnoDB是干嘛的?InnoDB是一个将表中的数据存储到磁盘上的存储引擎。2。InnoDB是如何读写数据的?InnoDB处理数据的过程是发生在内存中的,需要把磁盘中的数据加载苹果在全球有超过18亿台活跃设备创下新纪录苹果公司CEO库克在2022年第一财季的财报电话会议上表示,苹果公司目前在全球拥有超过18亿台活跃设备,创下新纪录。2020年同期,苹果有15亿台活跃设备2021年同期有16。5亿微信个性铃声怎么设置?小技巧解决大问题随着微信版本的不断更新提高,现在微信也可以设置个性铃声了。微信默认的铃声虽然好听,但是手机微信来电一样很容易和别人混在一起,设置个性的铃声就可以分开了,再也不用和其他人一起看手机了国家网信办拟规定人脸替换等深度合成内容应显著标识为规范互联网信息服务深度合成活动,国家互联网信息办公室起草的互联网信息服务深度合成管理规定(征求意见稿)近日向社会公开征求意见。征求意见稿明确了深度合成信息内容标识管理制度,要求深实地探访数字人民币现状商家不解,银行观望,距离普及还有多远?图片来源视觉中国文偲睿洞察,作者沈松霖2022年的第一个工作日,数字人民币App试点版正式上线。只要身处11个试点地区,包括深圳苏州雄安成都上海海南长沙西安青岛大连及冬奥会场景(北夜深人静的时候,你是怎么排解孤独的?(原创)对退休老人来说,入睡前清零,即把一切喜乐或烦恼都忘掉,不带任何负担进入梦乡,清晨起来以最佳心态开始新的一天生活,何来孤独?!有生之年,边拥有,边清零。享受忙碌,适应孤独。普中国察打一体无人机处于什么水平,比阿塞拜疆土耳其美国的如何?我是萨沙,我来回答。既然谈军事,就要实事求是。不可否认我国察打一体无人机,是比较先进的,在世界军火市场也有很好的销路。但是,同美国MQ9无人机相比,差距还是显而易见的。我们首先看看擅自投放快递柜将被罚民生周刊全媒体记者于海军对于快递业而言,随着消费需求旺盛,快递业末端趋于多元化,快递配送激励机制也应该随之转变,这也将成为快递业高质量发展的一个重要方向。您的快递已放置快递柜,请凭编程语言将走入怎样的2022本文是2021InfoQ年度技术盘点与展望系列文章之一,由直播内容整理而成,重点聚焦编程语言领域在2022年的核心趋势展望,希望能帮助你准确把握2022年编程语言领域的核心发展脉络2月1日起生效比亚迪将上调新能源车型价格受原材料价格大幅上涨新能源购车补贴退坡等因素影响,不少车企均官宣涨价。日前,网通社从比亚迪官方获悉,其发布了关于车型价格调整的说明,宣布自2月1日起,调整旗下新能源车型的价格,上调
世界上最短的电梯,只有四五层台阶的高度,并获吉尼斯世界纪录世界上最短的电梯在哪?呶!位于日本川崎市商场的这个号称全世界最短的电梯,整个电梯五个台阶,全长只有83cm,呆萌的外表,一度也被称为卖萌电梯。可这么多年它也没停止运营,网友们可坐不华山东峰上风光与赌棋亭华山东峰位于陕西省渭南市华阴市,又名朝阳峰,海拔2090米,是华山主峰之一,因位置居东得名。峰顶有一平台,居高临险,视野开阔,是著名的观日出的地方,人称朝阳台,东峰也因之被称为朝阳华山东峰朝阳峰风光华山东峰位于陕西省渭南市华阴市,又名朝阳峰,海拔2090米,是华山主峰之一,因位置居东得名。峰顶有一平台,居高临险,视野开阔,是著名的观日出的地方,人称朝阳台,东峰也因之被称为朝阳科普电动汽车终局之战固态电池最近大众汽车集团召开了隆重的线上电池日PowerDay,其中公布了包括固态电池在内的很多电动汽车电池技术细节。很多朋友关心什么是电动汽车固态电池,为什么固态电池被称为电动汽车的终局WiFi相机FCC认证办理WiFi相机FCC认证办理步骤介绍。Wifi数码相机即相机带wifi功能,可以与wifi连接,让能通过网络更加便捷的上传或分享照片。但一般这类产品要出口美国则必须要办理FCC认证。做什么生意最赚钱?做什么生意最赚钱?有这样一个故事从前有一个小镇,每个人都喜欢借钱,平时就靠信用卡来过日子。有一天来了一位有钱的外地旅客,他来旅店住宿,拿出1000块钱放在了柜台上,说想选一间合适的神来之笔9毛9的传奇故事做面条起家,年入20亿,上市之后股价暴涨百分之44。这简直就是神来之笔。在港股上市的9毛9集团,人称一块不到,目前他们集团旗下的9毛9在中国西北菜排名里面排老二,而泰二这个品牌在酸赚钱那些事儿1(小说连载)赚钱的秘密是迷恋一个人1hr有些人很自恋,其实长丑点没关系,但长的丑还一天到晚的自恋那就一定是到达了至高的境界。大家仔细观察一下,这个世界上只要长的丑的人,同时又位高权重的,这样的怎么赚到第一个1000万?怎么赚到第一个1000万?现在到处自媒体平台在宣传1000万有多好挣,多么容易,导致现在月入5000的人都很自卑,不敢说话。1000万到底有多少?1个月挣10万,减去别的开销存8万海外捞金日入300刀你绝对想知道的秘密海外捞金日入300刀你绝对想知道的秘密如果你是个小白,一点互联网项目经验都没有,那就不要做一些收益高又轻松的项目,因为你经不起那些诱惑,下场就是被割割割,还是做一些脚踏实地的项目。赚钱背后的真相赚钱背后的真相全世界的富人占总数的3,中产阶级占10,87(含87)以下的人是穷人,比例为19,所有的穷人都在想尽办法变成富人。韩国电影寄生虫讲述两种穷人一种甘做富人的寄生虫,另一