数据湖大数据生态杀青数据仓库的痛点只能存储结构化数据,无法采集存储非结构化数据无法存储原始数据,所有的数据须经过ETL清洗过程离线数仓的数据表牵一发而动全身,数据调整工程量大实时数仓存储空间有限,无法采集和存储海量实时数据回溯效率低下,实时数据和离线数据计算接口难以统一Kafka做实时数仓,以及日志传输。Kafka本身存储成本很高,且数据保留时间有时效性,一旦消费积压,数据达到过期时间后,就会造成数据丢失且没有消费到将实时要求不高的业务数据入湖、比如说能接受110分钟的延迟。因为Iceberg0。11也支持SQL实时读取,而且还能保存历史数据。这样既可以减轻线上Kafka的压力,还能确保数据不丢失的同时也能实时读取数据湖三剑客对比HudiHudi:HadoopUpsertsDeletesandIncrementals(原为HadoopUpsertsanDIncrementals),强调了其主要支持Upserts、Deletes和Incremental数据处理,其主要提供的写入工具是SparkHudiDataSourceAPI和自身提供的HoodieDeltaStreamer在查询方面,Hudi支持Hive、Spark、Presto。在性能方面,Hudi设计了HoodieKey,一个类似于主键的东西。对于查询性能,一般需求是根据查询谓词生成过滤条件下推至datasource。Hudi这方面没怎么做工作,其性能完全基于引擎自带的谓词下推和partitionprune功能。DeltaDelta定位是流批一体的DataLake存储层,支持updatedeletemerge。不强调主键,因此其updatedeletemerge的实现均是基于spark的join功能。在数据写入方面,Delta与Spark是强绑定的,这一点Hudi是不同的:Hudi的数据写入不绑定Spark(可以用Spark,也可以使用Hudi自己的写入工具写入)在查询方面,开源Delta目前支持Spark与Presto,但是,Spark是不可或缺的,因为deltalog的处理需要用到Spark。这意味着如果要用Presto查询Delta,查询时还要跑一个Spark作业IcebergIceberg一个通用化设计的TableFormat,高性能的分析与可靠的数据管理,Iceberg没有类似的HoodieKey设计,其不强调主键。上文已经说到,没有主键,做updatedeletemerge等操作就要通过Join来实现,而Join需要有一个类似SQL的执行引擎。Iceberg在查询性能方面做了大量的工作。值得一提的是它的hiddenpartition功能。Hiddenpartition意思是说,对于用户输入的数据,用户可以选取其中某些列做适当的变换(Transform)形成一个新的列作为partition列。这个partition列仅仅为了将数据进行分区,并不直接体现在表的schema中。总结Delta、Hudi、Iceberg三个开源项目中,Delta和Hudi跟Spark的代码深度绑定,尤其是写入路径。这两个项目设计之初,都基本上把Spark作为他们的默认计算引擎了。而ApacheIceberg的方向非常坚定,宗旨就是要做一个通用化设计的TableFormat。它完美的解耦了计算引擎和底下的存储系统,便于多样化计算引擎和文件格式,很好的完成了数据湖架构中的TableFormat这一层的实现,因此也更容易成为TableFormat层的开源事实标准ApacheIceberg也在朝着流批一体的数据存储层发展,manifest和snapshot的设计,有效地隔离不同transaction的变更,非常方便批处理和增量计算。并且,ApacheFlink已经是一个流批一体的计算引擎,二都可以完美匹配,合力打造流批一体的数据湖架构。Iceberg术语数据文件(datafiles) Iceberg表真实存储数据的文件,一般存储在data目录下,以。parquet结尾。清单文件(Manifestfile) 每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、通过该文件、可过滤掉无关数据、提高检索速度。快照(Snapshot) 快照代表一张表在某个时刻的状态。每个快照版本包含某个时刻的所有数据文件列表。Datafiles是存储在不同的manifestfiles里面,manifestfiles是存储在一个Manifestlist文件里面,而一个Manifestlist文件代表一个快照。sparkIceberg离线数仓前期准备 spark3。0。0scala2。12 Iceberg0。13。1 编译好的icebergspark3runtime0。13。1。jar拷贝到sparkjarsDWD加载ods原始数据controllervalsparkConfnewSparkConf()。set(spark。sql。catalog。hadoopprod,org。apache。iceberg。spark。SparkCatalog)。set(spark。sql。catalog。hadoopprod。type,hadoop)。set(spark。sql。catalog。hadoopprod。warehouse,hdfs:hadoop01:9820sparkwarehouse)。set(spark。sql。catalog。catalogname。type,hadoop)。set(spark。sql。catalog。catalogname。defaultnamespace,db)。set(spark。sql。sources。partitionOverwriteMode,dynamic)。set(spark。sql。session。timeZone,GMT8)。setMaster(local〔〕)。setAppName(dwdapp)valsparkSessionSparkSession。builder()。config(sparkConf)。getOrCreate()DwdIcebergService。readOdsData(sparkSession)service加载member到dwddefloadMember(sparkSession:SparkSession):Unit{sparkSession。read。json(datasourceicebergmember。log)。drop(dn)。withColumn(uid,col(uid)。cast(int))。withColumn(adid,col(adid)。cast(int))。writeTo(hadoopprod。db。dwdmember)。overwritePartitions()}DWS数据宽表defgetDwsMemberData(sparkSession:SparkSession,dt:String){importsparkSession。implicits。。。。。valresultdwdMember。join(dwdMemberRegtype。drop(dt),Seq(uid),left)。join(dwdPcentermempaymoney。drop(dt),Seq(uid),left)。join(dwdBaseAd,Seq(adid,dn),left)。join(dwdBaseWebsite,Seq(siteid,dn),left)。join(dwdVipLevel,Seq(vipid,dn),leftouter)。select(。。。)。as〔DwsMemberResult〕valresultDataresult。groupByKey(itemitem。uiditem。dn)。mapGroups{case(key,iters)valkeyskey。split()valuidInteger。parseInt(keys(0))valdnkeys(1)valdwsMembersiters。toListvalpaymoneydwsMembers。filter(。paymoney!null)。map(itemBigDecimal。apply(item。paymoney))。reduceOption()。getOrElse(BigDecimal。apply(0。00))。toString。。。。分区列不能为null,sparksql内存表null为字符串resultData。where(34;dn!null)。show()resultData。where(34;dn!null)。write。format(iceberg)。mode(overwrite)。save(hadoopprod。db。dwsmember)}ADS统计分析defqueryDetails(sparkSession:SparkSession,dt:String){importsparkSession。implicits。valresultDwsIcebergDao。queryDwsMemberData(sparkSession)。as〔QueryResult〕。where(sdt{dt})result。cache()统计根据url统计人数wordcountresult。mapPartitions(partition{partition。map(item(item。appregurlitem。dnitem。dt,1))})。groupByKey(。1)。mapValues(itemitem。2)。reduceGroups()。map(item{valkeysitem。1。split()valappregurlkeys(0)valdnkeys(1)valdtkeys(2)(appregurl,item。2,dt,dn)})。toDF(appregurl,num,dt,dn)。writeTo(hadoopprod。db。adsregisterappregurlnum)。overwritePartitions()统计各memberlevel等级支付金额前三的用户:mysql、oracle、hive、phoenix、iceberg对where里都不支持开窗函数,spark内存函数强大result。withColumn(rownum,rownumber()。over(Window。partitionBy(memberlevel)。orderBy(desc(paymoney))))。where(rownum4)。orderBy(memberlevel,rownum)。select(。。。)。writeTo(hadoopprod。db。adsregistertop3memberpay)。overwritePartitions()}yarn上测试 最后是花了18分钟跑完1000万条数据,查询表数据观察是否有数据丢失。数据没有丢失FlinkIceberg流批一体架构前期准备 flink1。13。0scala2。12 iceberg0。13。1 拷贝编译好的icebergflinkruntime1。130。13。1。jar到flinklib 启动flink集群,运行flinksql:binsqlclient。shembeddedshellflinkcdc采集数据到kafka,流模式写入icebergStreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();env。enableCheckpointing(6000);。。。kafakSource。setStartFromLatest();DataStreamRowDataresultenv。addSource(kafakSource)。map(item{。。。。rowData。setField(0,uid);rowData。setField(1,courseid);rowData。setField(2,deviceid);rowData。setField(3,StringData。fromString(array〔3〕。trim()));returnrowData;});result。print(处理完数据:);TableLoadertesttopicTableTableLoader。fromHadoopTable(hdfs:hadoop01:9820flinkwarehouseicebergdbdwdviewlog);FlinkSink。forRowData(result)。tableLoader(testtopicTable)。build();env。execute();批模式初始化加载数据DataStreamRowDatabatchFlinkSource。forRowData()。env(env)。tableLoader(tableLoader)。streaming(false)。build();流模式增量处理数据DataStreamRowDatastreamFlinkSource。forRowData()。env(env)。tableLoader(tableLoader)。streaming(true)。build();DataStream与Table转换写入icebergTabletabledwsIcbergDao。queryDwsMemberData(env,tableEnv)。where((dt)。isEqual(dt));DataStreamQueryResultqueryResultDataStreamtableEnv。toAppendStream(table,QueryResult。class);tableEnv。createTemporaryView(tmpA,queryResultDataStream);Stringsqlselectfrom(selectuid,memberlevel,register,appregurl,regsourcename,adname,sitename,viplevel,cast(paymoneyasdecimal(10,4)),rownumber()over(partitionbymemberlevelorderbycast(paymoneyasdecimal(10,4))desc)asrownum,dn,dtfromtmpAwheredtdt)whererownum4;Tabletable1tableEnv。sqlQuery(sql);DataStreamRowDatatop3DStableEnv。toRetractStream(table1,RowData。class)。filter(itemitem。f0)。map(itemitem。f1);Stringsql2selectappregurl,count(uid),dn,dtfromtmpAwheredtdtgroupbyappregurl,dn,dt;Tabletable2tableEnv。sqlQuery(sql2);DataStreamRowDataappregurlnumDStableEnv。toRetractStream(table2,RowData。class)。filter(itemitem。f0)。map(itemitem。f1);TableLoadertop3TableTableLoader。fromHadoopTable(warehouseDiradsregistertop3memberpay);TableLoaderappregurlnumTableTableLoader。fromHadoopTable(warehouseDiradsregisterappregurlnum);FlinkSink。forRowData(top3DS)。tableLoader(top3Table)。overwrite(true)。build();FlinkSink。forRowData(appregurlnumDS)。tableLoader(appregurlnumTable)。overwrite(true)。build();优化实践1小文件处理Iceberg0。11以前,通过定时触发batchapi进行小文件合并,这样虽然能合并,但是需要维护一套Actions代码,而且也不是实时合并的。TabletablefindTable(options,conf);Actions。forTable(table)。rewriteDataFiles()。targetSizeInBytes(101024)10KB。execute(); Iceberg0。11新特性,支持了流式小文件合并。通过分区存储桶键使用哈希混洗方式写数据、从源头直接合并文件,这样的好处在于,一个task会处理某个分区的数据,提交自己的Datafile文件,比如一个task只处理对应分区的数据。这样避免了多个task处理提交很多小文件的问题,且不需要额外的维护代码,只需在建表的时候指定属性write。distributionmode,该参数与其它引擎是通用的,比如Spark等。CREATETABLEcitytable(provinceBIGINT,citySTRING)PARTITIONEDBY(province,city)WITH(write。distributionmodehash);2排序功能在Iceberg0。11之前,Flink是不支持Iceberg排序功能的,所以之前只能结合Spark以批模式来支持排序功能,0。11新增了排序特性的支持,Iceberg也支持flink的排序insertintoIcebergtableselectdaysfromKafkatblorderbydays,provinceid;利用Iceberg的排序特性,将天作为分区。按天、小时、分钟进行排序,那么manifest文件就会记录这个排序规则,从而在检索数据的时候,提高查询效率,既能实现Hive分区的检索优点,还能避免Hivemetadata元数据过多带来的压力。总结flink不支持隐藏分区,不支持创建带水位线的表与hudi相比,缺少行级更新,只能对表的数据按分区进行overwrite全量覆盖flink近实时入湖 Iceberg提交Transaction时是以文件粒度来提交。这就没法以秒为单位提交Transaction,否则会造成文件数量膨胀; 没有在线服务节点。对于实时的高吞吐低延迟写入,无法得到纯实时的响应; Flink写入以checkpoint为单位,物理数据写入Iceberg后并不能直接查询,当触发了checkpoint才会写metadata文件,这时数据由不可见变为可见。checkpoint每次执行都会有一定时间。