范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Nacos和Apollo中的长轮询定时机制,太好用了

  今天这篇文章来介绍一下Nacos配置中心的原理之一:长轮询机制的应用
  为方便理解与表达,这里把 Nacos 控制台和 Nacos 注册中心称为 Nacos 服务器(就是 web 界面那个),我们编写的业务服务称为 Nacso 客户端;
  Nacos 动态监听的长轮询机制原理图,本篇将围绕这张图剖析长轮询定时机制的原理:
  ConfigService 是 Nacos 客户端提供的用于访问实现配置中心基本操作的类型,我们将从 ConfigService 的实例化开始长轮询定时机制的源码之旅;  1. 客户端的长轮询定时机制
  我们从NacosPropertySourceLocator.locate()开始【断点步入】:
  1.1 利用反射机制实例化 NacosConfigService 对象
  客户端的长轮询定时任务是在 NacosFactory.createConfigService() 方法中,构建 ConfigService 对象是实例时启动的,我们接着 1.1 处的源码;
  进入 NacosFactory.createConfigService():  public static ConfigService createConfigService(Properties properties) throws NacosException {     //【断点步入】创建 ConfigService     return ConfigFactory.createConfigService(properties); }
  进入 ConfigFactory.createConfigService(),发现其使用反射机制实例化 NacosConfigService 对象;
  1.2 NacosConfigService 的构造方法里启动长轮询定时任务
  进入 NacosConfigService.NacosConfigService() 构造方法,里面设置了一些跟远程任务相关的属性;
  1.2.1 初始化 HttpAgent
  MetricsHttpAgent 类的设计如下:
  ServerHttpAgent 类的设计如下:
  1.2.2 初始化 ClientWorker
  进入 ClientWorker.ClientWorker() 构造方法,主要是创建了两个定时调度的线程池,并启动一个定时任务;
  进入 ClientWorker.checkConfigInfo(),每隔 10s 检查一次配置是否发生变化;  cacheMap:是一个 AtomicReference> 对象,用来存储监听变更的缓存集合,key 是根据 datalD/group/tenant(租户)拼接的值。Value 是对应的存储在 Nacos 服务器上的配置文件的内容;  长轮询任务拆分:默认情况下,每个长轮询 LongPollingRunnable 任务处理3000个监听配置集。如果超过3000个,则需要启动多个 LongPollingRunnable 去执行;
  1.3 检查配置变更,读取变更配置 LongPollingRunnable.run()
  因为我们没有这么多配置项,debug 不进去,所以直接找到 LongPollingRunnable.run() 方法,该方法的主要逻辑是:
  根据 taskld 对 cacheMap 进行数据分割;
  再通过checkLocalConfig() 方法比较本地配置文件(在${user} acosconfig 里)的数据是否存在变更,如果有变更则直接触发通知;  public void run() {     List cacheDatas = new ArrayList();     ArrayList inInitializingCacheList = new ArrayList();     try {         //遍历 CacheData,检查本地配置         Iterator var3 = ((Map)ClientWorker.this.cacheMap.get()).values().iterator();         while(var3.hasNext()) {             CacheData cacheData = (CacheData)var3.next();             if (cacheData.getTaskId() == this.taskId) {                 cacheDatas.add(cacheData);                 try {                     //检查本地配置                     ClientWorker.this.checkLocalConfig(cacheData);                     if (cacheData.isUseLocalConfigInfo()) {                         cacheData.checkListenerMd5();                     }                 } catch (Exception var13) {                     ClientWorker.LOGGER.error("get local config info error", var13);                 }             }         }         //【断点步入 1.3.1】通过长轮询请求检查服务端对应的配置是否发生变更         List changedGroupKeys = ClientWorker.this.checkUpdateDataIds(cacheDatas, inInitializingCacheList);         //遍历存在变更的 groupKey,重新加载最新数据         Iterator var16 = changedGroupKeys.iterator();         while(var16.hasNext()) {             String groupKey = (String)var16.next();             String[] key = GroupKey.parseKey(groupKey);             String dataId = key[0];             String group = key[1];             String tenant = null;             if (key.length == 3) {                 tenant = key[2];             }             try {                 //【断点步入 1.3.2】读取变更配置,这里的 dataId、group 和 tenant 是【1.3.1】里获取的                 String content = ClientWorker.this.getServerConfig(dataId, group, tenant, 3000L);                 CacheData cache = (CacheData)((Map)ClientWorker.this.cacheMap.get()).get(GroupKey.getKeyTenant(dataId, group, tenant));                 cache.setContent(content);                 ClientWorker.LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", new Object[]{ClientWorker.this.agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)});             } catch (NacosException var12) {                 String message = String.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", ClientWorker.this.agent.getName(), dataId, group, tenant);                 ClientWorker.LOGGER.error(message, var12);             }         }         //触发事件通知         var16 = cacheDatas.iterator();         while(true) {             CacheData cacheDatax;             do {                 if (!var16.hasNext()) {                     inInitializingCacheList.clear();                     //继续定时执行当前线程                     ClientWorker.this.executorService.execute(this);                     return;                 }                 cacheDatax = (CacheData)var16.next();             } while(cacheDatax.isInitializing() && !inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheDatax.dataId, cacheDatax.group, cacheDatax.tenant)));             cacheDatax.checkListenerMd5();             cacheDatax.setInitializing(false);         }     } catch (Throwable var14) {         ClientWorker.LOGGER.error("longPolling error : ", var14);         ClientWorker.this.executorService.schedule(this, (long)ClientWorker.this.taskPenaltyTime, TimeUnit.MILLISECONDS);     } }
  注意:这里的断点需要注意 Nacos 服务器上修改配置(间隔大于 30s),进入后才好理解;  1.3.1 检查配置变更 ClientWorker.checkUpdateDataIds()
  我们点进 ClientWorker.checkUpdateDataIds()  方法,发现其最终调用的是 ClientWorker.checkUpdateConfigStr() 方法,其实现逻辑与源码如下:  通过MetricsHttpAgent.httpPost()  方法(上面 1.2.1 有提到)调用/v1/cs/configs/listener 接口实现长轮询请求;  轮询请求在实现层面只是设置了一个比较长的超时时间,默认是 30s;  如果服务端的数据发生了变更,客户端会收到一个 HttpResult ,服务端返回的是存在数据变更的 Data ID、Group、Tenant;  获得这些信息之后,在LongPollingRunnable.run() 方法中调用 getServerConfig() 去 Nacos 服务器上读取具体的配置内容;  List checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException {     List params = Arrays.asList("Listening-Configs", probeUpdateString);     List headers = new ArrayList(2);     headers.add("Long-Pulling-Timeout");     headers.add("" + this.timeout);     if (isInitializingCacheList) {         headers.add("Long-Pulling-Timeout-No-Hangup");         headers.add("true");     }     if (StringUtils.isBlank(probeUpdateString)) {         return Collections.emptyList();     } else {         try {             //调用 /v1/cs/configs/listener 接口实现长轮询请求,返回的 HttpResult 里包含存在数据变更的 Data ID、Group、Tenant             HttpResult result = this.agent.httpPost("/v1/cs/configs/listener", headers, params, this.agent.getEncode(), this.timeout);             if (200 == result.code) {                 this.setHealthServer(true);                 //                 return this.parseUpdateDataIdResponse(result.content);             }             this.setHealthServer(false);             LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", this.agent.getName(), result.code);         } catch (IOException var6) {             this.setHealthServer(false);             LOGGER.error("[" + this.agent.getName() + "] [check-update] get changed dataId exception", var6);             throw var6;         }         return Collections.emptyList();     } }1.3.2 读取变更配置 ClientWorker.getServerConfig()
  进入 ClientWorker.getServerConfig()  方法;读取服务器上的变更配置;最终调用的是 MetricsHttpAgent.httpGet()  方法(上面 1.2.1 有提到),调用 /v1/cs/configs  接口获取配置;然后通过调用 LocalConfigInfoProcessor.saveSnapshot() 将变更的配置保存到本地;
  2. 服务端的长轮询定时机制2.1 服务器接收请求 ConfigController.listener()
  Nacos客户端 通过 HTTP 协议与服务器通信,那么在服务器源码里必然有对应接口的实现;
  在 nacos-config 模块下的 controller 包,提供了个 ConfigController 类来处理请求,其中有个 /listener 接口,是客户端发起数据监听的接口,其主要逻辑和源码如下:  获取客户端需要监听的可能发生变化的配置,并计算 MD5 值;  ConfigServletInner.doPollingConfig() 开始执行长轮询请求;  2.2 执行长轮询请求 ConfigSer
  vletInner.doPollingConfig()
  进入 ConfigServletInner.doPollingConfig() 方法,该方法封装了长轮询的实现逻辑,同时兼容短轮询逻辑;
  进入 LongPollingService.addLongPollingClient() 方法,里面是长轮询的核心处理逻辑,主要作用是把客户端的长轮询请求封装成 ClientPolling 交给 scheduler 执行;
  2.3 创建线程执行定时任务 ClientLongPolling.run()
  我们找到 ClientLongPolling.run() 方法,这里可以体现长轮询定时机制的核心原理,通俗来说,就是:
  服务端收到请求之后,不立即返回,没有变更则在延后 (30-0.5)s 把请求结果返回给客户端;
  这就使得客户端和服务端之间在 30s 之内数据没有发生变化的情况下一直处于连接状态;
  2.4 监听配置变更事件2.4.1 监听 LocalDataChangeEvent 事件的实现
  当我们在 Nacos 服务器或通过 API 方式变更配置后,会发布一个 LocalDataChangeEvent 事件,该事件会被 LongPollingService 监听;
  这里 LongPollingService 为什么具有监听功能在 1.3.1 版本后有些变化:  1.3.1 前:LongPollingService.onEvent();  1.3.1 后:Subscriber.onEvent();
  在 Nacos 1.3.1 版本之前,通过 LongPollingService 继承 AbstractEventListener 实现监听,覆盖 onEvent() 方法;
  而在 1.3.2 版本之后,通过构造订阅者实现
  效果是一样的,实现了对 LocalDataChangeEvent 事件的监听,并通过通过线程池执行 DataChangeTask 任务;  2.4.2 监听事件后的处理逻辑 DataChangeTask.run()
  我们找到 DataChangeTask.run() 方法,这个线程任务实现了
  3. 源码结构图小结3.1 客户端的长轮询定时机制NacosPropertySourceLocator.locate() :初始化 ConfigService 对象,定位配置;  NacosConfigService.NacosConfigService():NacosConfigService 的构造方法;  Executors.newScheduledThreadPool():创建 executor 线程池;  Executors.newScheduledThreadPool():创建 executorService 线程池;  ClientWorker.checkConfigInfo():使用 executor 线程池检查配置是否发生变化;  ClientWorker.checkLocalConfig():检查本地配置;  ClientWorker.checkUpdateDataIds():检查服务端对应的配置是否发生变更;  ClientWorker.getServerConfig():读取变更配置  MetricsHttpAgent.httpPost():调用 /v1/cs/configs/listener 接口实现长轮询请求;  ClientWorker.checkUpdateConfigStr():检查服务端对应的配置是否发生变更;  MetricsHttpAgent.httpGet():调用 /v1/cs/configs 接口获取配置;  LongPollingRunnable.run():运行长轮询定时线程;  MetricsHttpAgent.MetricsHttpAgent():初始化 HttpAgent;  ClientWorker.ClientWorker():初始化 ClientWorker;  NacosFactory.createConfigService():创建配置服务器;  ConfigFactory.createConfigService():利用反射机制创建配置服务器;  3.2 服务端的长轮询定时机制ConfigController.listener() :服务器接收请求;  LongPollingService.addLongPollingClient():长轮询的核心处理逻辑,提前 500ms 返回响应;  ClientLongPolling.run():长轮询定时机制的实现逻辑;  Map.put():将 ClientLongPolling 实例本身添加到 allSubs 队列中;  Queue.remove():把 ClientLongPolling 实例本身从 allSubs 队列中移除;  MD5Util.compareMd5():比较数据的 MD5 值;  LongPollingService.sendResponse():将变更的结果通过 response 返回给客户端;  ConfigExecutor.scheduleLongPolling():启动定时任务,延时时间为 29.5s;  HttpServletRequest.getHeader():获取客户端设置的请求超时时间;  MD5Util.compareMd5():和服务端的数据进行 MD5 对比;  ConfigExecutor.executeLongPolling():创建 ClientLongPolling 线程执行定时任务;  MD5Util.getClientMd5Map():计算 MD5 值;  ConfigServletInner.doPollingConfig():执行长轮询请求;  3.3 Nacos 服务器配置变更的事件监听
  Nacos 服务器上的配置发生变更后,发布一个 LocalDataChangeEvent 事件;
  Subscriber.onEvent() :监听 LocalDataChangeEvent 事件(1.3.2 版本后);  DataChangeTask.run():根据 groupKey 返回配置;  ConfigExecutor.executeLongPolling():通过线程池执行 DataChangeTask 任务;

乐视发布入门iPhone13,只要499元在我印象中,乐视超级手机在当年还是很强的,采用了在当时来看非常先进的USBC接口,还有观感上几乎无边框的屏幕,整体没有明显的短板。可惜后来乐视资金链出现问题,贾老板也跑路去了美国,又翻脸!欧文再度发声想念詹姆斯这次不拿杜兰特当兄弟了?古语有云,树无皮,必死无疑人无脸,天下无敌。这句话用在篮网球星凯里欧文身上,或许再合适不过了自打提前开启休赛期后,欧文就几乎没有闲着,而在此过程中,他做得最多的一件事就是吃饭睡觉夸人类能否进入第二文明?如果进入不了是科学惹的祸吗?人类能否进入第二文明?这个问题一直是科学家们长期以来思索的问题。今天就来闲聊戏说人类能否进入第二文明?如果进入不了是科学惹的祸吗?自从地球上出现了人类那一刻起,地球就开始变得热闹起你知道阿努纳奇吗?传说中人类的真正起源目前发现最早的文明的历史充满了伟大的事件和神秘引起了人们更多想要了解它的好奇心与地球上已经存在的其他物种相比人类为什么进化得如此之快?古老的民族通过他们的宗教试图提供关于人类的起源几周后,人类或将第一次看到地狱的样子詹姆斯韦布空间望远镜将给地狱行星拍照。巨蟹座55e想像图。ESA哈勃M。Kornmesser几周后,人类可能会第一次(借助技术手段)了解地狱是什么样子的了。这个地狱其实在天上。詹姆它和地球太像了,有一大片海洋,科学家海洋之下或许存在生命人人能科普,处处有新知如今有越来越多的人开始意识到,地球的环境正在逐渐恶化,很多地区已经不适合人类生存,而且因为全球变暖的缘故,地球两极的冰雪开始融化,而且融化速度不断上升,以至于我们的祖先是一条鱼?科学家证实人类中耳由远古鱼的鳃进化而来人类的祖先是谁?有许多种说法,但目前能够得到大量化石和分子生物学证据支持的还是进化论。从进化论的观点来看,人是由古猿进化而来,但古猿又是从哪里来的呢?古猿到人类的进化过程如果从古猿感谢凯尔特人和勇士,让NBA季后赛封神,诠释竞技体育的涵义20212022赛季NBA季后赛,最终凯尔特人和勇士分别从东西部冲出重围,相聚总决赛。凯尔特人的斯玛特杰伦布朗塔图姆罗威格威普理查德都是自己选秀的,勇士的库里汤普森格林卢尼普尔穆迪CBA三热点赵睿周鹏回应李宁体育,李根复出,北控有意王非任主帅1李宁体育因在一条评论没伸脚不是正宗的广东宏远队,下面回复有道理,被网友推上了热点,随着事情的发展,球迷纷纷抵制李宁商品,广东队球员也做出了回应,广东老将周鹏说还真是准官方,真棒啊曝今年没有一加10Ultra一加10T是今年最后一款旗舰手机中国新闻虽然在国内发布的机型不算多,但放眼全球来看,一加今年是非常忙碌的,截止目前,已经推出了一加10Pro一加Ace一加Ace竞速版一加Nord2TNordCE2和NordC传统实体升级战实体店转型的4个现代化6年前,传统商业打了一场升级战,挣扎未果,一直在摸着石头过河。2022年,这场战役将成为生存战,战况会更加激烈!事实上,近几年传统商业一直挣扎在这样的态势中转型就像去送死,不转型就
天文现象月掩金星将于周五出现,下一次要等到2063年月掩金星示意图。香港于周五(24日)晚上会出现月掩金星,届时月球会运行到地球和金星之间,并将金星短暂遮掩。香港太空馆将于当天晚上7时30分至九时透过其YouTube频道及康文Fac只要799,刷新史低,最便宜的精粤Z790主板搭配i713700K装机实测按时下的行情,御三家Z790主板基本还在2000起。而精粤,直接刷新史低,将价格干到了799。也就是说,4090,你买不到RTX4090显卡,但是799(还能更低),你能买到一块Z联想ThinkBook1416独显版笔记本将在月底上市IT之家是科技媒体中的老牌一哥了,我不仅每天都看,还会转载他家的笔记本新闻。手机笔电等数码新品的资讯,去他家的App和网站上看准没错。今天就向大家强烈推荐IT之家公众号,另外也建议部分RTX4070RTX4060Ti可能不使用12VHPWR供电口,视TDP而定目前GeForceRTX4090以及RTX4080都使用的是12VHPWR供电接口,而目前对于不少用户来说,想要用上使用12VHPWR供电接口的显卡,还需要为电源准备38pin转1Windows11PhoneLink如何连接iPhone大有学问3月初,Windows1122H2正式版用户已开始迎来Windows1122H2Moment2更新(KB5022913)。其中有个很感兴趣的功能PhoneLink最新支持连上万元的大路灯是防近视智商税吗?鱼sir被狠狠打脸了!最初听到大路灯,鱼sir以为说的是下图这个后来知道,它是一种价格数千到上万元不等,贵到有点抠脚趾的护眼灯,据说能体验到格林威治7点的晨光,有人说是8点,还有说10点的,反正越说越玄机构2月全球智能手机出货量同比下降11,三星重回第一市场研究机构TechInsights报告显示,2023年2月全球智能手机出货量(批发)和销量(零售)分别同比下降11和5。尽管数字有所下降,但由于中国重新开放和经济前景改善,智能手创维推出新款G60显示器4K150Hz,2304分区MiniLEDIT之家3月21日消息,创维今日公布G60显示器,27英寸4K150Hz,2304分区MiniLED,售价4799元。官方表示,创维G60显示器是行业首款4K150HzMiniLE3月打野梯队新时代三大野王诞生,李白和赵云成落难兄弟!T1的意思是指版本第一梯队的英雄,这类英雄会比普通英雄较为强势,而T0的意思是指凌驾于第一梯队,属于版本非ban必选的英雄。本榜单是结合了官方所公布的排位巅峰赛数据和KPL职业赛英亡国少女谢莉尔关于少女复仇的二三事欢迎关注,获取更多游戏评测资讯,入手与否不再犹豫喜欢的不妨点个赞唷()文悗绫编辑Rin亡国少女谢莉尔是一款由TEKUNOSA开发,神乐代理发行的亡国复仇向题材的RPG。玩家需要扮演不知道怎么购买暗黑4?小锤来教你!关注小锤,畅玩好玩又有趣的游戏,保证兄弟们不会迷失游玩的方向!兄弟们!大家好!今天暗黑4火热开启测试,所有预购了暗黑4的玩家都可以开始试玩了!小锤发现有些朋友不太清楚如何购买这款游