限流实现原理

限流实现原理

本文以最常见的限流场景来讲述 Sentinel 基础的工作原理。

限流架构图

限流原理的总的架构图如下:

FlowSlotFlowRuleManager 中根据用当前资源名称作为 Key,然后读取出来这个资源绑定的 FlowRule 规则。然后将这些规则作为方法的参数,来调用 FlowRuleCheckercheckFlow 方法。

checkFlow 内部,遍历这每一个 FlowRule,逐次判断是否可以通过限流检查:

// FlowRuleChecker.java
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                        Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

每一个 FlowRule 内部都可以关联不同的 TrafficShapingController,关联不同的流量整形器的代码如下:

private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
    if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
        switch (rule.getControlBehavior()) {
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        ColdFactorProperty.coldFactor);
            case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                        rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
            case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
            default:
                // Default mode or unknown mode: default traffic shaping controller (fast-reject).
        }
    }
    return new DefaultController(rule.getCount(), rule.getGrade());
}

所以最终是否可以通过限流检查,是由不同的 TrafficShapingController 所决定的。

构造 Slot 链

第一次调用 entry 方法的时候,会根据 resourceWrapper 寻找或者新建属于这个资源的 ProcessorSlot 链,每个资源的 ProcessorSlotChain 会放到一个 chainMap 中:

// CtSph.java
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // Entry size limit.
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }

                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

新建 SlotChain 的过程,会首先通过 SpiLoader 寻找用户提供的自定义 SlotChainBuilder 实现类,如果没有,那么就使用默认的 DefaultSlotChainBuilder。这个 Builder 会使用 SpiLoader 查找所有实现了 ProcessorSlot 的类,并且按照这些 Slot 类上面的 SpiOrder 注解所声明的顺序,来对所有的 Slot 进行排序。

Sentinel 框架默认提供的 Builder 的实现类如下 (文件 com.alibaba.csp.sentinel.slotchain.SlotChainBuilder 的内容):

# Default slot chain builder
com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder

默认提供的 ProcessorSlot 的实现类如下 (文件 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 的内容):

# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

默认构造出来的 Slot 链如下所示:

NodeSelectorSlot

NodeSelectorSlot 用来构建调用树 (invocation tree)。每一个 Context 都有一颗调用树:

// NodeSelectorSlot.java

DefaultNode node = map.get(context.getName());
if (node == null) {
    synchronized (this) {
        node = map.get(context.getName());
        if (node == null) {
            node = new DefaultNode(resourceWrapper, null);
            HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
            cacheMap.putAll(map);
            cacheMap.put(context.getName(), node);
            map = cacheMap;
            // Build invocation tree
            ((DefaultNode) context.getLastNode()).addChild(node);
        }

    }
}

context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);

ClusterBuilderSlot

// ClusterBuilderSlot.java
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args)
    throws Throwable {
    if (clusterNode == null) {
        synchronized (lock) {
            if (clusterNode == null) {
                // Create the cluster node.
                clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                newMap.putAll(clusterNodeMap);
                newMap.put(node.getId(), clusterNode);

                clusterNodeMap = newMap;
            }
        }
    }
    node.setClusterNode(clusterNode);
}

StatisticSlot

StatisticSlot 作用主要就是在 entry 方法中增加线程的数量,记录通过请求的数量;并在方法退出的时候,递减线程的数量,记录响应时间,标识这是一条成功的请求:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
    // Request passed, add thread count and pass count.
    node.increaseThreadNum();
    node.addPassRequest(count);
}

@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
    node.addRtAndSuccess(rt, batchCount);
    node.decreaseThreadNum();
}

其它 Slot

  • LogSlot:负责捕获 BlockException 并记录日志
  • AuthoritySlot:负责进行权限校验
  • SystemSlot:负责确保 QPS、线程总数量、响应时间、系统负载、CPU 使用率不过载
  • FlowSlot:负责限流
  • DegradeSlot:熔断器

Node 树

Constants 中默认新建了资源名称为 machine-rootROOT 和名为 __total_inbound_traffic__ENTRY_NODE 这两个节点,其中 ENTRY_NODE 是用于全局的 SystemRule 规则的:

/**
 * Global ROOT statistic node that represents the universal parent node.
 */
public final static DefaultNode ROOT = new EntranceNode(new StringResourceWrapper(ROOT_ID, EntryType.IN),
    new ClusterNode(ROOT_ID, ResourceTypeConstants.COMMON));

/**
 * Global statistic node for inbound traffic. Usually used for {@code SystemRule} checking.
 */
public final static ClusterNode ENTRY_NODE = new ClusterNode(TOTAL_IN_RESOURCE_NAME, ResourceTypeConstants.COMMON);

ContextUtil 中默认给 ROOT 节点添加了资源名称为 sentinel_default_contextEntranceNode

static {
    // Cache the entrance node for default context.
    initDefaultContext();
}

private static void initDefaultContext() {
    String defaultContextName = Constants.CONTEXT_DEFAULT_NAME;
    EntranceNode node = new EntranceNode(new StringResourceWrapper(defaultContextName, EntryType.IN), null);
    Constants.ROOT.addChild(node);
    contextNameNodeMap.put(defaultContextName, node);
}

在进入 NodeSelectorSlot 的时候,这个 Slot 会添加一个 DefaultNode 到树上:

node = new DefaultNode(resourceWrapper, null);
((DefaultNode) context.getLastNode()).addChild(node);

Context

可以手动指定资源所处的 Context

public static void main(String[] args) {
    try {
        Context context=ContextUtil.enter("context1");
        Entry entry=SphU.entry("HelloWorld");
        entry.exit();
        ContextUtil.exit();
    } catch (BlockException ex) {
        // 处理被流控的逻辑
        System.out.println("blocked!");
    }catch (Exception e){
        e.printStackTrace();
    }
}

每当我们调用 SphU.entry() 或者 SphO.entry() 获取访问资源许可的时候都需要当前线程处在某个context中,如果我们没有显式调用 ContextUtil.enter(),默认会使用 Default context。一个线程对应一个 Context,一个 ContextName 对应多个 Context,一个 ContextName 共享一个 EntranceNode。同一个资源在不同的Context下都有调用,它们使用的也会是同一个处理链条。

参考