分布式限流实现原理
本文讲述 Sentinel 的分布式限流原理。
模块构成
Sentinel 的分布式限流模块包含:
sentinel-cluster-common-default
: 集群通信的一些通用的类、常量等实现sentinel-cluster-client-default
: 默认的使用 Netty 作为底层通讯的集群 Client 端sentinel-cluster-server-default
: 默认的集群 Server 端
common-default
模块并没有实际的逻辑,只是一些共用的类之类的实现,不再赘述。
Client 端
初始化 Bootstrap
Client 端初始化 Bootstrap
并尝试连接 Server 端:
// NettyTransportClient.java
private Bootstrap initClientBootstrap() {
Bootstrap b = new Bootstrap();
eventLoopGroup = new NioEventLoopGroup();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, ClusterClientConfigManager.getConnectTimeout())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
clientHandler = new TokenClientHandler(currentState, disconnectCallback);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
pipeline.addLast(new NettyResponseDecoder());
pipeline.addLast(new LengthFieldPrepender(2));
pipeline.addLast(new NettyRequestEncoder());
pipeline.addLast(clientHandler);
}
});
return b;
}
b.connect(host, port)
.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
// 打印连接是否异常的日志
if (future.cause() != null) {
}
}
});
客户端对于 Socket 连接的建立和可用性提供了一定的保障:
- 原子操作加
PENDING
状态,避免在正在启动或者正在关闭的时候,连续两次调用start
或者stop
操作。
OFF -> PENDING -> STARTED
STARTED -> PENDING -> OFF
private final AtomicInteger currentState = new AtomicInteger(ClientConstants.CLIENT_STATUS_OFF);
private void connect(Bootstrap b) {
if (currentState.compareAndSet(ClientConstants.CLIENT_STATUS_OFF, ClientConstants.CLIENT_STATUS_PENDING)) {
// 建立连接
}
}
@Override
public void stop() throws Exception {
while (currentState.get() == ClientConstants.CLIENT_STATUS_PENDING) {
try {
Thread.sleep(200);
} catch (Exception ex) {
// Ignore.
}
}
// 停止连接
}
TokenClientHandler
监听 Channel 状态,切换为 STARTED
或者 OFF
状态:
public class TokenClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
currentState.set(ClientConstants.CLIENT_STATUS_STARTED);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
currentState.set(ClientConstants.CLIENT_STATUS_OFF);
disconnectCallback.run();
}
}
- 断开连接的时候自动定时重连,连接的操作是放在另外一个线程池中单独执行的。同时结合
shouldRetry
的状态位,当手动确实是要 stop 的时候,那么就无需重试尝试连接了,确保只在连接突然断掉的状态下,才尝试连接。
private final AtomicBoolean shouldRetry = new AtomicBoolean(true);
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("sentinel-cluster-transport-client-scheduler"));
SCHEDULER.schedule(new Runnable() {
@Override
public void run() {
if (shouldRetry.get()) {
try {
startInternal();
} catch (Exception e) {
RecordLog.warn("[NettyTransportClient] Failed to reconnect to server", e);
}
}
}
}, RECONNECT_DELAY_MS * (failConnectedTime.get() + 1), TimeUnit.MILLISECONDS);
消息类型
Client 端会发送三种类型的消息:PING
、PONG
和 PARAM_FLOW
。每种类型的消息,都相应的注册了 RequestDataWriter
和 ResponseDataDecoder
。
Writer
:消息写入到流,写入的顺序,写入的方式等。Decoder
:从流中读取该消息的响应,并解析成合适的响应体结果。
在 Client 启动的时候,会将这些消息的 Writer 和 Decoder 注册到 RequestDataWriterRegistry
和 ResponseDataDecodeRegistry
中心。
@InitOrder(0)
public class DefaultClusterClientInitFunc implements InitFunc {
@Override
public void init() throws Exception {
initDefaultEntityWriters();
initDefaultEntityDecoders();
}
private void initDefaultEntityWriters() {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter());
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter());
Integer maxParamByteSize = ClusterClientStartUpConfig.getMaxParamByteSize();
if (maxParamByteSize == null) {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
} else {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter(maxParamByteSize));
}
}
private void initDefaultEntityDecoders() {
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PING, new PingResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_FLOW, new FlowResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PARAM_FLOW, new FlowResponseDataDecoder());
}
}
在 NettyResponseDecoder
中,其根据 ClientEntityCodecProvider
查找到了默认的 Decoder
实现类 (DefaultResponseEntityDecoder
),然后在其内部,根据不同的消息类型,从上述的 ResponseDataDecodeRegistry
注册中心中查找对应的 EntityDecoder
进行 decode
:
public class NettyResponseDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
ResponseEntityDecoder<ByteBuf, Response> responseDecoder = ClientEntityCodecProvider.getResponseEntityDecoder();
Response response = responseDecoder.decode(in);
out.add(response);
}
}
默认的 Decoder
的格式如下:
+--------+---------+-----------+---------+
| xid(4) | type(1) | status(1) | data... |
+--------+---------+-----------+---------+
当解析完之后,将数据等响应信息封装为 ClusterResponse
:
public class DefaultResponseEntityDecoder implements ResponseEntityDecoder<ByteBuf, ClusterResponse> {
@Override
public ClusterResponse decode(ByteBuf source) {
if (source.readableBytes() >= 6) {
int xid = source.readInt();
int type = source.readByte();
int status = source.readByte();
EntityDecoder<ByteBuf, ?> decoder = ResponseDataDecodeRegistry.getDecoder(type);
Object data;
if (source.readableBytes() == 0) {
data = null;
} else {
data = decoder.decode(source);
}
return new ClusterResponse<>(xid, type, status, data);
}
return null;
}
}
同理,对于 Writer
的实现,也是类似的套路:
public class DefaultRequestEntityWriter implements RequestEntityWriter<ClusterRequest, ByteBuf> {
@Override
public void writeTo(ClusterRequest request, ByteBuf target) {
int type = request.getType();
EntityWriter<Object, ByteBuf> requestDataWriter = RequestDataWriterRegistry.getWriter(type);
// Write head part of request.
writeHead(request, target);
// Write data part.
requestDataWriter.writeTo(request.getData(), target);
}
private void writeHead(Request request, ByteBuf out) {
out.writeInt(request.getId());
out.writeByte(request.getType());
}
}
限流的消息格式
+-------------------+--------------+----------------+---------------+------------------+
| RequestID(8 byte) | Type(1 byte) | FlowID(8 byte) | Count(4 byte) | PriorityFlag (1) |
+-------------------+--------------+----------------+---------------+------------------+
请求限流 Token
Sentinel 客户端在执行限流算发啊的时候,会判断是否是分布式限流还是本地机器限流。如果某个限流规则是需要分布式限流,那么首先获取一个 TokenService
,从这个 Service 上去请求 TokenResult
,然后根据请求到的 TokenResult
决定是否进行限流。在这之间如果发生了比如未配置 TokenService
或者抛出了异常,则会降级为采用本地的限流策略来进行限流。
// FlowRuleChecker.java
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
// 如果是集群中的某个节点
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
try {
TokenService clusterService = pickClusterService();
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
// If client is absent, then fallback to local mode.
} catch (Throwable ex) {
RecordLog.warn("[FlowRuleChecker] Request cluster token unexpected failed", ex);
}
// Fallback to local flow control when token client or server for this rule is not available.
// If fallback is not enabled, then directly pass.
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
上述 pickClusterService
的实现如下:
private static TokenService pickClusterService() {
if (ClusterStateManager.isClient()) {
return TokenClientProvider.getClient();
}
if (ClusterStateManager.isServer()) {
return EmbeddedClusterTokenServerProvider.getServer();
}
return null;
}
当以集群启动的时候,那么返回的就是 TokenClientProvider.getClient()
。在这个 Client 内部,requestToken
方法封装的就是发送限流请求的参数,获取限流响应的过程:
// DefaultClusterTokenClient.java
@Override
public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) {
if (notValidRequest(flowId, acquireCount)) {
return badRequest();
}
FlowRequestData data = new FlowRequestData().setCount(acquireCount)
.setFlowId(flowId).setPriority(prioritized);
ClusterRequest<FlowRequestData> request = new ClusterRequest<>(ClusterConstants.MSG_TYPE_FLOW, data);
try {
TokenResult result = sendTokenRequest(request);
logForResult(result);
return result;
} catch (Exception ex) {
ClusterClientStatLogUtil.log(ex.getMessage());
return new TokenResult(TokenResultStatus.FAIL);
}
}
接下来继续看,获取到的 TokenResult
,有哪些结果:
// FlowRuleChecker.java
private static boolean applyTokenResult(/*@NonNull*/ TokenResult result, FlowRule rule, Context context,
DefaultNode node,
int acquireCount, boolean prioritized) {
switch (result.getStatus()) {
case TokenResultStatus.OK:
return true;
case TokenResultStatus.SHOULD_WAIT:
// Wait for next tick.
try {
Thread.sleep(result.getWaitInMs());
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
case TokenResultStatus.NO_RULE_EXISTS:
case TokenResultStatus.BAD_REQUEST:
case TokenResultStatus.FAIL:
case TokenResultStatus.TOO_MANY_REQUEST:
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
case TokenResultStatus.BLOCKED:
default:
return false;
}
}
通过上述代码片段可知,只有 TokenResult
明确返回了 OK
或者 SHOULD_WAIT
的时候才会不进行限流;其他情况,比如规则不存在、请求不合法、需要阻塞等,一律执行限流策略。
Server 端
初始化 Netty
// NettyTransportServer.java
// 默认 2 * CPU 核数 个 Worker 线程
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
@Override
public void start() {
ServerBootstrap b = new ServerBootstrap();
this.bossGroup = new NioEventLoopGroup(1);
this.workerGroup = new NioEventLoopGroup(DEFAULT_EVENT_LOOP_THREADS);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
p.addLast(new NettyRequestDecoder());
p.addLast(new LengthFieldPrepender(2));
p.addLast(new NettyResponseEncoder());
p.addLast(new TokenServerHandler(connectionPool));
}
})
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.childOption(ChannelOption.SO_TIMEOUT, 10)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
b.bind(port).addListener(new GenericFutureListener<ChannelFuture>() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.cause() != null) {
// 打印启动失败日志
}
}
});
}
Server 端的 Netty 启动,同样借助 PENDING
状态避免多次启动/停止 Server。另外 Server 端还建立了一个连接池:
public class ConnectionPool {
/**
* Format: ("ip:port", connection)
*/
private final Map<String, Connection> CONNECTION_MAP = new ConcurrentHashMap<String, Connection>();
}
在 TokenServerHandler
中,当有新的连接、或者有连接断掉、或者有新的消息发送过来的时候,都会相应的维护连接池中的连接:
public class TokenServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 添加到连接池里面
globalConnectionPool.createConnection(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 从连接池中移除这个连接
String remoteAddress = getRemoteAddress(ctx);
globalConnectionPool.remove(ctx.channel());
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
globalConnectionPool.refreshLastReadTime(ctx.channel());
}
}
连接池中维护的连接,内部有一个定时器,可以定时地关闭空闲的连接。
当 Netty Server 有限流请求过来地时候,其会根据类型找到合适的 RequestProcessor
,然后让这个处理器去处理请求:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ClusterRequest) {
ClusterRequest request = (ClusterRequest)msg;
// Pick request processor for request type.
RequestProcessor<?, ?> processor = RequestProcessorProvider.getProcessor(request.getType());
ClusterResponse<?> response = processor.processRequest(request);
writeResponse(ctx, response);
}
}
TokenService
在 Processor 内部,会使用 TokenService
去判断是否需要限流。使用哪一个 TokenService
是根据 TokenServiceProvider
去决定的,其底层就是 SPI 机制。
// TokenServiceProvider.java
SpiLoader.loadFirstInstanceOrDefault(TokenService.class, DefaultTokenService.class);
在 TokenService
内部,其根据 flowId
找到对应的限流规则,然后再通过 ClusterFlowChecker
去获取集群的 Token:
// DefaultTokenService.java
@Override
public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
// The rule should be valid.
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}
ClusterFlowRuleManager
内部通过使用一个 ConcurrentHashMap
维护了 <flowId, clusterRule>
的映射,而这个映射是用户通过 loadRules
方法手动将限流规则加载到内存中的。
// ClusterFlowRuleManager.java
public static void loadRules(String namespace, List<FlowRule> rules) {
NamespaceFlowProperty<FlowRule> property = PROPERTY_MAP.get(namespace);
if (property != null) {
property.getProperty().updateValue(rules);
}
}
其中 ClusterFlowChecker
的核心算法如下,就是看当前可用的资源是否超过了最大的资源数量:
// ClusterFlowChecker.java
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
Long id = rule.getClusterConfig().getFlowId();
ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
// 平均一秒钟通过多少个 COUNT
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
// 总共多少个 COUNT
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
// 剩余多少个 COUNT
double nextRemaining = globalThreshold - latestQps - acquireCount;
if (nextRemaining >= 0) {
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
// Remaining count is cut down to a smaller integer.
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
} else {
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
// blocked
return new TokenResult(TokenResultStatus.BLOCKED)
.setRemaining(0)
.setWaitInMs(0);
}
}