范文健康探索娱乐情感热点
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文

Redisson延迟队列是怎么搞的?

  昨天,记录了Spring Boot基于Redisson实现订单状态延迟处理的思路后,就想着,还是要去搞清楚 RedissonDelayedQueue 的实现思路,所以,今天就忙里偷闲,去Github下载Redisson 源码来大概查略一番。如何创建RedissonDelayedQueue队列
  在 Test 中,可以看到这样一段代码1 2 RBlockingQueue queue1 = redisson.getBlockingQueue("test");   //按名称获取一个阻塞队列实例 RDelayedQueue dealyedQueue = redisson.getDelayedQueue(queue1);  //按名称获取一个延迟队列实例。参数是`RQueue`
  在获取 DelayedQueue 队列时,会初始化两个队列名称redisson_delay_queue_{队列名} 和redisson_delay_queue_timeout_{队列名} ,还会创建一个QueueTransferTask 队列中转的定时任务,添加队列1 dealyedQueue.offer(3, 5, TimeUnit.SECONDS);  //第一次参数是要发送给队列的数据,第二个参数是要延迟的时间,第三个参数是延迟的时间单位
  这里,我们直接来到 offerAsync 方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public RFuture offerAsync(V e, long delay, TimeUnit timeUnit) {         if (delay < 0) {             throw new IllegalArgumentException("Delay can"t be negative");         }                  long delayInMs = timeUnit.toMillis(delay);         long timeout = System.currentTimeMillis() + delayInMs;               long randomId = ThreadLocalRandom.current().nextLong();         return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,                 "local value = struct.pack("dLc0", tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"                + "redis.call("zadd", KEYS[2], ARGV[1], value);"               + "redis.call("rpush", KEYS[3], value);"               // if new object added to queue head when publish its startTime                // to all scheduler workers                + "local v = redis.call("zrange", KEYS[2], 0, 0); "               + "if v[1] == value then "                  + "redis.call("publish", KEYS[4], ARGV[1]); "               + "end;",               Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),               timeout, randomId, encode(e));     }
  在代码中,我们可以看到,最终执行的 Lua 脚本,其他的代码基本是一目了然,我们主要来分析一下这段Lua 脚本,1 2 3 4 5 6 7 local value = struct.pack("dLc0", tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);   //将超时时间、随机Id和消息内容序列化为二进制 redis.call("zadd", KEYS[2], ARGV[1], value);   //将序列化后的二进制内容按超时时间作为`score`存放到`redisson_delay_queue_timeout_{队列名}`这个有序集合(sorted set)中 redis.call("rpush", KEYS[3], value);   //将序列化后的二进制内容添加到`redisson_delay_queue_{队列名}`列表(List)中 local v = redis.call("zrange", KEYS[2], 0, 0);   //取出有序集合中的第一个元素 if v[1] == value then    redis.call("publish", KEYS[4], ARGV[1]);   //如果取到第一个元素,则`publish`到`channel`中 end;
  当 publish 到 channel 中,此时会触发 onSubscribe 然后执行 pushTask 方法
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 RFuture startTimeFuture = pushTaskAsync();   //执行`pushTaskAsync`方法,并返回到期时间       startTimeFuture.onComplete((res, e) -> {           if (e != null) {               if (e instanceof RedissonShutdownException) {                   return;               }               log.error(e.getMessage(), e);               scheduleTask(System.currentTimeMillis() + 5 * 1000L);               return;           }                      if (res != null) {   //取到延迟时间,设置执行时间,到期时便去执行`pushTaskAsync`方法               scheduleTask(res);           }       });
  我们继续来看 pushTaskAsync 方法
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 protected RFuture pushTaskAsync() {               return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,                       "local expiredValues = redis.call("zrangebyscore", KEYS[2], 0, ARGV[1], "limit", 0, ARGV[2]); "                     + "if #expiredValues > 0 then "                         + "for i, v in ipairs(expiredValues) do "                             + "local randomId, value = struct.unpack("dLc0", v);"                             + "redis.call("rpush", KEYS[1], value);"                             + "redis.call("lrem", KEYS[3], 1, v);"                         + "end; "                         + "redis.call("zrem", KEYS[2], unpack(expiredValues));"                     + "end; "                       // get startTime from scheduler queue head task                     + "local v = redis.call("zrange", KEYS[2], 0, 0, "WITHSCORES"); "                     + "if v[1] ~= nil then "                        + "return v[2]; "                     + "end "                     + "return nil;",                     Arrays.asList(getRawName(), timeoutSetName, queueName),                     System.currentTimeMillis(), 100);           }
  这里,我们依然只看 Lua 脚本这部分
  1 2 3 4 5 6 7 8 9 10 11 12 13 14  local expiredValues = redis.call("zrangebyscore", KEYS[2], 0, ARGV[1], "limit", 0, ARGV[2]);   //取出`redisson_delay_queue_timeout_{队列名}`中,分数小于当前时间戳的100条数据,意思就是取出到达延迟时间的数据  if #expiredValues > 0 then  //如果有到期数据     for i, v in ipairs(expiredValues) do          local randomId, value = struct.unpack("dLc0", v);  //将二进制反序列化         redis.call("rpush", KEYS[1], value);    //将反序列化后的数据放入到`队列名`中的集合(List)中         redis.call("lrem", KEYS[3], 1, v);   //将数据从`redisson_delay_queue_{队列名}`中移除掉     end;      redis.call("zrem", KEYS[2], unpack(expiredValues));  //批量删除`redisson_delay_queue_timeout_{队列名}`有序集合中的数据 end; local v = redis.call("zrange", KEYS[2], 0, 0, "WITHSCORES");  //取出`redisson_delay_queue_timeout_{队列名}`中最小分数即到期时间,作为定时任务参数,以便下次执行 if v[1] ~= nil then     return v[2]; end return nil; 队列取出数据
  这时候,我们取数据,就需要通过 RBlockingQueue 实例来取1 queue1.poll()
  我们来看看 RBlockingQueue 中的pollAsync 方法1 2 3 public RFuture pollAsync(long timeout, TimeUnit unit) {         return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), toSeconds(timeout, unit));   //RedisCommands.BLPOP_VALUE="BLPOP",这就在List中去取数据,     } 假巴意思总结一下
  这里,我们大概来总结一下,大致的流程是消息生产端给Redis添加延迟消息时,会生成一个有序集合和列表,此时会触发 QueueTransferTask 这个定时任务,该类会执行pushTask 方法设置延迟时间,到达延迟时间后,会再次执行pushTaskAsync 方法,把临时队列中的数据添加到最终集合中去。生产端到最终集合队列中去取消息。
  就先看这么多吧,其他细节以后再抽时间来学习。大概意思就是这么个意思,我也不知道对不对,反正先记下来。
iPhone13小米12和荣耀Magic4,哪款手机更值得买?更建议选择iPhone13,最大的优势就在于性能上搭载的苹果A15芯片不仅性能上领先安卓一个身位,在能耗控制上也比全新骁龙8高两个档次,在加上iOS系统的优化,iPhone13不仅年度折叠预测三星依然是冠军,荣耀令人刮目相看无论折叠屏是否是未来智能手机的终极形态,反正现在除了苹果之外安卓阵营所有品牌都在杀入这个领域。除了极少数的小品牌还没有进入之外,顶部的安卓品牌已经全部进入。甚至有品牌已经商用好几款特斯拉车顶维权当事人发声特斯拉女车主车顶维权案当事人张女士在其个人微博发布视频发布与特斯拉方面民事诉讼案件的相关进展。视频中,张女士表示,这个视频的内容揭露了名誉权案件的事实真相,并指出特斯拉全球副总裁陶有排面!国产机发布会登上美国华尔街日报,高管直言有点傻小米公司成立至今整整12年,为了庆祝这个特殊的日子,雷军晒出最难忘的几张照片,其中一张还登上美国华尔街日报。不少网友评论,果然雷老板有排面,一场发布会都惊动大洋彼岸的报纸,同时也怀百度因擅自转播春晚被央视起诉,判赔超50万Tech星球4月6日消息,近日,央视国际网络有限公司与北京百度网讯科技有限公司著作权权属侵权纠纷民事一审判决书公开。文书显示,原告央视公司诉称,百度公司未经授权许可擅自通过好看视频Java中的多态在Java中实现多态有3个必要条件1,满足继承关系2,要有重写3,父类引用指向子类对象,编译看左边,运行看右边对象的多态性只适用于方法,不适用于属性,属性的编译和运行都看左边多态是OPPOvivo小米宣布新举措不再允许32位应用单独上架IT之家4月6日消息,据移动智能终端生态联盟消息,为了进一步推进国内Android应用生态过渡至64位架构,提升用户体验,该联盟成员OPPOvivo小米共同宣布,自2022年4月起中国真正的高端科技一览!太牛了AI在今天已经是一个非常热门的话题了,但是很少有人从社会学的角度关注它人们要么神圣化它的力量,要么为它给我们生活带来的便利欢呼但具有讽刺意味的是,很多人即便被AI夺走了工作,也依然人性论之宇宙本原(2)大爆炸产生物质与反物质,物质由于引力作用形成星球或星系。在这种理论基础上将有很多问题需要解决星球的死亡时间星系结构空间质量力奇点。空间是什么?在西方科学定义中,空间是三维度的标量,大气科学可助推太阳能工程发展专家顾问哈尔滨工业大学教授杨大智中国科学院大气物理研究所研究员夏祥鳌中国气象局风能太阳能中心科学主任气象服务首席申彦波哈尔滨工业大学博士研究生王文婷在双碳目标背景下,加快清洁能源开技术分享MySQL命令行一则诡异问题分享作者杨涛涛资深数据库专家,专研MySQL十余年。擅长MySQLPostgreSQLMongoDB等开源数据库相关的备份恢复SQL调优监控运维高可用架构设计等。目前任职于爱可生,为各
微信在香港的普及率怎么样?据统计,目前香港即时通讯软件市场,Whatsapp,FaceBookMessenger和Wechat位于前三位。据互联网新思维于2015年3月份发表的2015年互联网最新统计中关于家里的有线电视停播了,怎么样才能看上免费电视?看DTMB地面数字电视吧。准备购买之前,先在网上查询一下你所在的地区信号覆盖情况,如在覆盖范围内,也可考虑。如果你的电视支持收看DTMB,就买一个天线,如不支持就得买机顶盒加天线。如何把家里两个宽带并成一个?各大运营商办理手机套餐,送宽带,有些家庭可能有两个宽带,如何有效利用两个宽带呢。可以使用企业的多WAN端口路由器实现,也可以使用家用无线路由器实现。很多时候企业路由器并不适合家庭使想买一套中央空调,什么牌子的好一点?这个问题问得好!问对人了。国内家用中央空调市场上分三个级别!第一,日系品牌,五大代表品牌有三菱电机,大金,日立,东芝,三菱重工!第二,美系四大品牌,约克,特灵,开利,麦克维尔!第三为什么以前的手机是可拆卸电池,而现在的大多数手机却是不可拆卸电池?现在的手机电池基本都是一体化设计了,用户不能自由拆卸,电池坏掉也只能找维修人员进行更换,当年那种换块电池就能满血复活的情况已经不复存在,这在很多人看来并不方便,那么厂家为什么要设计为什么国内现在有些人越来越喜欢华为笔记本电脑了呢?谢邀。题主这个问题,我感觉叫华为笔记本电脑为什么更多地出现在公众视野中比较好一些。回到题主的问题。华为笔记本电脑在今年年中之后,关注度就在一直上升。我认为他的成功,和他的产品线密不我有一个数码管怎么可以做成电压表?想制作一个数码管显示的电压表,一般有两种方法一是选用内部带有ADC(模数转换器)的单片机,利用其内置的ADC将被测电压转换成数字信号,然后经单片机处理后驱动数码管显示出电压二是采用你认为苹果史上最失败的一款手机是哪部?我认为最失败的还是刚刚上市的iPhoneXS,从最近一个月的用户反馈来看,购买iPhoneXS的用户真的不算很多,可以说比较失败了,原因有以下几点1iPhoneXS的外观和iPho全球大部分文字都被字母化,为何中国可以成功的保住独创的汉字?1995年,马云第一次在西雅图教师朋友家接触到电脑和互联网,他输入了beer,有美国的,有日本的,有德国的,但是没有中国的东西。马云好奇敲了个Chinabeer(中国啤酒),没有,微信办理的ETC怎么样?首先微信小程序搜索ETC助手,点击进入ETC助手小程序进入ETC助手后,小程序会自动定位你所在的省份,点击新办新卡就可以开始线上办理了,广东江苏山东北京四个省份已经可以线上办理,E苹果手机怎么清理王者荣耀的缓存?关键时刻,宇宙一声巨响,小帽子闪亮登场!针对楼主提出的问题本喵子认为楼主应该是玩游戏卡!所以应该这样解决。其一,打开设置通用iphone储存空间选择到王者荣耀,我们发现这个已经占用