基于 LeapArray 的统计

基于 LeapArray 的统计

Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

Metric 统计类

LeapArray 作为基础设施,其目的主要是为了在底层配合 Metric 类,以便对资源的各种信息做统计,我们来看 Metric 类都需要统计哪些信息:

public interface Metric extends DebugSupport {
    // 获取总的成功数量
    long success();
    // 获取最大的成功数量
    long maxSuccess();
    // 获取异常数量
    long exception();
    // 获取阻塞的数量
    long block();
    // 获取总的通过数量
    long pass();
    // 获取总响应时间
    long rt();
    // 获取最小的响应时间
    long minRt();
}

相应的,在 Metric 接口中,也有添加各种事件的方法:

public interface Metric extends DebugSupport {
    // 添加 n 个异常
    void addException(int n);
    // 添加 n 个阻塞
    void addBlock(int n);
    // 添加 n 个成功的响应
    void addSuccess(int n);
    // 添加 n 个通过
    void addPass(int n);
    // 在总响应时间上,添加 rt 时间
    void addRT(long rt);
}

ArrayMetricaddSuccess(int count)success() 为例,我们看下在内部是如何使用 LeapArray 提供资源统计服务的。

addSuccess 方法中,首先获取了当前时间戳的 Bucket (data 的类型是 LeapArray<MetricBucket>),然后在这个 Bucket 上进一步调用了内部 MetricBucketaddSuccess 方法。

@Override
public void addSuccess(int count) {
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addSuccess(count);
}

在来看 success() 方法,其内部也首先获取了当前时间戳的 Bucket,然后使用 values() 方法获取了内部存储的所有的 Bucket,最后进行汇总统计 success 的个数。

@Override
public long success() {
    data.currentWindow();
    long success = 0;

    List<MetricBucket> list = data.values();
    for (MetricBucket window : list) {
        success += window.success();
    }
    return success;
}

所以,currentWindow() 方法可谓是重中之重,此方法的作用是获取当前时间戳对应的 Bucket。我们接下来就重点研究此方法。

LeapArray 实现原理

数据结构

首先来看 LeapArray 内部主要的使用的数据结构:

protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;

protected final AtomicReferenceArray<WindowWrap<T>> array;
  • intervalInMs 代表滑动窗口的总时间长度
  • sampleCount 表示要将 intervalInMs 切割成多少份 Bucket,也就是滑动窗口中有多少个 bucket
  • windowLengthInMs 表示每一份 Bucket 代表多长时间
  • array 表示的是底层的数组,即这么多份 bucket 底层使用这个数组进行存储

在创建 LeapArray 的时候,会保证每一个 Bucket 存储的时间长度都是相同的,不会出现最后一个 Bucket 存储的是半份或者一部分时间长度这种情况:

AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

new LeapArray(6, 1200) 为例,内部大概的数据结构图,如下图所示:

    B0      B1       B2      B3     B4      B5
||_______|_______|_______|_______|_______|_______||
0       200     400     600     800     1000    1200  timestamp

表示,将 intervalInMs = 1200 毫秒的总时间窗口,拆分为 sampleCount = 6 个 Bucket,每一个 Bucket 存储的时间长度为 windowLengthInMs = 200 毫秒,底层 array 的长度为 6

获取时间戳对应的 Bucket

currentWindow() 方法就是根据当前最新的时间戳,去获取或者更新数组中相应的索引处的值,更新值也就是达到了窗口滑动的效果。

(1)根据时间戳获取对应的索引

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    return (int)(timeId % array.length());
}

比如数组长度为 6windowLengthInMs200,那么:

  • [1200, 1400) 的索引为 0
  • [1400, 1600) 的索引为 1
  • 以此类推…
    B0      B1       B2      B3     B4      B5
||_______|_______|_______|_______|_______|_______||
0       200     400     600     800     1000    1200  timestamp
1200   1400    1600    1800    2000     2200    2400
2400   2600    2800    3000    3200     3400    3600
...

(2)计算时间戳对应的 Bucket 开始的时间

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}

示例:

  • 1350 - 1350 % 200 = 1200
  • 1400 - 1400 % 200 = 1400
  • 1670 - 1670 % 200 = 1600

(3)根据时间戳找到数组中指定索引处的 Bucket

  • Bucket 不存在,创建一个新的 Bucket,然后 CAS 更新到这个索引处
     B0       B1      B2    NULL      B4
||_______|_______|_______|_______|_______||___
200     400     600     800     1000    1200  timestamp
                            ^
                        time=888
        bucket is empty, so create new and update
  • Bucket 是最新的,直接返回这个 Bucket
    B0       B1      B2     B3      B4
||_______|_______|_______|_______|_______||___
200     400     600     800     1000    1200  timestamp
                            ^
                         time=888
           startTime of Bucket 3: 800, so it's up-to-date
  • Bucket 已经过时了,重置当前的 Bucket,并且将所有过时的 Bucket 都给清理掉
  (old)
            B0       B1      B2    NULL      B4
|_______||_______|_______|_______|_______|_______||___
...    1200     1400    1600    1800    2000    2200  timestamp
                             ^
                          time=1676
         startTime of Bucket 2: 400, deprecated, should be reset

代码如下所示:

while (true) {
    WindowWrap<T> old = array.get(idx);
    if (old == null) {
        WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        if (array.compareAndSet(idx, null, window)) {
            // Successfully updated, return the created bucket.
            return window;
        } else {
            // Contention failed, the thread will yield its time slice to wait for bucket available.
            Thread.yield();
        }
    } else if (windowStart == old.windowStart()) {
        return old;
    } else if (windowStart > old.windowStart()) {
        if (updateLock.tryLock()) {
            try {
                // Successfully get the update lock, now we reset the bucket.
                return resetWindowTo(old, windowStart);
            } finally {
                updateLock.unlock();
            }
        } else {
            // Contention failed, the thread will yield its time slice to wait for bucket available.
            Thread.yield();
        }
    } else if (windowStart < old.windowStart()) {
        // Should not go through here, as the provided time is already behind.
        return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
    }
}

窗口 Wrapper

LeapArray 内部的数组中存储的类型是 WindowWrap<T> 类型,即在内部存储的 T 类型之外又包裹了一层,这个 WindowWrap 就是窗口,窗口内部的 T 才是真正的内部数据:

protected final AtomicReferenceArray<WindowWrap<T>> array;

WindowWrap 封装了和此窗口相关的一些属性:

public class WindowWrap<T> {

    // 一个窗口有多少毫秒
    private final long windowLengthInMs;

    // 窗口开始的时间-毫秒
    private long windowStart;

    // 静态数据
    private T value;

}

内部有一个比较重要的方法是 resetTo,可以将窗口的起始时间改为指定值,实现窗口滑动的效果:

public WindowWrap<T> resetTo(long startTime) {
    this.windowStart = startTime;
    return this;
}

LeapArray 底层存储

继承 LeapArray 的有如下子类

这些子类的定义如下:

public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {}
public class BucketLeapArray extends LeapArray<MetricBucket> {}
public class FutureBucketLeapArray extends LeapArray<MetricBucket> {}
public class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {}
public SlowRequestLeapArray extends LeapArray<SlowRequestCounter> {}
public class UnaryLeapArray extends LeapArray<LongAdder> {}

其中 MetricBucketXXCounter 等在内部也是基于 LongAdder 实现的:

public class MetricBucket {
    private final LongAdder[] counters;
}

static class SlowRequestCounter {
    private LongAdder slowCount;
    private LongAdder totalCount;
}

static class SimpleErrorCounter {
    private LongAdder errorCount;
    private LongAdder totalCount;
}

即最终的统计数据是存储在 LongAdder 中的,而这个 LongAdder 文件的源代码在一开始就标注上了,这个类的作者是 Doug Lea

/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 *
 * From http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/
 */

之所以没有用 Java 自身提供的 LongAdder 类,我猜测是因为当时 Java 类中还没有包含这个类。

在多线程环境中,LongAdder 相比 AtomicLong 性能要高出不少,特别是写多的场景。

LongAdder 的原理是,在最初无竞争时,只更新 base 的值,当有多线程竞争时通过分段的思想,让不同的线程 CAS 更新不同的段,最后把这些段相加就得到了完整的 LongAdder 存储的值。

参考