本文作者:孙晓健,ApacheRocketMQCommitter 01RocketMQConnect RocketMQConnect是一款可扩展的在RocketMQ与其他系统之间做流式数据传输的工具,能够轻松将RocketMQ与其他存储技术进行集成。RocketMQConnect使用特定的Source插件类型,将数据发送到RocketMQTopics中,并通过Sink监听Topics将数据写到下游指定数据存储中。使用过程中Connector可以通过JSON方式进行配置,无需编码。数据流转过程从源到目的,通过RocketMQ进行桥接。 RocketMQConnect具有以下特性: 通用性:Connect制定了标准API,包括Connector、Task、Converter、Transform,开发者可以通过标准API扩展自己插件,达到自己需求。 Offset自动管理(断点续传):Source方面用户在开发Connect时,可以通过Offset进行增量数据拉取。系统内部会自动对Offset做管理,会将上次拉取Offset信息进行持久化。下次任务重启时,可以通过上次提交的Offset继续进行数据增量拉取,无需从头进行数据同步;Sink方面基于RocketMQ自身的Offset提交策略,在内部实现了自动提交方式,任务运行时会自动处理,允许用户配置Offset提交间隔;如果系统自带offset已经可以满足需求,则无须另外维护Offset;如果系统自带Offset无法满足需求,则可以通过TaskAPI进行维护。TaskAPI中自带Offset维护能力,可以在Connect中自行决定Offset持久化逻辑,比如持久化到MySQL、Redis中。下次任务启动时,可以自动从Offset存储位点获取下一次执行Offset,继续做增量拉取。 分布式、可扩展、容错:可以分布式的方式进行部署,自带容错能力。Worker宕机或添加Worker时,任务会自动做重新分配、运行,在各集群Worker之间做平衡。任务失败后,也会自动重试。重试完可自动Rebalance到不同Worker机器上。 运维和监控:Connect提供了标准的集群管理功能,包括Connect管理功能以及插件管理功能。可以通过API方式对任务做启停操作,也可以查看任务在运行过程中的运行状态以及异常状态。并且可以进行指标上报,任务在数据拉取与数据写入后,数据总量、数据速率等都可以通过Metrics方式做数据上报。此外,Metrics也提供了标准的上报API,可以基于标准API做指标扩展和上报方式的扩展,比如上报到RocketMQtopic中、Prometheus等。 批流一体:Source在做数据拉取时,可以通过JDBC或指定插件sdk的方式,做批量数据拉取,转换为流方式,也可以使用CDC方式,通过增量快照或类Mysqlbinlog监听方式获取源端全量与增量变更数据,推给RocketMQ,下游可以通过Flink或RocketMQStream进行流式处理做状态计算,也可直接落到数据存储引擎中,如Hudi、Elasticsearch、Mysql等。 Standalone、Distributed模式:Standalone模式主要用于测试环境,Distributed模式主要用于生产环境。在试用过程中可以用Standalone方式做部署,得益于其不会做Config存储,每次启动时都可以带独立任务,帮助调试。 Connect组件包含以下几类:Connector:作为任务协调的高级抽象,描述了Task运行方式以及如何做Task拆分。Task:负责实际数据拉取操作,并负责offset的维护和TaskMetrics数据的收集。Worker:执行Task任务的进程。RecordConverter:在Source与Sink之间做数据转换,Record通过Schema制定数据契约,Schema可以随数据传输,也可以通过RocketMQSchemaRegistry进行远程存储,目前支持了Avro和JSON两种类型的Converter。Transform:数据传输过程中做数据转换。如进行字段变更、类型变更、做空值或已知错误值过滤等;还可以通过扩展groovytransform、pythontransform等脚本对数据进行复杂的转换,亦可做远程调用来进行静态数据的补全或做函数计算。DeadLetterQueue:在数据从Source端到Sink端的过程中,数据Convert转化错误、网络超时、逻辑错误造成写入失败等情况,可以根据自己编写的插件逻辑来决定是将数据写入到错误队列中、或忽略错误继续进行、或出现错误后停止任务等。写入错误队列中的数据,在不计较数据有序的情况下可自助进行异步修复后再写入。Metrics:提高任务运行过程中的可观测性,任务在数据拉取与数据写入时,需要监测任务拉取的数据量、写入数据量、拉取速率、写入速率、差值、内存占用等,都可以通过Metrics进行指标上报,供系统运营和运维使用。 上图为数据在Connect中的流转过程。 分布式部署下,Source与Sink可以在不同Worker中,不相互依赖,一个Connector下可包含Task、Transform、Converter顺序执行。Task负责从源端拉取数据,Task并发数量由自定义插件的分片方式决定。拉取到数据后,若中间配置了数据处理Transform,数据会依次经过配置的一个或者多个Transform后,再将数据传送给Converter,Converter会将数据进行重新组织成可传输的方式,若使用了RocketMQSchemaRegistry,则会进行Schema的校验、注册或升级,经过转换后的数据,最终写入至中间Topic中供下游Sink使用。下游Sink可以选择性的监听一个或者多个Topic,Topic中传输来的数据可以是相同存储引擎中的,也可以是异构存储引擎中的数据,数据在Sink转换后,最终传给流计算引擎或者直接写入到目的存储中。 在转换过程中,SourceConverter与SinkConverter要保持一致。不同的Converter解析的Schema格式会有差异,若Converter不一致,会造成Sink解析数据的失败。不同组件之间的差异化,可以通过自定义Transform来进行兼容。 以上架构具有如下几点优势: 松散架构:Source与Sink之间通过Topic进行解耦,E、T、L不再是一个整体。一般相同存储引擎的数据的读取和写入QPS差距很大,所以一体化的ETL在数据的读取时会受到目标库写入性能的制约。 而RocketMQConnect中的Source和Sink解耦后,可以做Source和Sink两端独立扩缩容,实现数据读取和写入的动态平衡,互不影响。 标准API:降低使用难度,扩展简便,在API中抽象了编写并发的具体方式,插件开发者可自定义拆分。 规范的数据抽象:使用Topic做解耦后,需要在Source和Sink之间建立数据契约。Connect主要通过Schema进行数据约束。以此来支持异构数据源之间的数据集成。 专注数据拷贝:Connect主要专注于与异构数据源之间的数据集成,不做流计算,支持数据拷贝到流(Flink、RocketMQStream)系统中,再做流计算。 轻量:依赖少。如果集群中已有RocketMQ集群,可以直接部署RocketMQConnect做数据同步工作,部署非常简单,无需额外部署调度组件。RocketMQConnect自带任务分配组件,无需额外关注。 另外,依托RocketMQ强大的性能,可以在不同系统之间做大规模数据的迁移。Source主要依赖于RocketMQ的写入能力,无需等待事务尾端数据写入。Sink依托于Topic的扩展能力,可以根据中间Topic的分区数量来决定下游Sink并发度,自动做扩展。任务做完扩展后,系统会对Connector进行重新分配,保证负载均衡,Offset不会丢,可以基于上次运行状态继续向下运行,无需人工干预。也可以依赖RocketMQ的有序策略来做顺序数据的同步。 02RocketMQConnect原理 管理区主要做任务配置变更或查询的接收,包括创建、删除、更新、启停和查看Connector等操作。变更任务后,管理端会将任务提交到RocketMQ共享配置的Topic中。因为每一个Worker都监听了相同Topic,所以每个Worker都能获取Config信息,然后触发集群Rebalance再重新做任务分配,最终达到全局任务平衡。 运行时区主要为已经被分配到当前Worker的Task提供运行空间。包括任务的初始化、数据拉取、Offset维护、任务启停状态上报、Metrics指标上报等。 调度区Connect自带任务分配调度工具,通过hash或一致性hash在Worker间进行任务平衡,主要监听Worker和Connector的变更。比如Worker添加或删除、Connector配置变更、任务启停等。获取状态变更用来更新本地任务状态,并决定是否进行下一轮Rebalance操作,以达到整个集群的负载均衡。 管理端、运行时区与调度区存在每个集群的每个Worker中,集群Worker间通信主要通过共享Topic来进行通知,Worker之间无主节、备节点之分,这让集群运维起来非常的方便,只需要在Broker中建对应共享Topic即可,但由于Task状态变化的动作只会发生在一个Worker中,集群之间共享会存在短暂延迟,所以通过RestApi查询Connector状态时可能会出现短暂不一致的现象。 服务发现过程。有变更时,每一个Worker都可以发现节点变更,实现服务自动发现的效果。 启动新的Worker时,Worker会向依赖的RocketMQTopic注册客户端变更监听。相同的ConsumerGroup,当有新客户端添加时,注册了该事件的客户端会收到变更通知,Worker收到变更事件后,会主动更新当前集群的Worker列表。 当Worker宕机或者缩容时也会产生相同的效果。 RocketMQConnect任务分配流程如下: 通过调用RestAPI方式创建Connector。如果Connector不存在,则自动进行创建,若存在则更新。创建后,会向ConfigTopic发送通知,通知Worker有任务变更。Worker获取任务变更后,再进行重新分配,以达到负载均衡的效果。停止任务也会产生相同的效果,目前每个Worker都会存储全量的任务及状态,但只运行分配给当前Worker的Task。 目前系统默认提供了简单hash或一致性hash两种任务分配模式,建议选择一致性hash模式。因为在一致性hash情况下,做Rebalance时变更比普通hash变更范围小,部分已经被分配好的任务不会再进行负载。 Connector扩展要素分为自定义配置、并发和Task信息。 自定义配置包含连接信息(核心配置项)、Convertor信息、Transform信息等。Connector仅作为任务全局概要和协调器,实际产生效果的依然是分配后的Task。比如1亿数据分为多个任务拉取,分别放在不同Task中执行,因此需要通过Connector去按照合理的逻辑做Task的拆分,这些拆分的操作需要在声明Connector时制定。Connecor将配置拆分后,将实际数据拉取逻辑配置告知Task,Task决定数据拉取的具体方式。 Task扩展要素包括配置初始化、连接开启与关闭、拉取频率、错误处理、实际数据拉取逻辑以及Offset维护。 整个系统中全局Converter转换都使用同一套API,分为两种模式: 本地模式:从SourceConnect拉取到数据后,由Converter做数据转换。转换过程中,本地操作会将Schema与value值合并为Connectrecord向下游传递。下游通过相同Converter再将其转换为Record,推给Sinktask做数据写入。中间通过ConvertSchema做了数据契约,可以在Source与Sink之间转换。本地模式下,Schema与Value作为一个整体传输,数据Body非常臃肿,每一条数据都带有Schema信息。但其优点为不存在版本兼容问题。 远程模式:在数据转换时,会将Schema存到远程RocketMQSchemaRegistry系统中,在数据传输过程中只带Value值,不带Schema约束信息。当Sink订阅Topic时,通过信息头带有的RecordID获取Schema信息、进行Schema校验,校验后再做数据转换。 Schema维护在RocketMQSchemaRegistry系统中。因此在转换过程中可以在系统中手工更新Schema,然后用指定的SchemaID做转换,但是需要在Converter插件中做数据兼容。 ConnectConverter内置了扩展,有本地的JSON、普通数据类型Converter等。如果内置扩展无法满足需求,可以通过RecordConverterAPI自己进行扩展。扩展后,将Converter包置于Worker运行插件目录下,系统即可自动加载。 配置方式分为Key和Value两种。其中Key标注数据的唯一,也可以是Struct结构化数据;Value是真实传输的数据。 Transform是在Connector与Convertor之间做数据映射转换与简单计算的辅助工具。当SourceConverter与SinkConnector在使用过程中达不到业务需求时,可以通过编写Transform插件的方式做数据适配。比如不同业务、不同数据源插件之间的数据转换,如字段映射、字段派生、类型转换、字段补全、复杂函数计算等。 系统中内置的Transform模式有比如字段扩展、替换等。如果不满足需求,可以通过API自行扩展Transform。部署时,只需将编写后的扩展打好包放置对应插件目录下,即可自动加载。 具体配置方式如上图左下方所示,Transform的运行为串行,可以对一个值做多个转换,可以配置多个Transform。需要配置多个Transform的情况下,通过逗号进行分隔,名称不能重复。 SourceTask做数据拉取或变更监听时,例如,通过JDBCMysql方式做数据增量拉取时,需要指定Offset增量拉取的方式,可以通过自增ID或Modifytime的方式。每次数据拉取完成发送成功后,会向Offsetwriter中提交增量信息(id或者modifytime),系统会异步进行持久化。任务下次启动时,会自动获取Offset,从上次位点开始处理数据,达到断点续传的效果。 封装Offset时没有固定模式,可以通过自己的方式拼接Offsetkey或value值,唯一依赖的是RocketMQ中的Connectoffsettopic信息,主要为推送给其他worker做本地Offset更新。如果使用系统的Offset维护,则用户只需要决定维护上报逻辑,无需关注如何保证Offset提交、Offset回滚模式等,一切都由系统保证。 运行过程中,若开启了死信队列,正确的数据会输送到目的端,错误数据会输送到错误队列中。业务方可以通过异步方式做数据处理,但是该种情况下无法保证有序。如果要保证数据有序,需要在触发报错的情况下将Task停止,先进行数据修复,修复后再启动Task。 如果单个Task处理数据报错,只需停止出错的Task,其他Task不受影响。因为每个Task在处理数据时消费的Query不一样,如果指定了Key,会按照Key做数据分区,然后保证分区内每个Query有序,因此单个Task停止不会影响全局有序性。 03RocketMQConnect使用场景 RocketMQConnect能够适用于大部分传统ETL适用的场景。另外,传统ETL无法实现的比如实时流传输、流批一体、快照功能等,RocketMQConnect亦能够实现。 新旧系统迁移场景:业务部升级变更过程中出现了类型变更、表拆分或扩容操作、添加索引的情况下可能导致停机耗时非常久,可以通过RocketMQConnect做数据重新搬迁。 分库分表场景:当前市面上有很多分库分表插件,可以通过Connect适配开源分库分表客户端做分库分表工作,也可以基于RocketMQ自己做分库分表逻辑,源端与目的端不变。数据从单表中取出后,可以在Transform中做分库分表逻辑。可以通过Transform做路由。路由到不同Topic中,在下游可以通过监听不同Topic落到已经分好的库表中。 多活:RocketMQConnect支持集群间Topic及元数据的拷贝,可保证多中心的Offset一致。 数据订阅场景:通过CDC模式做数据监听,向下游做数据通知。供下游做数据订阅以及即时数据更新。同时也可以将数据拉取后通过HTTP的方式直接推送到下游业务系统中,类似于Webhook的方式,但是需要对请求做验权、限流等。 其次,还有数据入仓入湖、冷数据备份、异构数据源数据集成等业务场景都可以通过RocketMQConnect作为数据处理方案 从整体使用场景来看,大致可以分为两部分,数据集成和流式处理。数据集成主要为将数据从一个系统搬到另一个系统,可以在异构数据源中进行数据同步。流式处理主要为将批处理信息通过批量数据拉取,或CDC模式将增量数据同步到对应流处理系统中,做数据聚合、窗口计算等操作,最终再通过Sink写入到存储引擎中。 04RocketMQConnect生态 RocketMQConnect目前对上图中产品均能够提供支持,平台也提供了KafkaConnect插件的适配。