范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

RocketMQ源码分析之主从同步服务组件HAService

  #头条创作挑战赛#
  上一篇:RocketMQ源码分析之核心磁盘数据结构CommitLog一、前言
  前面介绍了RocketMQ的CommitLog文件相关的类分析CommitLog物理日志相关的CommitLog类。其中有介绍到消息刷盘时高可用对应的submitReplicaRequest 方法,submitReplicaRequest 方法中如果配置的服务器的角色为SYNC_MASTER(从master同步),就会等待主从之间消息同步的进度达到设定的值之后才正常返回,如果超时则返回同步超时;// 提交复制请求 public CompletableFuture submitReplicaRequest(         AppendMessageResult result, MessageExt messageExt) {     // sync master的话,此时跟我们的dleger关系不大,主从同步,如果说主节点挂了以后,还得靠从节点手工运维切换成主节点     if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {         HAService service = this.defaultMessageStore.getHaService();         if (messageExt.isWaitStoreMsgOK()) {             // 通过HAService判断一下从节点是否ok             // 检查slave同步的位置是否小于 最大容忍的同步落后偏移量,如果是的则进行刷盘             if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {                 GroupCommitRequest request = new GroupCommitRequest(                         result.getWroteOffset() + result.getWroteBytes(),                         this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout() // 主从同步超时时间默认是3s                 );                 service.putRequest(request);                 service.getWaitNotifyObject().wakeupAll();                 return request.future(); // 可以通过future来等待主从同步完成             }             else {                 // 此时可能是从节点不可用                 return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);             }         }     }      return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); }public boolean isWaitStoreMsgOK() {     String result = this.getProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);     if (null == result) {         return true;     }      return Boolean.parseBoolean(result); }
  这段代码的主要逻辑如下:如果服务器的角色设置为SYNC_MASTER,则进行下一步,否则直接跳过主从同步;获取HAService对象,检查消息是否本地存储完毕,如果没有则结束,否则进入下一步;检查slave同步的位置是否小于 最大容忍的同步落后偏移量参数haSlaveFallbehindMax,如果是的则进行主从同步刷盘。如果没有则返回slave不可用的状态;将消息落盘的最大物理偏移量也就是CommitLog上的偏移量作为参数构建一个GroupCommitRequest对象,然后提交到HAService;最多等待syncFlushTimeout长的时间,默认为5秒。在5秒内获取结果,然后根据结果判断是否返回超时;二、同步流程
  上面那段代码比较简单,因为主从的逻辑全部交给了 HAService  和 HAConnection  两个类处理了。这里先简单介绍一下整个同步的流程(同步模式)
  三、高可用服务HAService
  HAService是在RocketMQ的Broker启动的时候就会创建的,而创建的点在DefaultMessageStore这个消息存储相关的综合类中,在这个类的构造器中会创建HAService无论当前的Broker是什么角色。
  这里需要说明的是 Broker中的Master和Slaver两个角色,代码都是一样的,只不过是在实际执行的时候,走的分支不一样。 四、源码分析内部属性;构造函数;启动内部类;接受Slave连接处理; 检查同步进度和唤醒CommitLog刷盘线程; 主从同步客户端组件;Master同步日志(监听slave日志同步进度和同步日志、根据同步进度来唤醒刷盘CommitLog线程) ;1、内部属性
  在HAService中有几个比较重要的属性,这里需要简单的介绍一下:
  参数
  说明
  connectionList
  连接到master的slave连接列表,用于管理连接
  acceptSocketService
  用于接收连接用的服务,只监听OP_ACCEPT事件,监听到连接事件时候,创建HAConnection来处理读写请求事件
  waitNotifyObject
  一个消费等待模型类,用于处理高可用线程和CommitLog的刷盘线程交互
  push2SlaveMaxOffset
  master同步到slave的偏移量
  groupTransferService
  主从同步的检测服务,用于检查是否同步完成
  haClient
  高可用的服务,slave用来跟master建立连接,像master汇报偏移量和拉取消息
  // 主从同步服务 public class HAService {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);      // 连接数量     private final AtomicInteger connectionCount = new AtomicInteger(0);     // 主从建立的网络连接,因为一个master可能有多个slave     private final List connectionList = new LinkedList<>();     // 接收slave的socket服务     private final AcceptSocketService acceptSocketService;     // 所属的消息存储组件     private final DefaultMessageStore defaultMessageStore;     // 线程阻塞与唤醒同步对象     private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();     // 推送到slave最大偏移量     private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);     // 组传输服务     private final GroupTransferService groupTransferService;     // 主从同步客户端组件     private final HAClient haClient; }2、构造函数
  HAService只有一个构造器。逻辑也比较简单,创建一个AcceptSocketService开放一个端口为 10912的端口用于slave来简历连接,同时启动主从信息同步的任务groupTransferService用于接收CommitLog在高可用刷盘时提交任务。public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {     this.defaultMessageStore = defaultMessageStore;     // 创建,接受连接的服务, 开放的端口号为10912     this.acceptSocketService = new AcceptSocketService(             defaultMessageStore.getMessageStoreConfig().getHaListenPort()     );     // 创建主从信息同步的线程     this.groupTransferService = new GroupTransferService();     this.haClient = new HAClient(); }3、启动内部类
  HAService  在创建之后,会在 DefaultMessageStore  中调用其 start  方法,这个方法会启动其内部的几个内部类,用来主从同步; public void start() throws Exception {     // 接受连接的服务,开启端口,设置监听的事件     this.acceptSocketService.beginAccept();     // 开启服务不断检查是否有连接     this.acceptSocketService.start();     // 开启groupTransferService,接受CommitLog的主从同步请求     this.groupTransferService.start();     // 开启haClient,用于slave来建立与Master连接和同步     this.haClient.start(); }4、接受Slave连接
  AcceptSocketService这个类在Broker的Master和Slaver两个角色启动时都会创建,只不过区别是Slaver开启端口之后,并不会有别的Broker与其建立连接。因为只有在Broker的角色是Slave的时候才会指定要连接的Master地址。这个逻辑,在Broker启动的时候BrokerController类中运行的。// 主要是基于nio来实现的 class AcceptSocketService extends ServiceThread {      // 监听端口地址     private final SocketAddress socketAddressListen;     // nio里面的网络监听服务端     private ServerSocketChannel serverSocketChannel;     // 多路复用监听组件     private Selector selector;      // 给他传入一个监听端口号,构建好监听地址     public AcceptSocketService(final int port) {         this.socketAddressListen = new InetSocketAddress(port);     }      /**      * Starts listening to slave connections.      *      * @throws Exception If fails.      */     public void beginAccept() throws Exception {         // 打开nio网络监听服务端         this.serverSocketChannel = ServerSocketChannel.open();         // 打开selector多路复用组件         this.selector = RemotingUtil.openSelector();         // 设置我们的socket重用地址是true         this.serverSocketChannel.socket().setReuseAddress(true);         // 设置监听我们指定的端口号         this.serverSocketChannel.socket().bind(this.socketAddressListen);         // 配置是否nio阻塞模式是false         this.serverSocketChannel.configureBlocking(false);         // 把nio网络监听服务器注册到selector多路复用组件里去         this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);     }      /**      * {@inheritDoc}      */     @Override     public void shutdown(final boolean interrupt) {         super.shutdown(interrupt);         try {             this.serverSocketChannel.close();             this.selector.close();         } catch (IOException e) {             log.error("AcceptSocketService shutdown exception", e);         }     }      /**      * {@inheritDoc}      */     @Override     public void run() {         log.info(this.getServiceName() + " service started");          while (!this.isStopped()) {             try {                 // 通过selector多路复用组件监听我们的nio网络服务器是否有连接事件到达                 this.selector.select(1000);                 // 如果说确实是有从节点来连接我们,此时就会拿到一批selectedKeys                 Set selected = this.selector.selectedKeys();                  if (selected != null) {                     // 每一个新建立的连接都对应了一个selectionKey,就是一个连接的key句柄                     for (SelectionKey k : selected) {                         // 如果说过来的网络事件就是op_accept连接事件                         if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {                             // 通过调用nio网络监听服务器的accept函数,就可以完成tcp连接,获取到一个新的连接                             SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();                              if (sc != null) {                                 HAService.log.info("HAService receive new connection, "                                     + sc.socket().getRemoteSocketAddress());                                  try {                                     // 把获取到的从节点连接封装成一个HAConnection                                     HAConnection conn = new HAConnection(                                             HAService.this,                                             sc                                     );                                     // 同时启动这个从节点连接组件                                     conn.start();                                     // 把从节点连接加入到自己的连接列表里去                                     HAService.this.addConnection(conn);                                 } catch (Exception e) {                                     log.error("new HAConnection exception", e);                                     sc.close();                                 }                             }                         } else {                             log.warn("Unexpected ops in select " + k.readyOps());                         }                     }                      // 一次selectedKeys处理完毕了,就必须做一个clear                     selected.clear();                 }             } catch (Exception e) {                 log.error(this.getServiceName() + " service has exception.", e);             }         }          log.info(this.getServiceName() + " service end");     }      /**      * {@inheritDoc}      */     @Override     public String getServiceName() {         return AcceptSocketService.class.getSimpleName();     } }
  beginAccept 方法就是开启Socket,绑定10912端口,然后注册selector和指定监听的事件为OP_ACCEPT也就是建立连接事件。对应的IO模式为NIO模式。主要看其run方法,这个方法是Master用来接受Slave连接的核心。
  每过一秒检查一次是否有连接事件,如果有则建立连接,并把建立起来的连接加入到连接列表中进行保存。一直循环这个逻辑。5、检查同步进度和唤醒CommitLog刷盘线程
  GroupTransferService是CommitLog消息刷盘的类CommitLog与HAService打交道的一个中间类。在CommitLog中进行主从刷盘的时候,会创建一个CommitLog.GroupCommitRequest的内部类,这个类包含了当前Broker最新的消息的物理偏移量信息。然后把这个类丢给GroupTransferService处理,然后唤醒GroupTransferService。起始这个逻辑跟CommitLog内部的GroupCommitService逻辑一样。只不过对于同步部分的逻辑不一样,这里可以参考前面的文章存储部分(3)CommitLog物理日志相关的CommitLog类。
  先看 run  方法 /**  * 在run方法中会将传入的CommitLog.GroupCommitRequest从requestsWrite  * 转换到requestsRead中然后进行处理检查对应的同步请求的进度。检查的逻辑在doWaitTransfer中  */ public void run() {     log.info(this.getServiceName() + " service started");      while (!this.isStopped()) {         try {             // 这里进入等待,等待被唤醒,进入等待之前会调用onWaitEnd方法,然后调用swapRequests方法             // 把requestsWrite转换为requestsRead             this.waitForRunning(10);             // 进行请求处理             this.doWaitTransfer();         } catch (Exception e) {             log.warn(this.getServiceName() + " service has exception. ", e);         }     }      log.info(this.getServiceName() + " service end"); }
  再看doWaitTransfer  方法/**  * 1、比较Master推送到Slave的 偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog.GroupCommitRequest中的偏移量  * 2、计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒  * 3、如果第一步结果为true,则返回结果为PUT_OK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSH_SLAVE_TIMEOUT。同时会唤醒CommitLog的刷盘线程。  */ private void doWaitTransfer() {     // 如果读请求不为空     if (!this.requestsRead.isEmpty()) {         for (CommitLog.GroupCommitRequest req : this.requestsRead) {             // 如果push到slave的偏移量 大于等于 请求中的消息的最大偏移量 表示slave同步完成             boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();             // 计算这次同步超时的时间点  同步的超时时间段为5s             long deadLine = req.getDeadLine();              // 如果没有同步完毕,并且还没达到超时时间,则等待1秒之后检查同步的进度             while (!transferOK && deadLine - System.nanoTime() > 0) {                 this.notifyTransferObject.waitForRunning(1000);                 transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();             }              // 超时或者同步成功的时候 唤醒主线程             req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);         }          this.requestsRead = new LinkedList<>();     } }
  主要逻辑如下:比较Master推送到Slave的 偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog.GroupCommitRequest中的偏移量。计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒。如果第一步结果为true,则返回结果为PUT_OK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSH_SLAVE_TIMEOUT。同时会唤醒CommitLog的刷盘线程。6、主从同步客户端组件
  前面我们说到了只有是Salve角色的Broker才会真正的配置Master的地址,而HAClient是需要Master地址的,因此这个类真正在运行的时候只有Slave才会真正的使用到。
   先看看核心的参数信息// 从节点那边会用这个线程跟我们主节点建立连接,执行数据读写 class HAClient extends ServiceThread {      // 读数据缓冲区大小,4mb     private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;     // master地址     private final AtomicReference masterAddress = new AtomicReference<>();     // 从节点收到数据以后会返回一个8个字节的ack偏移量,固定8个字节     private final ByteBuffer reportOffset = ByteBuffer.allocate(8);     // nio网络连接     private SocketChannel socketChannel;     // nio多路复用组件     private Selector selector;     // 最近一次写数据时间戳     private long lastWriteTimestamp = System.currentTimeMillis();     // 当前上报过的偏移量     private long currentReportedOffset = 0;     // 分发位置     private int dispatchPosition = 0;     // 读数据缓冲区     private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);     // 备份数据缓冲区     private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE); }
  基本上都是缓冲相关的配置。这里主要分析的是 run  方法中的逻辑 public void run() {     log.info(this.getServiceName() + " service started");      while (!this.isStopped()) {         try {             // 尝试去连接我们的master节点             if (this.connectMaster()) {                  // 是否要上报一下ack偏移量,间隔需要大于心跳的时间(5s)                 if (this.isTimeToReportOffset()) {                     // 向master 汇报当前 salve 的CommitLog的最大偏移量,并记录这次的同步时间                     boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);                     // 如果汇报完了就关闭连接                     if (!result) {                         this.closeMaster();                     }                 }                  // 如果说人家给你传输过来了数据                 this.selector.select(1000);                  // 向master拉取的信息                 boolean ok = this.processReadEvent();                 if (!ok) {                     this.closeMaster();                 }                  // 再次同步slave的偏移量如果,最新的偏移量大于已经汇报的情况下                 if (!reportSlaveMaxOffsetPlus()) {                     continue;                 }                  // 检查时间距离上次同步进度的时间间隔                 long interval =                     HAService.this.getDefaultMessageStore().getSystemClock().now()                         - this.lastWriteTimestamp;                 // 如果间隔大于心跳的时间,那么就关闭                 if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()                     .getHaHousekeepingInterval()) {                     log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress                         + "] expired, " + interval);                     this.closeMaster();                     log.warn("HAClient, master not response some time, so close connection");                 }             } else {                 // 等待                 this.waitForRunning(1000 * 5);             }         } catch (Exception e) {             log.warn(this.getServiceName() + " service has exception. ", e);             this.waitForRunning(1000 * 5);         }     }      log.info(this.getServiceName() + " service end"); }
  主要的逻辑如下:连接master,如果当前的broker角色是master,那么对应的masterAddress是空的,不会有后续逻辑。如果是slave,并且配置了master地址,则会进行连接进行后续逻辑处理检查是否需要向master汇报当前的同步进度,如果两次同步的时间小于5s,则不进行同步。每次同步之间间隔在5s以上,这个5s是心跳连接的间隔参数为haSendHeartbeatInterval向master 汇报当前 salve 的CommitLog的最大偏移量,并记录这次的同步时间从master拉取日志信息,主要就是进行消息的同步,同步出问题则关闭连接再次同步slave的偏移量,如果最新的偏移量大于已经汇报的情况下则从步骤1重头开始
  这里分析完了 run  方法,然后就要分析主要的日志同步的逻辑了,这个逻辑在 processReadEvent  方法中 private boolean processReadEvent() {     int readSizeZeroTimes = 0;     // 如果读取缓存还有没读取完,则一直读取     while (this.byteBufferRead.hasRemaining()) {         try {             // 可以把主从同步过来的数据读取到缓冲区里去             int readSize = this.socketChannel.read(this.byteBufferRead);             if (readSize > 0) {                 readSizeZeroTimes = 0;                 // 执行一次分发读请求                 boolean result = this.dispatchReadRequest();                 if (!result) {                     log.error("HAClient, dispatchReadRequest error");                     return false;                 }             } else if (readSize == 0) {                 if (++readSizeZeroTimes >= 3) {                     break;                 }             } else {                 log.info("HAClient, processReadEvent read socket < 0");                 return false;             }         } catch (IOException e) {             log.info("HAClient, processReadEvent read socket exception", e);             return false;         }     }      return true; }  private boolean dispatchReadRequest() {     // 请求的头信息     final int msgHeaderSize = 8 + 4; // phyoffset + size      while (true) {         // 获取分发的偏移差         int diff = this.byteBufferRead.position() - this.dispatchPosition;         // 如果偏移差大于头大小,说明存在请求体         if (diff >= msgHeaderSize) {             // 获取主master的最大偏移量             long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);             // 获取消息体             int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);             // 获取salve的最大偏移量             long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();              if (slavePhyOffset != 0) {                 if (slavePhyOffset != masterPhyOffset) {                     log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "                         + slavePhyOffset + " MASTER: " + masterPhyOffset);                     return false;                 }             }              // 如果偏移差大于 消息头和 消息体大小。则读取消息体             if (diff >= (msgHeaderSize + bodySize)) {                 byte[] bodyData = byteBufferRead.array();                 int dataStart = this.dispatchPosition + msgHeaderSize;                  // 把你读取到的数据追加到commitlog里面去                 HAService.this.defaultMessageStore.appendToCommitLog(                         masterPhyOffset, bodyData, dataStart, bodySize);                  // 记录分发的位置                 this.dispatchPosition += msgHeaderSize + bodySize;                  if (!reportSlaveMaxOffsetPlus()) {                     return false;                 }                  continue;             }         }          if (!this.byteBufferRead.hasRemaining()) {             this.reallocateByteBuffer();         }          break;     }      return true; }7、Master同步日志
  前面说过,在 HAService  的 AcceptSocketService  内部类中,Master会在建立连接的时候创建 HAConnection  用来处理读写事件。这里主要介绍构造函数和内部类就能了解原理了。
  成员变量public class HAConnection {      private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);      // 所属的HA高可用服务组件     private final HAService haService;     // nio网络连接     private final SocketChannel socketChannel;     // 跟我们建立连接的HA客户端组件的地址,从节点地址     private final String clientAddr;     // 网络连接写数据服务线程     private WriteSocketService writeSocketService;     // 网络连接读数据服务线程     private ReadSocketService readSocketService;      // 从节点请求获取的偏移量     private volatile long slaveRequestOffset = -1;     // 从节点同步数据后ack的偏移量     private volatile long slaveAckOffset = -1; }
  构造函数public HAConnection(final HAService haService, final SocketChannel socketChannel) throws IOException {     // 指定所属的 HAService     this.haService = haService;     // 指定的NIO的socketChannel     this.socketChannel = socketChannel;     // 客户端的地址     this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();     // 这是为非阻塞     this.socketChannel.configureBlocking(false);     // 是否启动SO_LINGER     // SO_LINGER作用:设置函数close()关闭TCP连接时的行为。缺省close()的行为是     // 如果有数据残留在socket发送缓冲区中则系统将继续发送这些数据给对方,等待被确认,然后返回。     this.socketChannel.socket().setSoLinger(false, -1);     // 是否开启TCP_NODELAY     this.socketChannel.socket().setTcpNoDelay(true);     if (NettySystemConfig.socketSndbufSize > 0) {         // 接收缓冲的大小         this.socketChannel.socket().setReceiveBufferSize(NettySystemConfig.socketSndbufSize);     }     if (NettySystemConfig.socketRcvbufSize > 0) {         // 发送缓冲的大小         this.socketChannel.socket().setSendBufferSize(NettySystemConfig.socketRcvbufSize);     }     // 把网络连接写数据服务线程和读数据服务线程都构建好     // 端口写服务     this.writeSocketService = new WriteSocketService(this.socketChannel);     // 端口读服务     this.readSocketService = new ReadSocketService(this.socketChannel);     // 增加haService中的连接数字段     this.haService.getConnectionCount().incrementAndGet(); }
  监听slave日志同步进度和同步日志
  WriteSocketService  监听的是 OP_WRITE  事件,注册的端口就是在 HAService  中开启的端口。 class WriteSocketService extends ServiceThread {      private final Selector selector;     private final SocketChannel socketChannel;      // 写数据头大小,12个字节     private final int headerSize = 8 + 4;     // 写数据头缓冲区     private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(headerSize);     // 从哪个位置开始传输     private long nextTransferFromWhere = -1;     // 对内存映射区域查询数据结果     private SelectMappedBufferResult selectMappedBufferResult;     // 最后一次写数据是否完成了,默认true     private boolean lastWriteOver = true;     // 最后一次写数据时间戳     private long lastWriteTimestamp = System.currentTimeMillis();      public WriteSocketService(final SocketChannel socketChannel) throws IOException {         this.selector = RemotingUtil.openSelector(); // 搞一个selector多路复用组件         this.socketChannel = socketChannel;         this.socketChannel.register(this.selector, SelectionKey.OP_WRITE); // 把这个网络连接注册到selector多路复用组件里去就可以了         this.setDaemon(true);     }      @Override     public void run() {         HAConnection.log.info(this.getServiceName() + " service started");          while (!this.isStopped()) {             try {                 // 如果说针对你的从节点此时可以执行写数据动作                 this.selector.select(1000);                  // 如果slave的读请求为 -1 表示没有slave 发出写请求,不需要处理                 if (-1 == HAConnection.this.slaveRequestOffset) {                     Thread.sleep(10);                     continue;                 }                  // nextTransferFromWhere 为-1 表示初始第一次同步,需要进行计算                 if (-1 == this.nextTransferFromWhere) {                     // 如果slave 同步完成 则下次同步从CommitLog的最大偏移量开始同步                     if (0 == HAConnection.this.slaveRequestOffset) {                         // 获取到commitlog里面最大的偏移量                         long masterOffset = HAConnection.this.haService.getDefaultMessageStore()                                 .getCommitLog().getMaxOffset();                         masterOffset = masterOffset                                 - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getMappedFileSizeCommitLog());                          if (masterOffset < 0) {                             masterOffset = 0;                         }                          // 可以去设置一下当前需要从哪个位置开始来传输数据                         this.nextTransferFromWhere = masterOffset;                     } else {                         // 设置下次同步的位置,为 salve 读请求的位置                         this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;                     }                      log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr                         + "], and slave request " + HAConnection.this.slaveRequestOffset);                 }                  // 上次同步是否完成                 if (this.lastWriteOver) {                     // 上一次写数据时间戳到现在截止的差值                     long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;                      // 把这个时间差值跟ha发送心跳间隔做一个比对,如果超过了那个间隔,心跳间隔为 5000毫秒                     if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaSendHeartbeatInterval()) {                          // Build Header                         // 开始去构建请求头,先设置我们要从哪个位置开始传输数据,心跳请求大小为12 字节                         this.byteBufferHeader.position(0);                         this.byteBufferHeader.limit(headerSize);                         this.byteBufferHeader.putLong(this.nextTransferFromWhere);                         this.byteBufferHeader.putInt(0);                         this.byteBufferHeader.flip();                          // 进行消息同步                         this.lastWriteOver = this.transferData();                         if (!this.lastWriteOver)                             continue;                     }                 }                 // 如果说是上一次传输还没完毕此时就传输数据就可以了                 else {                     this.lastWriteOver = this.transferData();                     // 如果还没同步完成则继续                     if (!this.lastWriteOver)                         continue;                 }                  // 构建完了header以后,此时就需要查询commitlog里面指定位置开始的一个数据片段                 SelectMappedBufferResult selectResult = HAConnection.this.haService                         .getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);                 if (selectResult != null) {                     int size = selectResult.getSize();                     // 检查要同步消息的长度,是不是大于单次同步的最大限制 默认为 32kb                     if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {                         size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();                     }                      long thisOffset = this.nextTransferFromWhere;                     this.nextTransferFromWhere += size;                      selectResult.getByteBuffer().limit(size);                     this.selectMappedBufferResult = selectResult;                      // Build Header                     this.byteBufferHeader.position(0);                     this.byteBufferHeader.limit(headerSize);                     this.byteBufferHeader.putLong(thisOffset);                     this.byteBufferHeader.putInt(size);                     this.byteBufferHeader.flip();                      this.lastWriteOver = this.transferData();                 } else {                     // 没有数据要传输呢?此时就是可以让我们的组件等待一会儿,等待100毫秒                     HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);                 }             } catch (Exception e) {                  HAConnection.log.error(this.getServiceName() + " service has exception.", e);                 break;             }         }          // 对等待通知组件做一个处理         HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();          if (this.selectMappedBufferResult != null) {             this.selectMappedBufferResult.release();         }          this.makeStop();          readSocketService.makeStop();          haService.removeConnection(HAConnection.this);          SelectionKey sk = this.socketChannel.keyFor(this.selector);         if (sk != null) {             sk.cancel();         }          try {             this.selector.close();             this.socketChannel.close();         } catch (IOException e) {             HAConnection.log.error("", e);         }          HAConnection.log.info(this.getServiceName() + " service end");     }      private boolean transferData() throws Exception {         int writeSizeZeroTimes = 0;         // Write Header         // 心跳的头没写满,先写头         while (this.byteBufferHeader.hasRemaining()) {             // 通过nio网络连接可以把请求头先发送出去             int writeSize = this.socketChannel.write(this.byteBufferHeader);             if (writeSize > 0) {                 writeSizeZeroTimes = 0;                 // 记录上次写的时间                 this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();             } else if (writeSize == 0) {                 // 重试3次 则不再重试                 if (++writeSizeZeroTimes >= 3) {                     break;                 }             } else {                 throw new Exception("ha master write header error < 0");             }         }          // 如果要同步的日志为null,则直接返回这次同步的结果是否同步完成         if (null == this.selectMappedBufferResult) {             return !this.byteBufferHeader.hasRemaining();         }          writeSizeZeroTimes = 0;          // Write Body         // 填充请求体         if (!this.byteBufferHeader.hasRemaining()) {             // 他会有一个要传输的一个commitlog里面的内存区域查询出来的数据片段             while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {                 // 把commitlog里面要传输的数据片段就写入到nio网络连接里去                 int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());                 if (writeSize > 0) {                     writeSizeZeroTimes = 0;                     this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();                 } else if (writeSize == 0) {                     // 重试3次                     if (++writeSizeZeroTimes >= 3) {                         break;                     }                 } else {                     throw new Exception("ha master write body error < 0");                 }             }         }          boolean result = !this.byteBufferHeader.hasRemaining() && !this.selectMappedBufferResult.getByteBuffer().hasRemaining();          // 释放缓存         if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {             this.selectMappedBufferResult.release();             this.selectMappedBufferResult = null;         }          return result;     }      @Override     public String getServiceName() {         return WriteSocketService.class.getSimpleName();     }      @Override     public void shutdown() {         super.shutdown();     } }
   主要的逻辑如下:如果slave进行了日志偏移量的汇报,判断是不是第一次的进行同步以及对应的同步进度。设置下一次的同步位置检查上次同步是不是已经完成了,检查两次同步的周期是不是超过心跳间隔,如果是的则需要把心跳信息放到返回的头里面,然后进行消息同步。如果上次同步还没完成,则等待上次同步完成之后再继续从Master本地读取CommitLog的最大偏移量,根据上次同步的位置开始从CommitLog获取日志信息,然后放到缓存中如果缓存的大小大于单次同步的最大大小haTransferBatchSize默认是32kb,那么只同步32kb大小的日志。如果缓存为null,则等待100毫秒
  根据同步进度来唤醒刷盘CommitLog线程
  ReadSocketService 的作用主要是:根据Slave推送的日志同步进度,来唤醒HAService的GroupTransferService 然后进一步唤醒CommitLog的日志刷盘线程。这里主要看run 方法和processReadEvent 方法。class ReadSocketService extends ServiceThread {      // 读数据最大缓冲区大小,默认是1mb     private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;     // 多路复用监听组件     private final Selector selector;     // nio网络连接     private final SocketChannel socketChannel;     // 读数据缓冲区     private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);     // 处理消息位置     private int processPosition = 0;     // 最近一次读取到数据的时间戳     private volatile long lastReadTimestamp = System.currentTimeMillis();      public ReadSocketService(final SocketChannel socketChannel) throws IOException {         this.selector = RemotingUtil.openSelector();         this.socketChannel = socketChannel;         this.socketChannel.register(this.selector, SelectionKey.OP_READ);         this.setDaemon(true);     }      @Override     public void run() {         HAConnection.log.info(this.getServiceName() + " service started");          // 任务是否结束         while (!this.isStopped()) {             try {                 //  设置selector的阻塞时间,如果说从节点确实发送了请求过来                 this.selector.select(1000);                 // 处理salver读取消息的事件                 boolean ok = this.processReadEvent();                 if (!ok) {                     HAConnection.log.error("processReadEvent error");                     break;                 }                  // 检查此次处理时间是否超过心跳连接时间                 long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;                 if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {                     log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);                     break;                 }             } catch (Exception e) {                 HAConnection.log.error(this.getServiceName() + " service has exception.", e);                 break;             }         }          this.makeStop();          writeSocketService.makeStop();          haService.removeConnection(HAConnection.this);          HAConnection.this.haService.getConnectionCount().decrementAndGet();          SelectionKey sk = this.socketChannel.keyFor(this.selector);         if (sk != null) {             sk.cancel();         }          try {             this.selector.close();             this.socketChannel.close();         } catch (IOException e) {             HAConnection.log.error("", e);         }          HAConnection.log.info(this.getServiceName() + " service end");     }      @Override     public String getServiceName() {         return ReadSocketService.class.getSimpleName();     }      private boolean processReadEvent() {         int readSizeZeroTimes = 0;          // 如果说读取数据缓冲区已经没有空间了,此时就做一个flip,处理位置复位为0         if (!this.byteBufferRead.hasRemaining()) {             // 读请求缓冲转变为读取模式。             this.byteBufferRead.flip();             this.processPosition = 0;         }          // 但凡是读取数据缓冲区还有空间,就进入循环         while (this.byteBufferRead.hasRemaining()) {             try {                 // 一次性从网络连接里读取最大可能是1mb的读取缓冲空间的内容                 int readSize = this.socketChannel.read(this.byteBufferRead);                 if (readSize > 0) {                     readSizeZeroTimes = 0;                      // 更新一下本次读取到数据的时间戳                     this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore()                             .getSystemClock().now();                      // 读取缓冲区位置 - 处理位置,是大于等于8,至少说是读到了8个字节                     // 为什么是8个字节,因为salver向master发去拉取请求时,偏移量固定为8                     if ((this.byteBufferRead.position() - this.processPosition) >= 8) {                         // 获取消息开始的位置,做一个运算,用读取缓冲区位置 - 读取缓冲区位置 % 8                         int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);                         // 读取8个直接就是从节点返回的东西,就是从节点完成消息同步的最新的偏移量                         long readOffset = this.byteBufferRead.getLong(pos - 8);                         // 设置处理的位置                         this.processPosition = pos;                          // 把我们的slave的ack偏移量去做一个设置                         HAConnection.this.slaveAckOffset = readOffset;                         // 如果slave的 读请求 偏移量小于0 表示同步完成了                         if (HAConnection.this.slaveRequestOffset < 0) {                             // 重新设置slave的 读请求的 偏移量                             HAConnection.this.slaveRequestOffset = readOffset;                             log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);                         } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {                             log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",                                     HAConnection.this.clientAddr,                                     HAConnection.this.slaveAckOffset,                                     HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());                             return false;                         }                          // 如果说从节点已经接收到了一些数据之后,唤醒阻塞的线程, 我们就可以通知HAService去传输一些数据给从节点                         // 在消息的主从同步选择的模式是同步的时候,会唤醒被阻塞的消息写入的线程                         HAConnection.this.haService.notifyTransferSome(                                 HAConnection.this.slaveAckOffset                         );                     }                 } else if (readSize == 0) {                     // 如果数据为0超过3次,表示同步完成,直接结束                     if (++readSizeZeroTimes >= 3) {                         break;                     }                 } else {                     log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");                     return false;                 }             } catch (IOException e) {                 log.error("processReadEvent exception", e);                 return false;             }         }          return true;     } }
  整体的逻辑如下:每1s执行一次事件就绪选择,然后调用processReadEvent方法处理读请求,读取从服务器的拉取请求获取slave已拉取偏移量,因为有新的从服务器反馈拉取进度,需要通知某些生产者以便返回,因为如果消息发送使用同步方式,需要等待将消息复制到从服务器,然后才返回,故这里需要唤醒相关线程去判断自己关注的消息是否已经传输完成。也就是HAService的GroupTransferService如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。五、总结
  RocketMQ的主从同步之间的核心类就是 HAService  和 HAConnection  和其中的几个子类。结合前面的那个图可以简单的理解一下。

当我老了学会老(本文字数1055,全文阅读约需2分钟)当我老了,我会与自己的身体握手言和。我会接纳渐渐老年的自己,从身体到心态。我会正视自己身体的退化,从头生白发到牙齿松动,从血压渐高到步履蹒跚35周6天,一大堆的孕检在向我招手终于来到36周!今天产科医生给我开了一大堆的单子!上次来医院是9月23日时隔一个月,孕检项目虽迟但到!第一关B超我改约了一早做B超的时间刚到医院还没缓过气就被大屏幕安排上了可是这次家里电费越交越多?教你5个省电小技巧,能省下不少钱,快学学不论是租房的还是买房党,每当要交电费的时候,总是会有一个疑惑,感觉自己使用的电器也不是很多,为什么会产生这么多电费?不论是冬天还是夏天,电费一直居高不下。这主要是因为,我们现在很多嫁人即翻身的童瑶比她大17岁的老公王冉,到底有多牛?2004年,童瑶和张默的故事闹得鸡飞狗跳,然而,一个看戏的男人却被牵扯其中。童瑶亲自走进派出所,指证自己的老师黄定宇对她有涉嫌强奸行为。黄定宇是谁?那可是中戏名气很大的一位老师,巩24分大胜!大外援177太香了,才仁脱胎换骨,解立彬变阵收获奇效北京时间10月20日,CBA常规赛继续开打,北京首钢10076广厦男篮,此役解立彬凭借内线的优势,再加上后场极为具有侵略性的防守,开局阶段就掌控比赛的主动权,而孙铭徽状态一般的局势尖头高跟鞋的几个特征及穿搭,快看你家有没有在今年的时装周上,尖头高跟鞋被越来越多的人穿上了脚。这种鞋子充满了女性的温柔和性感,但如果你在时尚中迷失了自我,就会在你的搭配中变得没有魅力。穿尖头高跟鞋的女人有几个特征她的脚很大捡到宝了!他是本赛季于根伟没看走眼的引援,如今已成津门虎核心日前,根据津门虎跟队记者顾颖透露,球队主力中卫安杜哈尔的伤情已经好转了很多。安杜哈尔代表津门虎出战与武汉三镇的比赛,受到了伤病困扰。所以,安杜哈尔这几天都处于休整状态。好在,安杜哈祝贺!再夺2金2银2铜国际射击运动联合会官网消息,在埃及开罗当地时间19日进行的射击(步手枪)世锦赛比赛中,中国队收获2金2银2铜。肖嘉芮萱夺得女子25米标准手枪冠军,中国队还包揽青年组男子10米气步枪鹈鹕vs网合理配置的胖虎压垮畸形配置的西蒙斯鹈鹕对网的大胜,算是合理配置对畸形配置的全面胜利。不是胖虎CJ英格拉姆比杜兰特和欧文强出多少,就是配置。当然,本场也可以看做本西蒙斯试验失败记录。鹈鹕的打法,从头到尾简单明了首发C剪短发呀太显嫩啦即使是秋冬天,短发依旧流行不减,好看的短发有很多,不挑人的短发一定是这一款。这款短发不挑脸型,显脸小又超级显嫩,搭配了修容空气刘海,显脸小,内包短发搭配黑茶色,显白显气质,这个长度想太多,心会疲倦人这一生,满打满算,三万来天,心就那么大,不好的装多了,就没地方搁置美好。人生就是一道减法题,过一天少一天,见一面少一面。既如此,何不开心一点,快乐一点。想太多,心会疲倦!遇事犹疑
什么面膜的口碑比较好?今天在这里给大家推荐我觉得好用到哭的睡眠面膜。睡眠面膜真的不需要不停换。有这其中两瓶就够了。推荐一怡丽丝尔ELIXIR弹力胶原亮白睡眠面膜这个面膜也是看冰冰种草才入的,用了一阵子果听说在新西兰居住超过10年,到65岁就可以领取退休金对吗?很多人说新西兰的养老体系好,我也仔细的研究了一下,现将有关知识跟大家分享一下。新西兰养老体系实际上包括了国家津贴制度和年金计划两部分,当然个人也可以购买商业养老保险。国家津贴制度(如果粮食涨到每斤6元,农村人就不会外出打工安心种地,可行吗?农业是国民经济的基础。若农副产品涨价了,其它行业会一烘而起,涨得更离谱。前几年,小麦1。2元一斤,一碗面10元钱,一个饼1元钱,今年小麦涨了几分钱,一碗面条就成了12元,一个饼就成大众速腾好还是朗逸好?很高兴回答你的问题。我是车海无涯,每天与大家一起交流,分享,学习有关汽车方面的知识。大众速腾和大众朗逸哪个更值得买?作为都是大众汽车在A级轿车里面的主力车型,两个车型都有很不错的销为什么许多家长可以接受自己的平庸却无法接受孩子的平庸?一朝权在手,就把令来行。我差你不能差,谁叫你是儿女,必须听我的。我在改天换地,能成多有面。面子平庸的自己在啃老平庸的父母,如果再被平庸的孩子啃?什么时候是个头,所以自已倾其所有望子教给孩子仇恨而不是爱合适吗?世界上没有无缘无故的爱,也没有无缘无故的恨。作为一个正常的人,应该是爱憎分明。对于一个孩子,从小就应该教他爱憎分明。例如,让孩子知道日本军国主义曾经对中国的侵略及其在中国犯下的罪行养了一个不知感恩的孩子,是不是我的错?首先我在企业管理当中就专门上了感恩的心,这一课我觉得感恩不是天生就会的,而是需要后天培养培训的。当然自己教育上面方式方法上不当,没有过多的去强调,这一点的话,小孩子在这一方面薄弱,哪些体制内岗位相对可以照顾家庭?哪些体制内岗位加班是常态?我认为,这是一个比较复杂的问题。建议从以下几个方面去看待和思考一据我所知,体制是国家机关企事业单位等组织制度。体制内的工作人员,职责法定,权力上授,权随责走,人随事动,费随事转。以孩子上课捣乱,老师无法上课,学校政教处让其停课写检查,家长到校大吵大闹该怎么办?这是以前一个学生,一个智商极高的男生。学校大扫除,这个男生偷懒躲着不干活,被老师发现,训了几句,然后让这个男生拿水桶打水。老师再转回来时,那个男生正用手往干活同学的身上泼水玩,看见孩子从小就是跟爷爷奶奶不亲,冷淡的像陌生人,这是为什么呢?就像是当年10岁的我,在下雨天的肯德利门口,苦等姥爷两个小时无果最终饿着肚子回去上课。从那时候起姥姥姥爷也就不是什么亲人了。我是一个女孩,从小我就知道,姥姥姥爷家有亲孙子,我舅舅家核桃仁咋样做好吃呢?我是二姐,我来回答下核桃仁咋样做好吃的问题。二姐觉得核桃仁好吃的做法如果是生吃的话可能会让人觉得没有什么味道,在之前也尝试了吃点剥开的核桃吃,因为核桃有含量很高的蛋白质,而且富含了