sentinel基于滑动窗口实现实时指标统计原理
sentinel通过责任链模式,让每个slot来实现一种功能来实现流量控制、熔断降级等功能。其中,最重要的一个Slot非StatisticSlot莫属,它通过统计单位时间的调用数、成功数、失败数等,为流量控制、熔断降级等提供数据支撑,而StatisticSlot的底层是基于滑动窗口实现实时指标统计的,下面介绍一下StatisticSlot的工作过程一、StatisticSlot的入口
sentinel将多个slot串联起来,每个slot在处理完成后,将数据传递给下一个slot,这些slot都是通过函数entry作为处理数据的入口,StatisticSlot的入口如下: @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { try { // 触发下一个Slot的entry方法 fireEntry(context, resourceWrapper, node, count, args); // 如果能通过SlotChain中后面的Slot的entry方法,说明没有被限流或降级 // 统计信息 node.increaseThreadNum(); node.addPassRequest(); // 省略部分代码 } catch (BlockException e) { context.getCurEntry().setError(e); // Add block count. node.increaseBlockedQps(); // 省略部分代码 throw e; } catch (Throwable e) { context.getCurEntry().setError(e); // Should not happen node.increaseExceptionQps(); // 省略部分代码 throw e; } }
StatisticSlot的功能主要有:
1、通过现有的统计数据进行规则校验,如果校验通过则可以对监控的接口进行访问。
2、校验通过则进行实时统计数据的更新。
3、如果被block或出现了异常了,则重新更新node中block的指标或异常指标。
通过代码可以发现,StatisticSlot主要是通过DefaultNode node来进行实时统计数据的更新,
下面来看一下DefaultNode的代码:public class DefaultNode extends StatisticNode { private ResourceWrapper id; private ClusterNode clusterNode; // 省略部分代码 //每通过一次,增加一次passRequest public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); } // 省略部分代码 }
通过代码可以看出,DefaultNode主要通过继承StatisticNode来实现统计功能。除此之外,DefaultNode还有一个成员变量ClusterNode,ClusterNode主要是记录所有的context中实时指标的总和。DefaultNode与ClusterNode的关系如下:
DefaultNode:保存着某个resource在某个context中的实时指标,每个DefaultNode都指向一个ClusterNode
ClusterNode:保存着某个resource在所有的context中实时指标的总和,同样的resource会共享同一个ClusterNode,不管他在哪个context中二、StatisticNode与ArrayMetric
从上面的分析中,我们知道实时指标的统计主要在StatisticNode中实现。下面介绍一下StatisticNodeprivate transient Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.sampleCount, IntervalProperty.INTERVAL); private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60); @Override public void addPassRequest() { rollingCounterInSecond.addPass(); rollingCounterInMinute.addPass(); }
两个变量rollingCounterInSecond和rollingCounterInMinute,分别统计一秒钟的实时指标和一分钟的实时指标,他们对应的类是ArrayMetric。 public class ArrayMetric implements Metric { private final LeapArray data; public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } } @Override public void addSuccess(int count) { WindowWrap wrap = data.currentWindow(); wrap.value().addSuccess(count); } } }
在ArrayMetric中,定义了类似于数组的变量data,它里面的每一个元素类似于一个桶,每个桶对应一个时间段,存放这个时间段的统计数据。以BucketLeapArray 为例 public class BucketLeapArray extends LeapArray { public BucketLeapArray(int sampleCount, int intervalInMs) { super(sampleCount, intervalInMs); } }
BucketLeapArray 主要继承LeapArray,LeapArray的代码如下: public abstract class LeapArray { //单个窗口的长度,即每个窗口统计的时间段的大小 protected int windowLengthInMs; //窗口的个数,即array的大小 protected int sampleCount; //整个数组统计的时间段的大小,intervalInMs=windowLengthInMs*sampleCount protected int intervalInMs; //一个数组,每个元素用来记录对应时间的数据 protected final AtomicReferenceArray> array; }
由此可见,数组array中的WindowWrap具有统计数据指标的能力。
在ArrayMetric的函数addSuccess中,增加统计数据主要是通过MetricBucket中的addSuccess来实现,那我们就看一下MetricBucket。 public enum MetricEvent { PASS, BLOCK, EXCEPTION, SUCCESS, RT, OCCUPIED_PASS } public class MetricBucket { private final LongAdder[] counters; public void addSuccess(int n) { add(MetricEvent.SUCCESS, n); } public void addPass(int n) { add(MetricEvent.PASS, n); } public MetricBucket add(MetricEvent event, long n) { counters[event.ordinal()].add(n); return this; } }
在MetricBucket中,定义了一个数组counters,数组中的元素分别用来记录单位时间内的pass、block、exception、success、rt等。
由此可见,StatisticNode主要是通过ArrayMetric来确定好滑动窗口的大小( windowLengthInMs)
和个数(sampleCount),ArrayMetric中的数组data的size就是sampleCount,它的每一个元素就是MetricBucket ,即存放数据的桶。三、滑动窗口
在使用滑动窗口统计数据时,需要知道当前数据应该落到哪个桶里面,下面介绍一下sentinel中滑动窗口的数据存放原理。先看一下滑动窗口数组的定义 public abstract class LeapArray { // 时间窗口的长度 protected int windowLengthInMs; // 采样窗口的个数 protected int sampleCount; // 以毫秒为单位的时间间隔 protected int intervalInMs; // 采样的时间窗口数组 protected AtomicReferenceArray> array; /** * LeapArray对象 * @param windowLength 时间窗口的长度,单位:毫秒 * @param intervalInSec 统计的间隔,单位:秒 */ public LeapArray(int sampleCount, int intervalInMs) { this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } }
在LeapArray中,定义数组array,默认窗口大小为500ms,窗口个数为2(intervalInSec 默认为1000,sampleCount默认为2)
现在继续回到 ArrayMetric.addPass() 方法中 @Override public void addPass() { WindowWrap wrap = data.currentWindow(); wrap.value().addPass(); }
主要是通过currentWindow()来获取当前窗口 public WindowWrap currentWindow() { return currentWindow(TimeUtil.currentTimeMillis()); } @Override public WindowWrap currentWindow(long time) { // time每增加一个windowLength的长度,timeId就会增加1,时间窗口就会往前滑动一个 long timeId = time / windowLength; // Calculate current index. // idx被分成[0,arrayLength-1]中的某一个数,作为array数组中的索引 int idx = (int)(timeId % array.length()); // Cut the time to current window start. long currentWindowStart = time - time % windowLength; while (true) { // 从采样数组中根据索引获取缓存的时间窗口 WindowWrap old = array.get(idx); // array数组长度不宜过大,否则old很多情况下都命中不了,就会创建很多个WindowWrap对象 if (old == null) { // 如果没有获取到,则创建一个新的 WindowWrap window = new WindowWrap(windowLength, currentWindowStart, new Window()); // 通过CAS将新窗口设置到数组中去 if (array.compareAndSet(idx, null, window)) { // 如果能设置成功,则将该窗口返回 return window; } else { // 否则当前线程让出时间片,等待 Thread.yield(); } // 如果当前窗口的开始时间与old的开始时间相等,则直接返回old窗口 } else if (currentWindowStart == old.windowStart()) { return old; // 如果当前时间窗口的开始时间已经超过了old窗口的开始时间,则放弃old窗口 // 并将time设置为新的时间窗口的开始时间,此时窗口向前滑动 } else if (currentWindowStart > old.windowStart()) { if (addLock.tryLock()) { try { // if (old is deprecated) then [LOCK] resetTo currentTime. return resetWindowTo(old, currentWindowStart); } finally { addLock.unlock(); } } else { Thread.yield(); } // 这个条件不可能存在 } else if (currentWindowStart < old.windowStart()) { // Cannot go through here. return new WindowWrap(windowLength, currentWindowStart, new Window()); } } }
这段代码的逻辑如下:
1、获取当前时间,用当前时间对窗口大小windowLength求差,得到时间的索引timeId,再用timeId对数组的长度取模,得到数组的下标idx
2、根据数组下标idx得到数组中的元素 WindowWrap old 。
2.1、如果old为空,说明没有初始化,整个时候就需要创建一个新的窗口,再将窗口放入到数组中。
2.2、如果old不为空,且当时时间段的开始时间和old的开始时间相同,则说明当前时间对应的窗口就是old,直接返回。
2.3、如果当前时间段的开始时间大于old的开始时间,说明old是属于上一轮数组的时间窗口,则需要执行函数resetWindowTo(old, currentWindowStart)。函数resetWindowTo就是将old的时间设置为当前时间窗口的时间,并且清理old中之前统计的数据,即将old清空,时间重置为当前时间窗口的开始时间。
这个滑动窗口的难点在于,时间timeId会不断增加,但是窗口数组data是固定大小(假设大小为2,数组下标为0、1)。所以刚开始时间窗口的索引idx=0,会落地array[0]中,然后时间增加,落到array[1]中,然后时间继续增加,idx=0,但是不能让这一次的数据落到上一次的时间窗口中,需要需要执行resetWindowTo,将上次统计到的数据情况。
另外,sentinel会运行一个线程,定时将滑动窗口中统计的数据写入到本地文件中,所以不用担心执行resetWindowTo会丢失掉之前统计的数据。