专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

Tomcat集群高可用

  Tomcat集群高可用集群架构总览DNSRoundRobinLoadBalancerCluster1Cluster2Tomcat1Tomcat2Tomcat3Tomcat4组件ServerServiceEngineClusterHostClusterContext(1N)ManagerDeltaManagerBackupManagerChannelInterceptor1。。InterceptorNReceiverSenderMembershipValveReplicationValveJvmRouteBinderValveLifecycleListenerClusterListenerClusterSessionListenerDeployerFarmWarDeployer工作原理1。TomcatA启动TomcatA使用标准的启动顺序启动,当Host被创建时,会有一个cluster对象跟其关联。当contexts在解析过程中发现web。xml中含有distributable标签,Tomcat会让Cluster(SimpleTcpCluster)创建一个Manager(默认DeltaManager)用于备份。如果是集群模式,会启动一个membershipservice(multicast)和一个replicationservice(tcpunicast)2。TomcatB启动(等待TomcatA启动完成)TomcatB跟TomcatA以相同的顺序启动,但会跟TomcatA互相建个一个关系(TomcatA,TomcatB),在启动前TomcatB会向TomcatA请求Session,如果60秒未响应已超时处理,记录一条异常日志并继续启动。推荐TomcatA和TomcatB以相同的配置启动3。TomcatA收到一个请求,创建SessionS1TomcatA收到一个请求,请求处理方式跟非集群方式一样。区别点是处理完后在响应给客户端之前ReplicationValve会通过TCP方式将Session同步给TomcatB。为了性能考虑,会有一个useDirtyFlag参数用于备份Session,只有达到指定次数才会备份。4。TomcatA崩溃如果TomcatA崩溃了,TomcatB会将其从成员列表中移除。TomcatA不会在引起TomcatB的任何变化,负载均衡会将所有到TomcatA的请求重定向到TomcatB5。TomcatB收到TomcatA同步过来的SessionS1TomcatB跟处理其他请求一样6。TomcatA恢复启动跟步骤1,2一样,完全启动前会加入Cluster,向TomcatB请求同步所有Session7。TomcatA收到一个请求,来自S1的客户端的无效(Session过期)调用请求完成后,TomcatA向TomcatB发送过期事件,TomcatB将S1移除8。TomcatB收到一个请求,创建SessionS2跟第3步一样9。TomcatA中SessionS2超时由于一直没有收到任何关于S2的请求如果TomcatA没有收到任何请求时,不会将失效的S2备份给其他成员集群配置Tomcat官方默认配置ClusterclassNameorg。apache。catalina。ha。tcp。SimpleTcpClusterchannelSendOptions8ManagerclassNameorg。apache。catalina。ha。session。DeltaManagerexpireSessionsOnShutdownfalsenotifyListenersOnReplicationtrueChannelclassNameorg。apache。catalina。tribes。group。GroupChannelMembershipclassNameorg。apache。catalina。tribes。membership。McastServiceaddress228。0。0。4port45564frequency500dropTime3000ReceiverclassNameorg。apache。catalina。tribes。transport。nio。NioReceiveraddressautoport4000autoBind100selectorTimeout5000maxThreads6SenderclassNameorg。apache。catalina。tribes。transport。ReplicationTransmitterTransportclassNameorg。apache。catalina。tribes。transport。nio。PooledParallelSenderSenderInterceptorclassNameorg。apache。catalina。tribes。group。interceptors。TcpFailureDetectorInterceptorclassNameorg。apache。catalina。tribes。group。interceptors。MessageDispatchInterceptorInterceptorclassNameorg。apache。catalina。tribes。group。interceptors。TcpPingInterceptorChannelValveclassNameorg。apache。catalina。ha。tcp。ReplicationValvefilterValveclassNameorg。apache。catalina。ha。session。JvmRouteBinderValveDeployerclassNameorg。apache。catalina。ha。deploy。FarmWarDeployertempDirtmpwartempdeployDirtmpwardeploywatchDirtmpwarlistenwatchEnabledfalseClusterListenerclassNameorg。apache。catalina。ha。session。ClusterSessionListenerCluster启用组播(Windows默认支持,Linux系统需手动开启)sudorouteaddnet224。0。0。0netmask240。0。0。0devwlp1s0sudorouteaddnet224。0。0。0netmask240。0。0。0devlosudorouteaddnet228。0。0。4netmask255。255。255。255devwlp1s0本地环回地址必须添加,不然没有效果sudorouteaddnet228。0。0。4netmask255。255。255。255devlo集群模型ClusterpublicStringgetClusterName();publicvoidsetClusterName(StringclusterName);Session管理publicManagercreateManager(Stringname);publicvoidregisterManager(Managermanager);publicvoidremoveManager(Managermanager);后台进程,用于热部署监控和心跳检测publicvoidbackgroundProcess();Channel添加拦截器publicvoidaddInterceptor(ChannelInterceptorinterceptor);发送消息publicUniqueIdsend(Member〔〕destination,Serializablemsg,intoptions)throwsChannelException;publicUniqueIdsend(Member〔〕destination,Serializablemsg,intoptions,ErrorHandlerhandler)throwsChannelException;心跳检测publicvoidheartbeat();publicvoidsetHeartbeat(booleanenable);成员回调publicvoidaddMembershipListener(MembershipListenerlistener);publicvoidaddChannelListener(ChannelListenerlistener);publicvoidremoveMembershipListener(MembershipListenerlistener);publicvoidremoveChannelListener(ChannelListenerlistener);publicbooleanhasMembers();publicMember〔〕getMembers();publicMembergetLocalMember(booleanincAlive);ChannelReceiverpublicvoidstart()throwsIOException;publicvoidstop();publicStringgetHost();publicintgetPort();publicintgetSecurePort();publicintgetUdpPort();publicvoidheartbeat();publicvoidsetMessageListener(MessageListenerlistener);publicMessageListenergetMessageListener();publicChannelgetChannel();publicvoidsetChannel(Channelchannel);ChannelSenderpublicvoidadd(Membermember);publicvoidremove(Membermember);publicvoidstart()throwsIOException;publicvoidstop();publicvoidheartbeat();publicvoidsendMessage(ChannelMessagemessage,Member〔〕destination)throwsChannelException;publicChannelgetChannel();publicvoidsetChannel(Channelchannel);MemberpublicStringgetName();publicbyte〔〕getHost();publicintgetPort();publicintgetSecurePort();publicintgetUdpPort();publiclonggetMemberAliveTime();publicvoidsetMemberAliveTime(longmemberAliveTime);publicbooleanisReady();publicbooleanisSuspect();publicbooleanisFailing();publicbyte〔〕getUniqueId();publicbyte〔〕getPayload();publicvoidsetPayload(byte〔〕payload);publicbyte〔〕getCommand();publicvoidsetCommand(byte〔〕command);publicbyte〔〕getDomain();publicbyte〔〕getData(booleangetalive);publicbyte〔〕getData(booleangetalive,booleanreset);publicintgetDataLength();当前启动的TomcatpublicbooleanisLocal();publicvoidsetLocal(booleanlocal);ChannelInterceptor
  发送消息从前往后,接收消息回调从后往前publicintgetOptionFlag();publicvoidsetOptionFlag(intflag);publicvoidsetNext(ChannelInterceptornext);publicChannelInterceptorgetNext();publicvoidsetPrevious(ChannelInterceptorprevious);publicChannelInterceptorgetPrevious();publicvoidsendMessage(Member〔〕destination,ChannelMessagemsg,InterceptorPayloadpayload)throwsChannelException;publicvoidmessageReceived(ChannelMessagedata);publicvoidheartbeat();publicbooleanhasMembers();publicMember〔〕getMembers();publicMembergetLocalMember(booleanincAliveTime);publicMembergetMember(Membermbr);publicvoidstart(intsvc)throwsChannelException;publicvoidstop(intsvc)throwsChannelException;publicvoidfireInterceptorEvent(InterceptorEventevent);publicChannelgetChannel();publicvoidsetChannel(Channelchannel);MembershipServicepublicvoidsetProperties(java。util。Propertiesproperties);publicjava。util。PropertiesgetProperties();publicvoidstart()throwsjava。lang。Exception;publicvoidstart(intlevel)throwsjava。lang。Exception;publicvoidstop(intlevel);publicbooleanhasMembers();publicMembergetMember(Membermbr);publicMember〔〕getMembers();publicMembergetLocalMember(booleanincAliveTime);publicString〔〕getMembersByName();publicMemberfindMemberByName(Stringname);publicvoidsetLocalMemberProperties(StringlistenHost,intlistenPort,intsecurePort,intudpPort);publicvoidsetMembershipListener(MembershipListenerlistener);publicvoidremoveMembershipListener();publicvoidsetPayload(byte〔〕payload);publicvoidsetDomain(byte〔〕domain);publicvoidbroadcast(ChannelMessagemessage)throwsChannelException;publicChannelgetChannel();publicvoidsetChannel(Channelchannel);publicMembershipProvidergetMembershipProvider();实现原理Tomcat集群启动入口
  tomcat各功能模块统一实现接口Lifecycle,模块的生命周期由顶层统一控制,各模块的启动从上到下启动各服务。ContainerBasestartInternalOverrideprotectedsynchronizedvoidstartInternal()throwsLifecycleException{。。。。。。cluster为server。xml中配置的SimpleTcpClusterClusterclassNameorg。apache。catalina。ha。tcp。SimpleTcpClusterchannelSendOptions8。。。ClusterClusterclustergetClusterInternal();if(clusterinstanceofLifecycle){((Lifecycle)cluster)。start();}。。。。。。}Session同步入口ReplicationValvepublicvoidinvoke(Requestrequest,Responseresponse)throwsIOException,ServletException{。。。。。。。处理请求getNext()。invoke(request,response);处理完成后响应用户前同步Sessionif(context!nullcluster!nullcontext。getManager()instanceofClusterManager){ClusterManagerclusterManager(ClusterManager)context。getManager();valveclustercanaccessmanagerotherclusterhandlereplicationathostlevelhopefully!if(cluster。getManager(clusterManager。getName())null){return;}if(cluster。hasMembers()){sendReplicationMessage(request,totalstart,isCrossContext,clusterManager);}else{resetReplicationRequest(request,isCrossContext);}}}finally{。。。。。}}protectedvoidsendReplicationMessage(Requestrequest,longtotalstart,booleanisCrossContext,ClusterManagerclusterManager){try{sendinvalidsessionsDeltaManagerreturnsString〔0〕if(!(clusterManagerinstanceofDeltaManager)){sendInvalidSessions(clusterManager);}sendreplicationsendSessionReplicationMessage(request,clusterManager);if(isCrossContext){sendCrossContextSession();}}catch(Exceptionx){FIXMEwehavealotofsends,butthetroublewithonenodestopsthecorrectreplicationtoothernodes!log。error(sm。getString(ReplicationValve。send。failure),x);}finally{}}通信协议心跳检测
  集群间会话的复制采用类似RPC的方式,直接序列化整个类发送给集群成员TRIBESB10bytespackagelength4bytesalive8bytesport4bytessecureport4bytesudpport4byteshostlength1bytehost1bytescommandlength4bytescommandcommandlengthbytesdlen4bytesdomaindlenbytesuniqueId16bytespayloadlength4bytespayloadplenbytesTRIBESE10bytes组件介绍SimpleTcpCluster启用集群protectedvoidstartInternal()throwsLifecycleException{try{checkDefaults();registerClusterValve();channel。addMembershipListener(this);channel。addChannelListener(this);channel。setName(getClusterName()Channel);channelStartOptions15,即默认启用四个组件channel。start(channelStartOptions);if(clusterDeployer!null){clusterDeployer。start();}registerMember(channel。getLocalMember(false));}catch(Exceptionx){log。error(sm。getString(simpleTcpCluster。startUnable),x);thrownewLifecycleException(x);}}心跳检测publicvoidbackgroundProcess(){if(clusterDeployer!null){clusterDeployer。backgroundProcess();}sendaheartbeatthroughthechannelif(isHeartbeatBackgroundEnabled()channel!null){channel。heartbeat();}periodiceventfireLifecycleEvent(Lifecycle。PERIODICEVENT,null);}消息回调publicvoidmessageReceived(ClusterMessagemessage){invokeallthelistenersbooleanacceptedfalse;if(message!null){for(ClusterListenerlistener:clusterListeners){if(listener。accept(message)){acceptedtrue;listener。messageReceived(message);}}。。。。。}ChannelCoordinator
  位于Interceptor最后一个,消息发送和消息接收由他进行协调处理ChannelListener1。。ChannelListenerNMembershipListener1。。MembershipListenerN〔ApplicationLayer〕ChannelChannelInterceptor1〔Channelstack〕ChannelInterceptorNCoordinator(implementsMessageListener,MembershipListener,ChannelInterceptor)MembershipServiceChannelSenderChannelReceiver〔IOlayer〕svc默认15protectedsynchronizedvoidinternalStart(intsvc)throwsChannelException{try{booleanvalidfalse;makesurewedontpassdownanyflagsthatareunrelatedtothebottomlayersvcsvcChannel。DEFAULT;。。。。。。muststartthereceiverfirstsothatwecancoordinatetheportitlistenstowiththelocalmembershipsettingsNioReceiver,通信协议为TCP,用于同步Sessionif(Channel。SNDRXSEQ(svcChannel。SNDRXSEQ)){clusterReceiver。setMessageListener(this);clusterReceiver。setChannel(getChannel());clusterReceiver。start();synchronize,bigtimeFIXMEMemberlocalMembergetChannel()。getLocalMember(false);if(localMemberinstanceofStaticMember){staticmemberStaticMemberstaticMember(StaticMember)localMember;staticMember。setHost(getClusterReceiver()。getHost());staticMember。setPort(getClusterReceiver()。getPort());staticMember。setSecurePort(getClusterReceiver()。getSecurePort());}else{multicastmembermembershipService。setLocalMemberProperties(getClusterReceiver()。getHost(),getClusterReceiver()。getPort(),getClusterReceiver()。getSecurePort(),getClusterReceiver()。getUdpPort());}validtrue;}NioSender,通信协议为TCP,用于发送Session信息if(Channel。SNDTXSEQ(svcChannel。SNDTXSEQ)){clusterSender。setChannel(getChannel());clusterSender。start();validtrue;}集群成员间的消息接收入口,采用组播方式通信协议为UDP接收心跳检测信息,Ping信息if(Channel。MBRRXSEQ(svcChannel。MBRRXSEQ)){membershipService。setMembershipListener(this);membershipService。setChannel(getChannel());if(membershipServiceinstanceofMcastService){((McastService)membershipService)。setMessageListener(this);}membershipService。start(MembershipService。MBRRX);validtrue;}集群成员间的消息发送入口,采用组播方式通信协议为UDPif(Channel。MBRTXSEQ(svcChannel。MBRTXSEQ)){membershipService。setChannel(getChannel());membershipService。start(MembershipService。MBRTX);validtrue;}。。。。。。}catch(ChannelExceptioncx){throwcx;}catch(Exceptionx){thrownewChannelException(x);}}McastServicepublicvoidstart(intlevel)throwsjava。lang。Exception{。。。。。if(impl!null){impl。start(level);return;}StringhostgetProperties()。getProperty(tcpListenHost);intportInteger。parseInt(getProperties()。getProperty(tcpListenPort));intsecurePortInteger。parseInt(getProperties()。getProperty(tcpSecurePort));intudpPortInteger。parseInt(getProperties()。getProperty(udpListenPort));if(localMembernull){localMembernewMemberImpl(host,port,100);localMember。setUniqueId(UUIDGenerator。randomUUID(true));localMember。setLocal(true);}else{localMember。setHostname(host);localMember。setPort(port);localMember。setMemberAliveTime(100);}localMember。setSecurePort(securePort);localMember。setUdpPort(udpPort);。。。。。。implnewMcastServiceImpl(localMember,Long。parseLong(properties。getProperty(mcastFrequency)),Long。parseLong(properties。getProperty(memberDropTime)),Integer。parseInt(properties。getProperty(mcastPort)),bind,java。net。InetAddress。getByName(properties。getProperty(mcastAddress)),ttl,soTimeout,this,this,Boolean。parseBoolean(properties。getProperty(localLoopbackDisabled)));。。。。。。impl。start(level);。。。。。。}McastServiceImpl实例化时初始化动作publicvoidinit()throwsIOException{setupSocket();发送包sendPacketnewDatagramPacket(newbyte〔MAXPACKETSIZE〕,MAXPACKETSIZE);sendPacket。setAddress(address);sendPacket。setPort(port);接收包receivePacketnewDatagramPacket(newbyte〔MAXPACKETSIZE〕,MAXPACKETSIZE);receivePacket。setAddress(address);receivePacket。setPort(port);member。setCommand(newbyte〔0〕);if(membershipnull){membershipnewMembership(member);}}protectedvoidsetupSocket()throwsIOException{if(mcastBindAddress!null){try{socketnewMulticastSocket(newInetSocketAddress(address,port));}catch(BindExceptione){socketnewMulticastSocket(port);}}else{socketnewMulticastSocket(port);}。。。。。。}启动入口publicsynchronizedvoidstart(intlevel)throwsIOException{booleanvalidfalse;if((levelChannel。MBRRXSEQ)Channel。MBRRXSEQ){if(receiver!null){thrownewIllegalStateException(sm。getString(mcastServiceImpl。receive。running));}try{if(sendernull){将地址加入组播组广播与组播的区别是:广播向组里面的每个成员发送消息,组播同时向每个成员发送消息socket。joinGroup(address);}}catch(IOExceptioniox){log。error(sm。getString(mcastServiceImpl。unable。join));throwiox;}doRunReceivertrue;接收线程receivernewReceiverThread();receiver。setDaemon(true);receiver。start();validtrue;}if((levelChannel。MBRTXSEQ)Channel。MBRTXSEQ){if(sender!null){thrownewIllegalStateException(sm。getString(mcastServiceImpl。send。running));}if(receivernull){socket。joinGroup(address);}makesureatleastonepacketgetsoutthere手动发送一个send(false);doRunSendertrue;发送线程sendernewSenderThread(sendFrequency);sender。setDaemon(true);sender。start();validtrue;}pause,onceortwice线程休眠一个周期,该周期可配置waitForMembers(level);}GroupChannelprotectedfinalChannelCoordinatorcoordinatornewChannelCoordinator();消息回调处理publicvoidmessageReceived(ChannelMessagemsg){if(msgnull){return;}try{Serializablefwdnull;if((msg。getOptions()SENDOPTIONSBYTEMESSAGE)SENDOPTIONSBYTEMESSAGE){fwdnewByteMessage(msg。getMessage()。getBytes());}else{try{反序列化,主要内容为(地址,消息内容,消息唯一ID,时间戳)fwdXByteBuffer。deserialize(msg。getMessage()。getBytesDirect(),0,msg。getMessage()。getLength());}catch(Exceptionsx){log。error(sm。getString(groupChannel。unable。deserialize,msg),sx);return;}}gettheactualmemberwiththecorrectalivetimeMembersourcemsg。getAddress();booleanrxfalse;booleandeliveredfalse;for(ChannelListenerchannelListener:channelListeners){if(channelListener!nullchannelListener。accept(fwd,source)){最终由SimpleTcpCluster处理channelListener。messageReceived(fwd,source);deliveredtrue;ifthemessagewasacceptedbyanRPCchannel,thatchannelisresponsibleforreturningthereply,otherwisewesendanabsencereplyif(channelListenerinstanceofRpcChannel){rxtrue;}}}}catch(Exceptionx){thiscouldbethechannellistenerthrowinganexception,weshouldlogitasawarning。if(log。isWarnEnabled()){log。warn(sm。getString(groupChannel。receiving。error),x);}thrownewRemoteProcessException(sm。getString(groupChannel。receiving。error),x);}}NioReceiver
  Tcp协议接收成员消息绑定地址与端口protectedvoidbind()throwsIOException{serverChannelServerSocketChannel。open();ServerSocketserverSocketserverChannel。socket();this。selector。set(Selector。open());根据server。xml中Receiver标签中配置的4000端口开始检测可用的数据,最大到4100bind(serverSocket,getPort(),getAutoBind());serverChannel。configureBlocking(false);serverChannel。register(this。selector。get(),SelectionKey。OPACCEPT);if(this。getUdpPort()0){datagramChannelDatagramChannel。open();configureDatagraChannel();bindUdp(datagramChannel。socket(),getUdpPort(),getAutoBind());}}监听端口protectedvoidlisten()throwsException{setListen(true);Selectorselectorthis。selector。get();if(selector!nulldatagramChannel!null){ObjectReaderoreadernewObjectReader(MAXUDPSIZE);maxsizeforadatagrampacketregisterChannel(selector,datagramChannel,SelectionKey。OPREAD,oreader);}while(doListen()selector!null){thismayblockforalongtime,uponreturntheselectedsetcontainskeysofthereadychannelstry{events();超时检测socketTimeouts();intnselector。select(getSelectorTimeout());if(n0){continue;nothingtodo}getaniteratoroverthesetofselectedkeysIteratorSelectionKeyitselector。selectedKeys()。iterator();lookateachkeyintheselectedsetwhile(it!nullit。hasNext()){SelectionKeykeyit。next();Isanewconnectioncomingin?if(key。isAcceptable()){ServerSocketChannelserver(ServerSocketChannel)key。channel();SocketChannelchannelserver。accept();接收缓冲区大小channel。socket()。setReceiveBufferSize(getRxBufSize());发送缓冲区大小channel。socket()。setSendBufferSize(getTxBufSize());是否启用Nagle算法Nagle算法:解决网络拥塞,如果搞频率只发送一个字节的数据,实际上发送的是41个字节的数据包,其中包含40个字节的Tcp协议头Nagle算法原理是在未接收到ACK之前将要发送的数据存入缓冲区,知道收到ACK或者缓冲的数据到达一定大小的数据再发送将小块的数据收集成大块数据后在发送channel。socket()。setTcpNoDelay(getTcpNoDelay());长时间未收到ACK由操作系统发送一个心跳检测包channel。socket()。setKeepAlive(getSoKeepAlive());套接字上接收的所有TCP紧急数据(OutOfBound,带外数据)都将通过套接字输入流接收,默认丢弃紧急数据channel。socket()。setOOBInline(getOoBInline());地址重用channel。socket()。setReuseAddress(getSoReuseAddress());用来控制Socket关闭Close()方法的行为,执行Close()方位会立即返回,但是底层的Socket会延迟一段时间,将缓冲区中的数据发送给对方channel。socket()。setSoLinger(getSoLingerOn(),getSoLingerTime());读超时channel。socket()。setSoTimeout(getTimeout());ObjectattachnewObjectReader(channel);registerChannel(selector,channel,SelectionKey。OPREAD,attach);}istheredatatoreadonthischannel?if(key。isReadable()){readDataFromSocket(key);}else{key。interestOps(key。interestOps()(SelectionKey。OPWRITE));}it。remove();}}catch(java。nio。channels。ClosedSelectorExceptioncse){ignoreisnormalatshutdownorstoplistensocket}catch(java。nio。channels。CancelledKeyExceptionnx){log。warn(sm。getString(nioReceiver。clientDisconnect));}catch(Throwablet){ExceptionUtils。handleThrowable(t);log。error(sm。getString(nioReceiver。requestError),t);}}serverChannel。close();if(datagramChannel!null){try{datagramChannel。close();}catch(Exceptioniox){if(log。isDebugEnabled()){log。debug(Unabletoclosedatagramchannel。,iox);}}datagramChannelnull;}closeSelector();}读取数据,将其放入任务池中处理protectedvoidreadDataFromSocket(SelectionKeykey)throwsException{NioReplicationTasktask(NioReplicationTask)getTaskPool()。getRxTask();if(tasknull){Nothreadstasksavailable,donothing,theselectionloopwillkeepcallingthismethoduntilathreadbecomesavailable,thethreadpoolitselfhasawaitingmechanismsowewillnotwaithere。if(log。isDebugEnabled()){log。debug(NoTcpReplicationThreadavailable);}}else{invokingthiswakesuptheworkerthreadthenreturnsaddtasktothreadpooltask。serviceChannel(key);getExecutor()。execute(task);}}protectedvoiddrainChannel(finalSelectionKeykey,ObjectReaderreader)throwsException{reader。access();ReadableByteChannelchannel(ReadableByteChannel)key。channel();intcount1;buffer。clear();makebufferemptySocketAddresssaddrnull;if(channelinstanceofSocketChannel){loopwhiledataavailable,channelisnonblockingwhile((countchannel。read(buffer))0){buffer。flip();makebufferreadableif(buffer。hasArray()){reader。append(buffer。array(),0,count,false);}else{reader。append(buffer,count,false);}buffer。clear();makebufferemptydowehaveatleastonepackage?if(reader。hasPackage()){break;}}}elseif(channelinstanceofDatagramChannel){DatagramChanneldchannel(DatagramChannel)channel;saddrdchannel。receive(buffer);buffer。flip();makebufferreadableif(buffer。hasArray()){reader。append(buffer。array(),0,buffer。limit()buffer。position(),false);}else{reader。append(buffer,buffer。limit()buffer。position(),false);}buffer。clear();makebufferemptydidwegetapackagecountreader。hasPackage()?1:1;}intpkgcntreader。count();if(count0pkgcnt0){endofstream,andnomorepackagestoprocessremoteEof(key);return;}ChannelMessage〔〕msgspkgcnt0?ChannelData。EMPTYDATAARRAY:reader。execute();registerForRead(key,reader);registertoreadnewdata,beforewesenditofftoavoiddeadlocksfor(ChannelMessagemsg:msgs){UsesendackhereifyouwanttoacktherequesttotheremoteserverbeforecompletingtherequestThisisconsideredanasynchronousrequestif(ChannelData。sendAckAsync(msg。getOptions())){sendAck(key,(WritableByteChannel)channel,Constants。ACKCOMMAND,saddr);}try{if(Logs。MESSAGES。isTraceEnabled()){try{Logs。MESSAGES。trace(NioReplicationThreadReceivedmsg:newUniqueId(msg。getUniqueId())atnewjava。sql。Timestamp(System。currentTimeMillis()));}catch(Throwablet){}}核心逻辑,消息回调处理。由ChannelCoordinator协调处理getCallback()。messageDataReceived(msg);UsesendackhereifyouwanttherequesttocompleteonthisserverbeforesendingtheacktotheremoteserverThisisconsideredasynchronizedrequestif(ChannelData。sendAckSync(msg。getOptions())){sendAck(key,(WritableByteChannel)channel,Constants。ACKCOMMAND,saddr);}}catch(RemoteProcessExceptione){if(log。isDebugEnabled()){log。error(sm。getString(nioReplicationTask。process。clusterMsg。failed),e);}if(ChannelData。sendAckSync(msg。getOptions())){sendAck(key,(WritableByteChannel)channel,Constants。FAILACKCOMMAND,saddr);}}catch(Exceptione){log。error(sm。getString(nioReplicationTask。process。clusterMsg。failed),e);if(ChannelData。sendAckSync(msg。getOptions())){sendAck(key,(WritableByteChannel)channel,Constants。FAILACKCOMMAND,saddr);}}if(getUseBufferPool()){BufferPool。getBufferPool()。returnBuffer(msg。getMessage());msg。setMessage(null);}}if(count0){remoteEof(key);}}ChannelCoordinator由后向前处理,即Server。xml中配置的Interceptor标签,最前一个为GroupChannelOverridepublicvoidmessageReceived(ChannelMessagemsg){if(getPrevious()!null){getPrevious()。messageReceived(msg);}}NioSender
  Tcp协议,集群间的消息发送接口publicbooleanprocess(SelectionKeykey,booleanwaitForAck)throwsIOException{intopskey。readyOps();key。interestOps(key。interestOps()ops);incasedisconnecthasbeencalledif((!isConnected())(!connecting)){thrownewIOException(sm。getString(nioSender。sender。disconnected));}if(!key。isValid()){thrownewIOException(sm。getString(nioSender。key。inValid));}if(key。isConnectable()){if(socketChannel。finishConnect()){completeConnect();if(current!null){key。interestOps(key。interestOps()SelectionKey。OPWRITE);}returnfalse;}else{waitfortheconnectiontofinishkey。interestOps(key。interestOps()SelectionKey。OPCONNECT);returnfalse;}endif}elseif(key。isWritable()){booleanwritecompletewrite();if(writecomplete){wearecompleted,shouldwereadanack?if(waitForAck){registertoreadtheackkey。interestOps(key。interestOps()SelectionKey。OPREAD);}else{ifnot,weareready,setMessagewillreregisterusforanotherwriteinterestdoahealthcheck,wehavenowayofverifyadisconnectedsocketsincewedontregisterforOPREADonwaitForAckfalseread();thiscausesoverheadsetRequestCount(getRequestCount()1);returntrue;}}else{wearenotcomplete,letswritesomemorekey。interestOps(key。interestOps()SelectionKey。OPWRITE);}endif}elseif(key。isReadable()){booleanreadcompleteread();if(readcomplete){setRequestCount(getRequestCount()1);returntrue;}else{key。interestOps(key。interestOps()SelectionKey。OPREAD);}endif}else{unknownstate,shouldneverhappenlog。warn(sm。getString(nioSender。unknown。state,Integer。toString(ops)));thrownewIOException(sm。getString(nioSender。unknown。state,Integer。toString(ops)));}endifreturnfalse;}消息发送protectedbooleanwrite()throwsIOException{if((!isConnected())(this。socketChannelnullthis。dataChannelnull)){thrownewIOException(sm。getString(nioSender。not。connected));}if(current!null){if(remaining0){wehavewritteneverything,orwearestartinganewpackageprotectagainstbufferoverwriteintbyteswrittenisUdpBased()?dataChannel。write(writebuf):socketChannel。write(writebuf);if(byteswritten1){thrownewEOFException();}remainingbyteswritten;iftheentiremessagewaswrittenfromthebufferresetthepositioncounterif(remaining0){remaining0;}}return(remaining0);}nomessagetosend,wecanconsiderthatcompletereturntrue;}ClusterManager
  ClusterManager主要分为DeltaManager和BackupManager。
  DeltaManager:会向所有活动的节点同步Session数据,如果节点过多(超过4个官方不推荐使用该模式)不推荐使用
  BackupManager:向一个备份节点同步Session数据,默认取成员列表中的第一个DeltaManagerprotectedvoidmessageReceived(SessionMessagemsg,Membersender){ClassLoadercontextLoaderThread。currentThread()。getContextClassLoader();try{ClassLoader〔〕loadersgetClassLoaders();Thread。currentThread()。setContextClassLoader(loaders〔0〕);if(log。isDebugEnabled()){log。debug(sm。getString(deltaManager。receiveMessage。eventType,getName(),msg。getEventTypeString(),sender));}switch(msg。getEventType()){caseSessionMessage。EVTGETALLSESSIONS:handleGETALLSESSIONS(msg,sender);break;caseSessionMessage。EVTALLSESSIONDATA:handleALLSESSIONDATA(msg,sender);break;caseSessionMessage。EVTALLSESSIONTRANSFERCOMPLETE:handleALLSESSIONTRANSFERCOMPLETE(msg,sender);break;caseSessionMessage。EVTSESSIONCREATED:handleSESSIONCREATED(msg,sender);break;caseSessionMessage。EVTSESSIONEXPIRED:handleSESSIONEXPIRED(msg,sender);break;caseSessionMessage。EVTSESSIONACCESSED:handleSESSIONACCESSED(msg,sender);break;caseSessionMessage。EVTSESSIONDELTA:handleSESSIONDELTA(msg,sender);break;caseSessionMessage。EVTCHANGESESSIONID:handleCHANGESESSIONID(msg,sender);break;caseSessionMessage。EVTALLSESSIONNOCONTEXTMANAGER:handleALLSESSIONNOCONTEXTMANAGER(msg,sender);break;default:wedidntrecognizethemessagetype,donothingbreak;}switch}catch(Exceptionx){log。error(sm。getString(deltaManager。receiveMessage。error,getName()),x);}finally{Thread。currentThread()。setContextClassLoader(contextLoader);}}主要看下创建Session事件protectedvoidhandleSESSIONCREATED(SessionMessagemsg,Membersender){counterReceiveEVTSESSIONCREATED;if(log。isDebugEnabled()){log。debug(sm。getString(deltaManager。receiveMessage。createNewSession,getName(),msg。getSessionID()));}DeltaSessionsession(DeltaSession)createEmptySession();session。setValid(true);session。setPrimarySession(false);session。setCreationTime(msg。getTimestamp());usecontainermaxInactiveIntervalsothatsessionwillexpirecorrectlyincaseofprimarytransfersession。setMaxInactiveInterval(getContext()。getSessionTimeout()60,false);session。access();最终执行sessions。put(session。getIdInternal(),session);将其放入内存中统一管理session。setId(msg。getSessionID(),notifySessionListenersOnReplication);session。endAccess();}BackupManager
  备份节点默认取所有节点中的第一个节点,采用Rpc协议进行通信protectedvoidsend(ClusterManagermanager,StringsessionId){BackupManager返回nullClusterMessagemsgmanager。requestCompleted(sessionId);if(msg!nullcluster!null){cluster。send(msg);if(doStatistics()){nrOfSendRequests;}}}key为sessionId,complete默认falsepublicvoidreplicate(Objectkey,booleancomplete){MapEntryK,VentryinnerMap。get(key);if(entrynull){return;}if(!entry。isSerializable()){return;}if(entry。isPrimary()entry。getBackupNodes()!nullentry。getBackupNodes()。length0){checktoseeifweneedtoreplicatethisobjectisDirty()completeisAccessReplicate()ReplicatedMapEntryrentrynull;if(entry。getValue()instanceofReplicatedMapEntry){rentry(ReplicatedMapEntry)entry。getValue();}控制备份频率booleanisDirtyrentry!nullrentry。isDirty();booleanisAccessrentry!nullrentry。isAccessReplicate();booleanreplcompleteisDirtyisAccess;if(!repl){if(log。isTraceEnabled()){log。trace(Notreplicating:key,nochangemade);}return;}checktoseeifthemessageisdiffableMapMessagemsgnull;if(rentry!nullrentry。isDiffable()(isDirtycomplete)){rentry。lock();try{constructadiffmessagemsgnewMapMessage(mapContextName,getReplicateMessageType(),true,(Serializable)entry。getKey(),null,rentry。getDiff(),entry。getPrimary(),entry。getBackupNodes());rentry。resetDiff();}catch(IOExceptionx){log。error(sm。getString(abstractReplicatedMap。unable。diffObject),x);}finally{rentry。unlock();}}if(msgnullcomplete){constructacompletemsgnewMapMessage(mapContextName,getReplicateMessageType(),false,(Serializable)entry。getKey(),(Serializable)entry。getValue(),null,entry。getPrimary(),entry。getBackupNodes());}if(msgnull){constructaaccessmessagemsgnewMapMessage(mapContextName,MapMessage。MSGACCESS,false,(Serializable)entry。getKey(),null,null,entry。getPrimary(),entry。getBackupNodes());}try{if(channel!nullentry。getBackupNodes()!nullentry。getBackupNodes()。length0){if(rentry!null){rentry。setLastTimeReplicated(System。currentTimeMillis());}向备份节点发送当前Session信息,备份节点默认所有节点中的第一个节点channel。send(entry。getBackupNodes(),msg,channelSendOptions);}}catch(ChannelExceptionx){log。error(sm。getString(abstractReplicatedMap。unable。replicate),x);}}endif}热部署
  支持上传War包部署FarmWarDeployerpublicvoidbackgroundProcess(){if(started){if(watchEnabled){count(count1)processDeployFrequency;if(count0){定时校验watcher。check();}}removeInvalidFileFactories();}}WarWatcherpublicvoidcheck(){所有以。war结尾的文件File〔〕listwatchDir。listFiles(newWarFilter());if(listnull){log。warn(sm。getString(warWatcher。cantListWatchDir,watchDir));listnewFile〔0〕;}firstmakesureallthefilesarelistedinourcurrentstatusfor(Filefile:list){if(!file。exists()){log。warn(sm。getString(warWatcher。listedFileDoesNotExist,file,watchDir));}如果上传新war包就会在这儿加入管理addWarInfo(file);}CheckallthestatuscodesandupdatetheFarmDeployerfor(IteratorMap。EntryString,WarInfoicurrentStatus。entrySet()。iterator();i。hasNext();){Map。EntryString,WarInfoentryi。next();WarInfoinfoentry。getValue();war包如果被变更过返回1war包如果被移除返回1war包如果新增则返回1intcheckinfo。check();if(check1){部署服务,通过JMX调用服务tryAddServiced进行部署listener。fileModified(info。getWar());}elseif(check1){listener。fileRemoved(info。getWar());noneedtokeepinmemoryi。remove();}}}部署服务publicvoidfileModified(FilenewWar){try{FiledeployWarnewFile(getDeployDirFile(),newWar。getName());ContextNamecnnewContextName(deployWar。getName(),true);if(deployWar。exists()deployWar。lastModified()newWar。lastModified()){return;}installlocalif(tryAddServiced(cn。getName())){try{copy(newWar,deployWar);}finally{removeServiced(cn。getName());}check(cn。getName());}通知集群中的其他成员部署新增的服务install(cn。getName(),deployWar);}catch(Exceptionx){log。error(sm。getString(farmWarDeployer。modInstallFail),x);}}

曝香港资深主持人郭利民逝世!享年98岁,生前是吉尼斯纪录保持者饿了吗?戳右边关注我们,每天给您送上最新出炉的娱乐硬核大餐!1月14日,据港媒报道称,有港乐教父之称的香港资深主持人郭利民去世,享年98岁,引起了网友的关注。据悉,郭利民在香港中文看雍正王朝,析康熙为啥两次废掉太子胤礽?头号有新人雍正王朝太子胤礽与康熙妃子郑春华电视剧雍正王朝中,太子胤礽与老爹康熙的妃子郑春华暗通款曲。偷偷摸摸,但无巧不成书,被老爹撞破了。康熙很生气,后果很严重,成为胤礽被废掉的导东宫迎驾事件中朱棣为何因为太子的小疏忽对东宫大开杀戒?永乐十二年,由汉王朱高煦策划并实施的东宫迎驾事件被搬上历史舞台。这一事件,可以说给了当时太子一党沉重的打击。所谓东宫迎驾事件,就是朱棣北征瓦剌回来,原本以为太子朱高炽会率领百官前来百富联盟区块链搭建金融数字化转型的可信桥梁,主动战略前瞻加快赋能未来发展目前新一轮的科技变革正在加速进化,以区块链技术为代表的创新技术正在蓬勃兴起,成为促进社会数字化转型发展的主要动力。随着区块链在全球范围内快速落地应用,金融行业也迎来了前所未有的机遇战士跌下神坛?LOL设计师宣布削弱五大战士神装的回复能力英雄联盟现在战士英雄有多超模相信大家懂的都得,打赢一波团凭借强大的吸血回复能力经常还能是满血,坦度完胜坦克英雄。不过统治英雄联盟许久的战士类英雄终于要被设计师削弱了。拳头设计师Ti李泽民曾任浙江省委书记,曾参加朝鲜战争,一生为民,今年88岁引言令人向往的浙江省,地理位置优渥,经济富庶,自古以来就是比较发达的城市,因为境内河流众多,曲折蜿蜒,才有了浙江这个称呼,改革开放后,土地肥沃,河流众多的浙江依靠着自身的优势一跃成隐藏在小尾巴里面的秘密脊柱裂!说来讽刺,人竟然能长出尾巴来!这到底是怎么回事呢?其实,孩子后背长尾巴是一种先天畸形,大约在一万人中就有一例,且男孩发病率明显高于女孩。长尾巴其实是中枢神经系统的一个提示,这是身体一次有意义的讲座(2006)2006年4月16日上午,我来到广州日报社的工会礼堂,参加中国青少年研究中心特约研究员毛湘玲阿姨的讲座孩子的成功从培养良好习惯开始。毛阿姨在一开始就在屏幕上打出了几行字,这是美国的一起读绘本TheLittleRedHen引言此文旨在协助家长在家陪孩子一起读英文绘本。会陆续上传50本初级绘本的图文讲解,收集在一起读绘本的合集里,便于查阅。TheLittleRedHenOnceuponatimeaca孙红雷金屋藏娇6年的娇妻,我们居然都认识,穿礼服裙好惊艳对于大多数的女性来说,穿衣打扮可以说是非常重要的事情。对于我们普通人来说,在参加一些重要场合的时候,也会打扮的非常精致得体,更何况是经常出现在大众面前的明星了。可是,想要营造出高级上市公司重要公告集锦上港集团2022年净利润173。6亿元,同比增18。3中国核电下属公司收到可再生能源补贴资金49。39亿元上港集团2022年净利润173。6亿元,同比增18。3金钼股份2022年净利润13。9亿元,同比增180。89安科生物2022年
WebRTC音频引擎实现分析WebRTC的音频引擎作为两大基础多媒体引擎之一,实现了音频数据的采集前处理编码发送接收解码混音后处理播放等一系列处理流程。本文在深入分析WebRTC源代码的基础上,学习并总结其音感染之后咳嗽不止,这些止咳小妙招总有适合你的一食疗法1梨子汤,梨子红枣冰糖加枸杞(可不加)加适量水煮沸。2盐蒸橙子,将橙子切开口,用牙签戳数洞,撒上少许盐,上锅蒸熟。二耳穴压豆取穴鼻咽喉气管肺肾脾肾上腺皮质下咳喘点。每次取一男子在梵净山金顶摩崖刻4字被判赔12万,二审维持原判新时代推动法治进程,2022年度十大案件候选案例中,包括一起针对生态破坏的公益诉讼案。在这起案件中,当事人在梵净山景区文物上刻字,被当地检察机关提起公益诉讼。一段拍摄于2021年7绿色林海的别样风景闪耀在庐山之巅的火焰蓝今年的庐山有一道别致的风景,在茫茫的绿色林海中,一群阳光帅气的蓝朋友在庐山周边执行森林防火执勤任务。2022年12月10日,内蒙古森林消防总队派出了内蒙古大兴安岭森林消防支队30015条精品线路发布!你去过哪些?为贯彻落实大运河文化带建设工作部署,积极促进水利风景资源保护利用,助力全省幸福河湖建设,近日,江苏省水利厅遴选推出故道千里远,故事千年传等15条水利风景区精品线路,串联全省76家特临汾黄河一号旅游公路主线实现贯通1月3日,山西晚报记者从临汾市交通运输局了解到随着黄河一号旅游公路主线吉县境内大宁界至姚家畔段冯家庄大桥近日完成架设,该市黄河一号旅游公路主线实现全线贯通。临汾市黄河一号旅游公路主江苏出土汉代双龙墓,开棺后竟冒出一条大白腿,专家看后毛骨悚然江苏连云港惊现千年不腐女尸,专家挖到一半时仓皇而逃一座古墓诉说千年传说,一段传说传承古今历史。2002年的炎炎盛夏,江苏连云港一村庄正在修建高速公路,工人们挖到了一块木板,细细一看元旦经济堂食小高峰出现,部分餐厅客流同比政策优化前增长1300随着全国防疫政策的优化以及稳增长促消费等政策信号释放,2023年元旦假期内,不少餐饮品牌的客流增长超预期,全国多店恢复排长队的热闹场景。不断刷新的叫号提示是餐饮消费加速回暖的缩影。又要囤药?这种药品家里有孩子的禁用,小心吃了之后不长个冬日生活打卡季随着疫情的精准调控实施,国人们已经感受到了它的威力。尤其是在彻底放开之后,更是让越来越多的人都确诊了阳性。感染后,会出现发烧咳嗽等症状,而随着病毒的变异,新的症状也陆揭秘!人工授精实验室如何挑选精兵强将人工授精是一种人工辅助生殖技术,是将优化后的精子注射入女性的生殖道内或宫腔内,达到受孕的目的。人工授精主要适用于男方勃起性功能障碍严重早泄轻中度的少弱精子症免疫性不育女方宫颈因素及怀孕5周的媳妇阳了后胎停流产了中心妇产医院,张明宇陪着媳妇王敏来到计划生育科做孕检,看看孩子的发育状态怎么样。两人高高兴兴地一边办理手续,一边打趣孩子会像谁。随后王敏进入候诊区等待,张明宇则满怀希望的在外面等候
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网