范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

RocketMQ事务的实现原理

  from:cnblogs.com/shanml/p/16584514.htm事务的使用RocketMQ事务的使用场景单体架构下的事务
  在单体系统的开发过程中,假如某个场景下需要对数据库的多张表进行操作,为了保证数据的一致性,一般会使用事务,将所有的操作全部提交或者在出错的时候全部回滚。以创建订单为例,假设下单后需要做两个操作: 在订单表生成订单 在积分表增加本次订单增加的积分记录
  在单体架构下只需使用@Transactional开启事务,就可以保证数据的一致性:     @Transactional     public void order() {         String orderId = UUID.randomUUID().toString();         // 生成订单         orderService.createOrder(orderId);         // 增加积分         creditService.addCredits(orderId);     }
  然而现在越来越多系统开始使用分布式架构,在分布式架构下,订单系统和积分系统可能是两个独立的服务,此时就不能使用上述的方法开启事务了,因为它们不处于同一个事务中,在出错的情况下,无法进行全部回滚,只能对当前服务的事务进行回滚,所以就有可能出现订单生成成功但是积分服务增加积分失败的情况(也可能相反),此时数据处于不一致的状态。 分布式架构下的事务
  分布式架构下如果需要保证事务的一致性,需要使用分布式事务,分布式事务的实现方式有多种,这里我们先看通过RocketMQ事务的实现方式。
  同样以下单流程为例,在分布式架构下的处理流程如下: 订单服务生成订单 发送订单生成的MQ消息,积分服务订阅消息,有新的订单生成之后消费消息,增加对应的积分记录
  普通MQ消息存在的问题
  如果使用@Transactional + 发送普通MQ的方式,看下存在的问题: 假如订单创建成功,MQ消息发送成功,但是 order  方法在返回的前一刻,服务突然宕机,由于开启了事务,事务还未提交(方法结束后才会正常提交),所以订单表并未生成记录,但是MQ却已经发送成功并且被积分服务消费,此时就会存在订单未创建但是积分记录增加的情况假如先发送MQ消息再创建订单呢,此时问题就更明显了,如果MQ消息发送成功,创建订单失败,那么同样处于不一致的状态     @Transactional     public void order() {         String orderId = UUID.randomUUID().toString();         // 创建订单         Order order = orderService.createOrder(orderDTO.getOrderId());         // 发送订单创建的MQ消息         sendOrderMessge(order);         return;     }
  解决上述问题的方式就是使用RocketMQ事务消息。
  RocketMQ事务消息的使用
  使用事务消息需要实现自定义的事务监听器, TransactionListener  提供了本地事务执行和状态回查的接口,executeLocalTransaction  方法用于执行我们的本地事务,checkLocalTransaction  是一种补偿机制,在异常情况下如果未收到事务的提交请求,会调用此方法进行事务状态查询,以此决定是否将事务进行提交/回滚:public interface TransactionListener {     /**      * 执行本地事务      *      * @param msg Half(prepare) message half消息      * @param arg Custom business parameter      * @return Transaction state      */     LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);      /**      * 本地事务状态回查      *      * @param msg Check message      * @return Transaction state      */     LocalTransactionState checkLocalTransaction(final MessageExt msg); }
  这里我们实现自定义的事务监听器 OrderTransactionListenerImpl  :executeLocalTransaction  方法中创建订单,如果创建成功返回COMMIT_MESSAGE  ,如果出现异常返回ROLLBACK_MESSAGE  。checkLocalTransaction  方法中回查事务状态,根据消息体中的订单ID查询订单是否已经创建,如果创建成功提交事务,如果未获取到认为失败,此时回滚事务。public class OrderTransactionListenerImpl implements TransactionListener {      @Autowired     private OrderService orderService;      @Override     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {         try {             String body = new String(msg.getBody(), Charset.forName("UTF-8"));             OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);             // 模拟生成订单             orderService.createOrder(orderDTO.getOrderId());         } catch (Exception e) {             // 出现异常,返回回滚状态             return LocalTransactionState.ROLLBACK_MESSAGE;         }         // 创建成功,返回提交状态         return LocalTransactionState.COMMIT_MESSAGE;     }      @Override     public LocalTransactionState checkLocalTransaction(MessageExt msg) {         String body = new String(msg.getBody(), Charset.forName("UTF-8"));         OrderDTO orderDTO = JSON.parseObject(body, OrderDTO.class);         try {             // 根据订单ID查询订单是否存在             Order order = orderService.getOrderByOrderId(orderDTO.getOrderId());             if (null != order) {                 return LocalTransactionState.COMMIT_MESSAGE;             }         } catch (Exception e) {             return LocalTransactionState.ROLLBACK_MESSAGE;         }         return LocalTransactionState.ROLLBACK_MESSAGE;     } }
  接下来看如何发送事务消息,事务消息对应的生产者为 TransactionMQProducer  ,创建TransactionMQProducer  之后,设置上一步自定义的事务监听器OrderTransactionListenerImpl  ,然后将订单ID放入消息体中, 调用sendMessageInTransaction  发送事务消息:public class TransactionProducer {     public static void main(String[] args) throws MQClientException, InterruptedException {         // 创建下单事务监听器         TransactionListener transactionListener = new OrderTransactionListenerImpl();         // 创建生产者         TransactionMQProducer producer = new TransactionMQProducer("order_group");         // 事务状态回查线程池         ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() {             @Override             public Thread newThread(Runnable r) {                 Thread thread = new Thread(r);                 thread.setName("client-transaction-msg-check-thread");                 return thread;             }         });         // 设置线程池         producer.setExecutorService(executorService);         // 设置事务监听器         producer.setTransactionListener(transactionListener);         // 启动生产者         producer.start();         try {             // 创建订单消息             OrderDTO orderDTO = new OrderDTO();             // 模拟生成订单唯一标识             orderDTO.setOrderId(UUID.randomUUID().toString());             // 转为字节数组             byte[] msgBody = JSON.toJSONString(orderDTO).getBytes(RemotingHelper.DEFAULT_CHARSET);             // 构建消息             Message msg = new Message("ORDER_TOPIC", msgBody);             // 调用sendMessageInTransaction发送事务消息             SendResult sendResult = producer.sendMessageInTransaction(msg, null);             System.out.printf(sendResult.toString());             Thread.sleep(10);         } catch (MQClientException | UnsupportedEncodingException e) {             e.printStackTrace();         }         for (int i = 0; i < 100000; i++) {             Thread.sleep(1000);         }         producer.shutdown();     } }
  事务的执行流程: 在订单服务下单后,向Borker发送生成订单的事务消息,投递到ORDER_TOPIC主题中 Broker收到事务消息之后,不会直接投递到ORDER_TOPIC主题中,而是先放在另外一个主题中,也叫half主题,half主题对消费者不可见 half主题加入消息成功之后,会回调事务监听器的的 executeLocalTransaction  方法,执行本地事务,也就是订单创建,如果创建成功返回COMMIT状态,如果出现异常返回ROLLBACK状态根据上一步的返回状态,进行结束事务的处理提交:从half主题中删除消息,然后将消息投送到ORDER_TOPIC主题中,积分服务订阅ORDER_TOPIC主题进行消费,生成积分记录回滚:从half主题中删除消息即可 如果本地事务返回的执行结果状态由于网络原因或者其他原因未能成功的发送给Broker,Broker未收到事务的执行结果,在补偿机制定时检查half主题中消息的事务执行状态时,会回调事务监听器 checkLocalTransaction  的接口,进行状态回查,判断订单是否创建成功,然后进行结束事务的处理
  使用事务消息不会存在订单创建失败但是消息发送成功的情况,不过你可能还有一个疑问,假如订单创建成功了,消息已经投送到队列中,但是积分服务在消费的时候失败了,这样数据还是处于不一致的状态,个人感觉,积分服务可以在失败的时候进行重试或者进行一些其他的补偿机制来保证积分记录成功的生成,在极端情况下积分记录依旧没有生成,此时可能就要人工接入处理了。 RocketMQ事务实现原理
  RocketMQ在4.3.0版中开始支持事务消息,它使用两阶段提交协议实现事务消息,同时增加补偿机制定时对事务的状态进行回查,来处理未提交/回滚的事务。
  两阶段提交
  发送事务消息分为两个阶段:
  第一阶段:生产者向Broker发送half(prepare)消息 ,生产者发送事务消息的时候,消息不会直接存入对应的主题中,而是先将消息存入RMQ_SYS_TRANS_HALF_TOPIC主题中,此时消息对消费者不可见,不能被消费者消费,称为half消息, half消息发送成功之后,开始执行本地事务 。
  第二阶段:提交阶段,根据第一阶段的本地事务执行结果来决定是提交事务还是回滚事务, 提交或者回滚的事务会从RMQ_SYS_TRANS_HALF_TOPIC中删除,对于提交的事务消息,会将消息投送到实际的主题队列中,之后消费者可以从队列中拉取到消息进行消费,对于回滚的事务消息,直接从RMQ_SYS_TRANS_HALF_TOPIC主题中删除即可。
  注意:由于RocketMQ追加写的性能并不会直接从RMQ_SYS_TRANS_HALF_TOPIC队列中删除消息,而是使用了另外一个队列,将已提交或者回滚的事务放入到OP队列中,在补偿机制对half消息进行检查的时候会从OP中判断是消息是否已经提交或者回滚。
  补偿机制
  两阶段提交事务的过程中,任一阶段出现异常都有可能导致事务未能成功的进行提交/回滚,所以需要增加一种补偿机制,定时对RMQ_SYS_TRANS_HALF_TOPIC主题中的half消息进行处理。
  RocketMQ使用了一种回查机制,在处理half消息时,对该消息的本地事务执行状态进行回查,根据回查结果决定是否需要提交/回滚,或者是等待下一次回查。
  接下来就从源码的角度研究一下事务的实现原理。
  上面可知,发送事务消息调用的是 TransactionMQProducer  的sendMessageInTransaction  方法:public class TransactionMQProducer extends DefaultMQProducer {     @Override     public TransactionSendResult sendMessageInTransaction(final Message msg,         final Object arg) throws MQClientException {         if (null == this.transactionListener) {             throw new MQClientException("TransactionListener is null", null);         }         // 设置主题         msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic()));         // 发送事务消息         return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);     } }
  sendMessageInTransaction  在DefaultMQProducerImpl  中实现,主要有以下几个步骤:获取事务监听器 TransactionListener  ,如果获取为空或者本地事务执行器LocalTransactionExecuter  为空将抛出异常,因为需要通过TransactionListener  或者LocalTransactionExecuter  来执行本地事务,所以不能为空在消息中 设置prepared属性,此时与普通消息(非事务消息)相比多了PROPERTY_TRANSACTION_PREPARED属性 调用send方法发送prepared消息也就是half消息,发送消息的流程与普通消息一致 根据消息的发送结果判断:如果发送成功 执行本地事务,并返回本地事务执行结果状态 ,如果返回的执行状态结果为空,将本地事务状态设置为UNKNOW发送成功之外的其他情况,包括 FLUSH_DISK_TIMEOUT  刷盘超时、FLUSH_SLAVE_TIMEOUT  和SLAVE_NOT_AVAILABLE  从节点不可用三种情况,此时意味着half消息发送失败,本地事务状态置为ROLLBACK_MESSAGE  回滚消息调用 endTransaction  方法结束事务public class DefaultMQProducerImpl implements MQProducerInner {     // 发送事务消息     public TransactionSendResult sendMessageInTransaction(final Message msg,         final LocalTransactionExecuter localTransactionExecuter, final Object arg)         throws MQClientException {         // 获取事务监听器         TransactionListener transactionListener = getCheckListener();         // 如果本地事务执行器或者监听为空         if (null == localTransactionExecuter && null == transactionListener) {             throw new MQClientException("tranExecutor is null", null);         }         // ...         SendResult sendResult = null;         // 设置prepared属性         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");         // 设置生产者组         MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());         try {             // 发送消息             sendResult = this.send(msg);         } catch (Exception e) {             throw new MQClientException("send message Exception", e);         }         // 本地事务状态         LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;         Throwable localException = null;         switch (sendResult.getSendStatus()) { // 判断消息发送状态             case SEND_OK: { // 如果发送成功                 try {                     // ...                     if (null != localTransactionExecuter) { // 如果本地事务执行器不为空                         // 执行本地事务                         localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);                     } else if (transactionListener != null) { // 如果事务监听器不为空                         log.debug("Used new transaction API");                         // 执行本地事务                         localTransactionState = transactionListener.executeLocalTransaction(msg, arg);                     }                     if (null == localTransactionState) {                         // 如果本地事务状态为空,设置为UNKNOW                         localTransactionState = LocalTransactionState.UNKNOW;                     }                     // ...                 } catch (Throwable e) {                     log.info("executeLocalTransactionBranch exception", e);                     log.info(msg.toString());                     localException = e;                 }             }             break;             case FLUSH_DISK_TIMEOUT:             case FLUSH_SLAVE_TIMEOUT:             case SLAVE_NOT_AVAILABLE:                 localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; // 本地事务状态设置为回滚                 break;             default:                 break;         }         try {             // 结束事务             this.endTransaction(msg, sendResult, localTransactionState, localException);         } catch (Exception e) {             log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);         }         // ...         return transactionSendResult;     } } half消息处理
  Broker对消息发送请求的处理在 SendMessageProcessor  中,当Broker收到消息后,判断消息是否含有PROPERTY_TRANSACTION_PREPARED  属性,如果含有prepared属性,会获取TransactionalMessageService  ,然后调用asyncPrepareMessage  对消息进行处理:public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {      private CompletableFuture asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,                                                                 SendMessageContext mqtraceContext,                                                                 SendMessageRequestHeader requestHeader) {         final RemotingCommand response = preSend(ctx, request, requestHeader);         final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();          // ...         CompletableFuture putMessageResult = null;         // 获取prepared属性标记         String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);         // 如果事务标记不为空         if (transFlag != null && Boolean.parseBoolean(transFlag)) {             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {                 response.setCode(ResponseCode.NO_PERMISSION);                 response.setRemark(                         "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                                 + "] sending transaction message is forbidden");                 return CompletableFuture.completedFuture(response);             }             // 事务消息持久化             putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);         } else {             // 普通消息持久化             putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);         }         return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);     } }
  TransactionalMessageServiceImpl  的asyncPrepareMessage  方法中,又调用了TransactionalMessageBridge  的asyncPutHalfMessage  方法,添加half消息:public class TransactionalMessageServiceImpl implements TransactionalMessageService {     @Override     public CompletableFuture asyncPrepareMessage(MessageExtBrokerInner messageInner) {         // 添加half消息         return transactionalMessageBridge.asyncPutHalfMessage(messageInner);     } }
  在 TransactionalMessageBridge  的asyncPutHalfMessage  方法中,调用了parseHalfMessageInner  方法设置half消息的相关属性。
  因为是half消息,此时还不能直接加入到实际的消息队列中,否则一旦加入就会被消费者消费,所以需要先对half消息暂存,等收到消息提交请求时才可以添加到实际的消息队列中,RocketMQ设置了一个 RMQ_SYS_TRANS_HALF_TOPIC  主题来暂存half消息。
  在 parseHalfMessageInner  方法中,会对消息进行如下处理:设置消息实际的主题和队列ID,待收到事务提交请求后恢复实际的主题和队列ID,向实际的队列中添加消息 更改消息的主题为half消息主题RMQ_SYS_TRANS_HALF_TOPIC,先将消息投送到half消息队列中 half主题对应的消息队列ID为0 ,所以更改消息的队列ID为0
  之后调用 asyncPutMessage  添加消息,接下来的流程就和普通消息的添加一致了,具体可参考【RocketMQ】消息的存储 :public class TransactionalMessageBridge {     public CompletableFuture asyncPutHalfMessage(MessageExtBrokerInner messageInner) {         // 添加消息         return store.asyncPutMessage(parseHalfMessageInner(messageInner));     }          private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {         // 设置实际的主题         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());         // 设置实际的队列ID         MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,             String.valueOf(msgInner.getQueueId()));         msgInner.setSysFlag(             MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));         // 设置事务主题RMQ_SYS_TRANS_HALF_TOPIC         msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());         // 设置事务队列ID         msgInner.setQueueId(0);         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));         return msgInner;     } }  public class TransactionalMessageUtil {     public static String buildHalfTopic() {         // half消息主题         return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;     } } 结束事务
  在进行了half消息发送和执行本地事务的操作后,消息暂存在Broker的half主题中,接下来生产者需要根据本地事务的执行结果,向Broker发送结束事务的请求,结束事务的方法 endTransaction  在DefaultMQProducerImpl  中实现:构建结束事务的请求头 EndTransactionRequestHeader  判断本地事务执行状态: COMMIT_MESSAGE:表示提交事务 ,结束事务的请求头中设置 TRANSACTION_COMMIT_TYPE 标识进行事务提交 ROLLBACK_MESSAGE:表示回滚事务 ,请求头中设置 TRANSACTION_ROLLBACK_TYPE 标识进行事务回滚 UNKNOW:事务执行结果未知状态 ,请求头中设置 TRANSACTION_NOT_TYPE 标识未知状态的事务 调用 endTransactionOneway  向Broker发送结束事务的请求public class DefaultMQProducerImpl implements MQProducerInner {     public void endTransaction(         final Message msg,         final SendResult sendResult,         final LocalTransactionState localTransactionState,         final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {         // 消息         final MessageId id;         if (sendResult.getOffsetMsgId() != null) {             id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());         } else {             id = MessageDecoder.decodeMessageId(sendResult.getMsgId());         }         // 获取事务ID         String transactionId = sendResult.getTransactionId();         // 获取Broker地址         final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());         // 结束事务请求头         EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();         // 设置事务ID         requestHeader.setTransactionId(transactionId);         requestHeader.setCommitLogOffset(id.getOffset());         // 判断本地事务状态         switch (localTransactionState) {             case COMMIT_MESSAGE: // 如果提交                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);                 break;             case ROLLBACK_MESSAGE: // 如果是回滚                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);                 break;             case UNKNOW: // 未知                 requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);                 break;             default:                 break;         }         doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);         requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());         requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());         requestHeader.setMsgId(sendResult.getMsgId());         String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;         // 发送结束事务的请求         this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,             this.defaultMQProducer.getSendMsgTimeout());     } } Broker事务结束请求处理
  Broker对事务结束的请求处理在 EndTransactionProcessor  中:判断是否是从节点,从节点没有结束事务的权限,如果是从节点返回SLAVE_NOT_AVAILABLE 从请求头中获取事务的提交类型: TRANSACTION_COMMIT_TYPE :表示提交事务,会调用 commitMessage  方法提交消息,如果提交成功调用endMessageTransaction  结束事务,恢复消息的原始主题和队列并调用deletePrepareMessage  方法删掉half消息TRANSACTION_ROLLBACK_TYPE:表示回滚事务,会调用rollbackMessage  方法回滚事务,然后删掉half消息public class EndTransactionProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {     @Override     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws         RemotingCommandException {         // 创建响应         final RemotingCommand response = RemotingCommand.createResponseCommand(null);         final EndTransactionRequestHeader requestHeader =             (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);         LOGGER.debug("Transaction request:{}", requestHeader);         // 如果是从节点,从节点没有结束事务的权限,返回SLAVE_NOT_AVAILABLE         if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {             response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);             LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");             return response;         }         // ...         OperationResult result = new OperationResult();         // 判断事务提交类型,如果是提交事务         if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {             // 提交消息             result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);             if (result.getResponseCode() == ResponseCode.SUCCESS) {                 // 校验Prepare消息                 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);                 if (res.getCode() == ResponseCode.SUCCESS) {                     // 结束事务,恢复消息的原始主题和队列                     MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());                     msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));                     msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());                     msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());                     msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());                     MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);                     RemotingCommand sendResult = sendFinalMessage(msgInner);                     if (sendResult.getCode() == ResponseCode.SUCCESS) {                         // 删除half消息                         this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());                     }                     return sendResult;                 }                 return res;             }         } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { // 如果是回滚             // 回滚消息             result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);             if (result.getResponseCode() == ResponseCode.SUCCESS) {                 RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);                 if (res.getCode() == ResponseCode.SUCCESS) {                      // 删除half消息                   this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());                 }                 return res;             }         }         response.setCode(result.getResponseCode());         response.setRemark(result.getResponseRemark());         return response;     } } 删除half消息
  由于CommitLog追加写的性质,RocketMQ并不会直接将half消息从CommitLog中删除,而是使用了另外一个OP主题RMQ_SYS_TRANS_OP_HALF_TOPIC(以下简称OP主题/队列),将已经提交/回滚的消息记录在OP主题队列中 : public class TransactionalMessageServiceImpl implements TransactionalMessageService {     @Override     public boolean deletePrepareMessage(MessageExt msgExt) {         // 添加到OP消息队列         if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {             log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);             return true;         } else {             log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());             return false;         }     } }
  putOpMessage  方法在TransactionalMessageBridge  中实现,它又调用了addRemoveTagInTransactionOp  方法向OP队列中添加消息:构建OP消息,主要是创建Message对象,然后设置主题为 RMQ_SYS_TRANS_OP_HALF_TOPIC  ,设置half消息在队列的偏移量调用  writeOp  方法将消息写入OP队列,makeOpMessageInner  方法用于构建消息体,然后调用putMessage  放将消息写入CommitLogpublic class TransactionalMessageBridge {     private final ConcurrentHashMap opQueueMap = new ConcurrentHashMap<>();     public boolean putOpMessage(MessageExt messageExt, String opType) {         // 构建消息队列,设置消息所属主题、Broker名称、队列ID信息         MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),             this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());         if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {             // 添加OP消息             return addRemoveTagInTransactionOp(messageExt, messageQueue);         }         return true;     }          /**      * 当事务消息进行提交或者回滚时,记录在operation队列中(OP队列)      */     private boolean addRemoveTagInTransactionOp(MessageExt prepareMessage, MessageQueue messageQueue) {         // 构建OP消息,主题为RMQ_SYS_TRANS_OP_HALF_TOPIC         Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,             String.valueOf(prepareMessage.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));         // 将消息写入OP队列         writeOp(message, messageQueue);         return true;     }      private void writeOp(Message message, MessageQueue mq) {         MessageQueue opQueue;         // 如果已经添加过         if (opQueueMap.containsKey(mq)) {             opQueue = opQueueMap.get(mq);         } else {             opQueue = getOpQueueByHalf(mq);             MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);             if (oldQueue != null) {                 opQueue = oldQueue;             }         }         // 如果为空         if (opQueue == null) {             // 创建             opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());         }         // 构建OP消息添加到OP队列中         putMessage(makeOpMessageInner(message, opQueue));     } } 事务状态检查
  由于各种原因有可能未成功收到提交/回滚事务的请求,所以RocketMQ需要定期检查half消息,检查事务的执行结果, TransactionalMessageCheckService  用于half消息状态的检查,它实现了ServiceThread  ,默认可以看到在onWaitEnd  方法中调用了check  方法进行状态检查:public class TransactionalMessageCheckService extends ServiceThread {          @Override     protected void onWaitEnd() {         long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();         int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();         long begin = System.currentTimeMillis();         log.info("Begin to check prepare message, begin time:{}", begin);         // 状态检查         this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());         log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);     }  }
  check  方法在TransactionalMessageServiceImpl  中实现:public class TransactionalMessageServiceImpl implements TransactionalMessageService {     @Override     public void check(long transactionTimeout, int transactionCheckMax,         AbstractTransactionalMessageCheckListener listener) {         try {             String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;             // 根据主题获取消息队列             Set msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);             if (msgQueues == null || msgQueues.size() == 0) {                 log.warn("The queue of topic is empty :" + topic);                 return;             }             log.debug("Check topic={}, queues={}", topic, msgQueues);             // 遍历所有的消息队列             for (MessageQueue messageQueue : msgQueues) {                 // 获取当前时间做为开始时间                 long startTime = System.currentTimeMillis();                 // 获取对应的OP消息队列                 MessageQueue opQueue = getOpQueue(messageQueue);                 // 获取half消息队列的消费进度                 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);                 // 获取op消息队列的消费进度                 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);                 log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);                 // 如果消费进度小于0表示不合法                 if (halfOffset < 0 || opOffset < 0) {                     log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,                         halfOffset, opOffset);                     continue;                 }                 // 存储已处理的消息                 List doneOpOffset = new ArrayList<>();                 HashMap removeMap = new HashMap<>();                 // 根据当前的消费进度从已处理队列中拉取消息                 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);                 // 如果拉取消息为空,打印错误继续处理下一个消息队列                 if (null == pullResult) {                     log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",                         messageQueue, halfOffset, opOffset);                     continue;                 }                 // 获取消息为空的数量默认为1                 int getMessageNullCount = 1;                 // 新的进度                 long newOffset = halfOffset;                 // 获取half队列的消费进度,赋值给i                 long i = halfOffset;                 while (true) {                     // 如果当前时间减去开始时间大于最大处理时间限制,终止循环                     if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {                         log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);                         break;                     }                     // 如果OP队列中包含当前偏移量,表示消息已经被处理,加入到已处理集合中                     if (removeMap.containsKey(i)) {                         log.debug("Half offset {} has been committed/rolled back", i);                         Long removedOpOffset = removeMap.remove(i);                         // 加入到doneOpOffset集合中                         doneOpOffset.add(removedOpOffset);                     } else { // 如果已处理队列中不包含当前消息                         // 根据偏移量从half队列获取half消息                         GetResult getResult = getHalfMsg(messageQueue, i);                         // 获取消息对象                         MessageExt msgExt = getResult.getMsg();                         // 如果获取消息为空                         if (msgExt == null) {                             // 判断获取空消息的次数是否大于MAX_RETRY_COUNT_WHEN_HALF_NULL                             if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {                                 break;                             }                             // 判断从half队列获取消息的结果是NO_NEW_MSG,表示没有消息,此时终止循环等待下一次进行检查                             if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {                                 log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,                                     messageQueue, getMessageNullCount, getResult.getPullResult());                                 break;                             } else {                                                                 log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",                                     i, messageQueue, getMessageNullCount, getResult.getPullResult());                                  // 走到这里说明消息的偏移量不合法,继续获取下一条消息进行处理                                 i = getResult.getPullResult().getNextBeginOffset();                                 newOffset = i;                                 continue;                             }                         }                         // 是否需要丢弃消息或者需要跳过消息                         if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {                             listener.resolveDiscardMsg(msgExt);                             // 继续处理下一条消息                             newOffset = i + 1;                             i++;                             continue;                         }                         // 如果消息的添加时间是否大于等于本次检查的开始时间,说明是在检查开始之后加入的消息,暂不进行处理                         if (msgExt.getStoreTimestamp() >= startTime) {                             log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,                                 new Date(msgExt.getStoreTimestamp()));                             break;                         }                         // 计算half消息在队列中的保留时间:当前时间减去消息加入的时间                         long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();                         // 事务超时时间                         long checkImmunityTime = transactionTimeout;                         // 获取PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性,表示事务回查最晚的时间                         String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);                         // 如果PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS属性不为空                         if (null != checkImmunityTimeStr) {                             // 获取事务回查最晚检查时间,如果checkImmunityTimeStr为-1则返回事务超时时间,否则返回checkImmunityTimeStr转为long后乘以1000得到的值                             checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);                             // 如果消息的保留时间小于事务回查最晚检查时间                             if (valueOfCurrentMinusBorn < checkImmunityTime) {                                 // 检查half消息在队列中的偏移量,如果返回true跳过本条消息                                 if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {                                     // 处理下一个消息                                     newOffset = i + 1;                                     i++;                                     continue;                                 }                             }                         } else {                             // 如果valueOfCurrentMinusBorn小于checkImmunityTime                             if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {                                 log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,                                     checkImmunityTime, new Date(msgExt.getBornTimestamp()));                                 break;                             }                         }                         // 获取OP消息                         List opMsg = pullResult.getMsgFoundList();                         // 判断是否需要检查,满足检查的条件为以下三种情况之一:                         // 1.拉取消息为空并且消息的保留时间已经大于事务设置的最晚回查时间                         // 2.拉取消息不为空并且拉取到的最后一条消息的存入时间减去当前时间超过了事务的超时时间                         // 3.half消息存留时间为负数                         boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)                             || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))                             || (valueOfCurrentMinusBorn <= -1);                         // 如果需要进行回查                         if (isNeedCheck) {                             // 将half消息重新加入到队列中                             if (!putBackHalfMsgQueue(msgExt, i)) {                                 continue;                             }                             // 发送回查请求                             listener.resolveHalfMsg(msgExt);                         } else {                             // 继续从OP队列中拉取消息                             pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);                             log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,                                 messageQueue, pullResult);                             continue;                         }                     }                     // 加1继续处理下一条消息                     newOffset = i + 1;                     i++;                 }                 if (newOffset != halfOffset) {                     // 更新消费进度                     transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);                 }                 long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);                 if (newOpOffset != opOffset) {                     // 更新处理进度                     transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);                 }             }         } catch (Throwable e) {             log.error("Check error", e);         }     } }
  在 check  方法中会获取half主题(RMQ_SYS_TRANS_HALF_TOPIC)下的所有消息队列,遍历所有的half消息队列,对队列中的half消息进行处理,主要步骤如下。
  一、 构建OP队列的消息队列对象MessageQueue
  调用 getOpQueue  获取当前half消息队列对应的OP队列的MessageQueue对象,实际上是创建了一个MessageQueue  对象,设置为OP队列的主题、以及Broker名称和队列的ID,在后面获取消费进度时使用:   private MessageQueue getOpQueue(MessageQueue messageQueue) {         // 获取OP消息队列         MessageQueue opQueue = opQueueMap.get(messageQueue);         if (opQueue == null) {             // 如果获取为空,则创建MessageQueue,主题设置为OP TOPIC,设置Broker名称和队列ID             opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), messageQueue.getBrokerName(),                 messageQueue.getQueueId());             // 加入到opQueueMap中             opQueueMap.put(messageQueue, opQueue);         }         return opQueue;     }
  二、获取half队列的消费进度和OP消费队列的消费进度
  消费进度的获取是通过调用 transactionalMessageBridge  的fetchConsumeOffset  方法进行查询的,可以看到方法的入参是MessageQueue  类型的,所以第一步需要构造OP队列的MessageQueue  对象,在这一步查询消费进度使用:   public long fetchConsumeOffset(MessageQueue mq) {         long offset = brokerController.getConsumerOffsetManager().queryOffset(TransactionalMessageUtil.buildConsumerGroup(),             mq.getTopic(), mq.getQueueId());         if (offset == -1) {             offset = store.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId());         }         return offset;     }
  三、从OP队列中拉取消息
  调用 fillOpRemoveMap  方法根据消费进度信息从OP队列中拉取消息,将拉取的消费放入removeMap中,用于判断half消息是否已经处理:   private PullResult fillOpRemoveMap(HashMap removeMap,         MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List doneOpOffset) {         // 从OP队列中拉取消息,每次拉取32条         PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);         // 如果拉取为空返回null         if (null == pullResult) {             return null;         }         // 如果拉取状态为消费进度不合法或者没有匹配的消息         if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL             || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {             log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,                 pullResult);             // 从拉取结果中获取消费进度并更新             transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());             return pullResult;         } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { // 如果没有消息             log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,                 pullResult);             return pullResult;         }         // 获取拉取到的消息         List opMsg = pullResult.getMsgFoundList();         if (opMsg == null) { // 如果为空打印日志             log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);             return pullResult;         }         // 遍历拉取的消息         for (MessageExt opMessageExt : opMsg) {             // 获取队列中的偏移量             Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));             log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),                 opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);             if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {                 // 如果偏移量小于最小的偏移量                 if (queueOffset < miniOffset) {                     // 加入到doneOpOffset中                     doneOpOffset.add(opMessageExt.getQueueOffset());                 } else {                     // 加入到已处理消息的集合removeMap中                     removeMap.put(queueOffset, opMessageExt.getQueueOffset());                 }             } else {                 log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);             }         }         log.debug("Remove map: {}", removeMap);         log.debug("Done op list: {}", doneOpOffset);         return pullResult;     }
  四、处理每一个half消息
  开启while循环,从half队列的消费进度处开始,处理每一个half消息: 如果当前时间减去检查开始时间大于最大处理时间,此时终止循环 如果removeMap中包含当前half消息,表示消息已经被处理,放入到已处理消息集合中doneOpOffset 如果removeMap不包含当前half消息, 调用 getHalfMsg  方法根据偏移量从half队列获取half消息,如果消息获取不为空继续下一步,否则进行如下处理判断获取空消息的个数是否大于 MAX_RETRY_COUNT_WHEN_HALF_NULL  ,如果大于将终止本次循环,处理下一个half消息队列判断拉取消息的状态是否为NO_NEW_MSG,如果是表示队列中没有消息,先终止循环 如果拉取消息的状态是不是NO_NEW_MSG,表示消费进度不合法,获取half消息队列中下一条消息进行处理 调用needDiscard判断是否需要丢弃half消息,或者调用needSkip判断是否需要跳过当前half消息: needDiscard是根据half消息的检查次数是否超过最大限制来决定是否丢弃half消息 private  boolean  needDiscard (MessageExt msgExt, int  transactionCheckMax) { // 从属性中获取检查次数  String  checkTimes  =  msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES); int  checkTime  =  1 ; // 如果不为空  if  (null  != checkTimes) { checkTime = getInt(checkTimes); // 如果检查次数大于事务最大的检查次数,表示需要丢弃  if  (checkTime >= transactionCheckMax) { return  true ; } else  { // 检查次数加一  checkTime++; } } // 更新检查次数  msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime)); return  false ; }needSkip是根据half消息在队列中的存留时间是否超过了最大的保留时间限制来决定是否跳过private  boolean  needSkip (MessageExt msgExt) { // 计算half消息在队列中的保留时间  long  valueOfCurrentMinusBorn  =  System.currentTimeMillis() - msgExt.getBornTimestamp(); // 如果大于Broker中设置的最大保留时间,表示需要跳过  if  (valueOfCurrentMinusBorn > transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime() * 3600L  * 1000 ) { log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}" , msgExt.getMsgId(), msgExt.getBornTimestamp()); return  true ; } return  false ; }判断消息的的存入时间是否大于本次开始检查的时间,如果大于说明是新加入的消息,由于事务消息发送后不会立刻提交,所以此时暂不需要进行检查,中断循环即可计算half消息在队列中的存留时间valueOfCurrentMinusBorn  :当前时间 - 消息存入的时间设置立刻回查事务状态的时间checkImmunityTime  :事务的超时时间从消息属性中获取PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS  属性的值放在checkImmunityTimeStr  变量中,表示事务的最晚回查时间:(1)如果checkImmunityTimeStr  获取不为空,调用getImmunityTime  方法计算事务立刻回查时间,并赋值给checkImmunityTime  ,从代码中可以看出如果checkImmunityTimeStr  为-1则返回事务的超时时间,否则返回checkImmunityTimeStr  的值并乘以1000转为秒:private  long  getImmunityTime (String checkImmunityTimeStr, long  transactionTimeout) { long  checkImmunityTime; // 转为long  checkImmunityTime = getLong(checkImmunityTimeStr); if  (-1  == checkImmunityTime) { // 如果为-1,使用事务的超时时间  checkImmunityTime = transactionTimeout; } else  { checkImmunityTime *= 1000 ; // 使用checkImmunityTime,乘以1000转为秒  } return  checkImmunityTime; }计算完checkImmunityTime  的值后,判断valueOfCurrentMinusBorn  是否小于checkImmunityTime  ,如果是表明还未到事务的超时时间,此时调用checkPrepareQueueOffset  检查half消息在队列中的偏移量,根据检查结果判断是否需要跳过当前消息:如果PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET属性获取为空,调用putImmunityMsgBackToHalfQueue  将消息重新加入half队列,如果返回true表示加入成功,此时向前推荐消费进度,处理下一条消息,如果加入失败会继续循环处理本条消息(因为进度未向前推进)如果PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET属性获取不为空,转为long型,判断OP队列中是否已经包含当前消息的偏移量,如果包含加入到doneOpOffset中并返回true,此时向前推进消费进度,处理下一条消息,否则同样调用putImmunityMsgBackToHalfQueue  将消息重新加入half队列,并根据加入成功与否判断是否继续处理下一条消息总结如果事务设置了PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS  属性,并且half消息的存留时间小于立刻检查事务的时间,说明还未到时间不需要进行状态检查,此时获取消息在half队列的偏移量,如果获取为空,将消息重新加入到half队列中,如果获取不为空判断是否已经在OP处理队列中,如果返回true处理下一个消息即可,否则同样将消息重新加入half队列中。RocketMQ在事务未到最晚回查时间时将消息重新加入了half消息队列,因为加入之后half队列的消费进度会往前推进并在回查结束时更新进度,所以下次检查时并不会检查到旧的half消息。 private  boolean  checkPrepareQueueOffset (HashMap removeMap, List doneOpOffset, MessageExt msgExt) { // 从属性中获取消息在half队列的偏移量  String  prepareQueueOffsetStr  =  msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); // 如果为空,调用putImmunityMsgBackToHalfQueue将消息重新加入half队列  if  (null  == prepareQueueOffsetStr) { return  putImmunityMsgBackToHalfQueue(msgExt); } else  { // 转为long  long  prepareQueueOffset  =  getLong(prepareQueueOffsetStr); // 如果为-1,返回false,等待下次循环进行处理  if  (-1  == prepareQueueOffset) { return  false ; } else  { // 如果OP队列中已经包含当前消息的偏移量  if  (removeMap.containsKey(prepareQueueOffset)) { long  tmpOpOffset  =  removeMap.remove(prepareQueueOffset); // 加入到已完成的消息集合中  doneOpOffset.add(tmpOpOffset); return  true ; } else  { // 将消息重新加入half队列  return  putImmunityMsgBackToHalfQueue(msgExt); } } } }(2)如果checkImmunityTimeStr  获取为空,判断valueOfCurrentMinusBorn  (消息存留时间)是否大于等于0并且小于checkImmunityTime  (事务超时时间),如果满足条件表示新加入的消息并且还未过事务的超时时间,此时终止循环暂不进行回查,否则进入下一步判断是否需要进行状态回查isNeedCheck,满足检查的条件为以下三种情况之一:(1)从OP队列中拉取消息为空并且当前half消息的存留时间已经大于事务设置的最晚回查时间opMsg ==  null  && valueOfCurrentMinusBorn > checkImmunityTime(2)从OP队列中拉取的消息不为空,并且拉取的最后一条消息的存入时间减去本次开始检查时间大于事务的超时时间opMsg !=  null  && (opMsg.get(opMsg.size() - 1 ).getBornTimestamp() - startTime > transactionTimeout)(3)half消息在队列中的保留时间小于等于1,说明加入half消息的时间大于本次开始检查的时间valueOfCurrentMinusBorn <= - 1 根据isNeedCheck判断是否需要回查(1)需要回查:调用putBackHalfMsgQueue  将half消息重新加入到队列中,如果加入失败继续循环再次处理,如果加入成功调用resolveHalfMsg  发送回查请求(2)不需要回查:调用fillOpRemoveMap继续从OP队列中拉取消息判断更新i的值,继续处理下一个half消息
  五、更新消费进度
  主要是更half队列和OP队列的消费进度。 重新添加half消息
  从 putBackHalfMsgQueue  方法中可以看出将消息重新加入到了half队列:  private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) {         // 重新将消息入到half消息队列中         PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt);         // 如果加入成功         if (putMessageResult != null             && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {             // 设置消息的逻辑偏移量             msgExt.setQueueOffset(                 putMessageResult.getAppendMessageResult().getLogicsOffset());             // 设置消息在CommitLog的偏移量             msgExt.setCommitLogOffset(                 putMessageResult.getAppendMessageResult().getWroteOffset());             // 设消息ID             msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());             log.debug(                 "Send check message, the offset={} restored in queueOffset={} "                     + "commitLogOffset={} "                     + "newMsgId={} realMsgId={} topic={}",                 offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(),                 msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),                 msgExt.getTopic());             return true;         } else {             // 加入失败             log.error(                 "PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, "                     + "msgId: {}",                 msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId());             return false;         }     } 状态回查请求发送
  resolveHalfMsg  方法中向客户端发送事务状态回查的请求,可以看到是通过线程池异步实现的:public abstract class AbstractTransactionalMessageCheckListener {    public void resolveHalfMsg(final MessageExt msgExt) {         executorService.execute(new Runnable() {             @Override             public void run() {                 try {                     // 发送状态回查请求                     sendCheckMessage(msgExt);                 } catch (Exception e) {                     LOGGER.error("Send check message error!", e);                 }             }         });     } }
  sendCheckMessage  方法在AbstractTransactionalMessageCheckListener  中实现,主要是构建请求信息,然后向消息的生产者发送事务状态回查的请求:public abstract class AbstractTransactionalMessageCheckListener {     public void sendCheckMessage(MessageExt msgExt) throws Exception {         // 构建回查请求头         CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();         checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); // 设置Commitlog偏移量         checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());         checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));         checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());         checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());         // 设置消息实际的TOPIC         msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));         // 设置消息实际的队列ID         msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));         msgExt.setStoreSize(0);         String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);         // 获取channel         Channel channel = brokerController.getProducerManager().getAvailableChannel(groupId);         if (channel != null) {             // 发送回查请求             brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);         } else {             LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);         }     } } 事务状态回查请求处理
  事务状态回查请求的处理在 ClientRemotingProcessor  中,如果请求类型是CHECK_TRANSACTION_STATE  表示是事务状态回查请求,调用checkTransactionState  方法进行事务状态检查:从请求中获取消息,判断消息是否为空,不为空进入下一步 从消息属性中获取生产者组名称,如果不为空进入下一步 根据生产者组名称获取 MQProducerInner  对象,然后调用checkTransactionState  进行状态检查public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {          @Override     public RemotingCommand processRequest(ChannelHandlerContext ctx,         RemotingCommand request) throws RemotingCommandException {         switch (request.getCode()) {             case RequestCode.CHECK_TRANSACTION_STATE:                 // 检查事务状态                 return this.checkTransactionState(ctx, request);             // ...             default:                 break;         }         return null;     }          public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,         RemotingCommand request) throws RemotingCommandException {         final CheckTransactionStateRequestHeader requestHeader =             (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);         final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());         // 获取消息         final MessageExt messageExt = MessageDecoder.decode(byteBuffer);         // 如果消息不为空         if (messageExt != null) {             // ...             // 获取事务ID             String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);             if (null != transactionId && !"".equals(transactionId)) {                 messageExt.setTransactionId(transactionId);             }             // 获取生产者组             final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);             if (group != null) {                 // 获取MQProducerInner                 MQProducerInner producer = this.mqClientFactory.selectProducer(group);                 if (producer != null) {                     final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());                     // 调用checkTransactionState进行状态检查                     producer.checkTransactionState(addr, messageExt, requestHeader);                 } else {                     log.debug("checkTransactionState, pick producer by group[{}] failed", group);                 }             } else {                 log.warn("checkTransactionState, pick producer group failed");             }         } else {             log.warn("checkTransactionState, decode message failed");         }          return null;     } }
  checkTransactionState  方法在DefaultMQProducerImpl  中实现,可以看到它创建了Runnable对象,然后提交到线程池中异步执行事务的状态检查,检查的主要逻辑如下:获取 TransactionCheckListener  (已废弃)类型的事务监听器获取 TransactionListener  类型的事务监听器如果 TransactionCheckListener  和TransactionListener  其中之一不为空,调用checkLocalTransaction进行状态检查调用 processTransactionState  处理事务查询结果,这一步主要是根据事务的查询结果构建请求信息,然后调用endTransactionOneway  方法向Broker发送结束事务的请求public class DefaultMQProducerImpl implements MQProducerInner {     @Override     public void checkTransactionState(final String addr, final MessageExt msg,         final CheckTransactionStateRequestHeader header) {         Runnable request = new Runnable() {             // ...                          @Override             public void run() {                 // 获取TransactionCheckListener监听器(已不推荐使用)                 TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();                 // 获取事务监听器                 TransactionListener transactionListener = getCheckListener();                 // 如果其中之一不为空                 if (transactionCheckListener != null || transactionListener != null) {                     // 初始化为UNKNOW状态                     LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;                     Throwable exception = null;                     try {                         if (transactionCheckListener != null) {                             localTransactionState = transactionCheckListener.checkLocalTransactionState(message);                         } else if (transactionListener != null) {                             log.debug("Used new check API in transaction message");                             // 调用checkLocalTransaction回查状态                             localTransactionState = transactionListener.checkLocalTransaction(message);                         } else {                             log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);                         }                     } catch (Throwable e) {                         log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);                         exception = e;                     }                     // 处理事务状态                     this.processTransactionState(                         localTransactionState,                         group,                         exception);                 } else {                     log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);                 }             }                          // 处理事务状态             private void processTransactionState(                 final LocalTransactionState localTransactionState,                 final String producerGroup,                 final Throwable exception) {                 // 构建结束事务的请求头                 final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();                 // 设置tCommitLog的偏移量                 thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());                 thisHeader.setProducerGroup(producerGroup);// 设置生产者组                 thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());                 thisHeader.setFromTransactionCheck(true); // 设置状态检查为true                 // ...                 thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); // 设置事务ID                 switch (localTransactionState) {                     case COMMIT_MESSAGE:                         thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); // 设置为提交                         break;                     case ROLLBACK_MESSAGE:                         thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); // 设置为回滚                         log.warn("when broker check, client rollback this transaction, {}", thisHeader);                         break;                     case UNKNOW:                         thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); // 设置为未知                         log.warn("when broker check, client does not know this transaction state, {}", thisHeader);                         break;                     default:                         break;                 }                  // ...                 // 执行结束事务钩子函数                 doExecuteEndTransactionHook(msg, uniqueKey, brokerAddr, localTransactionState, true);                  try {                     // 向Broker发送消息的回查结果                     DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,                         3000);                 } catch (Exception e) {                     log.error("endTransactionOneway exception", e);                 }             }         };         // 提交到线程池中执行任务         this.checkExecutor.submit(request);     } }
  总结

2023年新疆锂电池产业布局分析巴州产业基地最多(图)中商情报网讯新疆依托丰富的锂矿资源,布局锂电池产业,为中国锂电池市场新增新疆力量。据不完全统计,新疆锂电池产业基地具有14个。新疆锂矿资源带动锂电产业发展新疆拥有丰富的锂矿资源。新快手美妆60个品牌年成交破亿快手美妆正在重视货盘。2月16日,在快手电商2023年38节商家大会上,快手美妆新负责人九安首次公开露面,并谈到2023年美妆板块最大的变化将从运营人的纬度转变为运营货的纬度。九安围绕四大领域培育美豫名品3年内打造一批国内外知名河南品牌大河网讯如何推进我省品牌建设工作,推动经济社会高质量发展?河南省政府近日印发关于实施品牌发展战略推进美豫名品公共品牌建设的实施意见(以下简称实施意见),我省将瞄准四大领域发力,3年终于知道西双版纳的化妆师为什么贵了,这就不是化妆,是魔法吧!出去旅游的时候,女孩子们都喜欢拍美美的照片,为自己的旅途留下美好的纪念。所以,一般在一些特色景区的时候,多数女生都会选择当地的化妆师,来给自己打造一个富有当地特色的妆造。就比如说在基督教可以纹身吗?基督教是可以纹身的,圣经并没有涉及纹身的话题,因此没有明确的经文禁止纹身。不过,值得注意的是,旧约中有几节经文提到割伤或标记身体,利未记19章28节说不可为死人用身体搞破坏,也不可睡前这三个好习惯,可以让皮肤越来越好!白嫩的皮肤可以让我们更加自信。据说睡前是护肤的黄金时期。除了清洁面部和保湿面部,还有三个不起眼的习惯。如果你能坚持下去,你的皮肤可能会越来越好!1使用精华液尤其是25岁以后,皮肤在JJ雷迪克对拉塞尔威斯布鲁克和快船说了一些残酷的话据NBA有关消息报道,JJ雷迪克认为拉塞尔威斯布鲁克是一名伟大的球员,但前NBA神枪手JJ雷迪克认为他不适合洛杉矶快船队。洛杉矶快船队刚刚通过拉塞尔威斯布鲁克升级了他们的控卫阵容。TA卡里乌斯基本确定联赛杯决赛首发,纽卡希望能救赎彼此TheAthleticUK纽卡斯尔方面记者ChrisWaugh撰写专栏文章,标题为HowNewcastleendedupwithLorisKariusingoalfortheirb2023南京溧水半程马拉松开启报名,将于3月26日鸣枪起跑新民晚报讯(记者唐闻宜)记者今天从溧水区获悉,2023南京溧水半程马拉松赛(溧马)将于3月26日鸣枪开跑,赛事报名工作已于今日12时启动,跑者可通过赛事官网或官微等方式进行线上报名4战2胜1平1负!中国女足爆冷,唐佳丽王晓雪受伤,世界杯难出线最近中国女足正在西班牙参加集训,为世界杯做好冲刺准备。最近为了能够更好训练中国女足的状态,让他们见识到欧美这些强敌的状态和表现,最近在教练组的安排之下,中国女足和爱尔兰女足,瑞典女女足世界杯32强全部产生巴拿马锁定最后席位新华社新西兰汉密尔顿2月23日电(记者郭磊卢怀谦)2023年女足世界杯预选赛附加赛23日在新西兰汉密尔顿举行最后一场比赛,C组的巴拿马队以一粒头球破门10战胜巴拉圭队,锁定最后一个
发展装配式民宿产品,打造乡村旅游新经济原力舱篇民宿,之所以越来越热,是因为都市人们宿于乡村隐于田园归于慢生活的诉求和情怀越来越浓。暂别都市的喧嚣,走进乡村的宁静远离世间的纷争,融入和谐的生态放松疲惫的身心,享受自在的时光怀揣梦体质不同,症状不同,干预方式也不同,听听中医怎么说气虚质容易疲乏,声音低弱,喜欢安静,容易感冒。1气虚体质的人应多食具有益气健脾作用的食物,如黄豆大枣桂圆粳米蜂蜜鸡肉牛肉桂鱼花生等水果可以多吃樱桃葡萄少食耗气的食物,如空心菜生萝卜太稳了!第六金,恭喜中国队北京时间6月30日,2022年国际泳联世锦赛在匈牙利布达佩斯继续进行,这也是跳水项目的第五个比赛日。跳水项目将产生第六枚金牌,来自于男女混双3米板跳水决赛,这个项目是非奥项目,参赛美国最高法院驳回苹果上诉图源路透社集微网消息,此前苹果向美国联邦巡回法院提出无效高通两项智能手机专利的请求,该法院裁决苹果因与高通和解而缺乏起诉资格,随后苹果向美国最高法院提起上诉。现据路透社最新报道,美智能交互硬件行业专题由特斯拉机器人回溯智能交互硬件发展史(报告出品方作者安信证券,焦娟冯静静)1。人机硬件的工具化时代学术的角度来看,硬件是计算机硬件的简称,是指计算机系统中由电子机械及光电元件等组成的各种物理装臵的总称,是人类处理运算理想汽车宣布在纳斯达克发售最多20亿美元ADSTech星球6月28日消息,理想汽车宣布,已提交招股说明书补充文件,拟在纳斯达克全球精选市场(NGSM)通过场内股票发售计划(ATM发售),发售总额最多高达2,000,000,00刷屏AirPodsPro2来了2019年发布首款产品的AirPodsPro产品线,至今已有将近3年未有新品迭代。AirPodsProMagSafe版不能算!机哥等到脖子都长了,却只能等来AirPodsAirPo百检干货还以为空调开26度最省电?错错错错上海已经连续发布了好几个高温预警了,高温凶猛来袭,我们只能用空调保命。但小检之前一直都听有些专业人士养生专家说,空调要开26度最省电。为什么会有26度这么个说法?因为根据科学测验,电脑投屏快捷键怎么设置?电脑投影到投影仪有两种方式,一种是传统的有线方式,一种是当下比较流行的无线投屏方式。有线投影连接1用一根HDMI或者VGA线,将笔记本与投影仪对接。2投影仪设置投影仪上选择信号源为拉满生产力,三星GalaxyS22Ultra手机带来高级办公体验现在的办公场景,随着大环境的变化也越来越多样,一款好的手机可以让您无论在什么样的场景下,都可以完美的完成工作任务。今天就为大家介绍一款可以作为您移动办公好帮手的优质机型,它就是三星小米12S系列新品发布会7月4日举行,与徕卡联合研发DoNews6月28日消息(刘文轩)小米手机在官方微博宣布了小米影像战略升级暨小米新品发布会定档7月4日晚七点,届时将推出全新的小米12S系列新机。官方海报上可以看到,新机的后置镜