redisson分布式限流RRateLimiter源码分析
分布式限流-单位时间多实例多线程访问次数限制
接前面聊一聊redisson及优雅实现 和 说一说spring boot优雅集成redisson,简单以源码的方式给大家介绍了redisson的:可重入性、阻塞、续约、红锁、联锁、加锁解锁流程和集成spring boot注意点和优雅实现方式。
接下来在讲一讲平时用的比较多的限流模块--RRateLimiter1.简单使用 public static void main(String[] args) throws InterruptedException { RRateLimiter rateLimiter = createLimiter(); int allThreadNum = 20; CountDownLatch latch = new CountDownLatch(allThreadNum); long startTime = System.currentTimeMillis(); for (int i = 0; i < allThreadNum; i++) { // new Thread(() -> { if(i % 3 == 0) Thread.sleep(1000); boolean pass = rateLimiter.tryAcquire(); if(pass) { log.info("get "); } else { log.info("no"); } // latch.countDown(); // }).start(); } // latch.await(); System.out.println("Elapsed " + (System.currentTimeMillis() - startTime)); } public static RRateLimiter createLimiter() { Config config = new Config(); config.useSingleServer() .setTimeout(1000000) .setPassword("123456") .setAddress("redis://xxxx:6379"); RedissonClient redisson = Redisson.create(config); RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter3"); // 初始化:PER_CLIENT 单实例执行,OVERALL 全实例执行 // 最大流速 = 每10秒钟产生3个令牌 rateLimiter.trySetRate(RateType.OVERALL, 3, 10, RateIntervalUnit.SECONDS); return rateLimiter; } 复制代码
实际结果:[2022-10-29 14:32:46.261][INFO ][main][][] RedisTest - get [2022-10-29 14:32:46.312][INFO ][main][][] RedisTest - get [2022-10-29 14:32:46.358][INFO ][main][][] RedisTest - get [2022-10-29 14:32:47.416][INFO ][main][][] RedisTest - no [2022-10-29 14:32:47.469][INFO ][main][][] RedisTest - no [2022-10-29 14:32:47.517][INFO ][main][][] RedisTest - no [2022-10-29 14:32:48.577][INFO ][main][][] RedisTest - no [2022-10-29 14:32:48.623][INFO ][main][][] RedisTest - no 复制代码2. 实现限流redisson使用了哪些redis数据结构Hash结构 -- 限流器结构:参数rate代表速率参数interval代表多少时间内产生的令牌参数type代表单机还是集群ZSET结构 -- 记录获取令牌的时间戳,用于时间对比。1667025166312 --> 2022-10-29 14:32:461667025166262 --> 2022-10-29 14:32:461667025166215 --> 2022-10-29 14:32:46
3. String结构 --记录的是当前令牌桶中的令牌数【很明显被我用完了现在是0】
3. 超过10s,我再次获取一个令牌,数据结构发生的变化ZSET结构。-- 新生成一个ZSET结构,存放获取令牌的时间戳
String 结构 --当前令牌桶还有2个令牌
4. 源码浅析 RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter3"); // 初始化 // 最大流速 = 每10秒钟产生3个令牌 rateLimiter.trySetRate(RateType.PER_CLIENT, 3, 10, RateIntervalUnit.SECONDS); 复制代码
初始化定义没有什么好讲的,就是创建HASH结构
主要还是讲讲: rateLimiter.tryAcquire() private RFuture tryAcquireAsync(RedisCommand command, Long value) { return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "local rate = redis.call("hget", KEYS[1], "rate");local interval = redis.call("hget", KEYS[1], "interval");local type = redis.call("hget", KEYS[1], "type");assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")local valueName = KEYS[2];local permitsName = KEYS[4];if type == "1" then valueName = KEYS[3];permitsName = KEYS[5];end;local currentValue = redis.call("get", valueName); if currentValue ~= false then local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval); local released = 0; for i, v in ipairs(expiredValues) do local random, permits = struct.unpack("fI", v);released = released + permits;end; if released > 0 then redis.call("zrem", permitsName, unpack(expiredValues)); currentValue = tonumber(currentValue) + released; redis.call("set", valueName, currentValue);end;if tonumber(currentValue) < tonumber(ARGV[1]) then local nearest = redis.call("zrangebyscore", permitsName, "(" .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), "withscores", "limit", 0, 1); local random, permits = struct.unpack("fI", nearest[1]);return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval);else redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])); redis.call("decrby", valueName, ARGV[1]); return nil; end; else assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate"); redis.call("set", valueName, rate); redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])); redis.call("decrby", valueName, ARGV[1]); return nil; end;", Arrays.asList(this.getName(), this.getValueName(), this.getClientValueName(), this.getPermitsName(), this.getClientPermitsName()), new Object[]{value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong()}); } 复制代码
主要就是这段lua代码,下面我详细过一下
作者目前用的3.16.3版本,刚好遇见redisson的bug,见3197,请大家用最新版本,以下为修复后解析。 -- 获取hash结构的速率 local rate = redis.call("hget", KEYS[1], "rate") -- 获取hash结构的时间区间(ms) local interval = redis.call("hget", KEYS[1], "interval") -- 获取hash结构的时间类型 local type = redis.call("hget", KEYS[1], "type") -- 判断是否初始化限流结构 assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized") -- {name}:value string结构,这个key记录的是当前令牌桶中的令牌数 local valueName = KEYS[2] -- {name}:permits zset结构,记录了请求的令牌数,score则为请求的时间戳 local permitsName = KEYS[4] -- 单机限流才会用到,集群模式不用关注 if type == "1" then valueName = KEYS[3] permitsName = KEYS[5] end -- 生产速率rate必须比请求的令牌数大 assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate") -- 初始化RateLimiter并不会初始化stirng结构,因此第一次获取这里currentValue是null local currentValue = redis.call("get", valueName) if currentValue ~= false then -- 第二次获取令牌执行 -------------------------- 获取zset结构:统计之前的请求令牌数 -- 范围是0 ~ (第二次请求时间戳 - 令牌生产的时间) local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval) local released = 0 -- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌 for i, v in ipairs(expiredValues) do -- 获取请求数permits local random, permits = struct.unpack("fI", v) released = released + permits end -- 之前的请求令牌数 > 0, 例如10s产生3个令牌,现在超过10s了,重置周期并计算剩余令牌数 if released > 0 then -- 移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】 redis.call("zrem", permitsName, unpack(expiredValues)) currentValue = tonumber(currentValue) + released ------------------------- 更新string结构:=剩下令牌数+释放令牌数 redis.call("set", valueName, currentValue) end -- 如果当前令牌数 请求的令牌数 if tonumber(currentValue) < tonumber(ARGV[1]) then -- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息 local nearest = redis.call("zrangebyscore", permitsName, "(" .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), "withscores", "limit", 0, 1); local random, permits = struct.unpack("fI", nearest[1]) -- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌 return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval) else -- 如果当前令牌数 请求的令牌数,表示令牌够多,更新zset ------------------------- 更新zset结构 redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) ------------------------- 更新Stringt结构,减少一个剩下的令牌数 redis.call("decrby", valueName, ARGV[1]) return nil end else --------汀雨笔记----------------- 初始化Stringt结构,当前限流器的令牌数 redis.call("set", valueName, rate) --------汀雨笔记----------------- 初始化zset结构 redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) -- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体, -- ARGV[2]就是请求时间戳,ARGV[1]是请求的令牌数,统计会用到,ARGV[3]是当前时间戳为种子的随机数,具体用处还不知道,知道的网友可以留言 ------------------------- 更新Stringt结构,因为这是获取令牌操作,减掉一个令牌 -------------------------【本文作者认为,这里可以直接初始化string结构,值为rate - 1】 redis.call("decrby", valueName, ARGV[1]) return nil end 复制代码
这段lua代码也并不复杂,令牌桶的数量主要是通过时间窗口来控制,判断上一个请求是否超过了令牌生产周期。
留下一个疑问? -- 移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】 redis.call("zrem", permitsName, unpack(expiredValues)) 复制代码
我自己在本地测试,只要超过10s,permitsName就不一样,这就导致了这部分数据是不能移除的,就产生了冗余数据,从前面的截图也可以看出,是新生成了一个zset数据结构。
相当于直接走到了这一步:------------------------- 更新zset结构 redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) 复制代码
至于为什么会产生这样的结果,会的小伙伴可以留言,或者过段时间我提个issue。
及时当勉励 岁月不待人
能看到这里的人呀,都是菁英。
非常感谢
iPhone15ProMax再确认苹果A17圆润机身高速传输,价格不便宜作为全球智能机行业的领军者之一,苹果每年的新iPhone都会吸引众多消费者的关注,无独有偶,在近期多方媒体的不断披露之下,关于苹果明年的顶级旗舰iPhone15ProMax也是传来
刘强东重启京东注本文由新响原创,未经许可,禁止转载。最近,京东和刘强东本人动作不断,无论是痛批管理层,还是发内部信息对员工和高管的待遇一升一降,引起热议的理由除了企业经营行为的本身,最为引人关注
是时候考虑禁售苹果特斯拉了文科技君是时候考虑禁售苹果特斯拉了!美国的野心,已经毫不掩饰了。过去五年间,中兴华为都曾遭到美国不同程度的打压,而在当时,有不少人认为这是因为中兴华为违反了美国的相关条例,所以被针
3大因素,中国芯片迎来追赶期今天没什么大事,就扯一点闲篇。作者认为中国芯片其实迎来一个追赶期,理由有如下3点1,国际芯片发展已见顶以前中国跑,国外也跑,芯片追赶比较难。但如今,国际最先进芯片都已经干到3nm了
08年中国科学家打败日本,创造铁基超导世界纪录,美国博士都酸了上个世纪八十年代末,美国一个挤满人的物理学会议上,在此领域内德高望重的物理学家们纷纷上台演讲。他们大部分都是外国人。然而,在一众高鼻深目的外国面孔中,有一位长相平凡但充满温润气质的
通信历史连载229联邦快递FedEx劫持华为快递包裹事件通信历史连载229联邦快递FedEx劫持华为快递包裹事件联邦快递成立于1971年,最早提出隔夜递送理念的物流公司。1984年,联邦快递是全球范围内第一个进入中国的外资物流企业。19
为什么近些年中国乒乓球员出成绩普遍比日本球员晚?2022年第149篇作者杨磊伴随着19岁的张本智和亚洲杯登顶,唱衰中国乒乓未来的声音再次响起。当然,我知道绝大多数球友出发点是爱之深责之切,未雨绸缪。但是,日本乒乓球员早熟的现象,
成就数智企业打造大国品牌用友作为全球领先的企业云服务与软件提供商,是中国企业数智化服务和软件国产化自主创新的领导品牌,代表着世界舞台上的中国力量。用友品牌和产品在行业内受到广泛认可,连续多年入选中国互联网
那些神仙句子,建议收藏(情本无解,不必自困)1。我也许微不足道,但我相信我注定为人所爱。2。我这辈子都在推迟我以为我之后会有时间再做的事,但敞开的门不会一直敞开。3。当我们还是孩子的时候,我们曾以为,等我们长大,我们就会不再
建议小个子穿大衣时尽量别配平底鞋,换成这些鞋子,显瘦又显高大衣只有身高一六五以上的高个子才能驾驭吗?当然不是了,小个子也能Hold大衣。只不过,小个子穿大衣时,不能像高个子那样随心所欲,从款式挑选到内搭搭配再到鞋子搭配,各方面都要下足了功
米切尔罗宾逊上次战雄鹿我差点犯满离场从那之后便吸取了教训直播吧12月1日讯今日NBA常规赛,尼克斯103109不敌雄鹿。此役,尼克斯球员米切尔罗宾逊出战33分钟,投篮9中7,罚球2中1,得到15分20篮板1助攻1封盖。赛后,他在接受采访