范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

flink流式增量查询hudi表流程分析

  环境flink 1.13.6 hudi 0.11.0 merge on read 表
  代码示例tEnv.executeSql("CREATE  TABLE tb_person_hudi ( id BIGINT, age INT, name STRING,create_time TIMESTAMP ( 3 ), time_stamp TIMESTAMP(3),PRIMARY KEY ( id ) NOT ENFORCED ) WITH ( " +         "	"connector" = "hudi", " +         "	"table.type" = "MERGE_ON_READ", " +         "	"path" = "file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi", " +         "	"read.start-commit" = "20220722103000", " +       //  "	"read.end-commit" = "20220722104000", " +         "	"read.task" = "1", " +         "	"read.streaming.enabled" = "true", " +         "	"read.streaming.check-interval" = "30"  " +         ")"); Table table = tEnv.sqlQuery("select * from tb_person_hudi "); tEnv.toChangelogStream(table).print().setParallelism(1); env.execute("test");
  流程分析
  hudi源入口(HoodieTableSource)
  HoodieTableSource实现ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4个接口主要是支持对查询计划的优化。ScanTableSource则提供了读取hudi表的具体实现,核心方法为org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider :if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {  //开启了流式读(read.streaming.enabled)   StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction(       conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths());   InputFormat inputFormat = getInputFormat(true);   OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);   SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))       .setParallelism(1)       .transform("split_reader", typeInfo, factory)       .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));    return new DataStreamSource<>(source); }
  上面代码在流环境中创建了一个SourceFunction(StreamReadMonitoringFunction)和一个自定义的转换(StreamReadOperator)StreamReadMonitoringFunction: 监控hudi表元数据目录(.hoodie)获取需要被读取的文件分片(MergeOnReadInputSplit,一个base parquet文件和一组log文件),然后把分片递给下游的转换算子StreamReadOperator进行文件读取;固定一个线程去监控,名称为split_monitorxxxxx. StreamReadOperator:将按timeline升序收到的MergeOnReadInputSplit一个一个地读取分片数据;算子名称为split_reader->xxxxx,可以通过设置read.tasks进行设置并行度
  定时监控元数据获得增量分片(StreamReadMonitoringFunction)
  StreamReadMonitoringFunction负责定时(read.streaming.check-interval)扫描hudi表的元数据目录.hoodie,如果发现在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],从这些instant信息中可以知道数据变更写到了哪些文件(parquet,log),然后构建成分片对象(MergeOnReadInputSplit)。核心属性:issuedInstant,这个是增量查询的依据,记录着当前已经消费的数据的最新instant,类似于kafka的offset,但是hudi是基于timeline.该值是有状态的,维护在ListState中,所以flink job重启依然可以做到增量。 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很简单,就做了两件事,调用IncrementalInputSplits#inputSplits获取到增量分片(有序),然后传递给下游的算子(StreamReadOperator) public void monitorDirAndForwardSplits(SourceContext context) {   HoodieTableMetaClient metaClient = getOrCreateMetaClient();   IncrementalInputSplits.Result result =       incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant);    for (MergeOnReadInputSplit split : result.getInputSplits()) {     context.collect(split);   } }
  获取增量分片(IncrementalInputSplits)
  主要逻辑在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi关于timeline和instant的一些基本概念,详细的流程如下图所示:
  如果flink job首次运行指定了read.start-commit和read.end-commit,但是该范围是比较久以前,instant已经被归档,那么流作业将永远不能消费到数据
  https://github.com/apache/hudi/issues/6167
  读取数据文件(StreamReadOperator)
  StreamReadOperator算子接收分片后会缓存在队列Queue splits,然后不停从队列中poll分片放到线程池中执行
  private void processSplits() throws IOException { format.open(split); consumeAsMiniBatch(split); enqueueProcessSplits(); }
  主要有三个步骤:从队列中peek分片,调用MergeOnReadInputFormat.open构建迭代器,迭代器是用来进行文件的数据读取,一个迭代器对应一个分片(多个物理文件,base+log),对应不同读取的场景,有几种迭代器:BaseFileOnlyFilteringIterator,BaseFileOnlyIterator,LogFileOnlyIterator,MergeIterator,SkipMergeIterator 微批量消费,每批只读2048记录,将把记录传递给下游的算子消费同时标记消费的总数,如果该分片读到了尾,则将该分片从队列中弹出,并关闭MergeOnReadInputFormat 继续处理队列中的分片,回到步骤1,如果上一次的分片没消费完,那么本次循环将继续消费,只不过是由另一个线程处理。

时光催人老,不知不觉,枷锁也到退役了时光催人老,不知不觉,枷锁也到了退役的年纪,只是身边的那个男人无法来到现场。我上初中时,那时候的科比和枷锁正是彼此完美配合的时候,感觉他就是个大男孩,有个大哥带着他。随着年龄的增长群晖SynologyPhotos电视版3月9日,群晖公众号推送宣布上架SynologyPhotos电视版,可以支持在电视上欣赏NAS中的照片和视频了。此前我在电视上看群晖照片主要使用PLEX,以及使用SMB共享直接查看NASA航天器在火星上发现奇怪的圆形沙丘NASA航天器在火星上发现奇怪的圆形沙丘,许多形状和大小的沙丘在火星上很常见。但在这个例子中,沙丘几乎是完美的圆形,这很不寻常,尽管它们仍然是稍微不对称的,南端有陡峭的滑坡面。这表美国小哥做出世界最大的DD照,能绕地球两圈半!吉尼斯却不愿承认话说目前世界上最大的数字图像(通过计算机处理的图像)是一张斑马鱼胚胎矢状切面的合成照片。这张照片是由荷兰莱顿大学医学中心分子细胞生物学系在2010年制作的,照片只有大约1。5毫米长隐藏在太阳强光下的巨大杀手,可能会冲向地球这种大小的小行星大到足以引起大规模灭绝事件。一颗隐藏在强光下的行星杀手小行星终于被探测到,这颗巨大的太空岩石有一天可能会撞向地球。宽0。9英里(1。5公里)被命名为2022AP7的突发小行星1600的几率撞地球,NASA紧急监测,人类如何应对?小行星2023DW有可能撞击地球吗?2023年3月11日,北京近日,美国国家航空航天局(NASA)宣布发现了一颗名为2023DW的小行星,并表示它有非常小的几率在2046年2月撞击今天你思考了吗?地球的危险来自太空也来自人类地球面临的来自太空的危险有以下几种陨石撞击如果一颗较大的陨石撞击地球,可能会对地球的生命和环境造成毁灭性的影响,从而引发生态危机甚至全球性的灾难。太阳耀斑和风暴太阳耀斑和风暴可以产好奇号首次在火星拍到云隙光2021年1月下旬,好奇号首次看到了火星云,并开始记录,这些图像不仅壮观,而且能帮助科学家们理解这些云是如何形成的,以及最近有什么不同。大多数的火星云距离地面不超过60公里,由水冰手机自动化工具,Macrodroid软件体验市面上的软件大多朝着智能与人性化的方向发展,随着ChatGPT的出现,如今还流行通过AI来协助人们完成一些操作。让软件自动化完成工作,是摸鱼人梦寐以求的操作。前景虽好,但是AI协助黄埔这条徒步线,带你发现春日小惊喜!春风十里,阳光明媚,是时候探寻一条适合春日轻徒步的打卡线路了!小编打卡长岭国家登山健步道,发现黄麻段登山道又添不少小惊喜!森林中的小精灵春日来长岭国家登山健步道黄麻路段徒步的朋友们新发现的酶可将空气转化为电能,提供新的清洁能源编辑萝卜皮多种好氧细菌利用大气中的H2作为生长和生存的能源。这一具有全球意义的过程调节大气成分,增强土壤生物多样性并推动极端环境中的初级生产。大气中的H2氧化归因于NiFe氢化酶超
十年果粉转入华为Mate50Pro,上手十天体验如何?华为太懂用户华为Mate50系列和iPhone14系列的接连发布,不仅引发了旗舰市场的抢购热潮,也让许多用户陷入了选择困难症,差不多的价格到底是选华为还是苹果呢?近期网上关于iPhone14系恭喜了!5队6人交易签约,快船最弱位置升级完毕,詹姆斯如愿以偿头条创作挑战赛北京时间10月16号,NBA季前赛正式落下帷幕,参赛各队都在这期间对阵容进行了一定程度的磨合,新赛季将检验阵容的磨合程度,倍受期待。与此同时,平静的交易市场再次热闹起国乒亚洲杯名单确定!女队3主力均落选,陈幸同王艺迪对战伊藤第56届成都世乒赛团体赛刚刚落下帷幕,另一项国际赛事乒乓球亚洲杯就将在下个月接踵而至。乒乓球亚洲杯是一项传统赛事,但是因为特殊原因停办了两届,今年得以恢复,将会在泰国首都曼谷举行。2022年女排世锦赛最终排名和最佳阵容2022年女排世锦赛最终排名第一名塞尔维亚第二名巴西第三名意大利第四名美国第五名日本第六名中国第七名波兰第八名土耳其第九名比利时第十名加拿大第十一名多米尼加第十二名荷兰第十三名泰国日本GDP将要倒退30年日本经济即将危在旦夕,那真是黑云压城,风暴来袭,日本这回可能真的大事不妙了,尽管大家可能还不知道,就在前几天,日本重量级的财经媒体日经新闻发出了严厉警告,表示2022年日本名义GD郭晶晶41岁生日!奥运会送祝福,生涯狂拦77冠,为霍家生下1儿2女头条创作挑战赛10月15日,中国体坛名将,素有跳水皇后之称的郭晶晶迎来了自己的41岁生日。作为豪门阔太,郭晶晶这边非常的低调,并没有在个人社交媒体上有任何的动态。但非常有牌面的是,顶级签约!曼联有望同时签下米兰双雄当家球星!主力攻击手或离队从本赛季迄今为止的表现来看,曼联队的发挥算是中规中矩的。在英超联赛当中,虽然球队看起来并没有争夺冠军的希望,但是在赛季结束之后拿到一个欧冠资格还是相当有希望的。在欧联杯赛场上,滕哈Springbootdockerjenkins持续集成实战操作1操作步骤1。1docker安装在虚拟机(vitualBox)下执行yum命令rootcentosvagrantyuminstalldocker安装完成后,使用下面的命令来启动do王楚钦后援会翻旧账!矛头直指国乒名记无视规则,的确该骂10月16日,成都世乒赛团体赛虽然已经结束,但关于比赛的讨论远远没有结束!赛后,有球迷甚至质疑国乒教练组的安排,认为教练组给了王楚钦太多的机会,但王楚钦一次又一次地让机会溜走。事情广东队惜败深圳队,广东队四大输球的原因,解决不了进季后赛都难以下均为个人观点,仅供参考1。球队进攻缺乏多样性,整场很少有精彩的配合进攻,威姆斯离开后,再也没有了快攻反击,开局就靠三分手感拉开比分,手感一差,比分就被对手追上来了。2。黑白矮三西甲皇家马德里VS巴塞罗那巴萨后防伤缺严重,伯纳乌或陷入苦战巴萨欧冠基本出局,全力征战联赛,客战皇马能否证明能力10月16日晚间2215,五大联赛本轮最受关注的一场联赛将在伯纳乌进行,而他们的对手则是球队几十年来的死敌巴萨。这两支球队的每一