专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

RocketMQ源码分析之主从同步服务组件HAService

  头条创作挑战赛
  上一篇:RocketMQ源码分析之核心磁盘数据结构CommitLog一、前言
  前面介绍了RocketMQ的CommitLog文件相关的类分析CommitLog物理日志相关的CommitLog类。其中有介绍到消息刷盘时高可用对应的submitReplicaRequest方法,submitReplicaRequest方法中如果配置的服务器的角色为SYNCMASTER(从master同步),就会等待主从之间消息同步的进度达到设定的值之后才正常返回,如果超时则返回同步超时;提交复制请求publicCompletableFuturePutMessageStatussubmitReplicaRequest(AppendMessageResultresult,MessageExtmessageExt){syncmaster的话,此时跟我们的dleger关系不大,主从同步,如果说主节点挂了以后,还得靠从节点手工运维切换成主节点if(BrokerRole。SYNCMASTERthis。defaultMessageStore。getMessageStoreConfig()。getBrokerRole()){HAServiceservicethis。defaultMessageStore。getHaService();if(messageExt。isWaitStoreMsgOK()){通过HAService判断一下从节点是否ok检查slave同步的位置是否小于最大容忍的同步落后偏移量,如果是的则进行刷盘if(service。isSlaveOK(result。getWroteBytes()result。getWroteOffset())){GroupCommitRequestrequestnewGroupCommitRequest(result。getWroteOffset()result。getWroteBytes(),this。defaultMessageStore。getMessageStoreConfig()。getSlaveTimeout()主从同步超时时间默认是3s);service。putRequest(request);service。getWaitNotifyObject()。wakeupAll();returnrequest。future();可以通过future来等待主从同步完成}else{此时可能是从节点不可用returnCompletableFuture。completedFuture(PutMessageStatus。SLAVENOTAVAILABLE);}}}returnCompletableFuture。completedFuture(PutMessageStatus。PUTOK);}publicbooleanisWaitStoreMsgOK(){Stringresultthis。getProperty(MessageConst。PROPERTYWAITSTOREMSGOK);if(nullresult){returntrue;}returnBoolean。parseBoolean(result);}
  这段代码的主要逻辑如下:如果服务器的角色设置为SYNCMASTER,则进行下一步,否则直接跳过主从同步;获取HAService对象,检查消息是否本地存储完毕,如果没有则结束,否则进入下一步;检查slave同步的位置是否小于最大容忍的同步落后偏移量参数haSlaveFallbehindMax,如果是的则进行主从同步刷盘。如果没有则返回slave不可用的状态;将消息落盘的最大物理偏移量也就是CommitLog上的偏移量作为参数构建一个GroupCommitRequest对象,然后提交到HAService;最多等待syncFlushTimeout长的时间,默认为5秒。在5秒内获取结果,然后根据结果判断是否返回超时;二、同步流程
  上面那段代码比较简单,因为主从的逻辑全部交给了HAService和HAConnection两个类处理了。这里先简单介绍一下整个同步的流程(同步模式)
  三、高可用服务HAService
  HAService是在RocketMQ的Broker启动的时候就会创建的,而创建的点在DefaultMessageStore这个消息存储相关的综合类中,在这个类的构造器中会创建HAService无论当前的Broker是什么角色。
  这里需要说明的是Broker中的Master和Slaver两个角色,代码都是一样的,只不过是在实际执行的时候,走的分支不一样。四、源码分析内部属性;构造函数;启动内部类;接受Slave连接处理;检查同步进度和唤醒CommitLog刷盘线程;主从同步客户端组件;Master同步日志(监听slave日志同步进度和同步日志、根据同步进度来唤醒刷盘CommitLog线程);1、内部属性
  在HAService中有几个比较重要的属性,这里需要简单的介绍一下:
  参数
  说明
  connectionList
  连接到master的slave连接列表,用于管理连接
  acceptSocketService
  用于接收连接用的服务,只监听OPACCEPT事件,监听到连接事件时候,创建HAConnection来处理读写请求事件
  waitNotifyObject
  一个消费等待模型类,用于处理高可用线程和CommitLog的刷盘线程交互
  push2SlaveMaxOffset
  master同步到slave的偏移量
  groupTransferService
  主从同步的检测服务,用于检查是否同步完成
  haClient
  高可用的服务,slave用来跟master建立连接,像master汇报偏移量和拉取消息
  主从同步服务publicclassHAService{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。STORELOGGERNAME);连接数量privatefinalAtomicIntegerconnectionCountnewAtomicInteger(0);主从建立的网络连接,因为一个master可能有多个slaveprivatefinalListHAConnectionconnectionListnewLinkedList();接收slave的socket服务privatefinalAcceptSocketServiceacceptSocketService;所属的消息存储组件privatefinalDefaultMessageStoredefaultMessageStore;线程阻塞与唤醒同步对象privatefinalWaitNotifyObjectwaitNotifyObjectnewWaitNotifyObject();推送到slave最大偏移量privatefinalAtomicLongpush2SlaveMaxOffsetnewAtomicLong(0);组传输服务privatefinalGroupTransferServicegroupTransferService;主从同步客户端组件privatefinalHAClienthaClient;}2、构造函数
  HAService只有一个构造器。逻辑也比较简单,创建一个AcceptSocketService开放一个端口为10912的端口用于slave来简历连接,同时启动主从信息同步的任务groupTransferService用于接收CommitLog在高可用刷盘时提交任务。publicHAService(finalDefaultMessageStoredefaultMessageStore)throwsIOException{this。defaultMessageStoredefaultMessageStore;创建,接受连接的服务,开放的端口号为10912this。acceptSocketServicenewAcceptSocketService(defaultMessageStore。getMessageStoreConfig()。getHaListenPort());创建主从信息同步的线程this。groupTransferServicenewGroupTransferService();this。haClientnewHAClient();}3、启动内部类
  HAService在创建之后,会在DefaultMessageStore中调用其start方法,这个方法会启动其内部的几个内部类,用来主从同步;publicvoidstart()throwsException{接受连接的服务,开启端口,设置监听的事件this。acceptSocketService。beginAccept();开启服务不断检查是否有连接this。acceptSocketService。start();开启groupTransferService,接受CommitLog的主从同步请求this。groupTransferService。start();开启haClient,用于slave来建立与Master连接和同步this。haClient。start();}4、接受Slave连接
  AcceptSocketService这个类在Broker的Master和Slaver两个角色启动时都会创建,只不过区别是Slaver开启端口之后,并不会有别的Broker与其建立连接。因为只有在Broker的角色是Slave的时候才会指定要连接的Master地址。这个逻辑,在Broker启动的时候BrokerController类中运行的。主要是基于nio来实现的classAcceptSocketServiceextendsServiceThread{监听端口地址privatefinalSocketAddresssocketAddressListen;nio里面的网络监听服务端privateServerSocketChannelserverSocketChannel;多路复用监听组件privateSelectorselector;给他传入一个监听端口号,构建好监听地址publicAcceptSocketService(finalintport){this。socketAddressListennewInetSocketAddress(port);}Startslisteningtoslaveconnections。throwsExceptionIffails。publicvoidbeginAccept()throwsException{打开nio网络监听服务端this。serverSocketChannelServerSocketChannel。open();打开selector多路复用组件this。selectorRemotingUtil。openSelector();设置我们的socket重用地址是truethis。serverSocketChannel。socket()。setReuseAddress(true);设置监听我们指定的端口号this。serverSocketChannel。socket()。bind(this。socketAddressListen);配置是否nio阻塞模式是falsethis。serverSocketChannel。configureBlocking(false);把nio网络监听服务器注册到selector多路复用组件里去this。serverSocketChannel。register(this。selector,SelectionKey。OPACCEPT);}{inheritDoc}Overridepublicvoidshutdown(finalbooleaninterrupt){super。shutdown(interrupt);try{this。serverSocketChannel。close();this。selector。close();}catch(IOExceptione){log。error(AcceptSocketServiceshutdownexception,e);}}{inheritDoc}Overridepublicvoidrun(){log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{通过selector多路复用组件监听我们的nio网络服务器是否有连接事件到达this。selector。select(1000);如果说确实是有从节点来连接我们,此时就会拿到一批selectedKeysSetSelectionKeyselectedthis。selector。selectedKeys();if(selected!null){每一个新建立的连接都对应了一个selectionKey,就是一个连接的key句柄for(SelectionKeyk:selected){如果说过来的网络事件就是opaccept连接事件if((k。readyOps()SelectionKey。OPACCEPT)!0){通过调用nio网络监听服务器的accept函数,就可以完成tcp连接,获取到一个新的连接SocketChannelsc((ServerSocketChannel)k。channel())。accept();if(sc!null){HAService。log。info(HAServicereceivenewconnection,sc。socket()。getRemoteSocketAddress());try{把获取到的从节点连接封装成一个HAConnectionHAConnectionconnnewHAConnection(HAService。this,sc);同时启动这个从节点连接组件conn。start();把从节点连接加入到自己的连接列表里去HAService。this。addConnection(conn);}catch(Exceptione){log。error(newHAConnectionexception,e);sc。close();}}}else{log。warn(Unexpectedopsinselectk。readyOps());}}一次selectedKeys处理完毕了,就必须做一个clearselected。clear();}}catch(Exceptione){log。error(this。getServiceName()servicehasexception。,e);}}log。info(this。getServiceName()serviceend);}{inheritDoc}OverridepublicStringgetServiceName(){returnAcceptSocketService。class。getSimpleName();}}
  beginAccept方法就是开启Socket,绑定10912端口,然后注册selector和指定监听的事件为OPACCEPT也就是建立连接事件。对应的IO模式为NIO模式。主要看其run方法,这个方法是Master用来接受Slave连接的核心。
  每过一秒检查一次是否有连接事件,如果有则建立连接,并把建立起来的连接加入到连接列表中进行保存。一直循环这个逻辑。5、检查同步进度和唤醒CommitLog刷盘线程
  GroupTransferService是CommitLog消息刷盘的类CommitLog与HAService打交道的一个中间类。在CommitLog中进行主从刷盘的时候,会创建一个CommitLog。GroupCommitRequest的内部类,这个类包含了当前Broker最新的消息的物理偏移量信息。然后把这个类丢给GroupTransferService处理,然后唤醒GroupTransferService。起始这个逻辑跟CommitLog内部的GroupCommitService逻辑一样。只不过对于同步部分的逻辑不一样,这里可以参考前面的文章存储部分(3)CommitLog物理日志相关的CommitLog类。
  先看run方法在run方法中会将传入的CommitLog。GroupCommitRequest从requestsWrite转换到requestsRead中然后进行处理检查对应的同步请求的进度。检查的逻辑在doWaitTransfer中publicvoidrun(){log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{这里进入等待,等待被唤醒,进入等待之前会调用onWaitEnd方法,然后调用swapRequests方法把requestsWrite转换为requestsReadthis。waitForRunning(10);进行请求处理this。doWaitTransfer();}catch(Exceptione){log。warn(this。getServiceName()servicehasexception。,e);}}log。info(this。getServiceName()serviceend);}
  再看doWaitTransfer方法1、比较Master推送到Slave的偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog。GroupCommitRequest中的偏移量2、计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒3、如果第一步结果为true,则返回结果为PUTOK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSHSLAVETIMEOUT。同时会唤醒CommitLog的刷盘线程。privatevoiddoWaitTransfer(){如果读请求不为空if(!this。requestsRead。isEmpty()){for(CommitLog。GroupCommitRequestreq:this。requestsRead){如果push到slave的偏移量大于等于请求中的消息的最大偏移量表示slave同步完成booleantransferOKHAService。this。push2SlaveMaxOffset。get()req。getNextOffset();计算这次同步超时的时间点同步的超时时间段为5slongdeadLinereq。getDeadLine();如果没有同步完毕,并且还没达到超时时间,则等待1秒之后检查同步的进度while(!transferOKdeadLineSystem。nanoTime()0){this。notifyTransferObject。waitForRunning(1000);transferOKHAService。this。push2SlaveMaxOffset。get()req。getNextOffset();}超时或者同步成功的时候唤醒主线程req。wakeupCustomer(transferOK?PutMessageStatus。PUTOK:PutMessageStatus。FLUSHSLAVETIMEOUT);}this。requestsReadnewLinkedList();}}
  主要逻辑如下:比较Master推送到Slave的偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog。GroupCommitRequest中的偏移量。计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒。如果第一步结果为true,则返回结果为PUTOK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSHSLAVETIMEOUT。同时会唤醒CommitLog的刷盘线程。6、主从同步客户端组件
  前面我们说到了只有是Salve角色的Broker才会真正的配置Master的地址,而HAClient是需要Master地址的,因此这个类真正在运行的时候只有Slave才会真正的使用到。
  先看看核心的参数信息从节点那边会用这个线程跟我们主节点建立连接,执行数据读写classHAClientextendsServiceThread{读数据缓冲区大小,4mbprivatestaticfinalintREADMAXBUFFERSIZE102410244;master地址privatefinalAtomicReferenceStringmasterAddressnewAtomicReference();从节点收到数据以后会返回一个8个字节的ack偏移量,固定8个字节privatefinalByteBufferreportOffsetByteBuffer。allocate(8);nio网络连接privateSocketChannelsocketChannel;nio多路复用组件privateSelectorselector;最近一次写数据时间戳privatelonglastWriteTimestampSystem。currentTimeMillis();当前上报过的偏移量privatelongcurrentReportedOffset0;分发位置privateintdispatchPosition0;读数据缓冲区privateByteBufferbyteBufferReadByteBuffer。allocate(READMAXBUFFERSIZE);备份数据缓冲区privateByteBufferbyteBufferBackupByteBuffer。allocate(READMAXBUFFERSIZE);}
  基本上都是缓冲相关的配置。这里主要分析的是run方法中的逻辑publicvoidrun(){log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{尝试去连接我们的master节点if(this。connectMaster()){是否要上报一下ack偏移量,间隔需要大于心跳的时间(5s)if(this。isTimeToReportOffset()){向master汇报当前salve的CommitLog的最大偏移量,并记录这次的同步时间booleanresultthis。reportSlaveMaxOffset(this。currentReportedOffset);如果汇报完了就关闭连接if(!result){this。closeMaster();}}如果说人家给你传输过来了数据this。selector。select(1000);向master拉取的信息booleanokthis。processReadEvent();if(!ok){this。closeMaster();}再次同步slave的偏移量如果,最新的偏移量大于已经汇报的情况下if(!reportSlaveMaxOffsetPlus()){continue;}检查时间距离上次同步进度的时间间隔longintervalHAService。this。getDefaultMessageStore()。getSystemClock()。now()this。lastWriteTimestamp;如果间隔大于心跳的时间,那么就关闭if(intervalHAService。this。getDefaultMessageStore()。getMessageStoreConfig()。getHaHousekeepingInterval()){log。warn(HAClient,housekeeping,foundthisconnection〔this。masterAddress〕expired,interval);this。closeMaster();log。warn(HAClient,masternotresponsesometime,socloseconnection);}}else{等待this。waitForRunning(10005);}}catch(Exceptione){log。warn(this。getServiceName()servicehasexception。,e);this。waitForRunning(10005);}}log。info(this。getServiceName()serviceend);}
  主要的逻辑如下:连接master,如果当前的broker角色是master,那么对应的masterAddress是空的,不会有后续逻辑。如果是slave,并且配置了master地址,则会进行连接进行后续逻辑处理检查是否需要向master汇报当前的同步进度,如果两次同步的时间小于5s,则不进行同步。每次同步之间间隔在5s以上,这个5s是心跳连接的间隔参数为haSendHeartbeatInterval向master汇报当前salve的CommitLog的最大偏移量,并记录这次的同步时间从master拉取日志信息,主要就是进行消息的同步,同步出问题则关闭连接再次同步slave的偏移量,如果最新的偏移量大于已经汇报的情况下则从步骤1重头开始
  这里分析完了run方法,然后就要分析主要的日志同步的逻辑了,这个逻辑在processReadEvent方法中privatebooleanprocessReadEvent(){intreadSizeZeroTimes0;如果读取缓存还有没读取完,则一直读取while(this。byteBufferRead。hasRemaining()){try{可以把主从同步过来的数据读取到缓冲区里去intreadSizethis。socketChannel。read(this。byteBufferRead);if(readSize0){readSizeZeroTimes0;执行一次分发读请求booleanresultthis。dispatchReadRequest();if(!result){log。error(HAClient,dispatchReadRequesterror);returnfalse;}}elseif(readSize0){if(readSizeZeroTimes3){break;}}else{log。info(HAClient,processReadEventreadsocket0);returnfalse;}}catch(IOExceptione){log。info(HAClient,processReadEventreadsocketexception,e);returnfalse;}}returntrue;}privatebooleandispatchReadRequest(){请求的头信息finalintmsgHeaderSize84;phyoffsetsizewhile(true){获取分发的偏移差intdiffthis。byteBufferRead。position()this。dispatchPosition;如果偏移差大于头大小,说明存在请求体if(diffmsgHeaderSize){获取主master的最大偏移量longmasterPhyOffsetthis。byteBufferRead。getLong(this。dispatchPosition);获取消息体intbodySizethis。byteBufferRead。getInt(this。dispatchPosition8);获取salve的最大偏移量longslavePhyOffsetHAService。this。defaultMessageStore。getMaxPhyOffset();if(slavePhyOffset!0){if(slavePhyOffset!masterPhyOffset){log。error(masterpushedoffsetnotequalthemaxphyoffsetinslave,SLAVE:slavePhyOffsetMASTER:masterPhyOffset);returnfalse;}}如果偏移差大于消息头和消息体大小。则读取消息体if(diff(msgHeaderSizebodySize)){byte〔〕bodyDatabyteBufferRead。array();intdataStartthis。dispatchPositionmsgHeaderSize;把你读取到的数据追加到commitlog里面去HAService。this。defaultMessageStore。appendToCommitLog(masterPhyOffset,bodyData,dataStart,bodySize);记录分发的位置this。dispatchPositionmsgHeaderSizebodySize;if(!reportSlaveMaxOffsetPlus()){returnfalse;}continue;}}if(!this。byteBufferRead。hasRemaining()){this。reallocateByteBuffer();}break;}returntrue;}7、Master同步日志
  前面说过,在HAService的AcceptSocketService内部类中,Master会在建立连接的时候创建HAConnection用来处理读写事件。这里主要介绍构造函数和内部类就能了解原理了。
  成员变量publicclassHAConnection{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。STORELOGGERNAME);所属的HA高可用服务组件privatefinalHAServicehaService;nio网络连接privatefinalSocketChannelsocketChannel;跟我们建立连接的HA客户端组件的地址,从节点地址privatefinalStringclientAddr;网络连接写数据服务线程privateWriteSocketServicewriteSocketService;网络连接读数据服务线程privateReadSocketServicereadSocketService;从节点请求获取的偏移量privatevolatilelongslaveRequestOffset1;从节点同步数据后ack的偏移量privatevolatilelongslaveAckOffset1;}
  构造函数publicHAConnection(finalHAServicehaService,finalSocketChannelsocketChannel)throwsIOException{指定所属的HAServicethis。haServicehaService;指定的NIO的socketChannelthis。socketChannelsocketChannel;客户端的地址this。clientAddrthis。socketChannel。socket()。getRemoteSocketAddress()。toString();这是为非阻塞this。socketChannel。configureBlocking(false);是否启动SOLINGERSOLINGER作用:设置函数close()关闭TCP连接时的行为。缺省close()的行为是如果有数据残留在socket发送缓冲区中则系统将继续发送这些数据给对方,等待被确认,然后返回。this。socketChannel。socket()。setSoLinger(false,1);是否开启TCPNODELAYthis。socketChannel。socket()。setTcpNoDelay(true);if(NettySystemConfig。socketSndbufSize0){接收缓冲的大小this。socketChannel。socket()。setReceiveBufferSize(NettySystemConfig。socketSndbufSize);}if(NettySystemConfig。socketRcvbufSize0){发送缓冲的大小this。socketChannel。socket()。setSendBufferSize(NettySystemConfig。socketRcvbufSize);}把网络连接写数据服务线程和读数据服务线程都构建好端口写服务this。writeSocketServicenewWriteSocketService(this。socketChannel);端口读服务this。readSocketServicenewReadSocketService(this。socketChannel);增加haService中的连接数字段this。haService。getConnectionCount()。incrementAndGet();}
  监听slave日志同步进度和同步日志
  WriteSocketService监听的是OPWRITE事件,注册的端口就是在HAService中开启的端口。classWriteSocketServiceextendsServiceThread{privatefinalSelectorselector;privatefinalSocketChannelsocketChannel;写数据头大小,12个字节privatefinalintheaderSize84;写数据头缓冲区privatefinalByteBufferbyteBufferHeaderByteBuffer。allocate(headerSize);从哪个位置开始传输privatelongnextTransferFromWhere1;对内存映射区域查询数据结果privateSelectMappedBufferResultselectMappedBufferResult;最后一次写数据是否完成了,默认trueprivatebooleanlastWriteOvertrue;最后一次写数据时间戳privatelonglastWriteTimestampSystem。currentTimeMillis();publicWriteSocketService(finalSocketChannelsocketChannel)throwsIOException{this。selectorRemotingUtil。openSelector();搞一个selector多路复用组件this。socketChannelsocketChannel;this。socketChannel。register(this。selector,SelectionKey。OPWRITE);把这个网络连接注册到selector多路复用组件里去就可以了this。setDaemon(true);}Overridepublicvoidrun(){HAConnection。log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{如果说针对你的从节点此时可以执行写数据动作this。selector。select(1000);如果slave的读请求为1表示没有slave发出写请求,不需要处理if(1HAConnection。this。slaveRequestOffset){Thread。sleep(10);continue;}nextTransferFromWhere为1表示初始第一次同步,需要进行计算if(1this。nextTransferFromWhere){如果slave同步完成则下次同步从CommitLog的最大偏移量开始同步if(0HAConnection。this。slaveRequestOffset){获取到commitlog里面最大的偏移量longmasterOffsetHAConnection。this。haService。getDefaultMessageStore()。getCommitLog()。getMaxOffset();masterOffsetmasterOffset(masterOffsetHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getMappedFileSizeCommitLog());if(masterOffset0){masterOffset0;}可以去设置一下当前需要从哪个位置开始来传输数据this。nextTransferFromWheremasterOffset;}else{设置下次同步的位置,为salve读请求的位置this。nextTransferFromWhereHAConnection。this。slaveRequestOffset;}log。info(mastertransferdatafromthis。nextTransferFromWheretoslave〔HAConnection。this。clientAddr〕,andslaverequestHAConnection。this。slaveRequestOffset);}上次同步是否完成if(this。lastWriteOver){上一次写数据时间戳到现在截止的差值longintervalHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now()this。lastWriteTimestamp;把这个时间差值跟ha发送心跳间隔做一个比对,如果超过了那个间隔,心跳间隔为5000毫秒if(intervalHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaSendHeartbeatInterval()){BuildHeader开始去构建请求头,先设置我们要从哪个位置开始传输数据,心跳请求大小为12字节this。byteBufferHeader。position(0);this。byteBufferHeader。limit(headerSize);this。byteBufferHeader。putLong(this。nextTransferFromWhere);this。byteBufferHeader。putInt(0);this。byteBufferHeader。flip();进行消息同步this。lastWriteOverthis。transferData();if(!this。lastWriteOver)continue;}}如果说是上一次传输还没完毕此时就传输数据就可以了else{this。lastWriteOverthis。transferData();如果还没同步完成则继续if(!this。lastWriteOver)continue;}构建完了header以后,此时就需要查询commitlog里面指定位置开始的一个数据片段SelectMappedBufferResultselectResultHAConnection。this。haService。getDefaultMessageStore()。getCommitLogData(this。nextTransferFromWhere);if(selectResult!null){intsizeselectResult。getSize();检查要同步消息的长度,是不是大于单次同步的最大限制默认为32kbif(sizeHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaTransferBatchSize()){sizeHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaTransferBatchSize();}longthisOffsetthis。nextTransferFromWhere;this。nextTransferFromWheresize;selectResult。getByteBuffer()。limit(size);this。selectMappedBufferResultselectResult;BuildHeaderthis。byteBufferHeader。position(0);this。byteBufferHeader。limit(headerSize);this。byteBufferHeader。putLong(thisOffset);this。byteBufferHeader。putInt(size);this。byteBufferHeader。flip();this。lastWriteOverthis。transferData();}else{没有数据要传输呢?此时就是可以让我们的组件等待一会儿,等待100毫秒HAConnection。this。haService。getWaitNotifyObject()。allWaitForRunning(100);}}catch(Exceptione){HAConnection。log。error(this。getServiceName()servicehasexception。,e);break;}}对等待通知组件做一个处理HAConnection。this。haService。getWaitNotifyObject()。removeFromWaitingThreadTable();if(this。selectMappedBufferResult!null){this。selectMappedBufferResult。release();}this。makeStop();readSocketService。makeStop();haService。removeConnection(HAConnection。this);SelectionKeyskthis。socketChannel。keyFor(this。selector);if(sk!null){sk。cancel();}try{this。selector。close();this。socketChannel。close();}catch(IOExceptione){HAConnection。log。error(,e);}HAConnection。log。info(this。getServiceName()serviceend);}privatebooleantransferData()throwsException{intwriteSizeZeroTimes0;WriteHeader心跳的头没写满,先写头while(this。byteBufferHeader。hasRemaining()){通过nio网络连接可以把请求头先发送出去intwriteSizethis。socketChannel。write(this。byteBufferHeader);if(writeSize0){writeSizeZeroTimes0;记录上次写的时间this。lastWriteTimestampHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now();}elseif(writeSize0){重试3次则不再重试if(writeSizeZeroTimes3){break;}}else{thrownewException(hamasterwriteheadererror0);}}如果要同步的日志为null,则直接返回这次同步的结果是否同步完成if(nullthis。selectMappedBufferResult){return!this。byteBufferHeader。hasRemaining();}writeSizeZeroTimes0;WriteBody填充请求体if(!this。byteBufferHeader。hasRemaining()){他会有一个要传输的一个commitlog里面的内存区域查询出来的数据片段while(this。selectMappedBufferResult。getByteBuffer()。hasRemaining()){把commitlog里面要传输的数据片段就写入到nio网络连接里去intwriteSizethis。socketChannel。write(this。selectMappedBufferResult。getByteBuffer());if(writeSize0){writeSizeZeroTimes0;this。lastWriteTimestampHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now();}elseif(writeSize0){重试3次if(writeSizeZeroTimes3){break;}}else{thrownewException(hamasterwritebodyerror0);}}}booleanresult!this。byteBufferHeader。hasRemaining()!this。selectMappedBufferResult。getByteBuffer()。hasRemaining();释放缓存if(!this。selectMappedBufferResult。getByteBuffer()。hasRemaining()){this。selectMappedBufferResult。release();this。selectMappedBufferResultnull;}returnresult;}OverridepublicStringgetServiceName(){returnWriteSocketService。class。getSimpleName();}Overridepublicvoidshutdown(){super。shutdown();}}
  主要的逻辑如下:如果slave进行了日志偏移量的汇报,判断是不是第一次的进行同步以及对应的同步进度。设置下一次的同步位置检查上次同步是不是已经完成了,检查两次同步的周期是不是超过心跳间隔,如果是的则需要把心跳信息放到返回的头里面,然后进行消息同步。如果上次同步还没完成,则等待上次同步完成之后再继续从Master本地读取CommitLog的最大偏移量,根据上次同步的位置开始从CommitLog获取日志信息,然后放到缓存中如果缓存的大小大于单次同步的最大大小haTransferBatchSize默认是32kb,那么只同步32kb大小的日志。如果缓存为null,则等待100毫秒
  根据同步进度来唤醒刷盘CommitLog线程
  ReadSocketService的作用主要是:根据Slave推送的日志同步进度,来唤醒HAService的GroupTransferService然后进一步唤醒CommitLog的日志刷盘线程。这里主要看run方法和processReadEvent方法。classReadSocketServiceextendsServiceThread{读数据最大缓冲区大小,默认是1mbprivatestaticfinalintREADMAXBUFFERSIZE10241024;多路复用监听组件privatefinalSelectorselector;nio网络连接privatefinalSocketChannelsocketChannel;读数据缓冲区privatefinalByteBufferbyteBufferReadByteBuffer。allocate(READMAXBUFFERSIZE);处理消息位置privateintprocessPosition0;最近一次读取到数据的时间戳privatevolatilelonglastReadTimestampSystem。currentTimeMillis();publicReadSocketService(finalSocketChannelsocketChannel)throwsIOException{this。selectorRemotingUtil。openSelector();this。socketChannelsocketChannel;this。socketChannel。register(this。selector,SelectionKey。OPREAD);this。setDaemon(true);}Overridepublicvoidrun(){HAConnection。log。info(this。getServiceName()servicestarted);任务是否结束while(!this。isStopped()){try{设置selector的阻塞时间,如果说从节点确实发送了请求过来this。selector。select(1000);处理salver读取消息的事件booleanokthis。processReadEvent();if(!ok){HAConnection。log。error(processReadEventerror);break;}检查此次处理时间是否超过心跳连接时间longintervalHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now()this。lastReadTimestamp;if(intervalHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaHousekeepingInterval()){log。warn(hahousekeeping,foundthisconnection〔HAConnection。this。clientAddr〕expired,interval);break;}}catch(Exceptione){HAConnection。log。error(this。getServiceName()servicehasexception。,e);break;}}this。makeStop();writeSocketService。makeStop();haService。removeConnection(HAConnection。this);HAConnection。this。haService。getConnectionCount()。decrementAndGet();SelectionKeyskthis。socketChannel。keyFor(this。selector);if(sk!null){sk。cancel();}try{this。selector。close();this。socketChannel。close();}catch(IOExceptione){HAConnection。log。error(,e);}HAConnection。log。info(this。getServiceName()serviceend);}OverridepublicStringgetServiceName(){returnReadSocketService。class。getSimpleName();}privatebooleanprocessReadEvent(){intreadSizeZeroTimes0;如果说读取数据缓冲区已经没有空间了,此时就做一个flip,处理位置复位为0if(!this。byteBufferRead。hasRemaining()){读请求缓冲转变为读取模式。this。byteBufferRead。flip();this。processPosition0;}但凡是读取数据缓冲区还有空间,就进入循环while(this。byteBufferRead。hasRemaining()){try{一次性从网络连接里读取最大可能是1mb的读取缓冲空间的内容intreadSizethis。socketChannel。read(this。byteBufferRead);if(readSize0){readSizeZeroTimes0;更新一下本次读取到数据的时间戳this。lastReadTimestampHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now();读取缓冲区位置处理位置,是大于等于8,至少说是读到了8个字节为什么是8个字节,因为salver向master发去拉取请求时,偏移量固定为8if((this。byteBufferRead。position()this。processPosition)8){获取消息开始的位置,做一个运算,用读取缓冲区位置读取缓冲区位置8intposthis。byteBufferRead。position()(this。byteBufferRead。position()8);读取8个直接就是从节点返回的东西,就是从节点完成消息同步的最新的偏移量longreadOffsetthis。byteBufferRead。getLong(pos8);设置处理的位置this。processPositionpos;把我们的slave的ack偏移量去做一个设置HAConnection。this。slaveAckOffsetreadOffset;如果slave的读请求偏移量小于0表示同步完成了if(HAConnection。this。slaveRequestOffset0){重新设置slave的读请求的偏移量HAConnection。this。slaveRequestOffsetreadOffset;log。info(slave〔HAConnection。this。clientAddr〕requestoffsetreadOffset);}elseif(HAConnection。this。slaveAckOffsetHAConnection。this。haService。getDefaultMessageStore()。getMaxPhyOffset()){log。warn(slave〔{}〕requestoffset{}greaterthanlocalcommitLogoffset{}。,HAConnection。this。clientAddr,HAConnection。this。slaveAckOffset,HAConnection。this。haService。getDefaultMessageStore()。getMaxPhyOffset());returnfalse;}如果说从节点已经接收到了一些数据之后,唤醒阻塞的线程,我们就可以通知HAService去传输一些数据给从节点在消息的主从同步选择的模式是同步的时候,会唤醒被阻塞的消息写入的线程HAConnection。this。haService。notifyTransferSome(HAConnection。this。slaveAckOffset);}}elseif(readSize0){如果数据为0超过3次,表示同步完成,直接结束if(readSizeZeroTimes3){break;}}else{log。error(readsocket〔HAConnection。this。clientAddr〕0);returnfalse;}}catch(IOExceptione){log。error(processReadEventexception,e);returnfalse;}}returntrue;}}
  整体的逻辑如下:每1s执行一次事件就绪选择,然后调用processReadEvent方法处理读请求,读取从服务器的拉取请求获取slave已拉取偏移量,因为有新的从服务器反馈拉取进度,需要通知某些生产者以便返回,因为如果消息发送使用同步方式,需要等待将消息复制到从服务器,然后才返回,故这里需要唤醒相关线程去判断自己关注的消息是否已经传输完成。也就是HAService的GroupTransferService如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。五、总结
  RocketMQ的主从同步之间的核心类就是HAService和HAConnection和其中的几个子类。结合前面的那个图可以简单的理解一下。

张爱玲不生育也是一种善良如果孩子的出生,是为了继承自己的劳碌,恐慌,贫困,那么不生也是一种善良。这句话出自著名作家张爱玲之口,其意味深长,令人深思。孩子的出生,本应该是一个家庭最幸福的时刻,然而对于某些家打造共性产业园,这家老牌国企将实施工改丨对话中山工改一线当被问起企业想要自改的原因,广东颐丰食品股份有限公司(下称颐丰食品)董事长温振明说起了自己去年9月上任后,第一次进入厂区时的感受。说明厂区气味收集和处理能力有待提升。一走进去就闻到(经济)云南石屏小豆腐带动大产业近年来,云南省石屏县通过加快豆腐产业转型升级打造豆制品产业集群等,进一步擦亮石屏豆腐名片。据了解,目前石屏县每天生产豆制品150吨,有效带动群众就业增收,助力乡村振兴。3月22日,顺应前沿技术新趋势,打造产业发展新高地3月21日,市经济信息化委副主任汤文侃带队赴斯泰兰蒂斯亚太投资有限公司(以下简称斯泰兰蒂斯)中电科机器人有限公司(以下简称中电科)和上海交通大学人工智能研究院(以下简称研究院)开展建造国产大型邮轮让过路经济迈向产业链经济央视网消息大型邮轮被称为能在海上航行的星级酒店。目前,服役中的大型邮轮都是国外生产的,但很快这个情况就将改变。我国在上海建造的第一艘大型邮轮正如火如荼地开展内部装修和系统调试,预计曾在雷曼兄弟和安达信工作的这位硅谷银行高管,真的是金融界灭霸吗?网友对于詹代尔的评价只能当作段子来看。文海上客美国硅谷银行(SVB)破产之后,54岁的SVB高管约瑟夫詹代尔(JosephGentile)突然一夜爆红,被网友称为金融界灭霸。原来,华商记者帮约定45个工作日退完23万元房款8个多月了只退7万多购房者退房款,与开发商签协议约定45个工作日退完23万余元房款,但8个多月过去了只退了7万多元。2021年7月,陈女士认购了一套位于西咸新区橙天地文化中心的公寓,当时交了50房款2外贸订单暴跌,制造业失守,背后的真正原因到底是什么?今年大家讨论最多的一个词,估计就是外贸,因为今年大多数出口企业的订单情况不太好,甚至有的工厂订单不足去年的一半,可以说,2023年的外贸,开局就没有开好,甚至有人说今年的中国外贸进这几款滑梯放在移动水上乐园里效果很不错哦孩子是家庭的开心果,是家里的掌上明珠,是家里的调和剂。把孩子带到移动水上乐园来,那就是一个目的,就是要玩的开心玩的尽兴。纵观市场上那么多的充气嬉水主题乐园,哪几款比较深受孩子的喜爱V观财报ST榕泰原实控人拟被罚420万市场禁入,干啥了?中新经纬3月22日电22日晚间,广东榕泰实业股份有限公司(证券简称ST榕泰)公告,收到广东证监局行政处罚及市场禁入事先告知书(下称告知书),拟对原实际控制人杨宝生罚款420万元及市任泽平2023年的经济肯定是好的,房地产将首现分化式复苏3月21日至23日,以商业新变局,一起向未来为主题的中国商业地产行业发展论坛(第十八届)2023年会在深圳蛇口举行。著名经济学家任泽平指出,今年一定是复苏之年,经济肯定是好的,老天
图片征集乐享四季景,生态云龙湖主题创作活动向你发出邀请为更好地展示徐州云龙湖景区的四季美景,头条摄影头条图片联合中国风景名胜区协会江苏省徐州市云龙湖风景名胜区管理委员会中国摄影报等单位发起乐享四季景生态云龙湖主题创作活动。带话题云龙湖明朝李复亨年近八十才考中进士,还会有作为吗?明朝李复亨年近八十才考中进士,被任命为县主簿。一次他带人护送官马入府,夜晚投宿客栈时,有贼人将官马全部杀死了。对此李复亨认为不盗马获利,而将马杀死,肯定是与我有仇的仇家所为。于是他三型翅膀保障中国空间站用电无忧来源新华网新华社海南文昌10月31日电题三型翅膀保障中国空间站用电无忧新华社记者李国利赵叶苹米思源梦天实验舱发射圆满成功,中国空间站T字基本构型即将亮相。据中国航天科技集团相关专家建筑之美!游信创园,逛通明湖压了几十张照片,一起发出去闲来无事,游游逛逛楼是城市的山,路是城市的水,钢筋水泥塑造,背后是人类的审美大自然有鬼斧神工,人类同样有工匠之美信创园通明湖,走走拍拍,几十张图压在电脑里,不如一起放出去分享信创园今天对房地产没信心了现在的救市政策,基本上是治标不治本,无非是降低利息降低首付加大贷款额度或者取消限购等等市场已明确回应此次救市失败,大家缺的不是这一点儿利息也不缺这一点儿首付更不缺贷款额度,而是缺乏A股突然大涨,11月开门红行情会延续吗?周三,股市预测所有股民注意了!不管你现在看多还是看空,只要你没有离开市场,请你认真看完这篇文章,新进来的朋友加个关注,老朋友点赞,转发周二,三大股指高开高走,盘中4400个股上涨,涨停个股达89今日A股大涨75点,发生什么事?接下来的行情会怎么走今天是11月1日星期二,11月的第一天,中国股市突然走出了绝地反击的行情,拉出一根标准的大阳线,究竟是怎么回事?贵州茅台泸州老窖大涨8个点以上,五粮液大涨近7个点,白酒三大王牌集体俄欧能源博弈,欧洲宣告胜利?天然气价格跌到负数,不是什么好事欧洲天然气多到要倒贴钱往外卖了,这是什么情况?最近欧洲天然气期货价格暴跌,比八月底时下降了70,现货市场更是出现了诡异的一幕,价格在前几天居然跌破了0。也就是说,如果有人来买天然气美国经济并未远离衰退风险来源中国经济网经济日报出口拉动,特别是欧洲掏腰包大量进口能源和军品等给美国救了急,美国经济在三季度实现回升。但是,消费房地产市场制造业服务业等指标仍在走弱,美国经济并未远离衰退风险俄罗斯石油出口,绕开西方制裁,超六成俄油装船,顺利驶向东方因为受到西方制裁的限制,装载俄罗斯出口石油的大批油轮难以从北大西洋黑海地中海航线行驶,因此纷纷绕道驶向东方。据外电统计,目前俄罗斯出口的石油当中,有65经由上述方式顺利运抵新加坡,七部门新规定明星不得为药品保健品等广告代言,但为何屡禁不止?记者李科文编辑谢欣10月31日,市场监管总局等七部门联合印发关于进一步规范明星广告代言活动的指导意见(下简称指导意见),其中明确,医疗药品医疗器械保健食品和特殊医学用途配方食品是明
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网