湖仓一体电商项目(十)业务实现之编写写入DWD层业务代码
头条创作挑战赛业务实现之编写写入DWD层业务代码
DWD层数据主要存储干净的明细数据,这里针对ODS层KAFKAODSTOPIC数据编写代码进行清洗写入对应的Kafkatopic和IcebergDWD层中。代码功能中有以下几点重要方面:针对KafkaODS层中的数据进行清洗,写入IcebergDWD层中。将数据除了写入IcebergDWD层中之外,还要写入Kafka中方便后续处理得到DWS层数据。一、代码编写
编写处理KafkaODS层数据写入IcebergDWD层数据时,由于在KafkaKAFKAODSTOPICtopic中每条数据都已经有对应写入kafka的topic信息,所以这里我们只需要读取KAFKAODSTOPICtopic中的数据写入到IcebergDWD层中,另外动态获取每条数据写入Kafkatopic信息将每条数据写入到对应的topic即可。
具体代码参照ProduceODSDataToDWD。scala,大体代码逻辑如下:caseclassDwdInfo(icebergodstblname:String,kafkadwdtopic:String,browseproductcode:String,browseproducttpcode:String,userip:String,obtainpoints:String,userid1:String,userid2:String,frontproducturl:String,logtime:String,browseproducturl:String,id:String,ip:String,logintm:String,logouttm:String)objectProduceODSDataToDWD{privatevalkafkaBrokers:StringConfigUtil。KAFKABROKERSdefmain(args:Array〔String〕):Unit{1。准备环境valenv:StreamExecutionEnvironmentStreamExecutionEnvironment。getExecutionEnvironmentvaltblEnv:StreamTableEnvironmentStreamTableEnvironment。create(env)env。enableCheckpointing(5000)importorg。apache。flink。streaming。api。scala。2。需要预先创建Catalog创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持createtableifnotexists。。。语法tblEnv。executeSql(createcataloghadoopicebergwith(typeiceberg,catalogtypehadoop,warehousehdfs:myclusterlakehousedata)。stripMargin)2。创建KafkaConnector,连接消费Kafkaods中数据tblEnv。executeSql(createtablekafkaodstbl(icebergodstblnamestring,kafkadwdtopicstring,datastring)with(connectorkafka,topicKAFKAODSTOPIC,properties。bootstrap。serversnode1:9092,node2:9092,node3:9092,scan。startup。modelatestoffset,也可以指定earliestoffset、latestoffsetproperties。group。idmygroupid,formatjson)。stripMargin)valodsTbl:TabletblEnv。sqlQuery(selecticebergodstblname,data,kafkadwdtopicfromkafkaodstbl。stripMargin)valodsDS:DataStream〔Row〕tblEnv。toAppendStream〔Row〕(odsTbl)3。设置Sink到Kafka数据输出到侧输出流标记valkafkaDataTagnewOutputTag〔JSONObject〕(kafkadata)4。表准换成对应的DataStream数据处理,清洗ODS中的数据,存入Iceberg{icebergodstblname:ODSBROWSELOG,data:{browseProductCode:yyRAteviDb,browseProductTpCode:120,userIp:117。233。5。190,obtainPoints:24,userId:uid464936,frontProductUrl:https:1P2RQbHFS2,logTime:1647065858856,browseProductUrl:https:RXmiOUxRTliu9TE0},kafkadwdtopic:KAFKADWDBROWSELOGTOPIC}{icebergodstblname:ODSUSERLOGIN,data:{database:lakehousedb,xid:14942,userid:uid283876,ip:215。148。233。254,commit:true,id:10052,type:insert,logouttm:1647066506140,table:mcuserlogin,ts:1647066504,logintm:1647051931534},kafkadwdtopic:KAFKADWDUSERLOGINTOPIC}这里将数据转换成DataStream后再转换成表写入Iceberg对数据只是时间进行清洗,转换成DwdInfo类型DataStream返回,先过滤一些数据为null的valdwdDS:DataStream〔DwdInfo〕odsDS。filter(row{row。getField(0)!nullrow。getField(1)!nullrow。getField(2)!null})。process(newProcessFunction〔Row,DwdInfo〕(){overridedefprocessElement(row:Row,context:ProcessFunction〔Row,DwdInfo〕Context,collector:Collector〔DwdInfo〕):Unit{valicebergodstblname:Stringrow。getField(0)。toStringvaldata:Stringrow。getField(1)。toStringvalkafkadwdtopic:Stringrow。getField(2)。toStringvaljsonObj:JSONObjectJSON。parseObject(data)清洗日期数据jsonObj。put(logTime,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logTime)))jsonObj。put(logintm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logintm)))jsonObj。put(logouttm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logouttm)))解析json嵌套数据valbrowseproductcode:StringjsonObj。getString(browseProductCode)valbrowseproducttpcode:StringjsonObj。getString(browseProductTpCode)valuserip:StringjsonObj。getString(userIp)valobtainpoints:StringjsonObj。getString(obtainPoints)valuserid1:StringjsonObj。getString(userid)valuserid2:StringjsonObj。getString(userId)valfrontproducturl:StringjsonObj。getString(frontProductUrl)vallogtime:StringjsonObj。getString(logTime)valbrowseproducturl:StringjsonObj。getString(browseProductUrl)valid:StringjsonObj。getString(id)valip:StringjsonObj。getString(ip)vallogintm:StringjsonObj。getString(logintm)vallogouttm:StringjsonObj。getString(logouttm)往各类数据datajson对象中加入sinkdwdtopic的信息jsonObj。put(kafkadwdtopic,kafkadwdtopic)context。output(kafkaDataTag,jsonObj)collector。collect(DwdInfo(icebergodstblname,kafkadwdtopic,browseproductcode,browseproducttpcode,userip,obtainpoints,userid1,userid2,frontproducturl,logtime,browseproducturl,id,ip,logintm,logouttm))}})valpropsnewProperties()props。setProperty(bootstrap。servers,kafkaBrokers)6。将以上数据写入到Kafka各自DWD层topic中,这里不再使用SQL方式,而是直接使用DataStream代码方式Sink到各自的DWD层代码中dwdDS。getSideOutput(kafkaDataTag)。addSink(newFlinkKafkaProducer〔JSONObject〕(KAFKADWDDEFAULTTOPIC,newKafkaSerializationSchema〔JSONObject〕{overridedefserialize(jsonObj:JSONObject,aLong:lang。Long):ProducerRecord〔Array〔Byte〕,Array〔Byte〕〕{valsinkDwdTopic:StringjsonObj。getString(kafkadwdtopic)newProducerRecord〔Array〔Byte〕,Array〔Byte〕〕(sinkDwdTopic,null,jsonObj。toString。getBytes())}},props,FlinkKafkaProducer。Semantic。ATLEASTONCE))env。execute()}}
二、创建IcebergDWD层表
代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:1、在Hive中添加Iceberg表格式需要的包
启动HDFS集群,node1启动Hivemetastore服务,在Hive客户端启动Hive添加Iceberg依赖包:node1节点启动Hivemetastore服务〔rootnode1〕hiveservicemetastore在hive客户端node3节点加载两个jar包addjarsoftwarehive3。1。2libiceberghiveruntime0。12。1。jar;addjarsoftwarehive3。1。2liblibfb3030。9。3。jar;
2、创建Iceberg表
这里创建IcebergDWD表有DWDUSERLOGIN,创建语句如下:CREATETABLEDWDUSERLOGIN(idstring,useridstring,ipstring,logintmstring,logouttmstring)STOREDBYorg。apache。iceberg。mr。hive。HiveIcebergStorageHandlerLOCATIONhdfs:myclusterlakehousedataicebergdbDWDUSERLOGINTBLPROPERTIES(iceberg。cataloglocationbasedtable,write。metadata。deleteaftercommit。enabledtrue,write。metadata。previousversionsmax3);
三、代码测试
以上代码编写完成后,代码执行测试步骤如下:1、在Kafka中创建对应的topic在Kafka中创建KAFKADWDUSERLOGINTOPICtopic。kafkatopics。shzookeepernode3:2181,node4:2181,node5:2181createtopicKAFKADWDUSERLOGINTOPICpartitions3replicationfactor3监控以上topic数据〔rootnode1bin〕。kafkaconsoleconsumer。shbootstrapservernode1:9092,node2:9092,node3:9092topicKAFKADWDUSERLOGINTOPIC
2、将代码中消费Kafka数据改成从头开始消费
代码中KafkaConnector中属性scan。startup。mode设置为earliestoffset,从头开始消费数据。
这里也可以不设置从头开始消费Kafka数据,而是直接启动实时向MySQL表中写入数据代码RTMockDBData。java代码,实时向MySQL对应的表中写入数据,这里需要启动maxwell监控数据,代码才能实时监控到写入MySQL的业务数据。3、执行代码,查看对应结果
以上代码执行后在,在对应的KafkaKAFKADWDUSERLOGINTOPICtopic中都有对应的数据。在IcebergDWD层中对应的表中也有数据。
Kafka中结果如下:
IcebergDWD层表DWDUSERLOGIN中的数据如下:
四、架构图
卡塔尔世界杯修改涉台表述台当局不高兴了据台湾中时新闻网6月20日报道,卡塔尔世界杯球迷证注册系统已将台湾选项改成中华台北,对此,中方赞赏卡塔尔政府恪守一个中国原则。而台当局则表示高度遗憾。报道称,卡塔尔世界杯相关注册平
唐山案被打女子官方通报来了四人轻伤或轻微伤警方通报唐山打人案4人轻伤或轻微伤关于陈某志等涉嫌寻衅滋事暴力殴打他人等案件侦办进展情况的通报2022年6月10日凌晨,唐山市路北区某烧烤店发生起寻衅滋事暴力殴打他人案件,造成恶劣
唐山事件最新消息被打女孩的律师首次发声,透露重要信息唐山事件最新消息被打女孩的律师首次发声,透露重要信息最近一段时间,不知是巧合还是政府的雷霆出击,唐山烤肉店打人事件,终于等来了一个好消息。最高人民检察院表示,将依法严厉打击严重危害
唐山市公安局路北分局局长马爱军等人接受审查调查河北省纪检监察机关对唐山市某烧烤店寻衅滋事暴力殴打他人案件及其他相关案件涉及的公职人员涉嫌严重违纪违法问题,依纪依法开展审查调查。目前,经河北省纪委监委指定管辖,唐山市路北区政府党
这些姿势,男女都喜欢做,看看你有没有关注我,每天分享有趣漫画漫画来源于匡北北文百芜baiwu为什么别人越变越好看,而你却越来越不上镜?明明已经花了很多钱去做SPA报健身班,脸看起来不错,但是一照全身就不忍直视,究其原
为何取名福建号?邱毅给出三点解释,称统一行动福建作用重要003型航母命名为福建号,有何深意?台湾著名政论家邱毅给出三点解释。福建号战力强悍,美台将感到绝望。近期003型航母正式下水的新闻赚足了舆论眼球,像CNN纽约时报BBC这些平时阴阳
坚持吃护肝片,真能保护肝脏不受损吗?本文告诉你答案护肝片是一种中药的复方制剂,里边的主要成分包含了柴胡,五味子,板蓝根,绿豆,猪胆粉等成分,进入人体之后,可以达到健脾消食,疏肝理气的功效。生活当中很多人会适当地吃一些护肝片,其主要
同样是父亲节晚餐,先看潮汕,再看东北,网友我更喜欢东北的俗话说柴米油盐酱醋茶,人间烟火也有趣,品味各地美食,食尽人间烟火,是人生至幸之事。大家好,我是小菲,一枚热爱美食的80后,如果恰好您也喜欢美食,那就关注我吧!现在人们喜欢过节,无论
最好的报答不是请帮你忙的人吃饭,而是用这四种方式我其实今天都还挺开心的,一直说到表演节目,我真的很不开心!王传君坦诚地说。宋丹丹挥手笑道哈,你可以不演!王传君附和对对对,我一定不会演的!昨晚播出的五十公里桃花坞2中,所有人都无奈
为什么男生接吻时喜欢摸胸?今天后台有个妹子突然问性情哥一个问题,她说为什么亲吻的时候男人一定要摸胸?我说,大概是因为男人在亲吻的时候不知道把手放哪,于是就自然而然的摸胸了。不知道大家有没有经历过此类的事情呢
宝庆银楼屡次被市民投诉一口价黄金猫腻多自古以来,黄金在人们的交换贸易过程中,一直承担着货币的角色。提起黄金饰品,就想起保值,在某种程度上,已经成为了社会面上的某种价值共识。然而,您是否有想过,如今的金店套路重重,买黄金