保健励志美文体育育儿作文
投稿投诉
作文动态
热点娱乐
育儿情感
教程科技
体育养生
教案探索
美文旅游
财经日志
励志范文
论文时尚
保健游戏
护肤业界

SparkFlinkIceberg打造湖仓一体架构实践探索

  数据湖大数据生态杀青数据仓库的痛点只能存储结构化数据,无法采集存储非结构化数据无法存储原始数据,所有的数据须经过ETL清洗过程离线数仓的数据表牵一发而动全身,数据调整工程量大实时数仓存储空间有限,无法采集和存储海量实时数据回溯效率低下,实时数据和离线数据计算接口难以统一Kafka做实时数仓,以及日志传输。Kafka本身存储成本很高,且数据保留时间有时效性,一旦消费积压,数据达到过期时间后,就会造成数据丢失且没有消费到将实时要求不高的业务数据入湖、比如说能接受110分钟的延迟。因为Iceberg0。11也支持SQL实时读取,而且还能保存历史数据。这样既可以减轻线上Kafka的压力,还能确保数据不丢失的同时也能实时读取数据湖三剑客对比HudiHudi:HadoopUpsertsDeletesandIncrementals(原为HadoopUpsertsanDIncrementals),强调了其主要支持Upserts、Deletes和Incremental数据处理,其主要提供的写入工具是SparkHudiDataSourceAPI和自身提供的HoodieDeltaStreamer在查询方面,Hudi支持Hive、Spark、Presto。在性能方面,Hudi设计了HoodieKey,一个类似于主键的东西。对于查询性能,一般需求是根据查询谓词生成过滤条件下推至datasource。Hudi这方面没怎么做工作,其性能完全基于引擎自带的谓词下推和partitionprune功能。DeltaDelta定位是流批一体的DataLake存储层,支持updatedeletemerge。不强调主键,因此其updatedeletemerge的实现均是基于spark的join功能。在数据写入方面,Delta与Spark是强绑定的,这一点Hudi是不同的:Hudi的数据写入不绑定Spark(可以用Spark,也可以使用Hudi自己的写入工具写入)在查询方面,开源Delta目前支持Spark与Presto,但是,Spark是不可或缺的,因为deltalog的处理需要用到Spark。这意味着如果要用Presto查询Delta,查询时还要跑一个Spark作业IcebergIceberg一个通用化设计的TableFormat,高性能的分析与可靠的数据管理,Iceberg没有类似的HoodieKey设计,其不强调主键。上文已经说到,没有主键,做updatedeletemerge等操作就要通过Join来实现,而Join需要有一个类似SQL的执行引擎。Iceberg在查询性能方面做了大量的工作。值得一提的是它的hiddenpartition功能。Hiddenpartition意思是说,对于用户输入的数据,用户可以选取其中某些列做适当的变换(Transform)形成一个新的列作为partition列。这个partition列仅仅为了将数据进行分区,并不直接体现在表的schema中。总结Delta、Hudi、Iceberg三个开源项目中,Delta和Hudi跟Spark的代码深度绑定,尤其是写入路径。这两个项目设计之初,都基本上把Spark作为他们的默认计算引擎了。而ApacheIceberg的方向非常坚定,宗旨就是要做一个通用化设计的TableFormat。它完美的解耦了计算引擎和底下的存储系统,便于多样化计算引擎和文件格式,很好的完成了数据湖架构中的TableFormat这一层的实现,因此也更容易成为TableFormat层的开源事实标准ApacheIceberg也在朝着流批一体的数据存储层发展,manifest和snapshot的设计,有效地隔离不同transaction的变更,非常方便批处理和增量计算。并且,ApacheFlink已经是一个流批一体的计算引擎,二都可以完美匹配,合力打造流批一体的数据湖架构。Iceberg术语数据文件(datafiles)
  Iceberg表真实存储数据的文件,一般存储在data目录下,以。parquet结尾。清单文件(Manifestfile)
  每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、通过该文件、可过滤掉无关数据、提高检索速度。快照(Snapshot)
  快照代表一张表在某个时刻的状态。每个快照版本包含某个时刻的所有数据文件列表。Datafiles是存储在不同的manifestfiles里面,manifestfiles是存储在一个Manifestlist文件里面,而一个Manifestlist文件代表一个快照。sparkIceberg离线数仓前期准备
  spark3。0。0scala2。12
  Iceberg0。13。1
  编译好的icebergspark3runtime0。13。1。jar拷贝到sparkjarsDWD加载ods原始数据controllervalsparkConfnewSparkConf()。set(spark。sql。catalog。hadoopprod,org。apache。iceberg。spark。SparkCatalog)。set(spark。sql。catalog。hadoopprod。type,hadoop)。set(spark。sql。catalog。hadoopprod。warehouse,hdfs:hadoop01:9820sparkwarehouse)。set(spark。sql。catalog。catalogname。type,hadoop)。set(spark。sql。catalog。catalogname。defaultnamespace,db)。set(spark。sql。sources。partitionOverwriteMode,dynamic)。set(spark。sql。session。timeZone,GMT8)。setMaster(local〔〕)。setAppName(dwdapp)valsparkSessionSparkSession。builder()。config(sparkConf)。getOrCreate()DwdIcebergService。readOdsData(sparkSession)service加载member到dwddefloadMember(sparkSession:SparkSession):Unit{sparkSession。read。json(datasourceicebergmember。log)。drop(dn)。withColumn(uid,col(uid)。cast(int))。withColumn(adid,col(adid)。cast(int))。writeTo(hadoopprod。db。dwdmember)。overwritePartitions()}DWS数据宽表defgetDwsMemberData(sparkSession:SparkSession,dt:String){importsparkSession。implicits。。。。。valresultdwdMember。join(dwdMemberRegtype。drop(dt),Seq(uid),left)。join(dwdPcentermempaymoney。drop(dt),Seq(uid),left)。join(dwdBaseAd,Seq(adid,dn),left)。join(dwdBaseWebsite,Seq(siteid,dn),left)。join(dwdVipLevel,Seq(vipid,dn),leftouter)。select(。。。)。as〔DwsMemberResult〕valresultDataresult。groupByKey(itemitem。uiditem。dn)。mapGroups{case(key,iters)valkeyskey。split()valuidInteger。parseInt(keys(0))valdnkeys(1)valdwsMembersiters。toListvalpaymoneydwsMembers。filter(。paymoney!null)。map(itemBigDecimal。apply(item。paymoney))。reduceOption()。getOrElse(BigDecimal。apply(0。00))。toString。。。。分区列不能为null,sparksql内存表null为字符串resultData。where(34;dn!null)。show()resultData。where(34;dn!null)。write。format(iceberg)。mode(overwrite)。save(hadoopprod。db。dwsmember)}ADS统计分析defqueryDetails(sparkSession:SparkSession,dt:String){importsparkSession。implicits。valresultDwsIcebergDao。queryDwsMemberData(sparkSession)。as〔QueryResult〕。where(sdt{dt})result。cache()统计根据url统计人数wordcountresult。mapPartitions(partition{partition。map(item(item。appregurlitem。dnitem。dt,1))})。groupByKey(。1)。mapValues(itemitem。2)。reduceGroups()。map(item{valkeysitem。1。split()valappregurlkeys(0)valdnkeys(1)valdtkeys(2)(appregurl,item。2,dt,dn)})。toDF(appregurl,num,dt,dn)。writeTo(hadoopprod。db。adsregisterappregurlnum)。overwritePartitions()统计各memberlevel等级支付金额前三的用户:mysql、oracle、hive、phoenix、iceberg对where里都不支持开窗函数,spark内存函数强大result。withColumn(rownum,rownumber()。over(Window。partitionBy(memberlevel)。orderBy(desc(paymoney))))。where(rownum4)。orderBy(memberlevel,rownum)。select(。。。)。writeTo(hadoopprod。db。adsregistertop3memberpay)。overwritePartitions()}yarn上测试
  最后是花了18分钟跑完1000万条数据,查询表数据观察是否有数据丢失。数据没有丢失FlinkIceberg流批一体架构前期准备
  flink1。13。0scala2。12
  iceberg0。13。1
  拷贝编译好的icebergflinkruntime1。130。13。1。jar到flinklib
  启动flink集群,运行flinksql:binsqlclient。shembeddedshellflinkcdc采集数据到kafka,流模式写入icebergStreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。enableCheckpointing(6000);。。。kafakSource。setStartFromLatest();DataStreamRowDataresultenv。addSource(kafakSource)。map(item{。。。。rowData。setField(0,uid);rowData。setField(1,courseid);rowData。setField(2,deviceid);rowData。setField(3,StringData。fromString(array〔3〕。trim()));returnrowData;});result。print(处理完数据:);TableLoadertesttopicTableTableLoader。fromHadoopTable(hdfs:hadoop01:9820flinkwarehouseicebergdbdwdviewlog);FlinkSink。forRowData(result)。tableLoader(testtopicTable)。build();env。execute();批模式初始化加载数据DataStreamRowDatabatchFlinkSource。forRowData()。env(env)。tableLoader(tableLoader)。streaming(false)。build();流模式增量处理数据DataStreamRowDatastreamFlinkSource。forRowData()。env(env)。tableLoader(tableLoader)。streaming(true)。build();DataStream与Table转换写入icebergTabletabledwsIcbergDao。queryDwsMemberData(env,tableEnv)。where((dt)。isEqual(dt));DataStreamQueryResultqueryResultDataStreamtableEnv。toAppendStream(table,QueryResult。class);tableEnv。createTemporaryView(tmpA,queryResultDataStream);Stringsqlselectfrom(selectuid,memberlevel,register,appregurl,regsourcename,adname,sitename,viplevel,cast(paymoneyasdecimal(10,4)),rownumber()over(partitionbymemberlevelorderbycast(paymoneyasdecimal(10,4))desc)asrownum,dn,dtfromtmpAwheredtdt)whererownum4;Tabletable1tableEnv。sqlQuery(sql);DataStreamRowDatatop3DStableEnv。toRetractStream(table1,RowData。class)。filter(itemitem。f0)。map(itemitem。f1);Stringsql2selectappregurl,count(uid),dn,dtfromtmpAwheredtdtgroupbyappregurl,dn,dt;Tabletable2tableEnv。sqlQuery(sql2);DataStreamRowDataappregurlnumDStableEnv。toRetractStream(table2,RowData。class)。filter(itemitem。f0)。map(itemitem。f1);TableLoadertop3TableTableLoader。fromHadoopTable(warehouseDiradsregistertop3memberpay);TableLoaderappregurlnumTableTableLoader。fromHadoopTable(warehouseDiradsregisterappregurlnum);FlinkSink。forRowData(top3DS)。tableLoader(top3Table)。overwrite(true)。build();FlinkSink。forRowData(appregurlnumDS)。tableLoader(appregurlnumTable)。overwrite(true)。build();优化实践1小文件处理Iceberg0。11以前,通过定时触发batchapi进行小文件合并,这样虽然能合并,但是需要维护一套Actions代码,而且也不是实时合并的。TabletablefindTable(options,conf);Actions。forTable(table)。rewriteDataFiles()。targetSizeInBytes(101024)10KB。execute();
  Iceberg0。11新特性,支持了流式小文件合并。通过分区存储桶键使用哈希混洗方式写数据、从源头直接合并文件,这样的好处在于,一个task会处理某个分区的数据,提交自己的Datafile文件,比如一个task只处理对应分区的数据。这样避免了多个task处理提交很多小文件的问题,且不需要额外的维护代码,只需在建表的时候指定属性write。distributionmode,该参数与其它引擎是通用的,比如Spark等。CREATETABLEcitytable(provinceBIGINT,citySTRING)PARTITIONEDBY(province,city)WITH(write。distributionmodehash);2排序功能在Iceberg0。11之前,Flink是不支持Iceberg排序功能的,所以之前只能结合Spark以批模式来支持排序功能,0。11新增了排序特性的支持,Iceberg也支持flink的排序insertintoIcebergtableselectdaysfromKafkatblorderbydays,provinceid;利用Iceberg的排序特性,将天作为分区。按天、小时、分钟进行排序,那么manifest文件就会记录这个排序规则,从而在检索数据的时候,提高查询效率,既能实现Hive分区的检索优点,还能避免Hivemetadata元数据过多带来的压力。总结flink不支持隐藏分区,不支持创建带水位线的表与hudi相比,缺少行级更新,只能对表的数据按分区进行overwrite全量覆盖flink近实时入湖
  Iceberg提交Transaction时是以文件粒度来提交。这就没法以秒为单位提交Transaction,否则会造成文件数量膨胀;
  没有在线服务节点。对于实时的高吞吐低延迟写入,无法得到纯实时的响应;
  Flink写入以checkpoint为单位,物理数据写入Iceberg后并不能直接查询,当触发了checkpoint才会写metadata文件,这时数据由不可见变为可见。checkpoint每次执行都会有一定时间。

懒散的课堂作文自然课本来是我们了解认识大自然的窗口,可因为老师上课索然无味,所以向来不受我们的重视,与音乐等课被同学们列为鸡肋课。这不,又该上自然课了。老师带着课本走进教室,同学们立刻……童年趣事作文700字两篇篇一:童年趣事作文700字在我7岁的时候,我还是一个好奇又无知的小女孩。那个时候,我特别喜欢吃糖,把好几颗牙都吃黑了。我尤其喜欢和白开水兑白沙糖,喝起来总有一股沙甜味残留……好心办坏事五年级作文我是一个傻女孩,我总喜欢帮助别人。看到这儿,你肯定要问了:乐于助人不是好事吗?怎么能说是傻呢?唉,还不是因为我热心过头,好心办了坏事。那一次,表妺在做作业,我在一旁等她出……爸爸的爱作文600字(10篇)爸爸的爱往往比较深沉,很多时候是藏在了心底,要等我们仔细的去发现,今天,语文迷小编为您提供爸爸的爱作文600字,仅供参考!【1】爸爸的爱作文600字爸爸的爱,无处不……高考优秀作文为自己开扇窗有一棵大树,枝繁叶茂,浓荫匝地,是飞禽、走兽们喜爱的憩息场所。飞禽、走兽们经常讲它们旅行的见闻。大树听了,请飞禽带自己去旅行,飞禽说大树没有翅膀,拒绝了;请走兽帮助,走兽说大树……我眼中的心理学心理学是研究人心理现象的一门学科。认识了怎样去了解别人,对以后的工作、学习也有很大的作用。在我眼中,心里学就是生活。心理学和生活是息息相通的,心理学并不高深,它存在于生活的每一……老百姓心里都有一杆秤前几天,我父亲去医院看病返回,当走到了我们县城文化馆门前的时候,一群坐在哪里闲聊的老年人因其中有一个认识我父亲,所以就招呼我父亲过去坐一坐,我父亲因多病而腿脚不灵便,所以不能长……荣耀50基本确认,高通提供助力,新荣耀能否再创佳绩?荣耀50基本确认今年上半年国内各大厂商的旗舰机型基本上都陆续登场了,例如小米11、魅族18和OPPOFindX3系列,得益于出色的配置和表现,这些机型都受到了消费者的青睐……有什么办法能拦截1069开头的短信?我们可以选择使用手机自带拦截功能来屏蔽1069号段的短信。如果我们使用的是WindowsPhone的手机,我们可以打开手机自带的防火墙,开启过滤黑名单的功能,这样不仅可以……六年级作文秋天来了精选夏天悄悄地走了,秋天悄悄地来了。今天小编就来分享六年级作文:秋天来了,请各位读者好好欣赏和借鉴。六年级作文:秋天来了(一)秋天来了,那断断续续下了一个星期的雨,便是……小狗英语作文小狗是我们生活中最常见的动物,你会怎么写小狗呢?本文是小编为大家收集整理的小狗英语作文,欢迎参考借鉴。小狗英语作文一WhenIwasyoung,Ihadalovelydo……迎利好!工业互联网专项工作组2022年工作计划出炉4月14日,资本邦了解到,近日,工信部发布《工业互联网专项工作组2022年工作计划》,从夯实基础设施、深化融合应用、强化技术创新、培育产业生态、提升安全保障、完善要素保障等方面……
剪发小学优秀作文昨天晚上,妈妈说要带我去剪发。我的心情非常不好,心想:好好的干嘛要剪发呀!真是的!人家长发飘飘的多漂亮,又没招你,又没惹你的!我告诉妈妈我不舍得剪,可她还是坚持她的主意,哎!谁……我的热带鱼小学作文800字在我家的窗台上,放着一个长方形的大鱼缸,鱼缸的底部放着一个白色的珊瑚,它的四周摆着五颜六色、各式各样的鹅卵石。鱼缸中长着几根水草;鱼缸的四壁长了一层淡绿色的青苔。在这个优雅而舒……有关有关初中校园生活作文600字汇编九篇在学习、工作或生活中,许多人都有过写作文的经历,对作文都不陌生吧,写作文是培养人们的观察力、联想力、想象力、思考力和记忆力的重要手段。你知道作文怎样才能写的好吗?下面是小编帮大……夜行六年级作文和妈妈晚上锻炼回来,一路上灯火通明,好生郁闷!路灯给人们夜生活带来了方便,太阳落山了,忙碌了一天的人们可以在路灯的指引下,快乐的享受夜生活的美好。清明节马上就到了,……2020年大连中考满分作文心灵的憩所【作文真题】任选下面一题作文。(1)生活中,我们可以听到各种铃声:闹铃声、门铃声、电话铃声,校园里的铃声,车站码头的铃声、剧场影院的铃声hellip;hellip;……天上的母亲作文人间只不过是我们人类学习的地方,而天上是我们人类永远的归宿,上天给了我们几十年的生命,在这时间段里活出自己的精彩,最后以疾病宣告生命的终极,以最繁华的形式落幕。想到这些我……上网的利与弊E度网专稿未经允许不得转载如今,上网已经成为了人们十分热衷的一个话题。网上,有很多的东西,国内国外的都有。娱乐、新闻、电视剧、电影等等。所以,大部分的人都喜欢上网,特别是……机器人融资热背后发力仓储医疗等细分赛道,拓展新兴应用市场南方财经全媒体记者柳宁馨广州报道机器人热不仅体现为产量暴涨,融资增加,还体现为机器人在各行业的应用拓展。2021年全年,我国工业机器人产量36。6万台,同比增长44。9;我国服……幼儿教育名言名句天国来到人间也就越快1、只有受过教育的诚心诚意的人才是有趣味的人,也只有他们才是社会所需要的。这样的人越多,天国来到人间也就越快。契诃夫2、把子弟的幸福奠定在德行与良好的教养上面,那才是唯一……点亮我心中的灯作文傍晚时分,我走在回家的路上,路边的灯渐渐地亮了,这么明亮耀眼,看着那橙黄的灯光,我的思绪开始纷飞。那时我从体育中心回来的路上,我焦急的向四周张望,企盼能找到回家的公交车。……苹果6sp是不是目前苹果手机中续航最好的?感谢邀请。iPhone6sPlus不是苹果手机中续航最好的,正确答案是iPhone8Plus。数据如图以上是知名手机站点PhoneArena测评数据中库中的数据,续……助听器的耳印坏了,在哪里可以修呢?就看是那种的耳印模,要是软耳模就只有重新取耳印制作,要是硬耳模看具体坏的情况,有的是可以维修的,有的也只有重新取耳印制作。你好,耳印坏了建议重新做,可以到专业听力中心取耳……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网