1。背景 数据增量同步是ETL关键功能,在全量同步后,持续增量同步,保证数据的完整,正确和时效,通常有两种方式实现,双写和CDC 双写优点,实现简单,写入源库同时写入目标库;缺点,代码侵入,影响正常业务 CDC优点,无侵入,读取数据库log,获取数据变更;缺点,复杂,需要引入CDC组件,从数据变更(表行字段变更)到目标增量变更(通常是DTO)需要复杂的映射 Cdc组件本身通用设计,支持扩展redis,elasticsearch等数据库同步 本文包括两部分,cdc同步框架和基于cdc同步框架的关系图增量同步设计2。参考和术语 CDCchangedatacapture数据变更抓获 RBT基于规则的转换组件 《分布式datax架构设计》 《分布式datax详细(落地)设计》 《分布式时间槽设计》3。SETL介绍 下图介绍SETL逻辑架构和规划 setlrbt全量同步组件,datax组件,接入分布式调度,实现高性能的全量同步 setlcdccdc增量同步datax组件,接入分布式时间槽实现高可靠增量,后续规划接入kafkaconnect setlstream规划中,流式etl,引入kafkaconnect,实现高吞吐低延时的增量同步 configcenter配置中心,datax原生使用本地文件配置,配置中心摆脱本地文件限制,实现分布式系统的必要基础设施 rbtransformer基于规则引擎的转换器组件,针对dataxrecord seltdata相当于springdata,数据读写4。DebeziumCDC原理 Debeziumcdc组件,支持多种关系数据库,如sqlserver,db2,oracle,mysql等,也支持newsqlnosql,如Cassandra,mangodb,抽取并解释数据库日志获取数据变更,支持以事件监听模式消费事件,debezium支持ack机制,实现事件消费的可靠性 下图是dbz主要模块 Apidbz引擎定义,目前只有一个实现,EmbeddedEngine Embeddeddbz引擎的实现,顾名思义,可以嵌入到用户的代码,轻量级的,称为api方式 Serverdbz预置的事件分发服务,开箱即用,实现分发变更事件到redis,plusar和其他云消息服务,称为dbzengineserver方式,但其依赖Quarkus,一个对标springcloud框架,如果你已使用springcloud构建你的服务,可能对该方式不是很感兴趣 connector官方推荐的deploy方式,如下图,接入kafkaconnect,connector作为kafkaconnect的source,捕获源数据库变更,以事件方式推动到kafka,订阅事件的sink,写入到目标库,该方式充分利用kafka特性 前2种方式没有接入kafka,使用的是相同的connector,但也依赖kafka类,如事件,消费接口等 5。setldatacdc开发框架 上图是setlcdcsdk,虚线框是dbz的类,其他属于estldatacdc组件 DbzEngineClientConfiguration构建和初始化dbz引擎客户端,支持读取本地配置文件,配置中心 DebeziumEngineClientdbz引擎客户端,负责启动停止dbz引擎 DispatchChangeEventConsumerImplcdc事件消费实现,接收原生事件,负责批量ack DispatchChangeEventListenerImplChangEventListenersetlcdc组件 1)接收原生变更事件,读取和解释到RowDataBean,使用内部模型,如,操作类型,数据的before和after,解耦对dbz的依赖,也更方便后续使用 2)分类变更事件,目前只有行数据(row)事件,过滤掉query操作,分发给ChangeEventRowListener BinlogSyncChangeEventRowListener实现,实现订阅服务,依据ChangeEventHandler设置感兴趣的db。table和增删改类型调用处理器 开发人员实现自己的同步业务只需实现ChangeEventHandler即可,sdk隔离dbzcdc的内部机制6。关系图(neo4j)增量同步设计 增量同步是cdc开发框架应用,cdc告诉我们哪个库哪个表哪行哪些字段变更,但增量同步的目标库,如,neo4j,elasticsearch存储相当于dto,因此需要映射计算框架,从表行字段的变更映射为变更的dto 配置和执行 上图是CDC模型,包括CDC定义,映射Action设计,属于rbtransformer模块 CDCcdc配置模型,同时负责选择和执行适用cdc动作 CdcActioncdc动作,数据变更映射到目标库变更的逻辑实现 CdcRulecdc动作是否适用数据变更,CdcRule返回布尔值 CDCRunningContextcdc运行上下文 dataxreader Cdc动作和变更事件处理器datax实现,该实现接入到dataxreader,使用datax的readertransformwriter机制,cdc发出数据变更事件,cdc动作负责映射为目标变更,transform规则转换,最后到writer,目标变更写入目标库 同步实现时序 同步实现时序展示数据变更事件分发,CDCAction执行 关键步骤: 1。21。21ChangeConsumer负责批量ack 1。31。4解释变更事件,并转化为内部RowDataBean对象,解耦dbz 1。5BinlogSync是ChangeEventRowListener实现,从这开始进入同步业务 1。61。7查找对表变更感兴趣的cdc,表粒度较大,初步筛选,1。12精确筛选 1。121。16onConditional通过规则返回action是否适用(RowDataBean),若适用执行 1。15CdcReaderAction依据RowDataBean,源数据变更映射为目标变更,同步到目标库 关系图CDC配置示例 1)场景 上图cdc示例schema,覆盖3类场景 1。连接表关系变更filmfilmcategorycategory 2。外键关系变更customerstore 3。主从表多对一变更storeaddress对应图库的store顶点 2)配置示例解释 Cdc挂在rbt转换下,两者并无直接关系,这样设计主要方便cdc编写时参考 Tables标签该cdc相关的表,可设置多个表 Insertupdatedelete标签分别对应增删改分类的cdc动作,实际上,insert的事件可以产生update的action,该分类只是管理维度 Action标签数据变更对应的动作,映射源变更为目标变更 上图展示源customer表insert引起目标图库两个动作 1)新增的customer顶点 2)如果新增customer带有store外键,新增customerstore关系,适用规则 r。afterField(storeid)!null,插入数据storeid不为空 上图源customer变更引起目标变更, 规则!(r。updates()。size()2r。isUpdate(storeid)) 判断数据变更除了storeid还有其他字段变更才出发,storeid变更,引起后面两个动作,删除旧关系,新增新关系,关系rule是一样的,参数来源不一样,删除的storeid来源于before,新增来源于after 数据行lastUpdate必然变更,所以r。updates()。size()2 上图删除变更映射,customer删除,首先删除关系,再删除节点 退出策略(TBD) Cdc是事件流,事件源源不断产生,认为是永续执行,目前cdc接入datax,定时调度执行,因此需要一个机制退出,待下次定时调度再执行,例如,执行时长,未接收到事件时长等7。数据可靠性可靠channel 可靠channel,可确认的分布持久的channel,Channel不可靠对于CDC是致命的,丢失数据;但对于全量同步可以接受,全量同步故障转移后,整个分片重新同步即可。可靠channel对于数据量比较大,没有分片的情况也非常有价值,相当于断点续传的能力,但对性能有一定影响 CDC原生Dataxchannel分析 整个数据链路包括2部分, 第一段,CDC变更事件推送到reader,reader写到Exchanger(Channel)成功后ackCDC 第二段,writer从Exchanger拉取数据变更,同步到目标存储 另外,Channel承担流量统计和流控的职责 可以看到,第二段是不可靠的,MemoryChannel底层使用内存ArrayBlockingQueue存放数据,datax节点崩溃,故障转移后,原节点Channel的数据将丢失 Buffered类型Exchanger缓存Record,批量提交,存在丢失可能,reader需要非bufferedExchange配合,writer可以适用buffered或非buffered 可靠channel设计原理和实现 1)实现方案推模式 数据链路同样的两个阶段,不同的是第二阶段,channel引入mq作为持久存储,提供可确认,方案改变原数据链路,数据从mq获取,writer依赖mq,从而也改变了writer开发模型,6。16。2只是激活pull统计,获取的数据并不使用。6。16。2放在57之间,是为了pull统计更准确 2)实现方案拉模式 同方案1,引入mq,不同的是,mq作为本地queue持久存储,Channel封装起来,writer不需要依赖mq,数据链路与原生一样,主动获取mq消息。本方案保持数据链路形态,即writer通过RecordReceiver获取Record。缺点,ExchangerChannel增加ack方法,主动消费,涉及消费异步ack问题 推模式下channel统计 推模式下,旁路读取record,读取record通过消息引擎,需要通知channel读取了record,channel计算record的大小,发起统计 RecordReceiver接口增加byPassReader方法 publicvoidbyPassReader(Recordrecord);7分布式dataXCDC 分布式dataXCDC有两种可选方式,分布式作业和分布式时间槽 分布式作业在《分布式dataX详细(落地)设计》介绍过,dataXCDC单分片,使用分布式作业,只有一个worker作业工作,其他worker作业备用状态,资源利用率不高,因此,分布式时间槽比较合适 技术架构 下面介绍分布式dataXCDC的技术架构,下图是分布式datax和分布式dataxCDC技术架构对比,前者使用分布式作业,后者分布式时间槽,通过对比更好了解分布式dataXCDC 上为分布式dataXCDC,使用分布式时间槽模式;下为分布式dataX,使用分布式作业模式 1。作业节点 dataXCDC:分布式作业分片对应dataX作业,是standalone模式的dataxengine dataX:作业(worker)节点分片是dataX作业分片,是任务组模式的dataxengine 2。Client dataXCDC:管理台api写入分片,没有专用client dataX:专用的client,作业模式下DataxEngine,使用分布式调度器,负责分片和分配分片 3。配置中心 dataXCDC:配置放入配置中心,避免每个节点存放,影响动态伸缩和维护,当然也支持本地配置文件,每个worker节点配置所有的dataXCDC作业,用于failover dataX:作业配置直接写入分片的config节点,其他配置也可使用配置中心 4。作业任务租统计 dataXCDC:任务组的统计也是作业统计,因此不需要聚合为作业统计,直接拉取复制到prefix,latestprefix按设定的时间段计算速率;cdc没有最终summary,持久库保存多个summary,设定保存时限自动删除 dataX:设计比较复杂,参考《分布式datax详细(落地)设计》 5。分片策略 dataXCDC:eager模式,一次分配完分片,尽可能早地执行所有分片,达成用户触发时间要求 dataX:ondemand模式,按需分配工作量,获得更小的总体执行时间 6。故障转移 两者故障转移机制一致,节点下线,其他节点接替分片,不同的有两点, 6。1CDC处理事件流,事件流经channel,需要可靠channel,节点切换后继续执行未ack的channel内的事件,作业整个分片重新处理; 6。2cdc持续运行,需要保证一定的空闲节点数,需监控告警空闲节点数 动态分片新增和撤销 分布式时间槽支持cancel分片,设置cancel标志,待下次重启删除cancel分片,但cdc一直运行,该功能不能生效,因此目前未有动态分片新增和撤销 znode结构 setlcdc根节点,可以看成一个用户域,用户定义 cdcwatcher观测节点,一个域只需一个观测者,其分片与域内所有作业的所有分片一一对应,用户定义 cdc1001逻辑作业,其分片是dataxCDC作业,job模式的dataxEngine,这里有两个作业概念,分布式作业和dataX作业,域内可有多个作业,每个不同作业类型,上图两个分片对应两个dataXCDC作业,分片名称userId作业名称; 作业分片的配置 观测分片配置是任务组统计的reidskey前缀 dataXCDC作业配置,dataX作业jobId,作业配置,下图配置是本地文件,若使用配置中心,url以cc:开始 8架构质量设计 可靠性数据变更事件不丢失,数据变更事件至少消费一次,但允许重复处理 依赖可靠channel,只有事件处理完毕ack,源头事件偏移更新,否则,事件重发,事件处理器需保证幂等性 高可用变更事件处理是按顺序,只能单个线程按顺序处理,高可用是主备架构,处理节点失效,备用节点激活无缝接上失效节点 这里涉及两个关键点,处理状态恢复,包括debezium位点存储文件和数据库schema文件;处理节点失效发现,备用节点激活 dataxwriter数据变更事件允许重复处理,writer需要考虑幂等性附录Dbz引擎配置示例 几个重要的配置 connector。class数据源,mysql,oracle等 offset。storage。file。filename偏移存储文件位置,通常多个CDC作业,需启动多个debezium引擎,文件路径要区分,同样,databases。history。file。filaname数据库schema文件位置 database。server。name每个引擎配置不同的名称,否则出现异常: javax。management。InstanceAlreadyExistsException:debezium。mysql:typeconnectormetrics,contextschemahistory,serverlocalhost