专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

湖仓一体电商项目(十)业务实现之编写写入DWD层业务代码

  头条创作挑战赛业务实现之编写写入DWD层业务代码
  DWD层数据主要存储干净的明细数据,这里针对ODS层KAFKAODSTOPIC数据编写代码进行清洗写入对应的Kafkatopic和IcebergDWD层中。代码功能中有以下几点重要方面:针对KafkaODS层中的数据进行清洗,写入IcebergDWD层中。将数据除了写入IcebergDWD层中之外,还要写入Kafka中方便后续处理得到DWS层数据。一、代码编写
  编写处理KafkaODS层数据写入IcebergDWD层数据时,由于在KafkaKAFKAODSTOPICtopic中每条数据都已经有对应写入kafka的topic信息,所以这里我们只需要读取KAFKAODSTOPICtopic中的数据写入到IcebergDWD层中,另外动态获取每条数据写入Kafkatopic信息将每条数据写入到对应的topic即可。
  具体代码参照ProduceODSDataToDWD。scala,大体代码逻辑如下:caseclassDwdInfo(icebergodstblname:String,kafkadwdtopic:String,browseproductcode:String,browseproducttpcode:String,userip:String,obtainpoints:String,userid1:String,userid2:String,frontproducturl:String,logtime:String,browseproducturl:String,id:String,ip:String,logintm:String,logouttm:String)objectProduceODSDataToDWD{privatevalkafkaBrokers:StringConfigUtil。KAFKABROKERSdefmain(args:Array〔String〕):Unit{1。准备环境valenv:StreamExecutionEnvironmentStreamExecutionEnvironment。getExecutionEnvironmentvaltblEnv:StreamTableEnvironmentStreamTableEnvironment。create(env)env。enableCheckpointing(5000)importorg。apache。flink。streaming。api。scala。2。需要预先创建Catalog创建Catalog,创建表需要在Hive中提前创建好,不在代码中创建,因为在Flink中创建iceberg表不支持createtableifnotexists。。。语法tblEnv。executeSql(createcataloghadoopicebergwith(typeiceberg,catalogtypehadoop,warehousehdfs:myclusterlakehousedata)。stripMargin)2。创建KafkaConnector,连接消费Kafkaods中数据tblEnv。executeSql(createtablekafkaodstbl(icebergodstblnamestring,kafkadwdtopicstring,datastring)with(connectorkafka,topicKAFKAODSTOPIC,properties。bootstrap。serversnode1:9092,node2:9092,node3:9092,scan。startup。modelatestoffset,也可以指定earliestoffset、latestoffsetproperties。group。idmygroupid,formatjson)。stripMargin)valodsTbl:TabletblEnv。sqlQuery(selecticebergodstblname,data,kafkadwdtopicfromkafkaodstbl。stripMargin)valodsDS:DataStream〔Row〕tblEnv。toAppendStream〔Row〕(odsTbl)3。设置Sink到Kafka数据输出到侧输出流标记valkafkaDataTagnewOutputTag〔JSONObject〕(kafkadata)4。表准换成对应的DataStream数据处理,清洗ODS中的数据,存入Iceberg{icebergodstblname:ODSBROWSELOG,data:{browseProductCode:yyRAteviDb,browseProductTpCode:120,userIp:117。233。5。190,obtainPoints:24,userId:uid464936,frontProductUrl:https:1P2RQbHFS2,logTime:1647065858856,browseProductUrl:https:RXmiOUxRTliu9TE0},kafkadwdtopic:KAFKADWDBROWSELOGTOPIC}{icebergodstblname:ODSUSERLOGIN,data:{database:lakehousedb,xid:14942,userid:uid283876,ip:215。148。233。254,commit:true,id:10052,type:insert,logouttm:1647066506140,table:mcuserlogin,ts:1647066504,logintm:1647051931534},kafkadwdtopic:KAFKADWDUSERLOGINTOPIC}这里将数据转换成DataStream后再转换成表写入Iceberg对数据只是时间进行清洗,转换成DwdInfo类型DataStream返回,先过滤一些数据为null的valdwdDS:DataStream〔DwdInfo〕odsDS。filter(row{row。getField(0)!nullrow。getField(1)!nullrow。getField(2)!null})。process(newProcessFunction〔Row,DwdInfo〕(){overridedefprocessElement(row:Row,context:ProcessFunction〔Row,DwdInfo〕Context,collector:Collector〔DwdInfo〕):Unit{valicebergodstblname:Stringrow。getField(0)。toStringvaldata:Stringrow。getField(1)。toStringvalkafkadwdtopic:Stringrow。getField(2)。toStringvaljsonObj:JSONObjectJSON。parseObject(data)清洗日期数据jsonObj。put(logTime,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logTime)))jsonObj。put(logintm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logintm)))jsonObj。put(logouttm,DateUtil。getDateYYYYMMDDHHMMSS(jsonObj。getString(logouttm)))解析json嵌套数据valbrowseproductcode:StringjsonObj。getString(browseProductCode)valbrowseproducttpcode:StringjsonObj。getString(browseProductTpCode)valuserip:StringjsonObj。getString(userIp)valobtainpoints:StringjsonObj。getString(obtainPoints)valuserid1:StringjsonObj。getString(userid)valuserid2:StringjsonObj。getString(userId)valfrontproducturl:StringjsonObj。getString(frontProductUrl)vallogtime:StringjsonObj。getString(logTime)valbrowseproducturl:StringjsonObj。getString(browseProductUrl)valid:StringjsonObj。getString(id)valip:StringjsonObj。getString(ip)vallogintm:StringjsonObj。getString(logintm)vallogouttm:StringjsonObj。getString(logouttm)往各类数据datajson对象中加入sinkdwdtopic的信息jsonObj。put(kafkadwdtopic,kafkadwdtopic)context。output(kafkaDataTag,jsonObj)collector。collect(DwdInfo(icebergodstblname,kafkadwdtopic,browseproductcode,browseproducttpcode,userip,obtainpoints,userid1,userid2,frontproducturl,logtime,browseproducturl,id,ip,logintm,logouttm))}})valpropsnewProperties()props。setProperty(bootstrap。servers,kafkaBrokers)6。将以上数据写入到Kafka各自DWD层topic中,这里不再使用SQL方式,而是直接使用DataStream代码方式Sink到各自的DWD层代码中dwdDS。getSideOutput(kafkaDataTag)。addSink(newFlinkKafkaProducer〔JSONObject〕(KAFKADWDDEFAULTTOPIC,newKafkaSerializationSchema〔JSONObject〕{overridedefserialize(jsonObj:JSONObject,aLong:lang。Long):ProducerRecord〔Array〔Byte〕,Array〔Byte〕〕{valsinkDwdTopic:StringjsonObj。getString(kafkadwdtopic)newProducerRecord〔Array〔Byte〕,Array〔Byte〕〕(sinkDwdTopic,null,jsonObj。toString。getBytes())}},props,FlinkKafkaProducer。Semantic。ATLEASTONCE))env。execute()}}
  二、创建IcebergDWD层表
  代码在执行之前需要在Hive中预先创建对应的Iceberg表,创建Icebreg表方式如下:1、在Hive中添加Iceberg表格式需要的包
  启动HDFS集群,node1启动Hivemetastore服务,在Hive客户端启动Hive添加Iceberg依赖包:node1节点启动Hivemetastore服务〔rootnode1〕hiveservicemetastore在hive客户端node3节点加载两个jar包addjarsoftwarehive3。1。2libiceberghiveruntime0。12。1。jar;addjarsoftwarehive3。1。2liblibfb3030。9。3。jar;
  2、创建Iceberg表
  这里创建IcebergDWD表有DWDUSERLOGIN,创建语句如下:CREATETABLEDWDUSERLOGIN(idstring,useridstring,ipstring,logintmstring,logouttmstring)STOREDBYorg。apache。iceberg。mr。hive。HiveIcebergStorageHandlerLOCATIONhdfs:myclusterlakehousedataicebergdbDWDUSERLOGINTBLPROPERTIES(iceberg。cataloglocationbasedtable,write。metadata。deleteaftercommit。enabledtrue,write。metadata。previousversionsmax3);
  三、代码测试
  以上代码编写完成后,代码执行测试步骤如下:1、在Kafka中创建对应的topic在Kafka中创建KAFKADWDUSERLOGINTOPICtopic。kafkatopics。shzookeepernode3:2181,node4:2181,node5:2181createtopicKAFKADWDUSERLOGINTOPICpartitions3replicationfactor3监控以上topic数据〔rootnode1bin〕。kafkaconsoleconsumer。shbootstrapservernode1:9092,node2:9092,node3:9092topicKAFKADWDUSERLOGINTOPIC
  2、将代码中消费Kafka数据改成从头开始消费
  代码中KafkaConnector中属性scan。startup。mode设置为earliestoffset,从头开始消费数据。
  这里也可以不设置从头开始消费Kafka数据,而是直接启动实时向MySQL表中写入数据代码RTMockDBData。java代码,实时向MySQL对应的表中写入数据,这里需要启动maxwell监控数据,代码才能实时监控到写入MySQL的业务数据。3、执行代码,查看对应结果
  以上代码执行后在,在对应的KafkaKAFKADWDUSERLOGINTOPICtopic中都有对应的数据。在IcebergDWD层中对应的表中也有数据。
  Kafka中结果如下:
  IcebergDWD层表DWDUSERLOGIN中的数据如下:
  四、架构图

15年后回看贪官炸死情妇案,这场人为的悲剧,比想象得更恶劣2007年7月9日,在济南闹市街头,一辆悬挂着内部牌照的豪车,轰然爆炸。与车辆自燃不同,豪车爆炸的威力过大,女驾驶员直接被炸得粉身碎骨。有目击群众称,飞溅的鲜血让现场仿佛下了一场血叶问时代变了,年轻人不讲武德年幼的你第一次去表哥家,看到表哥在玩游戏机。猫叫大哥!我差后面那点流量吗?钓吧看个看到这个想摔手机!我为了用这个,把家里的裤子都开了两个洞了。痛击我方队友!道具赛这手机多少钱,哪里萌娃猜猜猜猜孩子,你应该快毕业咯,新东方少儿一班的吧?下面这位小朋友,是奶奶带呢?还是妈妈带的?可以猜歌咯,我猜是小苹果宝宝开始正儿八经的哭了,谁学我??信不信我打你?来了一个小小领舞,这舞那中山大学法学院杨教授骗走我的初夜,如果我不说还会有下一个导读7月2日晚,一位网友发表了一篇长文,称在中山大学学习期间,中山大学法学院的一位姓杨的教授利用自己作为教师的身份光环,诱导自己一步步陷入他的情感陷阱,毕业后的某个晚上骗走了她个人陈冲定居美国太惬意,戴黄金现身饭店显富贵,戏骨尤勇智罕见露面饿了吗?戳右边关注我们,每天给您送上最新出炉的娱乐硬核大餐!7月3日,某网友在社交平台上为自家餐饮店做宣传,并晒出与圈内一众大咖的合影,引发外界热议。照片中,陈冲身穿碎花裙搭配一件男朋友突然变得不愿意和你交流,怎么办?今天的文章先从一个来访者的案例讲起她和男朋友是从刚上大学时开始谈恋爱的。毕业后两个人并没有着急结婚,而是先以工作为主,因为两人都觉得自己现在正处于事业上升期,等经济基础扎实了才会考05年,山东男生因强奸罪被判无期,15年后无罪释放并获332万赔偿2005年2月11日下午,山东省临沭二中分校一名教师报警称,学校的宿管李贞梅在打扫卫生时,在一废弃的洗涮间内发现一具女尸,经警方调查,认定该具尸体就是本校失踪一个月的高一女生高妍。女生发文8000字揭露法学院教授玩弄感情,夺走初夜中山大学回应7月2日晚,有网友发表长文,揭露中山大学法学院一位杨姓教授玩弄感情,而且夺走了她的初夜,还玩暴力和消失。文章发出后,就得到了大家的关注,中山大学官微在7月3日上午11点发微博称已启中大法学院教授与多名女生保持不正当关系?女大学生发文举报文江湖独白专栏如何成为一名法学教授?身为从事法律行业的一名学者,他需要拥有博士学位,在国外有至少有一年的工作经验,再经过学校的选拔任用,是一位对中国法律知之甚详的人士,能够传道解惑法学院教授开房诱骗女生初夜,大尺度聊天内容曝光,中大展开调查7月2日晚,有网友发表长文,称自己于中山大学在读期间,中山大学法学院一位杨姓教授利用自己身为教师的身份光环,一步步诱导自己落入其情感圈套,并在自己毕业后骗走自己初夜据该网友的梳理!中山大学教授诱骗女学生开房?有多少女生,毁在了教授的魔爪下?7月2日晚,有一个网友深夜发文,痛斥自己的第一次被自己学院的教授一步步诱骗,落入感情圈套,导致最宝贵的东西被骗走。该教授还洋洋得意,说自己此生无悔了。网友说这个教授已经离婚十年,女
卡塔尔世界杯修改涉台表述台当局不高兴了据台湾中时新闻网6月20日报道,卡塔尔世界杯球迷证注册系统已将台湾选项改成中华台北,对此,中方赞赏卡塔尔政府恪守一个中国原则。而台当局则表示高度遗憾。报道称,卡塔尔世界杯相关注册平唐山案被打女子官方通报来了四人轻伤或轻微伤警方通报唐山打人案4人轻伤或轻微伤关于陈某志等涉嫌寻衅滋事暴力殴打他人等案件侦办进展情况的通报2022年6月10日凌晨,唐山市路北区某烧烤店发生起寻衅滋事暴力殴打他人案件,造成恶劣唐山事件最新消息被打女孩的律师首次发声,透露重要信息唐山事件最新消息被打女孩的律师首次发声,透露重要信息最近一段时间,不知是巧合还是政府的雷霆出击,唐山烤肉店打人事件,终于等来了一个好消息。最高人民检察院表示,将依法严厉打击严重危害唐山市公安局路北分局局长马爱军等人接受审查调查河北省纪检监察机关对唐山市某烧烤店寻衅滋事暴力殴打他人案件及其他相关案件涉及的公职人员涉嫌严重违纪违法问题,依纪依法开展审查调查。目前,经河北省纪委监委指定管辖,唐山市路北区政府党这些姿势,男女都喜欢做,看看你有没有关注我,每天分享有趣漫画漫画来源于匡北北文百芜baiwu为什么别人越变越好看,而你却越来越不上镜?明明已经花了很多钱去做SPA报健身班,脸看起来不错,但是一照全身就不忍直视,究其原为何取名福建号?邱毅给出三点解释,称统一行动福建作用重要003型航母命名为福建号,有何深意?台湾著名政论家邱毅给出三点解释。福建号战力强悍,美台将感到绝望。近期003型航母正式下水的新闻赚足了舆论眼球,像CNN纽约时报BBC这些平时阴阳坚持吃护肝片,真能保护肝脏不受损吗?本文告诉你答案护肝片是一种中药的复方制剂,里边的主要成分包含了柴胡,五味子,板蓝根,绿豆,猪胆粉等成分,进入人体之后,可以达到健脾消食,疏肝理气的功效。生活当中很多人会适当地吃一些护肝片,其主要同样是父亲节晚餐,先看潮汕,再看东北,网友我更喜欢东北的俗话说柴米油盐酱醋茶,人间烟火也有趣,品味各地美食,食尽人间烟火,是人生至幸之事。大家好,我是小菲,一枚热爱美食的80后,如果恰好您也喜欢美食,那就关注我吧!现在人们喜欢过节,无论最好的报答不是请帮你忙的人吃饭,而是用这四种方式我其实今天都还挺开心的,一直说到表演节目,我真的很不开心!王传君坦诚地说。宋丹丹挥手笑道哈,你可以不演!王传君附和对对对,我一定不会演的!昨晚播出的五十公里桃花坞2中,所有人都无奈为什么男生接吻时喜欢摸胸?今天后台有个妹子突然问性情哥一个问题,她说为什么亲吻的时候男人一定要摸胸?我说,大概是因为男人在亲吻的时候不知道把手放哪,于是就自然而然的摸胸了。不知道大家有没有经历过此类的事情呢宝庆银楼屡次被市民投诉一口价黄金猫腻多自古以来,黄金在人们的交换贸易过程中,一直承担着货币的角色。提起黄金饰品,就想起保值,在某种程度上,已经成为了社会面上的某种价值共识。然而,您是否有想过,如今的金店套路重重,买黄金
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网