范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

SparkFlinkIceberg打造湖仓一体架构实践探索

  数据湖-大数据生态杀青数据仓库的痛点只能存储结构化数据,无法采集存储非结构化数据无法存储原始数据,所有的数据须经过ETL清洗过程离线数仓的数据表牵一发而动全身,数据调整工程量大实时数仓存储空间有限,无法采集和存储海量实时数据回溯效率低下,实时数据和离线数据计算接口难以统一Kafka 做实时数仓,以及日志传输。Kafka 本身存储成本很高,且数据保留时间有时效性,一旦消费积压,数据达到过期时间后,就会造成数据丢失且没有消费到将实时要求不高的业务数据入湖、比如说能接受 1-10 分钟的延迟。因为 Iceberg 0.11 也支持 SQL 实时读取,而且还能保存历史数据。这样既可以减轻线上 Kafka 的压力,还能确保数据不丢失的同时也能实时读取数据湖三剑客对比HudiHudi:Hadoop Upserts Deletes and Incrementals(原为 Hadoop Upserts anD Incrementals),强调了其主要支持 Upserts、Deletes 和 Incremental 数据处理,其主要提供的写入工具是 Spark HudiDataSource API 和自身提供的 HoodieDeltaStreamer在查询方面,Hudi 支持 Hive、Spark、Presto。在性能方面,Hudi 设计了 HoodieKey ,一个类似于主键的东西。对于查询性能,一般需求是根据查询谓词生成过滤条件下推至 datasource。Hudi 这方面没怎么做工作,其性能完全基于引擎自带的谓词下推和 partition prune 功能。DeltaDelta定位是流批一体的 Data Lake 存储层,支持 update/delete/merge。不强调主键,因此其 update/delete/merge 的实现均是基于 spark 的 join 功能。在数据写入方面,Delta 与 Spark 是强绑定的,这一点 Hudi 是不同的:Hudi 的数据写入不绑定 Spark(可以用 Spark,也可以使用 Hudi 自己的写入工具写入)在查询方面,开源 Delta 目前支持 Spark 与 Presto,但是,Spark 是不可或缺的,因为 delta log 的处理需要用到 Spark。这意味着如果要用 Presto 查询 Delta,查询时还要跑一个 Spark 作业IcebergIceberg一个通用化设计的Table Format,高性能的分析与可靠的数据管理,Iceberg 没有类似的 HoodieKey 设计,其不强调主键。上文已经说到,没有主键,做 update/delete/merge 等操作就要通过 Join 来实现,而 Join 需要有一个 类似 SQL 的执行引擎。Iceberg 在查询性能方面做了大量的工作。值得一提的是它的 hidden partition 功能。Hidden partition 意思是说,对于用户输入的数据,用户可以选取其中某些列做适当的变换(Transform)形成一个新的列作为 partition 列。这个 partition 列仅仅为了将数据进行分区,并不直接体现在表的 schema 中。总结Delta、Hudi、Iceberg三个开源项目中,Delta和Hudi跟Spark的代码深度绑定,尤其是写入路径。这两个项目设计之初,都基本上把Spark作为他们的默认计算引擎了。而Apache Iceberg的方向非常坚定,宗旨就是要做一个通用化设计的Table Format。它完美的解耦了计算引擎和底下的存储系统,便于多样化计算引擎和文件格式,很好的完成了数据湖架构中的Table Format这一层的实现,因此也更容易 成为Table Format层的开源事实标准Apache Iceberg也在朝着流批一体的数据存储层发展,manifest和snapshot的设计,有效地隔离不同transaction的变更 ,非常方便批处理和增量计算。并且,Apache Flink已经是一个流批一体的计算引擎,二都可以完美匹配,合力打造流批一体的数据湖架构。Iceberg术语数据文件 ( data files )
  Iceberg 表真实存储数据的文件,一般存储在data目录下,以".parquet"结尾。清单文件 ( Manifest file )
  每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、通过该文件、可过滤掉无关数据、提高检索速度。快照( Snapshot )
  快照代表一张表在某个时刻的状态。每个快照版本包含某个时刻的所有数据文件列表。Data files 是存储在不同的 manifest files 里面, manifest files 是存储在一个 Manifest list 文件里面,而一个 Manifest list 文件代表一个快照。spark + Iceberg离线数仓前期准备
  spark 3.0.0_scala_2.12
  Iceberg 0.13.1
  编译好的iceberg-spark3-runtime-0.13.1.jar拷贝到spark/jarsDWD加载ods原始数据>controller val sparkConf = new SparkConf()       .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")       .set("spark.sql.catalog.hadoop_prod.type", "hadoop")       .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://hadoop01:9820/spark/warehouse")       .set("spark.sql.catalog.catalog-name.type", "hadoop")       .set("spark.sql.catalog.catalog-name.default-namespace", "db")       .set("spark.sql.sources.partitionOverwriteMode", "dynamic")       .set("spark.sql.session.timeZone", "GMT+8")       .setMaster("local[*]")       .setAppName("dwd_app")     val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()     DwdIcebergService.readOdsData(sparkSession) > service    // 加载member 到dwd   def loadMember(sparkSession: SparkSession): Unit ={     sparkSession.read.json("/datasource/iceberg/member.log").drop("dn")       .withColumn("uid", col("uid").cast("int"))       .withColumn("ad_id", col("ad_id").cast("int"))       .writeTo("hadoop_prod.db.dwd_member").overwritePartitions()   }DWS数据宽表  def getDwsMemberData(sparkSession: SparkSession, dt: String) = {     import sparkSession.implicits._     ....     val result = dwdMember.join(dwdMemberRegtype.drop("dt"), Seq("uid"), "left")       .join(dwdPcentermempaymoney.drop("dt"), Seq("uid"), "left")       .join(dwdBaseAd, Seq("ad_id", "dn"), "left")       .join(dwdBaseWebsite, Seq("siteid", "dn"), "left")       .join(dwdVipLevel, Seq("vip_id", "dn"), "left_outer")       .select("...").as[DwsMemberResult]      val resultData = result.groupByKey(item => item.uid + "_" + item.dn)       .mapGroups { case (key, iters) =>         val keys = key.split("_")         val uid = Integer.parseInt(keys(0))         val dn = keys(1)         val dwsMembers = iters.toList         val paymoney = dwsMembers.filter(_.paymoney != null)           .map(item => BigDecimal.apply(item.paymoney))           .reduceOption(_ + _)           .getOrElse(BigDecimal.apply(0.00)).toString    ....     // 分区列不能为null,spark-sql内存表null为字符串     resultData.where(#34;dn" =!= "null").show()     resultData.where(#34;dn" =!= "null")       .write.format("iceberg")       .mode("overwrite").save("hadoop_prod.db.dws_member")   }ADS统计分析 def queryDetails(sparkSession: SparkSession, dt: String) = {     import sparkSession.implicits._     val result = DwsIcebergDao.queryDwsMemberData(sparkSession).as[QueryResult].where(s"dt="${dt}"")     result.cache()      //统计根据url统计人数  wordcount     result.mapPartitions(partition => {       partition.map(item => (item.appregurl + "_" + item.dn + "_" + item.dt, 1))     }).groupByKey(_._1)       .mapValues(item => item._2).reduceGroups(_ + _)       .map(item => {         val keys = item._1.split("_")         val appregurl = keys(0)         val dn = keys(1)         val dt = keys(2)         (appregurl, item._2, dt, dn)       }).toDF("appregurl", "num", "dt", "dn")       .writeTo("hadoop_prod.db.ads_register_appregurlnum").overwritePartitions()      // 统计各memberlevel等级 支付金额前三的用户: mysql、oracle、hive、phoenix、iceberg对where里都不支持开窗函数,spark内存函数强大     result.withColumn("rownum", row_number().over(Window.partitionBy("memberlevel").orderBy(desc("paymoney"))))       .where("rownum<4")       .orderBy("memberlevel", "rownum")       .select("...")       .writeTo("hadoop_prod.db.ads_register_top3memberpay").overwritePartitions()   }yarn 上测试
  最后是花了 18 分钟跑完 1000 万条数据,查询表数据观察是否有数据丢失。数据没有丢失Flink+Iceberg 流批一体架构前期准备
  flink 1.13.0_scala_2.12
  iceberg 0.13.1
  拷贝编译好的iceberg-flink-runtime-1.13-0.13.1.jar到flink/lib
  启动flink集群,运行flink sql:bin/sql-client.sh embedded shellflink cdc采集数据到kafka,流模式写入iceberg        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.enableCheckpointing(6000);         ...         kafakSource.setStartFromLatest();         DataStream result = env.addSource(kafakSource).map(item -> {           ....             rowData.setField(0, uid);             rowData.setField(1, courseid);             rowData.setField(2, deviceid);             rowData.setField(3, StringData.fromString(array[3].trim()));             return rowData;         });          result.print(">>>处理完数据:");         TableLoader testtopicTable = TableLoader.fromHadoopTable("hdfs://hadoop01:9820/flink/warehouse/iceberg_db/dwd_view_log");         FlinkSink.forRowData(result).tableLoader(testtopicTable).build();          env.execute();批模式初始化加载数据   DataStream batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();流模式增量处理数据DataStream stream = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();DataStream与Table转换写入icebergTable table = dwsIcbergDao.queryDwsMemberData(env, tableEnv).where($("dt").isEqual(dt));         DataStream queryResultDataStream = tableEnv.toAppendStream(table, QueryResult.class);          tableEnv.createTemporaryView("tmpA", queryResultDataStream);         String sql = "select *from(select uid,memberlevel,register,appregurl" +                 ",regsourcename,adname,sitename,vip_level,cast(paymoney as decimal(10,4)),row_number() over" +                 " (partition by memberlevel order by cast(paymoney as decimal(10,4)) desc) as rownum,dn,dt from tmpA where dt="" + dt + "") " +                 " where rownum<4";         Table table1 = tableEnv.sqlQuery(sql);         DataStream top3DS = tableEnv.toRetractStream(table1, RowData.class).filter(item -> item.f0).map(item -> item.f1);          String sql2 = "select appregurl,count(uid),dn,dt from tmpA where dt="" + dt + "" group by appregurl,dn,dt";         Table table2 = tableEnv.sqlQuery(sql2);         DataStream appregurlnumDS = tableEnv.toRetractStream(table2, RowData.class).filter(item -> item.f0).map(item -> item.f1);          TableLoader top3Table = TableLoader.fromHadoopTable(warehouseDir + "/ads_register_top3memberpay");         TableLoader appregurlnumTable = TableLoader.fromHadoopTable(warehouseDir + "/ads_register_appregurlnum");      FlinkSink.forRowData(top3DS).tableLoader(top3Table).overwrite(true).build();     FlinkSink.forRowData(appregurlnumDS).tableLoader(appregurlnumTable).overwrite(true).build();优化实践1 小文件处理Iceberg 0.11 以前,通过定时触发 batch api 进行小文件合并,这样虽然能合并,但是需要维护一套 Actions 代码,而且也不是实时合并的。Table table = findTable(options, conf); Actions.forTable(table).rewriteDataFiles()         .targetSizeInBytes(10 * 1024) // 10KB         .execute();
  Iceberg 0.11 新特性,支持了流式小文件合并。通过分区/存储桶键使用哈希混洗方式写数据、从源头直接合并文件,这样的好处在于,一个 task 会处理某个分区的数据,提交自己的 Datafile 文件,比如一个 task 只处理对应分区的数据。这样避免了多个 task 处理提交很多小文件的问题,且不需要额外的维护代码,只需在建表的时候指定属性 write.distribution-mode,该参数与其它引擎是通用的,比如 Spark 等。CREATE TABLE city_table (       province BIGINT,      city STRING ) PARTITIONED BY (province, city) WITH (     "write.distribution-mode"="hash"  );2 排序功能在 Iceberg 0.11 之前,Flink 是不支持 Iceberg 排序功能的,所以之前只能结合 Spark 以批模式来支持排序功能,0.11 新增了排序特性的支持,Iceberg也支持flink的排序insert into Iceberg_table select days from Kafka_tbl order by days, province_id;利用 Iceberg 的排序特性,将天作为分区。按天、小时、分钟进行排序,那么 manifest 文件就会记录这个排序规则,从而在检索数据的时候,提高查询效率,既能实现 Hive 分区的检索优点,还能避免 Hive metadata 元数据过多带来的压力。总结flink不支持隐藏分区,不支持创建带水位线的表与 hudi 相比,缺少行级更新,只能对表的数据按分区进行 overwrite 全量覆盖flink近实时入湖
  ① Iceberg 提交 Transaction 时是以文件粒度来提交。这就没法以秒为单位提交 Transaction,否则会造成文件数量膨胀;
  ② 没有在线服务节点。对于实时的高吞吐低延迟写入,无法得到纯实时的响应;
  ③ Flink 写入以 checkpoint 为单位,物理数据写入 Iceberg 后并不能直接查询,当触发了 checkpoint 才会写 metadata 文件,这时数据由不可见变为可见。checkpoint 每次执行都会有一定时间。

杨元庆年薪1。7亿是什么概念坚决支持司马先生!在联想负债率高达90的现状下,柳杨作为总裁决策者,不是千方百计的去改善经营状况,改善负债率太高的财务状况,增大研发和创新投入,促进企业良性发展,而是二十几人高管把火大教育于佳宁链上代码将是元宇宙时代新的商业语言元宇宙席卷一切,科技巨头纷纷表态,资本闻风而动,元宇宙正在涌入大众视野,这个1992年就出现在科幻小说中的名词在近30年后令无数行业风云迭起,颠覆着人们的认知,开始改变商业的底层逻当算法全面入侵生活网生一代尝试对抗算法驯化来源半月谈年轻人谋划逃离算法半月谈记者梁姊我时常为自己感到悲哀,一边鄙夷着这些东西耽误我们这一代人的生命,一边又无法控制地掉入陷阱,有时候很想把手机扔了再玩手机就废掉了当算法全面入自由币一种新型货币中文名自由币外文名FreeCoin货币代码ZYB在区块链行业应用落实不断推进的今天,对于区块链在金融行业的应用不少人有着疑问,区块链金融与传统金融有什么区别?区别很大!金融一词,从古至今一直有着清晰的概念,而如今,我们通常把金融荣耀手环6全新升级,提供全天候血氧和95种运动模式除此之外,荣耀60发布会上还发布了全新升级的荣耀手环6,升级后的荣耀手环6为消费者提供全天候血氧监测和95种运动模式。去年发布的荣耀手环6开启了荣耀手环全面屏的时代,同时具有心率监谁会是骁龙8处理器真正首发?谁又会在新芯片大战中脱颖而出?随着骁龙8处理器正式发布,各个手机厂家也开始忙活了起来,争先恐后的抢占首发位置。在小米和摩托罗拉分别宣布首发后,realme也发声,称要做第二个发布搭载最新骁龙8处理器的手机。近年这样清理C盘,一下子多出几十个G今天打工妹打开电脑又看到了令人窒息的画面虽然D盘也够呛了,不过D盘清理起来相对简单(文末打工妹也会分享哦)C盘就没那么容易了!!!清理C盘并不像清理其它盘那么简单一不小心错删了文件不要无端联想,苹果AR应该和元宇宙没什么关系说到苹果最受期待的新产品,其中必然少不了一直在曝光,但是却不见其身影的苹果AR设备。直到前段时间为止,我们所知道的只有苹果确实在开发一套AR设备,至于这套AR设备的外观和性能则是以苹果通知iPhone供应商需求已经放缓彭博12月2日报道,苹果据悉将告诉iPhone供应商需求已经放缓。知情人士透露,苹果已通知零部件供应商,iPhone13系列的市场需求正在减弱,消费者的抢购意愿开始降低。据报道,由诺基亚全球第一!老厂牌提前抢苹果风头,网友你大爷还是你大爷随着电视剧两个人的世界的播出,男女主演火不火不知道,但作为道具之一的诺基亚着实火了一把。对于80后90后而言,诺基亚是很多人的青春记忆。随着智能手机新风口到来,诺基亚已经沉寂多时,168项升级加糖上市元气生活甜一点小蚂蚁甜粉款6。69万起168项升级加糖上市11月13日,奇瑞新能源AnteePark蚁粉狂欢节于魅力鹏城再度盛情开启。共创潮流生活,开启奇趣体验,在这场精彩纷呈快乐
从小米11换到三星GalaxyS21,明白一个道理高配低价未必好用年初的时候买了小米11,被它的配置和价格深深吸引,3999元的价格就能买到2K屏幕1亿像素全场景快充以及骁龙888旗舰芯片,同价位的友商机型少说也要4500元以上了,但是小米11使小米也要造翻盖折叠手机?相关翻盖折叠专利曝光最新消息,小米公司在中国国家知识产权协会申请了一项设备专利,专利中手机的设计与上周四发布的华为P50Pocket以及三星GalaxyZFlip3折叠手机类似,采用翻盖式折叠设计。据区块链默克尔树Merkle树是一种组织和构造大量数据以使其更易于处理的方法。在加密货币和区块链的情况下,Merkle树用于以对资源要求较低的方式构建交易数据。当在Merkle树结构中进行加密货币你被App窥视跟踪了吗?来源人民政协报记者李木元周佳佳黄喆经常听到周围的人跟我说,和朋友聚会时刚谈到一个商品,不久后在手机中就出现了该商品的广告还有人跟我反映,在用平台类App如打车软件电商购物App酒店教程19国产编程语言CbrotherExcel操作CBrother提供了一个Excel扩展来操作excle文件,目前支持。xlsx和。csv两种格式。Excel类Excel类表示一个Excel文档importlibexcelvar跑美团众包是一种什么体验?我用亲身经历告诉你!文章很长由于家庭原因,从0755辞职,回到了028地区,一时没有找到合适的工作,倒是很多公司打着招聘的旗号,不停面试求职者,从求职者那里套取经验和解决方案,最大限度的压缩成本,白piao经一文读懂物联网一什么是万物互联近年以来,万物互联一词持续都是热点,人人均在提万物互联,但绝大多数的人并没能实际上搞明白万物互联的含义。万物互联代表什么?按照度娘的话来说万物互联(InternetJZ从上往下打印二叉树从上往下打印二叉树题目描述从上往下打印出二叉树的每个节点,同层节点从左至右打印。题目链接从上往下打印二叉树(httpswww。nowcoder。compractice7fe2212税务再次出手!电商行业中招!论税收的重要性现如今,是不用出门就可以在网络上买到所需要的商品,这一切都要归功于电商。电商行业的发展越来越好,林子大了呢,就什么鸟都有了。税务再次出手!电商行业中招!论税收的重要性!电商行业呢,Java并发编程synchronized虽然多线程编程极大地提高了效率,但是也会带来一定的隐患。比如说两个线程同时往一个数据库表中插入不重复的数据,就可能会导致数据库中插入了相同的数据。今天我们就来一起讨论下线程安全问题微信版花呗终于要来了?随借随还按日计息,支付宝地位不保?自从移动支付发明以来,国人的生活发生了极大的变化,现在,只需一部手机就可以轻松解决很多生活的需求,只需要带着一部手机,就可以走遍全国。在日常消费时,我们再也不用手忙脚乱地掏出钱包,