本文作者:李伟ApacheRocketMQCommitter,RocketMQPython客户端项目Owner,ApacheDorisContributor,腾讯云消息队列资深开发工程师,著有《RocketMQ分布式消息中间件(核心原理与最佳实践)》。 01RocketMqueue101 RocketMQ拥有诸多出色的特性: 比如多副本机制,RocketMQ支持存储层的多副本Dledger,它是基于Raft协议的一致性存储库,保证能够从存储层实现多副本; 比如ACL鉴权机制,用于确定哪些producer能生产、哪些消费者组能消费,以及服务端的消息过滤; 比如事务消息,它是RocketMQ实现的生产者事务,生产者向broker发送一条事务消息,由生产者执行本地事务。如果执行成功,则向broker端发送commit事件,消费者才能消费;如果本地事务处理失败,则发送rollback事件,使消费者无法消费该消息。 比如RequestReply,它是类似于同步RPC调用的过程,用户相同的逻辑用消息来实现,能够实现同步RPC调用的过程,可以将调用API和发送消息两套逻辑进行统一。 广播消息指消息发出后,订阅它的所有消费者都能消费到所有实例。负载均衡消费指默认策略下,同一个消费者组的消费者都能平均地消费消息,具体策略可自行调整。RocketMQ支持Pull、Push和Pop三种消费模式,支持java、go、cpp、python、c等多种语言。 搭建RocketMQ集群的流程如下: 第一步:安装NameServer集群。NameServer集群包含一个或多个NameServer节点。启动服务时,默认监听9876端口。NameServer集群搭建好之后,启动一套Broker集群。 第二步:搭建Broker集群,使用经典masterslave部署模式,master提供读写,同时会将数据存储和元数据同步一份到slave。通过10912的HA端口做数据同步。 第三步:写生产者代码生产。生产者集群包含多个生产者实例,通过broker的10911和10909端口向broker发送数据。 第四步:消费者通过10911或10909端口向broker拉取数据。 生产者或消费者实例启动时,会先配置NameServer地址,由生产者或消费者从NameServer集群上拉取topic、Queue和Broker等路由信息,然后根据路由信息发送或拉取消息。 生产者和消费者均与broker之间存在channel连接。如果生产者或消费者长时间没有与broker联系,则broker会将连接剔除。 以下为RocketMQ101相关名词解析: 生产者包含生产者组和生产者实例。生产者组是若干个生产者实例的组合,且RocketMQ希望同一个生产者组内的实例行为一致。消费者组和消费者实例也同理。行为一致指生产者实例都生产同一种类型的消息,比如都生产订单消息,包括创建订单、订单发货、订单删除等步骤。行为一致的好处在于消息的生产和消费比较规整,不会出现混乱。 Topic是消息的分类,为字符串形式,可以通过topic将某集群内的全部消息进行分类,所有topic的消息组成全量的消息。而Tag又属于topic的子分类。 消费者在订阅消息时,必须先指定topic再指定tag,这样的一条记录被称为订阅关系。如果订阅关系不一致,则会导致订阅混乱,发生重复消费或不消费、消息堆积等情况。 Queue类似于分区,但它是逻辑上的概念,并不是物理存储上的概念。Property类似于header,property包含除了主要信息以外的扩展信息,比如消息属于哪个业务ID、发送者IP等。向某个topic发送消息时,能够指定property。 NameServer中包含broker与cluster的关系、Queuetopic与broker的关系,即路由信息。 Broker中包含以下四部分: CommitLog常规的文件存储。RocketMQ发送的数据会append到CommitLog。 Consumerqueue消费者在消费topic时,topic中包含多个queue,每一个queue都被称为consumerqueue,每个消费者对于每个consumerqueue都存在消费进度。 index在dashboard上能够根据key来查询消息。 Dledgercommitlog由Dledger存储库来管理的CommitLog,能够实现多副本。 RocketMQ的生产消费模型十分简单。如上图,TopicA有四个queue,其中queue1、queue2在MasterBroker1上,queue3、queue4在MasterBroker2上。ProducerGroupA下有两个生产者实例,分别向两个broker的queue发送消息。ConsumerGroupA也有两个消费者consume1和consume2。 从四个queue里取消息时,每个消费者默认的策略是依次向queue1、queue2、queue3、queue4循环发消息,以此最大程度地保证消息分布均匀。 消费者的消费模式有负载均衡和广播消费消费两种。 负载均衡策略下,比如共有4条queue,则consumerinstant1和consumerinstant2会分别被分配到2个queue,具体分配到哪两条需由算法决定。 广播消费策略下,假设topic有100条消息,则consumerinstance1和consumerinstance2每一个消费者实例都会消费到100条消息,即同消费者组的每个消费者示例都会消费到全量的消息。 02RocketMQ生态项目 RocketMQ生态项目包含以下几个部分: 客户端:客户端主要分为Java客户端与非Java客户端,其中RocketMQJava客户端是最原生的客户端,与RocketMQ的编写语言一致,功能也最为齐全。 计算:RocketMQ支持轻量级的预计算,比如轻量级的ETL。RocketMQFlink能够直接对接Flink,方便将RocketMQ数据传输到Flink做计算,利用Flink强大的生态同步到下游多种类型的目的地。RocketMQConnect与RocketMQStreams是轻量级的计算框架,功能更简单、轻量,部署运维也更容易。 管控:RocketMQDashboard拥有简单稳定且功能强大的管控端,能够支持常用的运维操作比如修改配置、禁用消费者等。 云原生:RocketMQDocker支持打包RocketMQ源码成为Dockerimage项目,能够支持各种不同平台的打包。RocketMQOperator支持RocketMQ上K8s,能够支持比如重启进程、下发配置、拉起集群等操作。 监控:RocketMQExporter目前能够支持80指标,可直接导入到Prometheus做告警和监控。开源项目可通过Prometheus的数据配置Grafana做大盘,实现监控能力。此外,Prometheus能够支持Hook回调,方便公司用户将RocketMQ指标监控对接到自己的告警平台。 云原生是技术行业的趋势,能够减少成本、方便运维和管理。RocketMQ新版本实现了存储计算分离,支持更快速、更方便地上K8s。EDA事件驱动和无服务也是大势所趋,比如腾讯云的云函数、阿里云的eventbridge等产品都是Serverless、EDA场景,能够直接集成RocketMQ。微服务领域,RocketMQ也提供了诸多原生支持。 电商、金融等传统领域正在进行数字化转型,消息传递、指标、日志传递等需求都能够利用RocketMQ简单快速地实现。 总而言之,RocketMQ能够利用自己强大的生态项目,支持企业各种各样形态的数据传输和计算。 03RocketMQ数据流构建 RocketMQ的数据流构建主要包含消息、CDC数据流、监控数据流以及湖仓数据流。CDC数据主要负责记录记录数据变更,监控数据流包括业务监控和常规监控。 消息的构建如上图所示。 以订单服务为例,订单服务收到创建订单的请求,创建成功后会将订单的基本信息通过RocketMQ发送给B服务。假设B服务为短信服务消费,由B服务向客户发送短信通知,包含订单相关的详细信息。 RocketMQ发送消息至B服务时,通过重试和死信实现最终一致性,以保证消息能够成功发送给消费者。RocketMQ有16次重试机会,且为阶梯性重试,能够持续十几个小时。 RocketMQ支持通过CanalFlinkCDC、RocketMQcollect的方式,将Binlog等数据提供给计算平台,再由RocketMQFlink、RocketMQStreams等进行轻量级的计算。计算完成后,将结果转发给下游数据库比如MySQL、ES、Redis等,进行异构或同构的数据同步。 RocketMQ支持从flume读取日志文件发送至RocketMQ,再通过RocketMQCollect或RocketMQFlink等将日志数据进行消费、ETL转换或发送至ES。ES已与ELK产品打通,可以在Kabana上查看日志。 除了日志,RocketMQ能够在业务系统做后端监控埋点,通过RocketMQclient将监控埋点数据发到RocketMQ,再通过RocketMQFlink或RocketMQStreams消费数据并发送给业务监控平台或数据湖仓库等,生成在线报表或实时报表。 前端监控大部分通过HTTP请求发送至RocketMQ,再通过RocketMQ相关的轻量级计算框架,根据不同诉求将数据汇总至不同的后端,比如ES或自建平台。 所有数据都能入到湖仓,因为所有数据都会有数据分析、数据挖掘或出统计报表的诉求。比如前后端的监控、TP数据库里的业务数据、日志文件的指标数据或日志文件都能通过对应的工具发到RocketMQ,通过RocketMQ提供的轻量级计算工具进行计算,然后发送到下游的Hive、Doris、Clickhouse或Hudi等数据库或数据仓库,产出报表、实时大盘、实时数据表等。 RocketMQ能够采集各种数据,比如metrics、TP数据、log的数据,然后通过RocketMQ提供的轻量级计算工具进行计算,最终汇总到同构异构的数据库、数据仓库或数据湖等。 数据构建流程中,RocketMQ作为中间核心的传输链路,是否能够借助本身的特性避免偶然性的因素影响数据的传输? RocketMQ的架构十分简单,而简单也意味着稳定和可靠。因此,使用RocketMQ做核心数据链路时,其稳定性和可靠性能够避免很多意外,减少不可控因素。 网络抖动往往无法避免,它可能导致数据丢失,而RocketMQ能够通过重试机制保证数据的最终一致。比如消息只发一半时发生了网络抖动,网络恢复如何保证数据最终能够被消费者完整地消费? 默认的消费机制下,RocketMQ有16次重试机会,按阶梯重试,重试间隔逐渐增加,最大限度地让消费者能够消费到数据。如果16次重试后依然没有消费成功,则消息会进入死信队列,由人工介入处理。产生死信消息后,RocketMQ能够产生告警,以快速发现并处理问题。 针对数据丢失,RocketMQ提供了消息轨迹,帮助快速定位,找到问题所在。消费者消息是否成功发送、broker是否存储成功、消费者是否成功消费到等问题,都可以通过消息轨迹进行确认。 针对带宽打满的问题,RocketMQ提供了服务端过滤的功能。假设Topic内是访问日志,将tag设为域名,消费者组可以只订阅某个域名下的访问日志,RocketMQ能够在服务端对消息进行过滤,再发送给消费者组。Broker只会将属于消费者的域名消息发送给消费者,不会发送所有消息,因此能节约大量带宽,可高达8090。 04RocketMQ5。0 RocketMQ5。0架构有两个重大改变,实现了存储计算分离以及轻量级客户端。 存储和计算分离主要为数据层面,将做存储和计算的broker拆分成了存储的broker和计算的broker,两类broker各司其职,分别负责存储和计算。 此前,RocketMQ的客户端生态较为丰富,但各个客户端的功能差异较大,难以实现一致。RocketMQ5。0彻底地解了该问题,实现了轻量级的基于gRPC的多语言客户端。RocketMQ5。0将此前客户端的重逻辑比如rebalance等转移至由Cbroker负责处理,使客户端逻辑变得非常轻量,客户端只剩消息消费或发消息调用接口,各个语言的逻辑容易统一,兼容性更好,不会出现实现方式不同导致逻辑不一致。 RocketMQ5。0除了存储和计算分离以外,还实现了数据面和控制面的拆分。控制面主要负责接入,此前只能通过NameServer的方式接入,而现在除了NameServer以外还提供了一种新的通过LBGroup的方式接入,更简单易用。LBGroup能够方便大家用更简单的方式接入,逻辑集群的接入可以通过LBgroup来实现,比如哪些客户端应该连到哪些集群,这也是NameServer难以实现的能力。一组NameServer会管理一个物理集群,物理集群可拆分为多个逻辑集群,每个逻辑集群能够分给不同的租户使用。 RocketMQ可以看作是一个通道,通道有上游和下游,且不同行业的上下游不一样,通道中的数据也不一样。 互联网已经涉及到每一个领域,但是垂直领域的发展依然非常欠缺。比如配送互联网涉及到交通运输等,需要有交通运输方面的专家与互联网技术进行深度结合,才能对配送行业引起深远广泛的影响。 未来,我们需要技术人员深耕于某一行业,做出真正适用于行业的优秀的互联网产品。 当前,RocketMQ已经能够支持事件和流。工业互联网行业非常重要的一个元素是IoT事件,它可能来自于各种终端设备,每天都会产生大量的、持续的事件,这些大量的数据都需要队列进行传输,提供给下游做计算。 因此,RocketMQ未来的发展将着力于事件和流。 RocketMQ已经推出了RocketMQCollect、RocketMQFlink和RocketMQstreams,在流计算上逐渐发力,形成了一整套完善的生态,能够帮助用户快速构建流式应用。而消息更是RocketMQ的擅长之处,能够帮助用户在不同场景的消息下方便、快速地接入使用。RocketMQ已经开源了MQTT等协议,使接入设备更快速方便。 随着RocketMQ5。0的发布,RocketMQ在处理消息、事件和流上实现了统一,有了越来越强大的优势,存储和计算分离的特性也使其能提供更低的成本,使企业上云更省钱、更省力,也更省人力。