专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

高并发异步解耦利器RocketMQ究竟强在哪里?

  本文带大家从以下几个方面详细了解RocketMQ:RocketMQ如何保证消息存储的可靠性?RocketMQ如何保证消息队列服务的高可用?如何构建一个高可用的RocketMQ双主双从最小集群?RocketMQ消息是如何存储的?RocketMQ是如何保证存取消息的效率的?如何实现基于MessageKey的高效查询?如何实现基于MessageId的高效查询?RocketMQ的Topic在集群中是如何存储的?Broker自动创建Topic会有什么问题?RocketMQ如何保证消息投递的顺序性?RocketMQ如何保证消息消费的顺序性?实现分布式事务的手段有哪些?RocketMQ如何实现事务消息?RocketMQ事务消息是如何存储的?1。RocketMQ技术架构
  RocketMQ的架构主要分为四部分,如下图所示:
  Producer:消息生产者,支持集群方式部署;Consumer:消息消费者,支持集群方式部署,支持pull,push模式获取消息进行消费,支持集群和广播方式消费;NameServer:Topic路由注册中心,类似于Dubbo中的zookeeper,支持Broker的动态注册与发现;提供心跳检测机制,检查Broker是否存活;接收Broker集群的注册信息,作为路由信息的基本数据;NameServier各个实例不相互进行通信,每个NameServer都保存了一份完整的路由信息,这与zookeeper有所区别,不用作复杂的节点数据同步与选主过程;BrokerServer:主要负责消息的存储、投递和查询,以及服务高可用保证。BrokerServer包含以下几个重要的子模块:RemotingModule:整个Broker的实体,负责处理来自clients端的请求;ClientManager:负责管理客户端(ProducerConsumer)和维护Consumer的Topic订阅信息;StoreService:提供方便简单的API接口处理消息存储到物理硬盘和查询功能;HAService:高可用服务,提供MasterBroker和SlaveBroker之间的数据同步功能;IndexService:根据特定的Messagekey对投递到Broker的消息进行索引服务,以提供消息的快速查询。
  2。RocketMQ执行原理
  RocketMQ执行原理如下图所示:
  首先,启动每个NameServer节点,共同构成一个NameServerCluster。NameServer启动后,监听端口,等待Broker、Producer、Consumer的连接;然后启动Broker的主从节点,这个时候Broker会与所有的NameServer建立并保持长连接,定时发送心跳包,把自己的信息(IP端口号)以及存储的所有Topic信息注册到每个NameServer中。这样NameServer集群中就有Topic和Broker的映射关系了;收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic,每个Topic默认会分配4个Queue;启动生产者,这个时候生产者会把信息注册到NameServer中,并且从NameServer获取Broker服务器,Queue等信息;启动消费者,这个时候消费者会把信息注册到NameServer中,并且从NameServer获取Broker服务器,Queue等信息;生产者发送消息到Broker集群中的时候,会从所有的Master节点的对应Topic中选择一个Queue,然后与Queue所在的Broker建立长连接从而向Broker投递消息。消息实际上是存储在了CommitLog文件中,而Queue文件里面存储的实际是消息在CommitLog中的存储位置信息;消费者从Broker集群中消费消息的时候,会通过特定的负载均衡算法,绑定一个消息队列进行消费;消费者会定时(或者kill阶段)把Queue的消费进度offset提交到Broker的consumerOffset。json文件中记录起来;主节点和从节点之间可以是同步或者异步的进行数据复制,相关配置参数:brokerRole,可选值:ASYNCMASTER:异步复制方式(异步双写),生产者写入消息到Master之后,无需等到消息复制到Slave即可返回,消息的复制由旁路线程进行异步复制;SYNCMASTER:同步复制方式(同步双写),生产者写入消息到Master之后,需要等到Slave复制成功才可以返回。如果有多个Slave,只需要有一个Slave复制成功,并成功应答,就算复制成功了。这里是否持久化到磁盘依赖于另一个参数:flushDiskType;SLAVE:从节点3。RocketMQ集群
  本节我们来看看一个双主双从的RocketMQ是如何搭建的。集群配置参数说明:
  在讨论集群前,我们需要了解两个关键的集群配置参数:brokerRole,flushDiskType。brokerRole在前一节已经介绍了,而flushDiskType则是刷盘方式的配置,主要有:
  ASYNCFLUSH:异步刷盘SYNCFLUSH:同步刷盘3。1如何保证消息存储的可靠性?
  brokerRole确定了主从同步是异步的还是同步的,flushDiskType确定了数据刷盘的方式是同步的还是异步的。
  如果业务场景对消息丢失容忍度很低,可以采用SYNCMASTERASYNCFLUSH的方式,这样只有master和slave在刷盘前同时挂掉,消息才会丢失,也就是说即使有一台机器出故障,仍然能保证数据不丢;
  如果业务场景对消息丢失容忍度比较高,则可以采用ASYNCMASTERASYNCFLUSH的方式,这样可以尽可能的提高消息的吞吐量。3。2如何保证消息队列服务的高可用?消费端的高可用
  MasterBroker支持读和写,SlaveBroker只支持读。
  当Master不可用的时候,Consumer会自动切换到Slave进行读,也就是说,当Master节点的机器出现故障后,Consumer仍然可以从Slave节点读取消息,不影响消费端的消费程序。生产端的高可用集群配置参数说明:
  brokerName:broker的名称,需要把Master和Slave节点配置成相同的名称,表示他们的主从关系,相同的brokerName的一组broker,组成一个broker组;brokerId:broker的id,0表示Master节点的id,大于0表示Slave节点的id。
  在RocketMQ中,机器的主从节点关系是提前配置好的,没有类似Kafka的Master动态选主功能。
  如果一个Master宕机了,要让生产端程序继续可以生产消息,您需要部署多个Master节点,组成多个broker组。这样在创建Topic的时候,就可以把Topic的不同消息队列分布在多个broker组中,即使某一个broker组的Master节点不可用了,其他组的Master节点仍然可用,保证了Producer可以继续发送消息。3。3如何构建一个高可用的RocketMQ双主双从最小集群?
  为了尽可能的保证消息不丢失,并且保证生产者和消费者的可用性,我们可以构建一个双主双从的集群,搭建的架构图如下所示:
  部署架构说明:两个Broker组,保证了其中一个Broker组的Master节点挂掉之后,另一个Master节点仍然可以接受某一个Topic的消息投递;主从同步采用SYNCMASTER,保证了生产者写入消息到Master之后,需要等到Slave也复制成功,才返回消息投递成功。这样即使主节点或者从节点挂掉了,也不会导致丢数据;由于主节点有了从节点做备份,所以,落盘策略可以使用ASYNCFLUSH,从而尽可能的提高消息的吞吐量;如果只提供两台服务器,要部署这个集群的情况下,可以把BrokerMaster1和BrokerSlave2部署在一台机器,BrokerMaster2和BrokerSlave1部署在一台机器。关键配置参数
  以下是关键的配置参数:BrokerMaster1NameServer地址namesrvAddr192。168。1。100:9876;192。168。1。101:9876集群名称brokerClusterNameitzhaicomclusterbrokerIP地址brokerIP1192。168。1。100broker通信端口listenPort10911broker名称brokerNamebroker10表示主节点brokerId02点进行消息删除deleteWhen02消息在磁盘上保留48小时fileReservedTime48主从同步复制brokerRoleSYNCMASTER异步刷盘flushDiskTypeASYNCFLUSH自动创建TopicautoCreateTopicEnabletrue消息存储根目录storePathRootDirdatarocketmqstoremBrokerSlave1NameServer地址namesrvAddr192。168。1。100:9876;192。168。1。101:9876集群名称brokerClusterNameitzhaicomclusterbrokerIP地址brokerIP1192。168。1。101broker通信端口listenPort10911broker名称brokerNamebroker1非0表示从节点brokerId12点进行消息删除deleteWhen02消息在磁盘上保留48小时fileReservedTime48从节点brokerRoleSLAVE异步刷盘flushDiskTypeASYNCFLUSH自动创建TopicautoCreateTopicEnabletrue消息存储根目录storePathRootDirdatarocketmqstoresBrokerMaster2NameServer地址namesrvAddr192。168。1。100:9876;192。168。1。101:9876集群名称brokerClusterNameitzhaicomclusterbrokerIP地址brokerIP1192。168。1。102broker通信端口listenPort10911broker名称brokerNamebroker20表示主节点brokerId02点进行消息删除deleteWhen02消息在磁盘上保留48小时fileReservedTime48主从同步复制brokerRoleSYNCMASTER异步刷盘flushDiskTypeASYNCFLUSH自动创建TopicautoCreateTopicEnabletrue消息存储根目录storePathRootDirdatarocketmqstoremBrokerSlave2NameServer地址namesrvAddr192。168。1。100:9876;192。168。1。101:9876集群名称brokerClusterNameitzhaicomclusterbrokerIP地址brokerIP1192。168。1。103broker通信端口listenPort10911broker名称brokerNamebroker2非0表示从节点brokerId12点进行消息删除deleteWhen02消息在磁盘上保留48小时fileReservedTime48从节点brokerRoleSLAVE异步刷盘flushDiskTypeASYNCFLUSH自动创建TopicautoCreateTopicEnabletrue消息存储根目录storePathRootDirdatarocketmqstores
  写了那么多顶层架构图,不写写底层内幕,就不是IT宅(itzhai。com)的文章风格,接下来,我们就来看看底层存储架构。4。RocketMQ存储架构
  我们在broker。conf文件中配置了消息存储的根目录:消息存储根目录storePathRootDirdatarocketmqstorem
  进入这个目录,我们可以发现如下的目录结构:
  其中:abort:该文件在broker启动时创建,关闭时删除,如果broker异常退出,则文件会存在,在下次启动时会走修复流程;checkpoint:检查点,主要存放以下内容:physicMsgTimestamp:commitlog文件最后一次落盘时间;logicsMsgTimestamp:consumequeue最后一次落盘时间;indexMsgTimestamp:索引文件最后一次落盘时间;commitlog:存放消息的完整内容,所有的topic消息都会通过文件追加的形式写入到该文件中;config:消息队列的配置文件,包括了topic配置,消费的偏移量等信息。其中consumerOffset。json文件存放消息队列消费的进度;consumequeue:topic的逻辑队列,在消息存放到commitlog之后,会把消息的存放位置记录到这里,只有记录到这里的消息,才能被消费者消费;index:消息索引文件,通过MessageKey查询消息时,是通过该文件进行检索查询的。4。1RocketMQ消息是如何存储的
  下面我们来看看关键的commitlog以及consumequeue:
  消息投递到Broker之后,是先把实际的消息内容存放到CommitLog中的,然后再把消息写入到对应主题的ConsumeQueue中。其中:
  CommitLog:消息的物理存储文件,存储实际的消息内容。每个Broker上面的CommitLog被该Broker上所有的ConsumeQueue共享。
  单个文件大小默认为1G,文件名长度为20位,左边补零,剩余为起始偏移量。预分配好空间,消息顺序写入日志文件。当文件满了,则写入下一个文件,下一个文件的文件名基于文件第一条消息的偏移量进行命名;
  ConsumeQueue:消息的逻辑队列,相当于CommitLog的索引文件。RocketMQ是基于Topic主题订阅模式实现的,每个Topic下会创建若干个逻辑上的消息队列ConsumeQueue,在消息写入到CommitLog之后,通过Broker的后台服务线程(ReputMessageService)不停地分发请求并异步构建ConsumeQueue和IndexFile(索引文件,后面介绍),然后把每个ConsumeQueue需要的消息记录到各个ConsumeQueue中。
  ConsumeQueue主要记录8个字节的commitLogOffset(消息在CommitLog中的物理偏移量),4个字节的msgSize(消息大小),8个字节的TagHashcode,每个元素固定20个字节。
  ConsumeQueue相当于CommitLog文件的索引,可以通过ConsumeQueue快速从很大的CommitLog文件中快速定位到需要的消息。CONSUMEQUEUE的存储结构
  主题消息队列:在consumequeue目录下,按照topic的维度存储消息队列。
  重试消息队列:如果topic中的消息消费失败,则会把消息发到重试队列,重新队列按照消费端的GroupName来分组,命名规则:RETRYConsumerGroupName
  死信消息队列:如果topic中的消息消费失败,并且超过了指定重试次数之后,则会把消息发到死信队列,死信队列按照消费端的GroupName来分组,命名规则:DLQConsumerGroupName
  假设我们现在有一个topic:itzhaitest,消费分组:itzhaiconsumergroup,当消息消费失败之后,我们查看consumequeue目录,会发现多处了一个重试队列:
  我们可以在RocketMQ的控制台看到这个重试消息队列的主题和消息:
  如果一直重试失败,达到一定次数之后(默认是16次,重试时间:1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h),就会把消息投递到死信队列:
  4。2RocketMQ是如何保证存取消息的效率的4。2。1如何保证高效写
  每条消息的长度是不固定的,为了提高写入的效率,RocketMQ预先分配好1G空间的CommitLog文件,采用顺序写的方式写入消息,大大的提高写入的速度。
  RocketMQ中消息刷盘主要可以分为同步刷盘和异步刷盘两种,通过flushDiskType参数进行配置。如果需要提高写消息的效率,降低延迟,提高MQ的性能和吞吐量,并且不要求消息数据存储的高可靠性,可以把刷盘策略设置为异步刷盘。4。2。2如何保证高效读
  为了提高读取的效率,RocketMQ使用ConsumeQueue作为消费消息的索引,使用IndexFile作为基于消息key的查询的索引。下面来详细介绍下。4。2。2。1ConsumeQueue
  读取消息是随机读的,为此,RocketMQ专门建立了ConsumeQueue索引文件,每次先从ConsumeQueue中获取需要的消息的地址,消息大小,然后从CommitLog文件中根据地址直接读取消息内容。在读取消息内容的过程中,也尽量利用到了操作系统的页缓存机制,进一步加速读取速度。
  ConsumeQueue由于每个元素大小是固定的,因此可以像访问数组一样访问每个消息元素。并且占用空间很小,大部分的ConsumeQueue能够被全部载入内存,所以这个索引查找的速度很快。每个ConsumeQueue文件由30w个元素组成,占用空间在6M以内。每个文件默认大小为600万个字节,当一个ConsumeQueue类型的文件写满之后,则写入下一个文件。4。2。2。2IndexFile为什么按照MessageKey查询效率高?
  我们在RocketMQ的store目录中可以发现有一个index目录,这个是一个用于辅助提高查询消息效率的索引文件。通过该索引文件实现基于消息key来查询消息的功能。物理存储结构
  IndexFile索引文件物理存储结构如下图所示:
  Header:索引头文件,40bytes,包含以下信息:beginTimestamp:索引文件中第一个索引消息存入Broker的时间戳;endTimestamp:索引文件中最后一个索引消息存入Broker的时间戳beginPHYOffset:索引文件中第一个索引消息在CommitLog中的偏移量;endPhyOffset:索引文件中最后一个索引消息在CommitLog中的偏移量;hashSlotCount:构建索引使用的slot数量;indexCount:索引的总数;SlotTable:槽位表,类似于Redis的Slot,或者哈希表的key,使用消息的key的hashcode与slotNum取模可以得到具体的槽的位置。每个槽位占4bytes,一个IndexFile可以存储500w个slot;IndexLinkedList:消息的索引内容,如果哈希取模后发生槽位碰撞,则构建成链表,一个IndexFile可以存储2000w个索引:KeyHash:消息的哈希值;CommitLogOffset:消息在CommitLog中的偏移量;Timestamp:消息存储的时间戳;NextIndexOffset:下一个索引的位置,如果消息取模后发生槽位槽位碰撞,则通过此字段把碰撞的消息构成链表。
  每个IndexFile文件的大小:40b4b500000020b20000000420000040b,约为400M。逻辑存储结构
  IndexFile索引文件的逻辑存储结构如下图所示:
  IndexFile逻辑上是基于哈希表来实现的,SlotTable为哈希键,IndexLinkedList中存储的为哈希值。4。2。2。3为什么按照MessageId查询效率高?
  RocketMQ中的MessageId的长度总共有16字节,其中包含了:消息存储主机地址(IP地址和端口),消息CommitLogoffset。
  按照MessageId查询消息的流程:Client端从MessageId中解析出Broker的地址(IP地址和端口)和CommitLog的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEWMESSAGEBYID)。Broker端走的是QueryMessageProcessor,读取消息的过程用其中的commitLogoffset和size去commitLog中找到真正的记录并解析成一个完整的消息返回。4。3RocketMQ集群是如何做数据分区的?
  我们继续看看在集群模式下,RocketMQ的Topic数据是如何做分区的。IT宅(itzhai。com)提醒大家,实践出真知。这里我们部署两个Master节点:
  4。3。1ROCKETMQ的TOPIC在集群中是如何存储的
  我们通过手动配置每个Broker中的Topic,以及ConsumeQueue数量,来实现Topic的数据分片,如,我们到集群中手动配置这样的Topic:brokera创建itzhaicomtest1,4个队列;brokerb创建itzhaicomtest1,2个队列。
  创建完成之后,Topic分片集群分布如下:
  即:
  可以发现,RocketMQ是把Topic分片存储到各个Broker节点中,然后在把Broker节点中的Topic继续分片为若干等分的ConsumeQueue,从而提高消息的吞吐量。ConsumeQueue是作为负载均衡资源分配的基本单元。
  这样把Topic的消息分区到了不同的Broker上,从而增加了消息队列的数量,从而能够支持更块的并发消费速度(只要有足够的消费者)。4。3。2BROKER自动创建TOPIC会有什么问题?
  假设设置为通过Broker自动创建Topic(autoCreateTopicEnabletrue),并且Producer端设置Topic消息队列数量设置为4,也就是默认值:producer。setDefaultTopicQueueNums(4);
  尝试往一个新的topicitzhaitestqueue1连续发送10条消息,发送完毕之后,查看Topic状态:
  我们可以发现,在两个broker上面都创建了itzhaitestqueuea,并且每个broker上的消息队列数量都为4。怎么回事,我配置的明明是期望创建4个队列,为什么加起来会变成了8个?如下图所示:
  由于时间关系,本文我们不会带大家从源码方面去解读为啥会出现这种情况,接下来我们通过一种更加直观的方式来验证下这个问题:继续做实验。
  我们继续尝试往一个新的topicitzhaitestqueue10发送1条消息,注意,这一次不做并发发送了,只发送一条,发送完毕之后,查看Topic状态:
  可以发现,这次创建的消息队列数量又是对的了,并且都是在brokera上面创建的。接下来,无论怎么并发发送消息,消息队列的数量都不会继续增加了。
  其实这也是并发请求Broker,触发自动创建Topic的bug。
  为了更加严格的管理Topic的创建和分片配置,一般在生产环境都是配置为手动创建Topic,通过提交运维工单申请创建Topic以及Topic的数据分配。
  接下来我们来看看RocketMQ的特性。更多其他技术的底层架构内幕分析,请访问我的博客IT宅(itzhai。com)或者关注Java架构杂谈公众号。5。RocketMQ特性5。1生产端5。1。1消息发布
  RocketMQ中定义了如下三种消息通信的方式:publicenumCommunicationMode{SYNC,ASYNC,ONEWAY,}SYNC:同步发送,生产端会阻塞等待发送结果;应用场景:这种方式应用场景非常广泛,如重要业务事件通知。ASYNC:异步发送,生产端调用发送API之后,立刻返回,在拿到Broker的响应结果后,触发对应的SendCallback回调;应用场景:一般用于链路耗时较长,对RT较为敏感的业务场景;ONEWAY:单向发送,发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别;应用场景:适用于耗时非常短,对可靠性要求不高的场景,如日志收集。
  SYNC和ASYNC关注发送结果,ONEWAY不关注发送结果。发送结果如下:publicenumSendStatus{SENDOK,FLUSHDISKTIMEOUT,FLUSHSLAVETIMEOUT,SLAVENOTAVAILABLE,}SENDOK:消息发送成功。SENDOK并不意味着投递是可靠的,要确保消息不丢失,需要开启SYNCMASTER同步或者SYNCFLUSH同步写;FLUSHDISKTIMEOUT:消息发送成功,但是刷盘超时。如果Broker的flushDiskTypeSYNCFLUSH,并且5秒内没有完成消息的刷盘,则会返回这个状态;FLUSHSLAVETIMEOUT:消息发送成功,但是服务器同步到Slave时超时。如果Broker的brokerRoleSYNCMASTER,并且5秒内没有完成同步,则会返回这个状态;SLAVENOTAVAILABLE:消息发送成功,但是无可用的Slave节点。如果Broker的brokerRoleSYNCMASTER,但是没有发现SLAVE节点或者SLAVE节点挂掉了,那么会返回这个状态。源码内容更精彩,欢迎大家进一步阅读源码详细了解消息发送的内幕:
  同步发送:org。apache。rocketmq。client。producer。DefaultMQProducersend(org。apache。rocketmq。common。message。Message)异步发送:org。apache。rocketmq。client。producer。DefaultMQProducersend(org。apache。rocketmq。common。message。Message,org。apache。rocketmq。client。producer。SendCallback)单向发送:org。apache。rocketmq。client。producer。DefaultMQProducersendOneway(org。apache。rocketmq。common。message。Message)5。1。2顺序消费
  消息的有序性指的是一类消息消费的时候,可以按照发送顺序来消费,比如:在Java架构杂谈茶餐厅吃饭产生的消息:进入餐厅、点餐、下单、上菜、付款,消息要按照这个顺序消费才有意义,但是多个顾客产生的消息是可以并行消费的。顺序消费又分为全局顺序消费和分区顺序消费:全局顺序:同一个Topic下的消息,所有消息按照严格的FIFO顺序进行发布和消费。适用于:性能要求不高,所有消息严格按照FIFO进行发布和消费的场景;分区顺序:同一个Topic下,根据消息的特定业务ID进行shardingkey分区,同一个分区内的消息按照严格的FIFO顺序进行发布和消费。适用于:性能要求高,在同一个分区中严格按照FIFO进行发布和消费的场景。
  一般情况下,生产者是会以轮训的方式把消息发送到Topic的消息队列中的:
  在同一个Queue里面,消息的顺序性是可以得到保证的,但是如果一个Topic有多个Queue,以轮训的方式投递消息,那么就会导致消息乱序了。
  为了保证消息的顺序性,需要把保持顺序性的消息投递到同一个Queue中。5。1。2。1如何保证消息投递的顺序性
  RocketMQ提供了MessageQueueSelector接口,可以用来实现自定义的选择投递的消息队列的算法:for(inti0;iorderList。size();i){StringcontentHelloitzhai。com。Java架构杂谈,newDate();MessagemsgnewMessage(topicitzhaicom,tags〔itags。length〕,KEYi,content。getBytes(RemotingHelper。DEFAULTCHARSET));SendResultsendResultproducer。send(msg,newMessageQueueSelector(){OverridepublicMessageQueueselect(ListMessageQueuemqs,Messagemsg,Objectarg){LongorderId(Long)arg;订单号与消息队列个数取模,保证让同一个订单号的消息落入同一个消息队列longindexorderIdmqs。size();returnmqs。get((int)index);}},orderList。get(i)。getOrderId());System。out。printf(content:s,sendResult:sn,content,sendResult);}
  如上图,我们实现了MessageQueueSelector接口,并在实现的select方法里面,指定了选择消息队列的算法:订单号与消息队列个数取模,保证让同一个订单号的消息落入同一个消息队列:
  有个异常场景需要考虑:假设某一个Master节点挂掉了,导致Topic的消息队列数量发生了变化,那么继续使用以上的选择算法,就会导致在这个过程中同一个订单的消息会分散到不同的消息队列里面,最终导致消息不能顺序消费。
  为了避免这种情况,只能选择牺牲failover特性了。
  现在投递到消息队列中的消息保证了顺序,那如何保证消费也是顺序的呢?5。1。2。2如何保证消息消费的顺序性?
  RocketMQ中提供了MessageListenerOrderly,该对象用于有顺序收异步传递的消息,一个队列对应一个消费线程,使用方法如下:consumer。registerMessageListener(newMessageListenerOrderly(){消费次数,用于辅助模拟各种消费结果AtomicLongconsumeTimesnewAtomicLong(0);OverridepublicConsumeOrderlyStatusconsumeMessage(ListMessageExtmsgs,ConsumeOrderlyContextcontext){context。setAutoCommit(true);System。out。printf(sReceiveNewMessages:sn,Thread。currentThread()。getName(),msgs);this。consumeTimes。incrementAndGet();if((this。consumeTimes。get()2)0){returnConsumeOrderlyStatus。SUCCESS;}elseif((this。consumeTimes。get()3)0){returnConsumeOrderlyStatus。ROLLBACK;}elseif((this。consumeTimes。get()4)0){returnConsumeOrderlyStatus。COMMIT;}elseif((this。consumeTimes。get()5)0){context。setSuspendCurrentQueueTimeMillis(3000);returnConsumeOrderlyStatus。SUSPENDCURRENTQUEUEAMOMENT;}returnConsumeOrderlyStatus。SUCCESS;}});
  如果您使用的是MessageListenerConcurrently,表示并发消费,为了保证消息消费的顺序性,需要设置为单线程模式。
  使用MessageListenerOrderly的问题:如果遇到某条消息消费失败,并且无法跳过,那么消息队列的消费进度就会停滞。5。1。3延迟队列(定时消息)
  定时消费是指消息发送到Broker之后不会立即被消费,而是等待特定的时间之后才投递到Topic中。定时消息会暂存在名为SCHEDULETOPICXXXX的topic中,并根据delayTimeLevel存入特定的queue,queueIddelayTimeLevel1,一个queue只存相同延迟的消息,保证具有相同延迟的消息能够顺序消费。比如,我们设置1秒后把消息投递到topicitzhaicomtopic,则存储的文件目录如下所示:
  Broker会调度地消费SCHEDULETOPICXXXX,将消息写入真实的topic。
  定时消息的副作用:定时消息会在第一次写入Topic和调度写入实际的topic都会进行计数,因此发送数量,tps都会变高。
  使用延迟队列的场景:提交了订单之后,如果等待超过约定的时间还未支付,则把订单设置为超时状态。
  RocketMQ提供了以下几个固定的延迟级别:publicclassMessageStoreConfig{。。。10个level,level:118privateStringmessageDelayLevel1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h;。。。}
  level0表示不使用延迟消息。
  另外,消息消费失败也会进入延迟队列,消息发送时间与设置的延迟级别和重试次数有关。
  以下是发送延迟消息的代码:publicclassScheduledMessageProducer{publicstaticvoidmain(String〔〕args)throwsException{DefaultMQProducerproducernewDefaultMQProducer(TestProducerGroup);producer。start();inttotalMessagesToSend100;for(inti0;itotalMessagesToSend;i){MessagemessagenewMessage(TestTopic,(Helloscheduledmessagei)。getBytes());指定该消息在10秒后被消费者消费message。setDelayTimeLevel(3);producer。send(message);}producer。shutdown();}}5。1。4数据完整性与事务消息
  通过消息对系统进行解耦之后,势必会遇到分布式系统数据完整性的问题。5。1。4。1实现分布式事务的手段有哪些?
  我们可以通过以下手段解决分布式系统数据最终一致性问题:数据库层面的2PC(Twophasecommitprotocol),二阶段提交,同步阻塞,效率低下,存在协调者单点故障问题,极端情况下存在数据不一致的风险。对应技术上的XA、JTAJTS。这是分布式环境下事务处理的典型模式;数据库层面的3PC,三阶段提交,引入了参与者超时机制,增加了预提交阶段,使得故障恢复之后协调者的决策复杂度降低,但整体的交互过程变得更长了,性能有所下降,仍旧会存在数据不一致的问题;业务层面的TCC,TryConfirmCancel。对业务的侵入较大,和业务紧耦合,对于每一个操作都需要定义三个动作分别对应:TryConfirmCancel,将资源层的两阶段提交协议转换到业务层,成为业务模型中的一部分;本地消息表;事务消息;
  RocketMQ事务消息(TransactionalMessage)则是通过事务消息来实现分布式事务的最终一致性。下面看看RocketMQ是如何实现事务消息的。5。1。4。2RocketMQ如何实现事务消息?
  如下图:
  事务消息有两个流程:事务消息发送及提交:发送half消息;服务端响应half消息写入结果;根据half消息的发送结果执行本地事务。如果发送失败,此时half消息对业务不可见,本地事务不执行;根据本地事务状态执行Commit或者Rollback。Commit操作会触发生成ConsumeQueue索引,此时消息对消费者可见;补偿流程:
  5。对于没有CommitRollback的事务消息,会处于pending状态,这对这些消息,MQServer发起一次回查;
  6。Producer收到回查消息,检查回查消息对应的本地事务的转塔体;
  7。根据本地事务状态,重新执行Commit或者Rollback。
  补偿阶段主要用于解决消息的Commit或者Rollback发生超时或者失败的情况。
  half消息:并不是发送了一半的消息,而是指消息已经发送到了MQServer,但是该消息未收到生产者的二次确认,此时该消息暂时不能投递到具体的ConsumeQueue中,这种状态的消息称为half消息。5。1。4。3RocketMQ事务消息是如何存储的?
  发送到MQServer的half消息对消费者是不可见的,为此,RocketMQ会先把half消息的Topic和Queue信息存储到消息的属性中,然后把该half消息投递到一个专门的处理事务消息的队列中:RMQSYSTRANSHALFTOPIC,由于消费者没有订阅该Topic,所以无法消息half类型的消息。
  生产者执行Commithalf消息的时候,会存储一条专门的Op消息,用于标识事务消息已确定的状态,如果一条事务消息还没有对应的Op消息,说明这个事务的状态还无法确定。RocketMQ会开启一个定时任务,对于pending状态的消息,会先向生产者发送回查事务状态请求,根据事务状态来决定是否提交或者回滚消息。
  当消息被标记为Commit状态之后,会把half消息的Topic和Queue相关属性还原为原来的值,最终构建实际的消费索引(ConsumeQueue)。RocketMQ并不会无休止的尝试消息事务状态回查,默认查找15次,超过了15次还是无法获取事务状态,RocketMQ默认回滚该消息。并打印错误日志,可以通过重写AbstractTransactionalMessageCheckListener类修改这个行为。
  可以通过Broker的配置参数:transactionCheckMax来修改此值。5。1。5消息重投
  如果消息发布方式是同步发送会重投,如果是异步发送会重试。
  消息重投可以尽可能保证消息投递成功,但是可能会造成消息重复。
  什么情况会造成重复消费消息?出现消息量大,网络抖动的时候;生产者主动重发;消费负载发生变化。
  可以使用的消息重试策略:retryTimesWhenSendFailed:设置同步发送失败的重投次数,默认为2。所以生产者最多会尝试发送retryTimesWhenSendFailed1次。为了最大程度保证消息不丢失,重投的时候会尝试向其他broker发送消息;超过重投次数,抛出异常,让客户端自行处理;触发重投的异常:RemotingException、MQClientException和部分MQBrokerException;retryTimesWhenSendAsyncFailed:设置异步发送失败重试次数,异步重试不会选择其他Broker,不保证消息不丢失;retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SENDOK),是否尝试发送到其他broker,默认false。重要的消息可以开启此选项。
  oneway发布方式不支持重投。5。1。6批量消息
  为了提高系统的吞吐量,提高发送效率,可以使用批量发送消息。
  批量发送消息的限制:同一批批量消息的topic,waitStoreMsgOK属性必须保持一致;批量消息不支持延迟队列;批量消息一次课发送的上限是4MB。
  发送批量消息的例子:Stringtopicitzhaitesttopic;ListMessagemessagesnewArrayList();messages。add(newMessage(topic,TagA,OrderID001,Helloworlditzhai。com0。getBytes()));messages。add(newMessage(topic,TagA,OrderID002,Helloworlditzhai。com1。getBytes()));messages。add(newMessage(topic,TagA,OrderID003,Helloworlditzhai。com2。getBytes()));producer。send(messages);
  如果发送的消息比较多,会增加复杂性,为此,可以对大消息进行拆分。以下是拆分的例子:publicclassListSplitterimplementsIteratorListMessage{限制最大大小privatefinalintSIZELIMIT102410244;privatefinalListMessagemessages;privateintcurrIndex;publicListSplitter(ListMessagemessages){this。messagesmessages;}OverridepublicbooleanhasNext(){returncurrIndexmessages。size();}OverridepublicListMessagenext(){intstartIndexgetStartIndex();intnextIndexstartIndex;inttotalSize0;for(;nextIndexmessages。size();nextIndex){Messagemessagemessages。get(nextIndex);inttmpSizecalcMessageSize(message);if(tmpSizetotalSizeSIZELIMIT){break;}else{totalSizetmpSize;}}ListMessagesubListmessages。subList(startIndex,nextIndex);currIndexnextIndex;returnsubList;}privateintgetStartIndex(){MessagecurrMessagemessages。get(currIndex);inttmpSizecalcMessageSize(currMessage);while(tmpSizeSIZELIMIT){currIndex1;Messagemessagemessages。get(curIndex);tmpSizecalcMessageSize(message);}returncurrIndex;}privateintcalcMessageSize(Messagemessage){inttmpSizemessage。getTopic()。length()message。getBody()。length();MapString,Stringpropertiesmessage。getProperties();for(Map。EntryString,Stringentry:properties。entrySet()){tmpSizeentry。getKey()。length()entry。getValue()。length();}tmpSizetmpSize20;Increasethelogoverheadby20bytesreturntmpSize;}}thenyoucouldsplitthelargelistintosmallones:ListSplittersplitternewListSplitter(messages);while(splitter。hasNext()){try{ListMessagelistItemsplitter。next();producer。send(listItem);}catch(Exceptione){e。printStackTrace();handletheerror}}5。1。7消息过滤
  RocketMQ的消费者可以根据Tag进行消息过滤来获取自己感兴趣的消息,也支持自定义属性过滤。
  Tags是Topic下的次级消息类型二级类型(注:Tags也支持TagATagB这样的表达式),可以在同一个Topic下基于Tags进行消息过滤。
  消息过滤是在Broker端实现的,减少了对Consumer无用消息的网络传输,缺点是增加了Broker负担,实现相对复杂。5。2消费端5。2。1消费模型
  消费端有两周消费模型:集群消费和广播消费。集群消费
  集群消费模式下,相同ConsumerGroup的每个Consumer实例平均分摊消息。广播消费
  广播消费模式下,相同ConsumerGroup的每个Consumer实例都接收全量的消息。5。2。2消息重试
  RocketMQ会为每个消费组都设置一个Topic名称为RETRYconsumerGroupName的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
  考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。
  RocketMQ对于重试消息的处理是先保存至Topic名称为SCHEDULETOPICXXXX的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至RETRYconsumerGroupName的重试队列中。
  比如,我们设置1秒后把消息投递到topicitzhaicomtopic,则存储的文件目录如下所示:
  5。2。3死信队列
  当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
  RocketMQ将这种正常情况下无法被消费的消息称为死信消息(DeadLetterMessage),将存储死信消息的特殊队列称为死信队列(DeadLetterQueue)。
  在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
  由于RocketMQ是使用Java写的,所以它的代码特别适合拿来阅读消遣,我们继续来看看RocketMQ的源码结构。。。
  不不,还是算了,一下子又到周末晚上了,时间差不多了,今天就写到这里了。有空再聊。
  我精心整理了一份Redis宝典给大家,涵盖了Redis的方方面面,面试官懂的里面有,面试官不懂的里面也有,有了它,不怕面试官连环问,就怕面试官一上来就问你Redis的RedoLog是干啥的?毕竟这种问题我也不会。
  在Java架构杂谈公众号发送Redis关键字获取pdf文件:

奋斗时光记忆跑腿1年助村民解决证明困境开栏语随着粤港澳大湾区战略的深入实施以及市直管镇体制改革的不断深化,水乡迎来了极为难得的发展机遇。市第十五次党代会提出,要把水乡功能区建设成富有水乡特色的高质量统筹发展示范区。水乡心想事成比褚晓羽进监狱还可怕的事情,终究还是来了心想事成比褚晓羽进监狱还可怕的事情,终究还是来了本文由娱乐乐可不可原创,如果你们喜欢的话,欢迎点赞和关注哦!电视剧心想事成是由毛晓彤张俪李泽锋主演的,讲述在一个普通的家庭中,两个性全国雪车及钢架雪车锦标赛,河北运动员李纯键夺得两金河北新闻网讯(河北日报记者王伟宏)从河北省体育局冬季运动中心获悉,近日在国家雪车雪橇中心雪游龙进行的20222023赛季全国雪车及钢架雪车锦标赛上,河北运动员李纯键搭档国家队队友,中国拳手吕斌夺得WBA超蝇量级国际金腰带3月16日,吕斌(中)在获胜后庆祝。新华社记者侯昭康摄当日,WBA(世界拳击协会)职业拳王争霸赛在浙江永康体育中心体育馆举行,中国拳手吕斌首回合击倒泰国人甘萨,夺得WBA超蝇量级国兰州中川国际机场夏秋换季焕新启航中国甘肃网3月16日讯(本网记者宋芳科通讯员杜鹏)2023年3月26日起,兰州中川国际机场将正式执行夏秋航班计划。为应对航班换季,兰州中川国际机场以东航春秋航青岛航国航南航等公司为巴赫穆特可能是个巨大的陷阱!真正目的在于榨干乌克兰军队主力文西風图片源自网络自去年年中瓦格纳雇佣军接战巴赫穆特到现在七八个月时间,每周都能听到各种瓦格纳和俄罗斯正规军不和的传闻,瓦格纳快顶不住了,俄罗斯要溃败的消息,但事实却是略显笨重的俄微信键盘iOS内测版1。0。5(5)更新支持中译英日韩IT之家3月25日消息,据IT之家网友反馈,微信键盘iOS测试版近期迎来了1。0。5(5)更新,本次更新后,支持3种语言的边写边译中译英日韩,支持切换繁体输入,体验优化和问题修复。微博之夜尴尬同框迪丽热巴杨紫全程无交流,杨紫被嘲像小跟班微博之夜贡献了太多精彩的瞬间,譬如胡歌刘亦菲的组合仙剑3的世纪同框等,可也有一些尴尬的同框,比如杨紫和迪丽热巴。杨紫和迪丽热巴同为90花top,双方粉丝本就不睦,这次两人罕见同框领黄轩资源翻了一倍,原来是答应做蒋雯丽情人后,两人各取所需蒋雯丽不仅要求加100多场和黄轩的姐弟恋戏,还让经纪人签了黄轩,二人在白相中再次饰演情侣,剧里两人的激情戏也很多,再次来了一场轰轰烈烈的姐弟恋。彼时蒋雯丽54岁黄轩27岁,两人看似从一杯咖啡中品味非长气质红网时刻新闻记者胡芳长沙报道当非洲咖啡遇上网红长沙,会碰撞出怎样的火花?2021年诞生于湖南长沙,以非洲咖啡为特色的咖啡连锁品牌小咖主,美式咖啡仅8元一杯。3月20日,2023中非姚晨亮相微博之夜黑纱鱼尾裙展露低调大气女王范儿今日,演员姚晨出席2022微博之夜,一袭黑纱深v鱼尾裙将她大方沉稳的气质很好的衬托开来,黑发红唇的利落标配也让大姚整个人都气场全开。没有过多繁杂的设计和点缀,简约大气的黑纱长裙造型
旅游地理旅游地理1。旅游资源及其特征2019年全国卷,421。英国康沃尔郡在一个废弃的矿山上开发了伊甸园项目。该项目主体是温宣,由8个充满未来主义艺术风格的巨大蜂巢式穹顶建筑构成。穹顶建筑腾讯市值凭什么重返世界前10?在今天的时候数据显示,周三的股价大涨让腾讯控股公司超越了石油巨头埃克森美孚,在6个月后重新进入了世界前十大最具价值的公司行列,位居第十位。此时的腾讯市值3。34万亿港币,约合美元4单小二上线IM在线客服,让服务连接更高效客户在售后报修时遇到困难不能第一时间联系到客服,客户售后体验差怎么办?客户无法即时联系到客服导致商机流失怎么办?对此,单小二全新上线IM在线客服功能,让客户第一时间联系客服,在帮助百度李彦宏我们的研发强度比腾讯高占总收入的15CNMO新闻目前,百度掌握着国内最大的搜索引擎,在很多领域都有非常强大的影响力。与此同时,百度在研发投入方面也一直都没有松懈,甚至还在人工智能自动驾驶等领域走到了行业前列。根据相关二硒化钼复合材料弥补锂硫电池不足近期,扬州大学研究者设计出了一种包埋于多孔氮掺杂碳(NC)纳米骨架中的多界面二硒化钼复合材料二硒化钼二硒化钴异质结构(MoSe2CoSe2NC),并以其作为锂硫电池(LSBs)的多高能技术护航,坦克500PHEV亮相广州车展2022年12月30日,以新科技,新生活为主题的2022年广州国际车展,在中国进出口商品交易会琶洲展馆正式开幕,受到了行业和广大消费者的广泛关注。其中,坦克品牌携坦克500PHEV下好全省一盘棋,助力营商环境百花齐放文高维日前,一份关于2022年广东21地市的营商环境评价报告权威出炉,引发各界关注。报告显示,整体上看,市场主体的满意度持续增强,城市间的差距在不断缩小。一枝独秀不是春,百花齐放春SciHortic壳聚糖接枝咖啡酸聚乳酸共聚膜延缓双孢蘑菇品质劣变近日,ScientiaHorticulturae在线发表了南京财经大学食品科学与工程学院裴斐副教授团队题为Transcriptomeanalysisrevealsthemechan大厂品质!英特尔NUC开箱简单装机体验,迷你主机装机记录!开篇碎碎念记一次英特尔装机过程,大概是21年左右,朋友找我帮搞一台电脑,需求是小巧不占地方,主要用途是办公和LOL,做为垃圾佬,自然追求低价。所以自然而然去某鱼淘,之前帮他搞过几台注重品质内外兼修极狐阿尔法S的实力不容小觑如今的我们,正处在能源时代的变革期,对于车企而言,纷纷都在向该领域转型,并且在经历了几轮的淘汰浪潮后,能存活下来的,都属脚踏实地的品牌,不少产品更是展现了出众的品质与性能,本期要说每次来都给套房!这家君悦酒店品质与实力兼具独立寒秋,湘江北去,橘子洲头看万山红遍,层林尽染漫江碧透,百舸争流沁园春长沙GrandHyattChangsha长沙君悦是我在长沙最喜欢的一家酒店了,地理位置很棒,酒店直面湘江,位
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网