RPC基础功能数据传输序列化反序列化客户端代理类请求映射分发RPC产品功能 除了RPC基础功能为,RPC还具备八大功能模块 Consumer:连接管理、负载均衡、请求路由、超时处理 Provider:队列线程池、超时丢弃、优雅关闭、过载保护Consumer核心设计 完整的consumer功能模块图 请求路由通过一系列规则过滤出可以选择的provider节点列表,在应用隔离,读写分离灰度发布中发挥关键作用。匹配规则规则描述待比较属性运算符属性匹配值匹配节点数据结构设计链表 以IP分流规则举例 行为链表连接管理 保持与Provider的长连接,用于传输数据和返回结果,通常情况下基于tcp连接,udphttp也可以实现。初始化时机饿汉模式服务间连接、数据库连接懒汉模式网关连接连接数维护服务连接池数据库连接池心跳断线重连机制client线程模型负载均衡 确保多个Provider实例节点流量均匀合理(并非绝对平均);支持节点扩容缩容与灰度发布(调整流量权重比例)轮询按顺序轮流分发请求随机随机选择分发请求取模请求数实例数得到集群下标,将请求分发给指定下标的集群实例。权重根据实例的性能设置权重值进行请求分发一致性hash根据hash值将0232次方按照实例数划分为不同的hash范围,根据范围值分发请求超时处理对于长时间没有响应的请求,需要作异常处理,及时释放资源。工作线程阻塞位置等待响应通知超时逻辑工作线程等待通知数据返回终止等待超时抛出异常数据结构Mapkey:sessionID,value:WindowDataProvider核心设计队列线程池 将不同类型的请求,放入各自的队列,每个队列分配独立的线程池,进行资源隔离。这里的ThreadPool并非指java中的线程池对象,这里是真正的线程池。 Provider作为服务提供方,对调用方的信息是无感知的,随业务发展可能会有N个consumer对provider进行调用,工作线程有限,因而需要设计数据结构(队列)对请求进行存放。 数据结构设计分析:请求量多大?是否需要削峰填谷缓存请求?是否存在请求顺序性要求?请求之间相互隔离?不同的请求放不同的队列,不同队列使用不同线程池,资源隔离 线程与线程之间什么样的通信方式是最安全的? 通过通信方式来共享内存,不要通过共享内存来通信。线程与线程之间通过队列方式通信是最好的。补充知识:OS分配资源的基本单位是进程,cpu调度的基本单位是线程。 线程分配的合理数: 线程数CPU核数22;线程过多会造成线程等待和上下文切换的资源开销;线程太少又无法充分利用cpu资源。 这里就不提供压测截图了,关于压测有兴趣可以百度自行了解,或者找测试同学沟通 QPS队列线程2w5w8w11w单队列多线程1640。00s0。010。00s0。050。00s0。190。01s多队列单线程6410。00s0。010。00s0。020。000。070。01s 在QPS8w时单队列多线程模型的耗时明显比多队列单线程高超时丢弃 快速失败已超时的请求,缓解队列的压力 IO线程处理反序列化后将请求放入队列(reqQueue) 入队代码publickvoidrun(Groupgroup,IAsyncHandlerhandler){构造AsyncTask对象放入队列AsyncTasktasknewAsyncTask(taskTimeout,handler);balance(group,task);}异步任务构造方法publicAsyncTask(inttimeout,IAsyncHandlerhandler){super();if(timeout0){timeout1000;}this。timeouttimeout;超时时间this。handlerhandler;this。addTimeSystem。currentTimeMillis();入队时间}当线程池中工作线程从队列中取出请求进行处理时,判断当前请求是否超时。超时则丢弃,未超时则调用服务处理 出队代码publicvoidrun(){while(!isStop){AsyncTasktasknull;try{tasktaskQueue。poll(1500,TimeUnit。MILLISECONDS);取出队列任务if(null!task){execTimeoutTask(task);判断任务是否超时}}catch(InterruptedExceptione){logger。error(haserror!,e);}catch(Throwableex){if(task!null){task。getHandler()。exceptionCaught(ex);}}}}publicvoidexecTimeoutTask(AsyncTasktask)throw{当前时间减去入队时间是否大于超时时间超时回调超时if(System。currentTimeMillis()task。getAddTime()task。getTimeout()){task。getHandler()。exceptionCaught(newTimeoutException(threadFactoryNameasynctasktimeout!));return;}else{没有超时回调处理任务Objectobjtask。getHandler()。run();task。getHandler()。messageReceived();}} 对于consumer而言,一个请求调用的超时时间请求队列排队时间(主要耗时)程序处理时间优雅关闭 进程结束前应确保队列中的的请求全部处理完成,这路的处理完成是直接回复并非处理逻辑。 如何通知调用方?返回数据中带关闭信息建议专门关闭协议通知调用方不建议服务提供方对调用方是无感知的;即便是在知道调用方的情况下这样的设计也会造成循环依赖,且调用可能失败,系统复杂度过高。 返回数据中带关闭信息可以复用现有RPC通道优雅关闭Server端实现监听关闭信号kill12ComponentpublicclassSignalConfig{AutowiredprivateSignalRegistrysignalRegistrypublicvoidinit(){signalRegistry。register();}}监听关闭信号publicvoidregister(){try{if(StringUtils。isNotBlank(osName)(!isMac()!isWindows())){SignalsignewSignal(USR2);这里的USR2其实就是12代表的是用户自定义信号这表示当用户通过kill12来结束某个进程时回调operateSignalHandler函数进行处理Signal。handle(sig,operateSignalHandler);Signalsig2newSignal(STKFLT);Signal。handle(sig2,breakHeartbeatSignal);}}catch(Exceptione){logger。error(signalregisterfailed!,e)}}改变服务状态改变服务状态ComponentpublicclassOperateSignalimplementsSignalHandler{privatestaticLoggerloggerLoggerFactory。getLogger(OperateSignal。class);AutowiredprivateRpcContextrpcContetx;Overridepublicvoidhandle(SingalsingalName){logger。info(server:{}currentstateis:{},newObject〔〕{rpcContext。getServiceName(),rpcContext。getServerState()});设置当前服务状态为重启rpcContext。setServerState(ServerStateType。Reboot);logger。info(server:{}willreboot!,newObject〔〕{rpcContext。getServiceName()});}}通知客户端超时处理判断请求没有超时时调用service方法处理请求publicObjectinnerInvoke(RpcContextrpcContext,MethodSignaturemethodSignature)throwException{这里使用责任链模式维护了RPC上下文对象requestFilter(context);Objectresponsenull;RPCContext。setThreadLocal(context);。。。。。省略。。。。。context。getResponseProtocol()。setSdpEntity(response);responseFilter(context);RpcContext。clear();returncontext;}protectedvoidrequestFilter(RpcContextcontext)throwsExcpetion{logger。debug(beginrequestFilters);for(IFilterfilter:requestFilters){if(context。getExecFilter()ExecFilterType。ALLcontext。getExecFilter()ExecFilterType。RequestOnly){filter。filter(context);}logger。debug(endrequestFilters);}}publicvoidfilter(RpcContextcontext)throwsException{if(serviceContext。getServerState)ServerStateType。Reboot(protocol。getPlatformType()PlatformType。Javaprotocol。getPlatformType()PlatformType。PHP)){封装RPC响应RpcResponseresponsenewRpcResponse();ResetProtocolrpnewResetProtocol();rp。setMsg(Thisserverisreboot!);responseProtocol。setSdpEntity(rp);response。setResponseBuffer(responseProtocol。toBytes(false,null));context。setRpcResponse(response);context。setExecFilter(ExecFilterType。None);不再继续过滤context。setDoInvoke(false);}} 程序启动后初始化,初始化时初始化容器、初始化插件、初始化监听信号(重要)、初始化服务 解释:通常结束进程我们用kill9命令这里的9其实就是一种信号量,用户可以自定义信号量发送给操作系统,可以自定义实现优雅关闭client端实现根据返回改变节点状态publicProtocolrequest(ProtocolrequestProtocol)throwsException{判断服务节点状态if(ServerState。RebootstateServerState。Deadstate){thrownewRebootException();};。。。。注册窗口事件socket。registerRec(requestProtocol。getSessionID());异步发送请求socket。send(data);。。。。接收数据,等待数据到达事件byte〔〕buffersocket。receive(requestProtocol。getSessionID()。currUserCount);ProtocolreceiveProtocolProtocol。fromBytes(buffer,socket。isRights(),socket。getDESKey());接收回包首先判断类型是否是重启SDPTypesdpTypereceiveProtocol。getSDPType();if(sdpTypeSDPType。Reset){如果是重启服务节点置为不可用this。asReboot();logger。info(server:〔{}〕,address:〔{}〕wasrebooted,willchooseanotherone!,newObject〔〕{this。getName(),this。getAddress()});抛出服务重启异常thrownewRebootException();}} 设置服务器为重启状态publicvoidasReboot(){if(ServerState。Reboot!this。getState()){this。setState(ServerState。Reboot);this。setDeadTime(System。currentTimeMillis());this。setWeight(1);this。getSocketPool()。destroy();ServerStateDetector。instance()。add(this);logger。debug(thisserverisreboot!host:this。getAddress());}}节点探索过载保护 服务提供方为保证正常运行,主动丢弃超出处理能力的请求。 这里的主动丢弃与超时丢弃不同。每个请求入队时判断队列中的请求数是否达到阈值,超过则直接丢弃。 入队任务代码publickvoidrun(Groupgroup,IAsyncHandlerhandler){构造AsyncTask对象放入队列AsyncTasktasknewAsyncTask(taskTimeout,handler);balance(group,task);}publicvoidbalance(Groupgroup,AsyncTasktask){if(groupGroup。HIGH){balanceTask(highFactor。getAndIncrement(),groupHighWorkers,task);}elseif(groupGroup。DEFAULT){balanceTask(defaultFactor。getAndIncrement(),defaultWorkers,task);}}publicvoidbalanceTask(intfactor,AsyncWorker〔〕workers,AsyncTasktask){intidxfactorworkers。length;如果预设值大于0if(limitSize0){workers〔idx〕。addTask(task,limitSize,mode);}else{worker〔idx〕。addTask(task);}}voidaddTask(AsyncTasktask,intlimitSize,booleanabortNewTask){如果请求队列长度超出了预设值抛弃新来的任务if(this。taskQueue。size()limitSize){if(abortNewTask){task。getHandler()。exceptionCaught(newTimeoutException(threadFactoryNameabortthistask,becausethequeueisfull!));}else{elimintateOldTask(task);}}else{this。queue。offer(task);}}