分布式限流实现原理

分布式限流实现原理

本文讲述 Sentinel 的分布式限流原理。

模块构成

Sentinel 的分布式限流模块包含:

  • sentinel-cluster-common-default: 集群通信的一些通用的类、常量等实现
  • sentinel-cluster-client-default: 默认的使用 Netty 作为底层通讯的集群 Client 端
  • sentinel-cluster-server-default: 默认的集群 Server 端

common-default 模块并没有实际的逻辑,只是一些共用的类之类的实现,不再赘述。

Client 端

初始化 Bootstrap

Client 端初始化 Bootstrap 并尝试连接 Server 端:

// NettyTransportClient.java
private Bootstrap initClientBootstrap() {
    Bootstrap b = new Bootstrap();
    eventLoopGroup = new NioEventLoopGroup();
    b.group(eventLoopGroup)
        .channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                clientHandler = new TokenClientHandler(currentState, disconnectCallback);

                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                pipeline.addLast(new NettyResponseDecoder());
                pipeline.addLast(new LengthFieldPrepender(2));
                pipeline.addLast(new NettyRequestEncoder());
                pipeline.addLast(clientHandler);
            }
        });

    return b;
}

b.connect(host, port)
    .addListener(new GenericFutureListener<ChannelFuture>() {
    @Override
    public void operationComplete(ChannelFuture future) {
        // 打印连接是否异常的日志
        if (future.cause() != null) {
        }
    }
});

客户端对于 Socket 连接的建立和可用性提供了一定的保障

  • 原子操作加 PENDING 状态,避免在正在启动或者正在关闭的时候,连续两次调用 start 或者 stop 操作。
OFF -> PENDING -> STARTED
STARTED -> PENDING -> OFF
private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);

private void connect(Bootstrap b) {
    if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
        // 建立连接
    }
}

@Override
public void stop() throws Exception {
    while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
        try {
            Thread.sleep(200);
        } catch (Exception ex) {
            // Ignore.
        }
    }

    // 停止连接
}

TokenClientHandler 监听 Channel 状态,切换为 STARTED 或者 OFF 状态:

public class TokenClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        currentState.set(ClientConstants.CLIENT_STATUS_STARTED);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        currentState.set(ClientConstants.CLIENT_STATUS_OFF);

        disconnectCallback.run();
    }

}
  • 断开连接的时候自动定时重连,连接的操作是放在另外一个线程池中单独执行的。同时结合 shouldRetry 的状态位,当手动确实是要 stop 的时候,那么就无需重试尝试连接了,确保只在连接突然断掉的状态下,才尝试连接。
private final AtomicBoolean shouldRetry = new AtomicBoolean(true);
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
        new NamedThreadFactory("sentinel-cluster-transport-client-scheduler"));

SCHEDULER.schedule(new Runnable() {
    @Override
    public void run() {
        if (shouldRetry.get()) {
            try {
                startInternal();
            } catch (Exception e) {
                RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
            }
        }
    }
}, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);        

消息类型

Client 端会发送三种类型的消息:PINGPONGPARAM_FLOW。每种类型的消息,都相应的注册了 RequestDataWriterResponseDataDecoder

  • Writer:消息写入到流,写入的顺序,写入的方式等。
  • Decoder:从流中读取该消息的响应,并解析成合适的响应体结果。

在 Client 启动的时候,会将这些消息的 Writer 和 Decoder 注册到 RequestDataWriterRegistryResponseDataDecodeRegistry 中心。

@InitOrder(0)
public class DefaultClusterClientInitFunc implements InitFunc {

    @Override
    public void init() throws Exception {
        initDefaultEntityWriters();
        initDefaultEntityDecoders();
    }

    private void initDefaultEntityWriters() {
        RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter());
        RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter());
        Integer maxParamByteSize = ClusterClientStartUpConfig.getMaxParamByteSize();
        if (maxParamByteSize == null) {
            RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
        } else {
            RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter(maxParamByteSize));
        }
    }

    private void initDefaultEntityDecoders() {
        ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PING, new PingResponseDataDecoder());
        ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_FLOW, new FlowResponseDataDecoder());
        ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PARAM_FLOW, new FlowResponseDataDecoder());
    }
}

NettyResponseDecoder 中,其根据 ClientEntityCodecProvider 查找到了默认的 Decoder 实现类 (DefaultResponseEntityDecoder),然后在其内部,根据不同的消息类型,从上述的 ResponseDataDecodeRegistry 注册中心中查找对应的 EntityDecoder 进行 decode

public class NettyResponseDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        ResponseEntityDecoder<ByteBuf, Response> responseDecoder = ClientEntityCodecProvider.getResponseEntityDecoder();
        Response response = responseDecoder.decode(in);
        out.add(response);
    }
}

默认的 Decoder 的格式如下:

+--------+---------+-----------+---------+
| xid(4) | type(1) | status(1) | data... |
+--------+---------+-----------+---------+

当解析完之后,将数据等响应信息封装为 ClusterResponse

public class DefaultResponseEntityDecoder implements ResponseEntityDecoder<ByteBuf, ClusterResponse> {

    @Override
    public ClusterResponse decode(ByteBuf source) {
        if (source.readableBytes() >= 6) {
            int xid = source.readInt();
            int type = source.readByte();
            int status = source.readByte();

            EntityDecoder<ByteBuf, ?> decoder = ResponseDataDecodeRegistry.getDecoder(type);

            Object data;
            if (source.readableBytes() == 0) {
                data = null;
            } else {
                data = decoder.decode(source);
            }

            return new ClusterResponse<>(xid, type, status, data);
        }
        return null;
    }
}

同理,对于 Writer 的实现,也是类似的套路:

public class DefaultRequestEntityWriter implements RequestEntityWriter<ClusterRequest, ByteBuf> {

    @Override
    public void writeTo(ClusterRequest request, ByteBuf target) {
        int type = request.getType();
        EntityWriter<Object, ByteBuf> requestDataWriter = RequestDataWriterRegistry.getWriter(type);

        // Write head part of request.
        writeHead(request, target);
        // Write data part.
        requestDataWriter.writeTo(request.getData(), target);
    }

    private void writeHead(Request request, ByteBuf out) {
        out.writeInt(request.getId());
        out.writeByte(request.getType());
    }
}

限流的消息格式

+-------------------+--------------+----------------+---------------+------------------+
| RequestID(8 byte) | Type(1 byte) | FlowID(8 byte) | Count(4 byte) | PriorityFlag (1) |
+-------------------+--------------+----------------+---------------+------------------+

请求限流 Token

Sentinel 客户端在执行限流算发啊的时候,会判断是否是分布式限流还是本地机器限流。如果某个限流规则是需要分布式限流,那么首先获取一个 TokenService,从这个 Service 上去请求 TokenResult,然后根据请求到的 TokenResult 决定是否进行限流。在这之间如果发生了比如未配置 TokenService 或者抛出了异常,则会降级为采用本地的限流策略来进行限流。

// FlowRuleChecker.java
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    // 如果是集群中的某个节点
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }

    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
    try {
        TokenService clusterService = pickClusterService();
        if (clusterService == null) {
            return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
        }
        long flowId = rule.getClusterConfig().getFlowId();
        TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
        return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
        // If client is absent, then fallback to local mode.
    } catch (Throwable ex) {
        RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
    }
    // Fallback to local flow control when token client or server for this rule is not available.
    // If fallback is not enabled, then directly pass.
    return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}

上述 pickClusterService 的实现如下:

private static TokenService pickClusterService() {
    if (ClusterStateManager.isClient()) {
        return TokenClientProvider.getClient();
    }

    if (ClusterStateManager.isServer()) {
        return EmbeddedClusterTokenServerProvider.getServer();
    }
    return null;
}

当以集群启动的时候,那么返回的就是 TokenClientProvider.getClient()。在这个 Client 内部,requestToken 方法封装的就是发送限流请求的参数,获取限流响应的过程:

// DefaultClusterTokenClient.java
@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
    if (notValidRequest(flowId, acquireCount)) {
        return badRequest();
    }
    FlowRequestData data = new FlowRequestData().setCount(acquireCount)
        .setFlowId(flowId).setPriority(prioritized);
    ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
    try {
        TokenResult result = sendTokenRequest(request);
        logForResult(result);
        return result;
    } catch (Exception ex) {
        ClusterClientStatLogUtil.log(ex.getMessage());
        return new TokenResult(TokenResultStatus.FAIL);
    }
}

接下来继续看,获取到的 TokenResult,有哪些结果:

// FlowRuleChecker.java
private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
                                                        DefaultNode node,
                                                        int acquireCount, boolean prioritized) {
    switch (result.getStatus()) {
        case TokenResultStatus.OK:
            return true;
        case TokenResultStatus.SHOULD_WAIT:
            // Wait for next tick.
            try {
                Thread.sleep(result.getWaitInMs());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return true;
        case TokenResultStatus.NO_RULE_EXISTS:
        case TokenResultStatus.BAD_REQUEST:
        case TokenResultStatus.FAIL:
        case TokenResultStatus.TOO_MANY_REQUEST:
            return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
        case TokenResultStatus.BLOCKED:
        default:
            return false;
    }
}

通过上述代码片段可知,只有 TokenResult 明确返回了 OK 或者 SHOULD_WAIT 的时候才会不进行限流;其他情况,比如规则不存在、请求不合法、需要阻塞等,一律执行限流策略。

Server 端

初始化 Netty

// NettyTransportServer.java

// 默认 2 * CPU 核数 个 Worker 线程
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
        SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

@Override
public void start() {
    ServerBootstrap b = new ServerBootstrap();
    this.bossGroup = new NioEventLoopGroup(1);
    this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
    b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 128)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
                p.addLast(new NettyRequestDecoder());
                p.addLast(new LengthFieldPrepender(2));
                p.addLast(new NettyResponseEncoder());
                p.addLast(new TokenServerHandler(connectionPool));
            }
        })
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
        .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
        .childOption(ChannelOption.SO_TIMEOUT, 10)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
    b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {
        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.cause() != null) {
                // 打印启动失败日志
            }
        }
    });
}

Server 端的 Netty 启动,同样借助 PENDING 状态避免多次启动/停止 Server。另外 Server 端还建立了一个连接池:

public class ConnectionPool {

    /**
     * Format: ("ip:port", connection)
     */
    private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();

}

TokenServerHandler 中,当有新的连接、或者有连接断掉、或者有新的消息发送过来的时候,都会相应的维护连接池中的连接:

public class TokenServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 添加到连接池里面
        globalConnectionPool.createConnection(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 从连接池中移除这个连接
        String remoteAddress = getRemoteAddress(ctx);
        globalConnectionPool.remove(ctx.channel());
    }

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        globalConnectionPool.refreshLastReadTime(ctx.channel());
    }

}

连接池中维护的连接,内部有一个定时器,可以定时地关闭空闲的连接。

当 Netty Server 有限流请求过来地时候,其会根据类型找到合适的 RequestProcessor,然后让这个处理器去处理请求:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ClusterRequest) {
        ClusterRequest request = (ClusterRequest)msg;

        // Pick request processor for request type.
        RequestProcessor<?, ?> processor = RequestProcessorProvider.getProcessor(request.getType());
        ClusterResponse<?> response = processor.processRequest(request);
        writeResponse(ctx, response);
    }
}

TokenService

在 Processor 内部,会使用 TokenService 去判断是否需要限流。使用哪一个 TokenService 是根据 TokenServiceProvider 去决定的,其底层就是 SPI 机制。

// TokenServiceProvider.java
SpiLoader.loadFirstInstanceOrDefault(TokenService.class, DefaultTokenService.class);

TokenService 内部,其根据 flowId 找到对应的限流规则,然后再通过 ClusterFlowChecker 去获取集群的 Token:

// DefaultTokenService.java
@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
    // The rule should be valid.
    FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
    if (rule == null) {
        return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
    }

    return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}

ClusterFlowRuleManager 内部通过使用一个 ConcurrentHashMap 维护了 <flowId, clusterRule> 的映射,而这个映射是用户通过 loadRules 方法手动将限流规则加载到内存中的。

// ClusterFlowRuleManager.java
public static void loadRules(String namespace, List<FlowRule> rules) {
    NamespaceFlowProperty<FlowRule> property = PROPERTY_MAP.get(namespace);
    if (property != null) {
        property.getProperty().updateValue(rules);
    }
}

其中 ClusterFlowChecker 的核心算法如下,就是看当前可用的资源是否超过了最大的资源数量:

// ClusterFlowChecker.java
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
    Long id = rule.getClusterConfig().getFlowId();
    ClusterMetric metric = ClusterMetricStatistics.getMetric(id);

    // 平均一秒钟通过多少个 COUNT
    double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
    // 总共多少个 COUNT
    double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
    // 剩余多少个 COUNT
    double nextRemaining = globalThreshold - latestQps - acquireCount;

    if (nextRemaining >= 0) {
        metric.add(ClusterFlowEvent.PASS, acquireCount);
        metric.add(ClusterFlowEvent.PASS_REQUEST, 1);

        // Remaining count is cut down to a smaller integer.
        return new TokenResult(TokenResultStatus.OK)
            .setRemaining((int) nextRemaining)
            .setWaitInMs(0);
    } else {
        metric.add(ClusterFlowEvent.BLOCK, acquireCount);
        metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);

        // blocked
        return new TokenResult(TokenResultStatus.BLOCKED)
            .setRemaining(0)
            .setWaitInMs(0);
    }
}