范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Flink操练(三十三)之自定义键控状态(三)MapState

  0 简介
  MapState[K, V]保存Key-Value对。 MapState.get(key: K) MapState.put(key: K, value: V) MapState.contains(key: K) MapState.remove(key: K) 1 实例
  1.1 实例一:
  去重计算应该是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通常可以通过distinct的方式得到去重结果,而实时计算是一种增量、长期计算过程,我们在面对不同的场景,例如数据量的大小、计算结果精准度要求等可以使用不同的方案。
  此篇介绍如何通过编码方式实现精确去重,以一个实际场景为例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID、用户设备ID(idfa/imei/cookie)、点击时间。
  实现步骤分析: 为了当天的数据可重现,这里选择事件时间也就是广告点击时间作为每小时的窗口期划分 数据分组使用广告位ID+点击事件所属的小时 选择processFunction来实现,一个状态用来保存数据、另外一个状态用来保存对应的数据量 计算完成之后的数据清理,按照时间进度注册定时器清理
  广告数据 case class AdData(id:Int,devId:String,time:Long)
  分组数据 case class AdKey(id:Int,time:Long)
  主流程 val env=StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  val kafkaConfig=new Properties() kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092") kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")  val consumer=new FlinkKafkaConsumer[String]("topic1",new SimpleStringSchema,kafkaConfig) val ds=env.addSource(consumer)         .map(x=>{                 val s=x.split(",")                 AdData(s(0).toInt,s(1),s(2).toLong)                 }             ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[AdData](Time.minutes(1)){                 override def extractTimestamp(element: AdData): Long = element.time             })       .keyBy(x=>{             val endTime=              TimeWindow.getWindowStartWithOffset(x.time, 0, Time.hours(1).toMilliseconds)+Time.hours(1).toMilliseconds             AdKey(x.id,endTime)       })
  指定时间时间属性,这里设置允许1min的延时,可根据实际情况调整;
  时间的转换选择TimeWindow.getWindowStartWithOffset Flink在处理window中自带的方法,使用起来很方便,第一个参数 表示数据时间,第二个参数offset偏移量,默认为0,正常窗口划分都是整点方式,例如从0开始划分,这个offset就是相对于0的偏移量,第三个参数表示窗口大小,得到的结果是数据时间所属窗口的开始时间,这里加上了窗口大小,使用结束时间与广告位ID作为分组的Key。
  去重逻辑
  自定义Distinct1ProcessFunction 继承了KeyedProcessFunction, 方便起见使用输出类型使用Void,这里直接使用打印控制台方式查看结果,在实际中可输出到下游做一个批量的处理然后在输出;
  定义两个状态:MapState,key表示devId, value表示一个随意的值只是为了标识,该状态表示一个广告位在某个小时的设备数据,如果我们使用rocksdb作为statebackend, 那么会将mapstate中key作为rocksdb中key的一部分,mapstate中value作为rocksdb中的value, rocksdb中value 大小是有上限的,这种方式可以减少rocksdb value的大小;
  另外一个ValueState,存储当前MapState的数据量,是由于mapstate只能通过迭代方式获得数据量大小,每次获取都需要进行迭代,这种方式可以避免每次迭代。 class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey, AdData, Void] {    var devIdState: MapState[String, Int] = _   var devIdStateDesc: MapStateDescriptor[String, Int] = _   var countState: ValueState[Long] = _   var countStateDesc: ValueStateDescriptor[Long] = _      override def open(parameters: Configuration): Unit = {     devIdStateDesc = new MapStateDescriptor[String, Int]("devIdState", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[Int]))     devIdState = getRuntimeContext.getMapState(devIdStateDesc)     countStateDesc = new ValueStateDescriptor[Long]("countState", TypeInformation.of(classOf[Long]))     countState = getRuntimeContext.getState(countStateDesc)   }    override def processElement(value: AdData, ctx: KeyedProcessFunction[AdKey, AdData, Void]#Context, out: Collector[Void]): Unit = {     val currW=ctx.timerService().currentWatermark()     if(ctx.getCurrentKey.time+1<=currW) {         println("late data:" + value)         return       }      val devId = value.devId     devIdState.get(devId) match {       case 1 => {         //表示已经存在       }        case _ => {         //表示不存在         devIdState.put(devId, 1)         val c = countState.value()         countState.update(c + 1)         //还需要注册一个定时器         ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)       }     }     println(countState.value())   }    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[AdKey, AdData, Void]#OnTimerContext, out: Collector[Void]): Unit = {     println(timestamp + " exec clean~~~")     println(countState.value())     devIdState.clear()     countState.clear()   } }
  数据清理通过注册定时器方式ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey.time + 1)表示当watermark大于该小时结束时间+1就会执行清理动作,调用onTimer方法。
  在处理逻辑里面加了 val currW=ctx.timerService().currentWatermark() if(ctx.getCurrentKey.time+1<=currW){         println("late data:" + value)         return   }
  主要考虑可能会存在滞后的数据比较严重,会影响之前的计算结果,做了一个类似window机制里面的一个延时判断,将延时的数据过滤掉,也可以使用OutputTag 单独处理。
  1.2 实例二:
  我们知道电商平台会将用户与商品的交互行为收集记录下来,行为数据主要包括几个字段:userId、itemId、categoryId、behavior和timestamp。其中userId和itemId分别代表用户和商品的唯一ID,categoryId为商品类目ID,behavior表示用户的行为类型,包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)等,timestamp记录行为发生时间。本文采用阿里巴巴提供的一个淘宝用户行为数据集,为了精简需要,只节选了部分数据。下面的代码使用 MapState[String, Int]  记录某个用户某种行为出现的次数。这里读取了数据集文件,模拟了一个淘宝用户行为数据流。/**   * 用户行为   * categoryId为商品类目ID   * behavior包括点击(pv)、购买(buy)、加购物车(cart)、喜欢(fav)   * */ case class UserBehavior(userId: Long,                           itemId: Long,                           categoryId: Int,                           behavior: String,                           timestamp: Long)  class MapStateFunction extends RichFlatMapFunction[UserBehavior, (Long, String, Int)] {    // 指向MapState的句柄   private var behaviorMapState: MapState[String, Int] = _    override def open(parameters: Configuration): Unit = {     // 创建StateDescriptor     val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])     // 通过StateDescriptor获取运行时上下文中的状态     behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)   }    override def flatMap(input: UserBehavior, collector: Collector[(Long, String, Int)]): Unit = {     var behaviorCnt = 1     // behavior有可能为pv、cart、fav、buy等     // 判断状态中是否有该behavior     if (behaviorMapState.contains(input.behavior)) {       behaviorCnt = behaviorMapState.get(input.behavior) + 1     }     // 更新状态     behaviorMapState.put(input.behavior, behaviorCnt)     collector.collect((input.userId, input.behavior, behaviorCnt))   } }  def main(args: Array[String]): Unit = {    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)   env.setParallelism(8)    // 获取数据源   val sourceStream: DataStream[UserBehavior] = env   .addSource(new UserBehaviorSource("state/UserBehavior-50.csv")).assignTimestampsAndWatermarks(new AscendingTimestampExtractor[UserBehavior]() {     override def extractAscendingTimestamp(userBehavior: UserBehavior): Long = {       // 原始数据单位为秒,乘以1000转换成毫秒       userBehavior.timestamp * 1000     }   }                                                                                            )    // 生成一个KeyedStream   val keyedStream =  sourceStream.keyBy(user => user.userId)    // 在KeyedStream上进行flatMap   val behaviorCountStream = keyedStream.flatMap(new MapStateFunction)    behaviorCountStream.print()    env.execute("state example") }  class UserBehaviorSource(path: String) extends RichSourceFunction[UserBehavior] {    var isRunning: Boolean = true   // 输入源   var streamSource: InputStream = _    override def run(sourceContext: SourceContext[UserBehavior]): Unit = {     // 从项目的resources目录获取输入     streamSource = MapStateExample.getClass.getClassLoader.getResourceAsStream(path)     val lines: Iterator[String] = scala.io.Source.fromInputStream(streamSource).getLines     while (isRunning && lines.hasNext) {       val line = lines.next()       val itemStrArr = line.split(",")       val userBehavior = UserBehavior(itemStrArr(0).toLong, itemStrArr(1).toLong, itemStrArr(2).toInt, itemStrArr(3), itemStrArr(4).toLong)       sourceContext.collect(userBehavior)     }   }    override def cancel(): Unit = {     streamSource.close()     isRunning = false   } }
  Keyed State是针对 KeyedStream  的状态,必须先对一个DataStream  进行keyBy  操作。在本例中,我们对用户ID进行了keyBy  ,那么用户ID为1的行为数据共享同一状态数据,以此类推,每个用户ID的行为数据共享自己的状态数据。
  之后,我们需要实现Rich类函数,比如 RichFlatMapFunction  ,或者KeyedProcessFunction  等函数类。这些算子函数类都是RichFunction  的一种实现,他们都有运行时上下文RuntimeContext  ,RuntimeContext  包含了状态数据。 在实现这些算子函数类时,一般是在open  方法中声明状态。open  是算子的初始化方法,它在实际处理函数之前调用。
  具体到状态的使用,我们首先要注册一个 StateDescriptor  。从名字中可以看出,StateDescriptor  是状态的一种描述,它描述了状态的名字和状态的数据结构。状态的名字可以用来区分不同的状态,一个算子内可以有多个不同的状态,每个状态的StateDescriptor  需要设置不同的名字。同时,我们也需要指定状态的具体数据结构,指定具体的数据结构非常重要,因为Flink要对其进行序列化和反序列化,以便进行Checkpoint和必要的恢复。数据结构的类型和序列化机制可以参考我之前的文章:Flink进阶教程:数据类型和序列化机制简介。
  在本例中,我们使用 val behaviorMapStateDescriptor = new MapStateDescriptor[String, Int]("behaviorMap", classOf[String], classOf[Int])  注册了一个MapStateStateDescriptor  ,key为某种行为,如pv、buy等,数据类型为String  ,value为该行为出现的次数,数据类型为Int  。此外,每种类型的状态都有对应的StateDescriptor  ,比如MapStateDescriptor  对应MapState  ,ValueStateDescriptor  对应ValueState  。
  接着我们通过 StateDescriptor  向RuntimeContext  中获取状态句柄。本例中对应的代码为:behaviorMapState = getRuntimeContext.getMapState(behaviorMapStateDescriptor)  。状态句柄并不存储状态,它只是Flink提供的一种访问状态的接口,状态数据实际存储在State Backend中。
  使用和更新状态发生在实际的处理函数上,比如 RichFlatMapFunction  中的flatMap  方法,在实现自己的业务逻辑时访问和修改状态,比如通过get  方法获取状态。
  1.3 实例三 package qiuhua; import com.google.common.collect.Lists; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.UUID; /**  * @program: bigdata_learn  * @description: 如果当前的 key 出现了 3 次,则需要计算平均值  * @author: Mr.逗  * @create: 2021-09-08 16:47  *  MapState :这个状态为每一个 key 保存一个 Map 集合  *      put() 将对应的 key 的键值对放到状态中  *      values() 拿到 MapState 中所有的 value  *      clear() 清除状态  **/ public class CountAverageWithMapState extends RichFlatMapFunction,Tuple2> {     //1. MapState :key 是一个唯一的值,value 是接收到的相同的 key 对应的 value 的值      @Override     public void open(Configuration parameters) throws Exception {         super.open(parameters);         //注册状态         MapStateDescriptor descriptor = new MapStateDescriptor<>("map_state", String.class, Long.class);         mapState=getRuntimeContext().getMapState(descriptor);     }      /**      * MapState:      *      Map集合的特点,相同key,会覆盖数据。      */      private MapState mapState;     @Override     public void flatMap(Tuple2 value, Collector> out) throws Exception {      mapState.put(UUID.randomUUID().toString(),value.f1);      //如果当前key出现了三次,就进行计算平均值         ArrayList allElements = Lists.newArrayList(mapState.values());         if (allElements.size()==3)         {             long count=0;             long sum=0;             for(Long ele:allElements)             {                 count++;                 sum+=ele;             }             double avg=(double)sum/count;             out.collect(Tuple2.of(value.f0,avg));             //清除状态             mapState.clear();         }     } }

网购垃圾网易云耳机买了个网易云耳机,140块,还不如30的,操作太复杂,没有实体按键,始终是单耳模式,只响一个,说明书也没有切换双耳模式的操作步骤,问了客服才复杂六大步的操作步骤,操作好几次都不成功双十一上线微信扫码付淘特执着喊话微信本报记者李立上海报道作为阿里核心电商业务之一,临近双十一,淘特(前身为淘宝特价版)日前宣布上线微信扫码付功能。表面上这很像是淘特和微信的一次直连,不过据中国经营报记者了解到,微信扫iPhone13ProMax对比荣耀Magic3至臻版,到底谁才是旗舰标杆?每年的这个时候,手机圈总会热闹非凡,因为苹果和安卓通常会在这个时间上演巅峰对接。而要说今年最能够对苹果产生威胁的,无疑是荣耀。一方面,荣耀Magic3系列继承了华为Mate系列的衣红米Note11Pro在路上,旗舰芯片120W快充,网友值得期待说到性价比,很多人想会到小米。其实现在的性价比完全是红米在扛大旗,也带动了整个手机市场。在这几年红米手机从低端的数字系列,到现在主打K系列高端机型,在国内口碑都非常不错。而在全球发今年最好的旗舰本之一荣耀MagicBookV14必买的七大理由荣耀笔记本在2021年有非常亮眼的销量表现,消费者认知和认可度都在攀升,最新的荣耀MagicBookV14更是把目标瞄准了中高端市场,成为继手机之后荣耀重磅打造的又一款具备枢纽级互转转Q3手机行情报告5G换机潮越演越热iPhone13刷爆热搜2021年三季度,国内手机市场群雄逐鹿各大品牌奋力冲高,素有晴雨表之称的二手市场又有哪些变化?10月8日,转转集团发布2021年三季度手机行情报告(以下简称Q3转转集团手机行情),什么是PHEV插电动力汽车什么是PHEV插电动力汽车?相信很多用户对嘉际的插电动力都存在疑问,它到底是怎么工作的呢?这里小小际就向大家大致介绍一下什么是插电混合动力汽车。PHEV,全称PluginHybri未来乘用车主流动力是纯电?混动?还是燃料电池?纯电汽车工业在人类历史上已经走过了百年之久,汽车的动力历经蒸汽机内燃机时代。内燃机时代,电动车短暂登上历史舞台,但最终由于技术当时条件等种种原因未能取代内燃机而走向落寞。随着技术的不断你初学编程的时候是怎么提高自己的Debug能力的?作为一名IT行业的从业人员,也是一名老程序员。现在主要在从事产品研发及项目管理工作,所以我来探讨一下这个问题。什么是BugBug一词的英文翻译为臭虫子或虫子。但是现在,我们认为是在比亚迪新能源的领导者比亚迪将于4月7日晚的新车发布会上,上市比亚迪秦PLUSEV秦EV宋PLUSEV唐EV和e2五款新车。其中,比亚迪秦PLUSEV宋PLUSEV两款为全新车型,设计上均采用了比亚迪家罗永浩,凤姐老网红活跃时间都很长,为什么现在网红周期都很短?这些初代网红都是借住互联网刚刚兴起时的东风出现的。最早的应该是芙蓉姐姐从水木清华论坛可谓是一夜爆红,各种奇葩造型照片疯传。罗永浩是讲课时各种经典语录音频,在互联网上疯传,甚至有人下
智慧全运都有哪些黑科技四年一度的国内最高水平综合性体育盛会全运会开幕在即,陕西省体育场长安常宁生态体育馆等智慧场馆已焕然一新。为了打造智慧全运,中国电信针对全运会打造一张精品5G网络,分布全省十地市的5新机小米11T发布华为nova9红米K50曝魅族小屏旗舰新机华为nova9系列真机配置曝光今天,据博主菊厂影业Fans称,全新nova9标准版采用居中挖孔设计,而nova9Pro与荣耀50Pro设计一致,共有四个配色,手机采用磨砂质感后壳。谁是郭台铭痛恨的人?比亚迪富士康恩怨录谁是郭台铭最痛恨的敌人?答案是比亚迪总裁王传福谁掳获了巴菲特的心,答案还是一样,王传福!无惧,是王传福单挑鸿海郭台铭的武器,也因此赢得巴菲特的青睐。王传福出身贫寒,一无所有,当一个网上看到一些几百块钱的笔记本电脑,真的假的?真的。我以前买过。还是好一些的。便宜点的五六百以内。办公啥的还能用一下。质量靠运气网上有很多几百块钱的笔记本电脑,真假难辨,道儿很多,水很深!不懂的朋友建议不要轻易购买。下面就把我你们还在用QQ吗?用的话一般用来干啥?斗地主!qq现在还用的,之前踩空间,留言,偷菜,分享心情,写日志,充会员,充红钻,绿钻,偷菜等等,很多人都在用QQ。那时候社交软件不多,QQ迅速发展起来,成为人们日常生活中经常使用为实现苹果的环保目标,此次的iPhone13系列,苹果又没了包装膜苹果为了环保也是拼了,自从去年以环保的名义去除了充电头,耳机之后,今年刚刚发布的iPhone13系列依旧与iPhone12一样取消了充电头,耳机,不仅如此,这次苹果又在包装上持续将我们常说的区块链POW和POS到底有何区别?我们在了解区块链后常常会听到一个词汇叫共识机制。以太坊计划从PoW过渡到PoS比特币因两种机制产生不同能耗时,这两家为彼此的优缺点各执一词,争论愈发激烈。那么工作证明机制(PoW)聚焦在鸿蒙生态,华为HDC开发者大会是一场属于开发者自己的盛宴华为开发者大会(HDC)是华为面向全球开发者的年度盛会,今年的华为开发者大会(HDC)将于10月22日10月24日在东莞松山湖举行,此次大会可以会有鸿蒙生态HMSCore6。0全屋博世打造电动自行车生态提供核心组件实现OTA无缝升级在探索自己的电动自行车概念之外,博世(Bosch)还为许多其他制造商提供动力系统和电池组。近日,该公司对现有生态系统进行了改进,将现代电动自行车的所有必要组件纳入到一个干净的包装中如果让你购买汽车润滑油自用,你会选择哪个品牌?其实我给大家录过专业的视频,其实选润滑油呢,我觉得主要从以下几点去考虑,首先呢第1点呢就是对磨损的降低,因为发动机就是汽车的心脏磨损降低是最大的问题,而车最大的问题就是85的问题都早报iPhone13首发直降500!4699元起GoPro10发布9月18日早报导读华为或已解决射频芯片问题5G功能将在不久的将来重新推出外媒曝光iPhone13Pro远峰蓝真机上手微信上线一证通查你有几个电话号码?还有关怀模式四年啦,终于想起换