RocketMQ源码分析之主从同步服务组件HAService
头条创作挑战赛
上一篇:RocketMQ源码分析之核心磁盘数据结构CommitLog一、前言
前面介绍了RocketMQ的CommitLog文件相关的类分析CommitLog物理日志相关的CommitLog类。其中有介绍到消息刷盘时高可用对应的submitReplicaRequest方法,submitReplicaRequest方法中如果配置的服务器的角色为SYNCMASTER(从master同步),就会等待主从之间消息同步的进度达到设定的值之后才正常返回,如果超时则返回同步超时;提交复制请求publicCompletableFuturePutMessageStatussubmitReplicaRequest(AppendMessageResultresult,MessageExtmessageExt){syncmaster的话,此时跟我们的dleger关系不大,主从同步,如果说主节点挂了以后,还得靠从节点手工运维切换成主节点if(BrokerRole。SYNCMASTERthis。defaultMessageStore。getMessageStoreConfig()。getBrokerRole()){HAServiceservicethis。defaultMessageStore。getHaService();if(messageExt。isWaitStoreMsgOK()){通过HAService判断一下从节点是否ok检查slave同步的位置是否小于最大容忍的同步落后偏移量,如果是的则进行刷盘if(service。isSlaveOK(result。getWroteBytes()result。getWroteOffset())){GroupCommitRequestrequestnewGroupCommitRequest(result。getWroteOffset()result。getWroteBytes(),this。defaultMessageStore。getMessageStoreConfig()。getSlaveTimeout()主从同步超时时间默认是3s);service。putRequest(request);service。getWaitNotifyObject()。wakeupAll();returnrequest。future();可以通过future来等待主从同步完成}else{此时可能是从节点不可用returnCompletableFuture。completedFuture(PutMessageStatus。SLAVENOTAVAILABLE);}}}returnCompletableFuture。completedFuture(PutMessageStatus。PUTOK);}publicbooleanisWaitStoreMsgOK(){Stringresultthis。getProperty(MessageConst。PROPERTYWAITSTOREMSGOK);if(nullresult){returntrue;}returnBoolean。parseBoolean(result);}
这段代码的主要逻辑如下:如果服务器的角色设置为SYNCMASTER,则进行下一步,否则直接跳过主从同步;获取HAService对象,检查消息是否本地存储完毕,如果没有则结束,否则进入下一步;检查slave同步的位置是否小于最大容忍的同步落后偏移量参数haSlaveFallbehindMax,如果是的则进行主从同步刷盘。如果没有则返回slave不可用的状态;将消息落盘的最大物理偏移量也就是CommitLog上的偏移量作为参数构建一个GroupCommitRequest对象,然后提交到HAService;最多等待syncFlushTimeout长的时间,默认为5秒。在5秒内获取结果,然后根据结果判断是否返回超时;二、同步流程
上面那段代码比较简单,因为主从的逻辑全部交给了HAService和HAConnection两个类处理了。这里先简单介绍一下整个同步的流程(同步模式)
三、高可用服务HAService
HAService是在RocketMQ的Broker启动的时候就会创建的,而创建的点在DefaultMessageStore这个消息存储相关的综合类中,在这个类的构造器中会创建HAService无论当前的Broker是什么角色。
这里需要说明的是Broker中的Master和Slaver两个角色,代码都是一样的,只不过是在实际执行的时候,走的分支不一样。四、源码分析内部属性;构造函数;启动内部类;接受Slave连接处理;检查同步进度和唤醒CommitLog刷盘线程;主从同步客户端组件;Master同步日志(监听slave日志同步进度和同步日志、根据同步进度来唤醒刷盘CommitLog线程);1、内部属性
在HAService中有几个比较重要的属性,这里需要简单的介绍一下:
参数
说明
connectionList
连接到master的slave连接列表,用于管理连接
acceptSocketService
用于接收连接用的服务,只监听OPACCEPT事件,监听到连接事件时候,创建HAConnection来处理读写请求事件
waitNotifyObject
一个消费等待模型类,用于处理高可用线程和CommitLog的刷盘线程交互
push2SlaveMaxOffset
master同步到slave的偏移量
groupTransferService
主从同步的检测服务,用于检查是否同步完成
haClient
高可用的服务,slave用来跟master建立连接,像master汇报偏移量和拉取消息
主从同步服务publicclassHAService{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。STORELOGGERNAME);连接数量privatefinalAtomicIntegerconnectionCountnewAtomicInteger(0);主从建立的网络连接,因为一个master可能有多个slaveprivatefinalListHAConnectionconnectionListnewLinkedList();接收slave的socket服务privatefinalAcceptSocketServiceacceptSocketService;所属的消息存储组件privatefinalDefaultMessageStoredefaultMessageStore;线程阻塞与唤醒同步对象privatefinalWaitNotifyObjectwaitNotifyObjectnewWaitNotifyObject();推送到slave最大偏移量privatefinalAtomicLongpush2SlaveMaxOffsetnewAtomicLong(0);组传输服务privatefinalGroupTransferServicegroupTransferService;主从同步客户端组件privatefinalHAClienthaClient;}2、构造函数
HAService只有一个构造器。逻辑也比较简单,创建一个AcceptSocketService开放一个端口为10912的端口用于slave来简历连接,同时启动主从信息同步的任务groupTransferService用于接收CommitLog在高可用刷盘时提交任务。publicHAService(finalDefaultMessageStoredefaultMessageStore)throwsIOException{this。defaultMessageStoredefaultMessageStore;创建,接受连接的服务,开放的端口号为10912this。acceptSocketServicenewAcceptSocketService(defaultMessageStore。getMessageStoreConfig()。getHaListenPort());创建主从信息同步的线程this。groupTransferServicenewGroupTransferService();this。haClientnewHAClient();}3、启动内部类
HAService在创建之后,会在DefaultMessageStore中调用其start方法,这个方法会启动其内部的几个内部类,用来主从同步;publicvoidstart()throwsException{接受连接的服务,开启端口,设置监听的事件this。acceptSocketService。beginAccept();开启服务不断检查是否有连接this。acceptSocketService。start();开启groupTransferService,接受CommitLog的主从同步请求this。groupTransferService。start();开启haClient,用于slave来建立与Master连接和同步this。haClient。start();}4、接受Slave连接
AcceptSocketService这个类在Broker的Master和Slaver两个角色启动时都会创建,只不过区别是Slaver开启端口之后,并不会有别的Broker与其建立连接。因为只有在Broker的角色是Slave的时候才会指定要连接的Master地址。这个逻辑,在Broker启动的时候BrokerController类中运行的。主要是基于nio来实现的classAcceptSocketServiceextendsServiceThread{监听端口地址privatefinalSocketAddresssocketAddressListen;nio里面的网络监听服务端privateServerSocketChannelserverSocketChannel;多路复用监听组件privateSelectorselector;给他传入一个监听端口号,构建好监听地址publicAcceptSocketService(finalintport){this。socketAddressListennewInetSocketAddress(port);}Startslisteningtoslaveconnections。throwsExceptionIffails。publicvoidbeginAccept()throwsException{打开nio网络监听服务端this。serverSocketChannelServerSocketChannel。open();打开selector多路复用组件this。selectorRemotingUtil。openSelector();设置我们的socket重用地址是truethis。serverSocketChannel。socket()。setReuseAddress(true);设置监听我们指定的端口号this。serverSocketChannel。socket()。bind(this。socketAddressListen);配置是否nio阻塞模式是falsethis。serverSocketChannel。configureBlocking(false);把nio网络监听服务器注册到selector多路复用组件里去this。serverSocketChannel。register(this。selector,SelectionKey。OPACCEPT);}{inheritDoc}Overridepublicvoidshutdown(finalbooleaninterrupt){super。shutdown(interrupt);try{this。serverSocketChannel。close();this。selector。close();}catch(IOExceptione){log。error(AcceptSocketServiceshutdownexception,e);}}{inheritDoc}Overridepublicvoidrun(){log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{通过selector多路复用组件监听我们的nio网络服务器是否有连接事件到达this。selector。select(1000);如果说确实是有从节点来连接我们,此时就会拿到一批selectedKeysSetSelectionKeyselectedthis。selector。selectedKeys();if(selected!null){每一个新建立的连接都对应了一个selectionKey,就是一个连接的key句柄for(SelectionKeyk:selected){如果说过来的网络事件就是opaccept连接事件if((k。readyOps()SelectionKey。OPACCEPT)!0){通过调用nio网络监听服务器的accept函数,就可以完成tcp连接,获取到一个新的连接SocketChannelsc((ServerSocketChannel)k。channel())。accept();if(sc!null){HAService。log。info(HAServicereceivenewconnection,sc。socket()。getRemoteSocketAddress());try{把获取到的从节点连接封装成一个HAConnectionHAConnectionconnnewHAConnection(HAService。this,sc);同时启动这个从节点连接组件conn。start();把从节点连接加入到自己的连接列表里去HAService。this。addConnection(conn);}catch(Exceptione){log。error(newHAConnectionexception,e);sc。close();}}}else{log。warn(Unexpectedopsinselectk。readyOps());}}一次selectedKeys处理完毕了,就必须做一个clearselected。clear();}}catch(Exceptione){log。error(this。getServiceName()servicehasexception。,e);}}log。info(this。getServiceName()serviceend);}{inheritDoc}OverridepublicStringgetServiceName(){returnAcceptSocketService。class。getSimpleName();}}
beginAccept方法就是开启Socket,绑定10912端口,然后注册selector和指定监听的事件为OPACCEPT也就是建立连接事件。对应的IO模式为NIO模式。主要看其run方法,这个方法是Master用来接受Slave连接的核心。
每过一秒检查一次是否有连接事件,如果有则建立连接,并把建立起来的连接加入到连接列表中进行保存。一直循环这个逻辑。5、检查同步进度和唤醒CommitLog刷盘线程
GroupTransferService是CommitLog消息刷盘的类CommitLog与HAService打交道的一个中间类。在CommitLog中进行主从刷盘的时候,会创建一个CommitLog。GroupCommitRequest的内部类,这个类包含了当前Broker最新的消息的物理偏移量信息。然后把这个类丢给GroupTransferService处理,然后唤醒GroupTransferService。起始这个逻辑跟CommitLog内部的GroupCommitService逻辑一样。只不过对于同步部分的逻辑不一样,这里可以参考前面的文章存储部分(3)CommitLog物理日志相关的CommitLog类。
先看run方法在run方法中会将传入的CommitLog。GroupCommitRequest从requestsWrite转换到requestsRead中然后进行处理检查对应的同步请求的进度。检查的逻辑在doWaitTransfer中publicvoidrun(){log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{这里进入等待,等待被唤醒,进入等待之前会调用onWaitEnd方法,然后调用swapRequests方法把requestsWrite转换为requestsReadthis。waitForRunning(10);进行请求处理this。doWaitTransfer();}catch(Exceptione){log。warn(this。getServiceName()servicehasexception。,e);}}log。info(this。getServiceName()serviceend);}
再看doWaitTransfer方法1、比较Master推送到Slave的偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog。GroupCommitRequest中的偏移量2、计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒3、如果第一步结果为true,则返回结果为PUTOK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSHSLAVETIMEOUT。同时会唤醒CommitLog的刷盘线程。privatevoiddoWaitTransfer(){如果读请求不为空if(!this。requestsRead。isEmpty()){for(CommitLog。GroupCommitRequestreq:this。requestsRead){如果push到slave的偏移量大于等于请求中的消息的最大偏移量表示slave同步完成booleantransferOKHAService。this。push2SlaveMaxOffset。get()req。getNextOffset();计算这次同步超时的时间点同步的超时时间段为5slongdeadLinereq。getDeadLine();如果没有同步完毕,并且还没达到超时时间,则等待1秒之后检查同步的进度while(!transferOKdeadLineSystem。nanoTime()0){this。notifyTransferObject。waitForRunning(1000);transferOKHAService。this。push2SlaveMaxOffset。get()req。getNextOffset();}超时或者同步成功的时候唤醒主线程req。wakeupCustomer(transferOK?PutMessageStatus。PUTOK:PutMessageStatus。FLUSHSLAVETIMEOUT);}this。requestsReadnewLinkedList();}}
主要逻辑如下:比较Master推送到Slave的偏移量push2SlaveMaxOffset是不是大于传进来的CommitLog。GroupCommitRequest中的偏移量。计算本次同步超时的时间节点,时间为当前时间加上参数系统配置参数syncFlushTimeout默认为5秒。如果第一步结果为true,则返回结果为PUTOK。如果第一步为false,则每过一秒检查一次结果,如果超过5次了还没同步完成,则表示超时了那么返回结果为FLUSHSLAVETIMEOUT。同时会唤醒CommitLog的刷盘线程。6、主从同步客户端组件
前面我们说到了只有是Salve角色的Broker才会真正的配置Master的地址,而HAClient是需要Master地址的,因此这个类真正在运行的时候只有Slave才会真正的使用到。
先看看核心的参数信息从节点那边会用这个线程跟我们主节点建立连接,执行数据读写classHAClientextendsServiceThread{读数据缓冲区大小,4mbprivatestaticfinalintREADMAXBUFFERSIZE102410244;master地址privatefinalAtomicReferenceStringmasterAddressnewAtomicReference();从节点收到数据以后会返回一个8个字节的ack偏移量,固定8个字节privatefinalByteBufferreportOffsetByteBuffer。allocate(8);nio网络连接privateSocketChannelsocketChannel;nio多路复用组件privateSelectorselector;最近一次写数据时间戳privatelonglastWriteTimestampSystem。currentTimeMillis();当前上报过的偏移量privatelongcurrentReportedOffset0;分发位置privateintdispatchPosition0;读数据缓冲区privateByteBufferbyteBufferReadByteBuffer。allocate(READMAXBUFFERSIZE);备份数据缓冲区privateByteBufferbyteBufferBackupByteBuffer。allocate(READMAXBUFFERSIZE);}
基本上都是缓冲相关的配置。这里主要分析的是run方法中的逻辑publicvoidrun(){log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{尝试去连接我们的master节点if(this。connectMaster()){是否要上报一下ack偏移量,间隔需要大于心跳的时间(5s)if(this。isTimeToReportOffset()){向master汇报当前salve的CommitLog的最大偏移量,并记录这次的同步时间booleanresultthis。reportSlaveMaxOffset(this。currentReportedOffset);如果汇报完了就关闭连接if(!result){this。closeMaster();}}如果说人家给你传输过来了数据this。selector。select(1000);向master拉取的信息booleanokthis。processReadEvent();if(!ok){this。closeMaster();}再次同步slave的偏移量如果,最新的偏移量大于已经汇报的情况下if(!reportSlaveMaxOffsetPlus()){continue;}检查时间距离上次同步进度的时间间隔longintervalHAService。this。getDefaultMessageStore()。getSystemClock()。now()this。lastWriteTimestamp;如果间隔大于心跳的时间,那么就关闭if(intervalHAService。this。getDefaultMessageStore()。getMessageStoreConfig()。getHaHousekeepingInterval()){log。warn(HAClient,housekeeping,foundthisconnection〔this。masterAddress〕expired,interval);this。closeMaster();log。warn(HAClient,masternotresponsesometime,socloseconnection);}}else{等待this。waitForRunning(10005);}}catch(Exceptione){log。warn(this。getServiceName()servicehasexception。,e);this。waitForRunning(10005);}}log。info(this。getServiceName()serviceend);}
主要的逻辑如下:连接master,如果当前的broker角色是master,那么对应的masterAddress是空的,不会有后续逻辑。如果是slave,并且配置了master地址,则会进行连接进行后续逻辑处理检查是否需要向master汇报当前的同步进度,如果两次同步的时间小于5s,则不进行同步。每次同步之间间隔在5s以上,这个5s是心跳连接的间隔参数为haSendHeartbeatInterval向master汇报当前salve的CommitLog的最大偏移量,并记录这次的同步时间从master拉取日志信息,主要就是进行消息的同步,同步出问题则关闭连接再次同步slave的偏移量,如果最新的偏移量大于已经汇报的情况下则从步骤1重头开始
这里分析完了run方法,然后就要分析主要的日志同步的逻辑了,这个逻辑在processReadEvent方法中privatebooleanprocessReadEvent(){intreadSizeZeroTimes0;如果读取缓存还有没读取完,则一直读取while(this。byteBufferRead。hasRemaining()){try{可以把主从同步过来的数据读取到缓冲区里去intreadSizethis。socketChannel。read(this。byteBufferRead);if(readSize0){readSizeZeroTimes0;执行一次分发读请求booleanresultthis。dispatchReadRequest();if(!result){log。error(HAClient,dispatchReadRequesterror);returnfalse;}}elseif(readSize0){if(readSizeZeroTimes3){break;}}else{log。info(HAClient,processReadEventreadsocket0);returnfalse;}}catch(IOExceptione){log。info(HAClient,processReadEventreadsocketexception,e);returnfalse;}}returntrue;}privatebooleandispatchReadRequest(){请求的头信息finalintmsgHeaderSize84;phyoffsetsizewhile(true){获取分发的偏移差intdiffthis。byteBufferRead。position()this。dispatchPosition;如果偏移差大于头大小,说明存在请求体if(diffmsgHeaderSize){获取主master的最大偏移量longmasterPhyOffsetthis。byteBufferRead。getLong(this。dispatchPosition);获取消息体intbodySizethis。byteBufferRead。getInt(this。dispatchPosition8);获取salve的最大偏移量longslavePhyOffsetHAService。this。defaultMessageStore。getMaxPhyOffset();if(slavePhyOffset!0){if(slavePhyOffset!masterPhyOffset){log。error(masterpushedoffsetnotequalthemaxphyoffsetinslave,SLAVE:slavePhyOffsetMASTER:masterPhyOffset);returnfalse;}}如果偏移差大于消息头和消息体大小。则读取消息体if(diff(msgHeaderSizebodySize)){byte〔〕bodyDatabyteBufferRead。array();intdataStartthis。dispatchPositionmsgHeaderSize;把你读取到的数据追加到commitlog里面去HAService。this。defaultMessageStore。appendToCommitLog(masterPhyOffset,bodyData,dataStart,bodySize);记录分发的位置this。dispatchPositionmsgHeaderSizebodySize;if(!reportSlaveMaxOffsetPlus()){returnfalse;}continue;}}if(!this。byteBufferRead。hasRemaining()){this。reallocateByteBuffer();}break;}returntrue;}7、Master同步日志
前面说过,在HAService的AcceptSocketService内部类中,Master会在建立连接的时候创建HAConnection用来处理读写事件。这里主要介绍构造函数和内部类就能了解原理了。
成员变量publicclassHAConnection{privatestaticfinalInternalLoggerlogInternalLoggerFactory。getLogger(LoggerName。STORELOGGERNAME);所属的HA高可用服务组件privatefinalHAServicehaService;nio网络连接privatefinalSocketChannelsocketChannel;跟我们建立连接的HA客户端组件的地址,从节点地址privatefinalStringclientAddr;网络连接写数据服务线程privateWriteSocketServicewriteSocketService;网络连接读数据服务线程privateReadSocketServicereadSocketService;从节点请求获取的偏移量privatevolatilelongslaveRequestOffset1;从节点同步数据后ack的偏移量privatevolatilelongslaveAckOffset1;}
构造函数publicHAConnection(finalHAServicehaService,finalSocketChannelsocketChannel)throwsIOException{指定所属的HAServicethis。haServicehaService;指定的NIO的socketChannelthis。socketChannelsocketChannel;客户端的地址this。clientAddrthis。socketChannel。socket()。getRemoteSocketAddress()。toString();这是为非阻塞this。socketChannel。configureBlocking(false);是否启动SOLINGERSOLINGER作用:设置函数close()关闭TCP连接时的行为。缺省close()的行为是如果有数据残留在socket发送缓冲区中则系统将继续发送这些数据给对方,等待被确认,然后返回。this。socketChannel。socket()。setSoLinger(false,1);是否开启TCPNODELAYthis。socketChannel。socket()。setTcpNoDelay(true);if(NettySystemConfig。socketSndbufSize0){接收缓冲的大小this。socketChannel。socket()。setReceiveBufferSize(NettySystemConfig。socketSndbufSize);}if(NettySystemConfig。socketRcvbufSize0){发送缓冲的大小this。socketChannel。socket()。setSendBufferSize(NettySystemConfig。socketRcvbufSize);}把网络连接写数据服务线程和读数据服务线程都构建好端口写服务this。writeSocketServicenewWriteSocketService(this。socketChannel);端口读服务this。readSocketServicenewReadSocketService(this。socketChannel);增加haService中的连接数字段this。haService。getConnectionCount()。incrementAndGet();}
监听slave日志同步进度和同步日志
WriteSocketService监听的是OPWRITE事件,注册的端口就是在HAService中开启的端口。classWriteSocketServiceextendsServiceThread{privatefinalSelectorselector;privatefinalSocketChannelsocketChannel;写数据头大小,12个字节privatefinalintheaderSize84;写数据头缓冲区privatefinalByteBufferbyteBufferHeaderByteBuffer。allocate(headerSize);从哪个位置开始传输privatelongnextTransferFromWhere1;对内存映射区域查询数据结果privateSelectMappedBufferResultselectMappedBufferResult;最后一次写数据是否完成了,默认trueprivatebooleanlastWriteOvertrue;最后一次写数据时间戳privatelonglastWriteTimestampSystem。currentTimeMillis();publicWriteSocketService(finalSocketChannelsocketChannel)throwsIOException{this。selectorRemotingUtil。openSelector();搞一个selector多路复用组件this。socketChannelsocketChannel;this。socketChannel。register(this。selector,SelectionKey。OPWRITE);把这个网络连接注册到selector多路复用组件里去就可以了this。setDaemon(true);}Overridepublicvoidrun(){HAConnection。log。info(this。getServiceName()servicestarted);while(!this。isStopped()){try{如果说针对你的从节点此时可以执行写数据动作this。selector。select(1000);如果slave的读请求为1表示没有slave发出写请求,不需要处理if(1HAConnection。this。slaveRequestOffset){Thread。sleep(10);continue;}nextTransferFromWhere为1表示初始第一次同步,需要进行计算if(1this。nextTransferFromWhere){如果slave同步完成则下次同步从CommitLog的最大偏移量开始同步if(0HAConnection。this。slaveRequestOffset){获取到commitlog里面最大的偏移量longmasterOffsetHAConnection。this。haService。getDefaultMessageStore()。getCommitLog()。getMaxOffset();masterOffsetmasterOffset(masterOffsetHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getMappedFileSizeCommitLog());if(masterOffset0){masterOffset0;}可以去设置一下当前需要从哪个位置开始来传输数据this。nextTransferFromWheremasterOffset;}else{设置下次同步的位置,为salve读请求的位置this。nextTransferFromWhereHAConnection。this。slaveRequestOffset;}log。info(mastertransferdatafromthis。nextTransferFromWheretoslave〔HAConnection。this。clientAddr〕,andslaverequestHAConnection。this。slaveRequestOffset);}上次同步是否完成if(this。lastWriteOver){上一次写数据时间戳到现在截止的差值longintervalHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now()this。lastWriteTimestamp;把这个时间差值跟ha发送心跳间隔做一个比对,如果超过了那个间隔,心跳间隔为5000毫秒if(intervalHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaSendHeartbeatInterval()){BuildHeader开始去构建请求头,先设置我们要从哪个位置开始传输数据,心跳请求大小为12字节this。byteBufferHeader。position(0);this。byteBufferHeader。limit(headerSize);this。byteBufferHeader。putLong(this。nextTransferFromWhere);this。byteBufferHeader。putInt(0);this。byteBufferHeader。flip();进行消息同步this。lastWriteOverthis。transferData();if(!this。lastWriteOver)continue;}}如果说是上一次传输还没完毕此时就传输数据就可以了else{this。lastWriteOverthis。transferData();如果还没同步完成则继续if(!this。lastWriteOver)continue;}构建完了header以后,此时就需要查询commitlog里面指定位置开始的一个数据片段SelectMappedBufferResultselectResultHAConnection。this。haService。getDefaultMessageStore()。getCommitLogData(this。nextTransferFromWhere);if(selectResult!null){intsizeselectResult。getSize();检查要同步消息的长度,是不是大于单次同步的最大限制默认为32kbif(sizeHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaTransferBatchSize()){sizeHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaTransferBatchSize();}longthisOffsetthis。nextTransferFromWhere;this。nextTransferFromWheresize;selectResult。getByteBuffer()。limit(size);this。selectMappedBufferResultselectResult;BuildHeaderthis。byteBufferHeader。position(0);this。byteBufferHeader。limit(headerSize);this。byteBufferHeader。putLong(thisOffset);this。byteBufferHeader。putInt(size);this。byteBufferHeader。flip();this。lastWriteOverthis。transferData();}else{没有数据要传输呢?此时就是可以让我们的组件等待一会儿,等待100毫秒HAConnection。this。haService。getWaitNotifyObject()。allWaitForRunning(100);}}catch(Exceptione){HAConnection。log。error(this。getServiceName()servicehasexception。,e);break;}}对等待通知组件做一个处理HAConnection。this。haService。getWaitNotifyObject()。removeFromWaitingThreadTable();if(this。selectMappedBufferResult!null){this。selectMappedBufferResult。release();}this。makeStop();readSocketService。makeStop();haService。removeConnection(HAConnection。this);SelectionKeyskthis。socketChannel。keyFor(this。selector);if(sk!null){sk。cancel();}try{this。selector。close();this。socketChannel。close();}catch(IOExceptione){HAConnection。log。error(,e);}HAConnection。log。info(this。getServiceName()serviceend);}privatebooleantransferData()throwsException{intwriteSizeZeroTimes0;WriteHeader心跳的头没写满,先写头while(this。byteBufferHeader。hasRemaining()){通过nio网络连接可以把请求头先发送出去intwriteSizethis。socketChannel。write(this。byteBufferHeader);if(writeSize0){writeSizeZeroTimes0;记录上次写的时间this。lastWriteTimestampHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now();}elseif(writeSize0){重试3次则不再重试if(writeSizeZeroTimes3){break;}}else{thrownewException(hamasterwriteheadererror0);}}如果要同步的日志为null,则直接返回这次同步的结果是否同步完成if(nullthis。selectMappedBufferResult){return!this。byteBufferHeader。hasRemaining();}writeSizeZeroTimes0;WriteBody填充请求体if(!this。byteBufferHeader。hasRemaining()){他会有一个要传输的一个commitlog里面的内存区域查询出来的数据片段while(this。selectMappedBufferResult。getByteBuffer()。hasRemaining()){把commitlog里面要传输的数据片段就写入到nio网络连接里去intwriteSizethis。socketChannel。write(this。selectMappedBufferResult。getByteBuffer());if(writeSize0){writeSizeZeroTimes0;this。lastWriteTimestampHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now();}elseif(writeSize0){重试3次if(writeSizeZeroTimes3){break;}}else{thrownewException(hamasterwritebodyerror0);}}}booleanresult!this。byteBufferHeader。hasRemaining()!this。selectMappedBufferResult。getByteBuffer()。hasRemaining();释放缓存if(!this。selectMappedBufferResult。getByteBuffer()。hasRemaining()){this。selectMappedBufferResult。release();this。selectMappedBufferResultnull;}returnresult;}OverridepublicStringgetServiceName(){returnWriteSocketService。class。getSimpleName();}Overridepublicvoidshutdown(){super。shutdown();}}
主要的逻辑如下:如果slave进行了日志偏移量的汇报,判断是不是第一次的进行同步以及对应的同步进度。设置下一次的同步位置检查上次同步是不是已经完成了,检查两次同步的周期是不是超过心跳间隔,如果是的则需要把心跳信息放到返回的头里面,然后进行消息同步。如果上次同步还没完成,则等待上次同步完成之后再继续从Master本地读取CommitLog的最大偏移量,根据上次同步的位置开始从CommitLog获取日志信息,然后放到缓存中如果缓存的大小大于单次同步的最大大小haTransferBatchSize默认是32kb,那么只同步32kb大小的日志。如果缓存为null,则等待100毫秒
根据同步进度来唤醒刷盘CommitLog线程
ReadSocketService的作用主要是:根据Slave推送的日志同步进度,来唤醒HAService的GroupTransferService然后进一步唤醒CommitLog的日志刷盘线程。这里主要看run方法和processReadEvent方法。classReadSocketServiceextendsServiceThread{读数据最大缓冲区大小,默认是1mbprivatestaticfinalintREADMAXBUFFERSIZE10241024;多路复用监听组件privatefinalSelectorselector;nio网络连接privatefinalSocketChannelsocketChannel;读数据缓冲区privatefinalByteBufferbyteBufferReadByteBuffer。allocate(READMAXBUFFERSIZE);处理消息位置privateintprocessPosition0;最近一次读取到数据的时间戳privatevolatilelonglastReadTimestampSystem。currentTimeMillis();publicReadSocketService(finalSocketChannelsocketChannel)throwsIOException{this。selectorRemotingUtil。openSelector();this。socketChannelsocketChannel;this。socketChannel。register(this。selector,SelectionKey。OPREAD);this。setDaemon(true);}Overridepublicvoidrun(){HAConnection。log。info(this。getServiceName()servicestarted);任务是否结束while(!this。isStopped()){try{设置selector的阻塞时间,如果说从节点确实发送了请求过来this。selector。select(1000);处理salver读取消息的事件booleanokthis。processReadEvent();if(!ok){HAConnection。log。error(processReadEventerror);break;}检查此次处理时间是否超过心跳连接时间longintervalHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now()this。lastReadTimestamp;if(intervalHAConnection。this。haService。getDefaultMessageStore()。getMessageStoreConfig()。getHaHousekeepingInterval()){log。warn(hahousekeeping,foundthisconnection〔HAConnection。this。clientAddr〕expired,interval);break;}}catch(Exceptione){HAConnection。log。error(this。getServiceName()servicehasexception。,e);break;}}this。makeStop();writeSocketService。makeStop();haService。removeConnection(HAConnection。this);HAConnection。this。haService。getConnectionCount()。decrementAndGet();SelectionKeyskthis。socketChannel。keyFor(this。selector);if(sk!null){sk。cancel();}try{this。selector。close();this。socketChannel。close();}catch(IOExceptione){HAConnection。log。error(,e);}HAConnection。log。info(this。getServiceName()serviceend);}OverridepublicStringgetServiceName(){returnReadSocketService。class。getSimpleName();}privatebooleanprocessReadEvent(){intreadSizeZeroTimes0;如果说读取数据缓冲区已经没有空间了,此时就做一个flip,处理位置复位为0if(!this。byteBufferRead。hasRemaining()){读请求缓冲转变为读取模式。this。byteBufferRead。flip();this。processPosition0;}但凡是读取数据缓冲区还有空间,就进入循环while(this。byteBufferRead。hasRemaining()){try{一次性从网络连接里读取最大可能是1mb的读取缓冲空间的内容intreadSizethis。socketChannel。read(this。byteBufferRead);if(readSize0){readSizeZeroTimes0;更新一下本次读取到数据的时间戳this。lastReadTimestampHAConnection。this。haService。getDefaultMessageStore()。getSystemClock()。now();读取缓冲区位置处理位置,是大于等于8,至少说是读到了8个字节为什么是8个字节,因为salver向master发去拉取请求时,偏移量固定为8if((this。byteBufferRead。position()this。processPosition)8){获取消息开始的位置,做一个运算,用读取缓冲区位置读取缓冲区位置8intposthis。byteBufferRead。position()(this。byteBufferRead。position()8);读取8个直接就是从节点返回的东西,就是从节点完成消息同步的最新的偏移量longreadOffsetthis。byteBufferRead。getLong(pos8);设置处理的位置this。processPositionpos;把我们的slave的ack偏移量去做一个设置HAConnection。this。slaveAckOffsetreadOffset;如果slave的读请求偏移量小于0表示同步完成了if(HAConnection。this。slaveRequestOffset0){重新设置slave的读请求的偏移量HAConnection。this。slaveRequestOffsetreadOffset;log。info(slave〔HAConnection。this。clientAddr〕requestoffsetreadOffset);}elseif(HAConnection。this。slaveAckOffsetHAConnection。this。haService。getDefaultMessageStore()。getMaxPhyOffset()){log。warn(slave〔{}〕requestoffset{}greaterthanlocalcommitLogoffset{}。,HAConnection。this。clientAddr,HAConnection。this。slaveAckOffset,HAConnection。this。haService。getDefaultMessageStore()。getMaxPhyOffset());returnfalse;}如果说从节点已经接收到了一些数据之后,唤醒阻塞的线程,我们就可以通知HAService去传输一些数据给从节点在消息的主从同步选择的模式是同步的时候,会唤醒被阻塞的消息写入的线程HAConnection。this。haService。notifyTransferSome(HAConnection。this。slaveAckOffset);}}elseif(readSize0){如果数据为0超过3次,表示同步完成,直接结束if(readSizeZeroTimes3){break;}}else{log。error(readsocket〔HAConnection。this。clientAddr〕0);returnfalse;}}catch(IOExceptione){log。error(processReadEventexception,e);returnfalse;}}returntrue;}}
整体的逻辑如下:每1s执行一次事件就绪选择,然后调用processReadEvent方法处理读请求,读取从服务器的拉取请求获取slave已拉取偏移量,因为有新的从服务器反馈拉取进度,需要通知某些生产者以便返回,因为如果消息发送使用同步方式,需要等待将消息复制到从服务器,然后才返回,故这里需要唤醒相关线程去判断自己关注的消息是否已经传输完成。也就是HAService的GroupTransferService如果读取到的字节数等于0,则重复三次,否则结束本次读请求处理;如果读取到的字节数小于0,表示连接被断开,返回false,后续会断开该连接。五、总结
RocketMQ的主从同步之间的核心类就是HAService和HAConnection和其中的几个子类。结合前面的那个图可以简单的理解一下。