RocketMQ 逻辑队列

RocketMQ 逻辑队列

当前,MessageQueue 和 Broker 耦合在一起,意味着 Broker 数量变化之后,消息队列的数量也会发生变化,这会造成所有的队列都需要一个重新平衡的过程,这个过程可能需要数分钟才能恢复。增加逻辑队列之后,Broker 数量的变化不会影响逻辑队列数量的变化,二者可以独立变化。

架构设计

brokerName MessageQueue LogicalQueue
broker1 0 0
broker1 1 1
broker2 0 2
broker2 1 3

假设当前一个 LogicalQueue 从 broker1 迁移到了 broker2,我们迁移的仅仅是映射关系,而非实际的数据,所以 broker1 依然能够正常消费 LogicalQueue-0 这个逻辑队列里面的数据,但是我们会将这个队列的状态置位只读,故这个队列不能再写入消息:

brokerName MessageQueue LogicalQueue QueueStatus
broker1 0 0(0-100) ReadOnly
broker1 1 1 Normal
broker2 0 2 Normal
broker2 1 3 Normal
broker2 2 0(101-) Normal

当 broker1 从 commit log 和 consume queue 中清除了所有数据后,QueueStatus 变为 Expired (不可读也不可写):

brokerName MessageQueue LogicalQueue QueueStatus
broker1 0 0(-) Expired
broker1 1 1 Normal
broker2 0 2 Normal
broker2 1 3 Normal
broker2 2 0(101-) Normal

如果这个 LogicQueue 再次迁移回 broker1,它会重用这个过期的 MessageQueue:

brokerName MessageQueue LogicalQueue QueueStatus
broker1 0 0(201-) Normal
broker1 1 1 Normal
broker2 0 2 Normal
broker2 1 3 Normal
broker2 2 0(101-200) ReadOnly

如果这个 LogicQueue 再次迁移回 broker1 的时候,当前没有过期的 MessageQueue,它会创建一个新的 MessageQueue:

brokerName MessageQueue LogicalQueue QueueStatus
broker1 0 0(0-100) ReadOnly
broker1 1 1 Normal
broker1 2 0(201-) Normal
broker2 0 2 Normal
broker2 1 3 Normal
broker2 2 0(101-200) ReadOnly

如果 broker2 下线了,那么上面的所有的 LogicQueue 都应该进行迁移:

brokerName MessageQueue LogicalQueue QueueStatus
broker1 0 0 Normal
broker1 1 1 Normal
broker1 2 2(101-) Normal
broker1 3 3(101-) Normal
broker2 0 2(0-100) ReadOnly
broker2 1 3(0-100) ReadOnly

当 broker2 上面的所有数据包括 commit log 和 consume queue 被消费完后,那么 broker2 可以被移除掉了:

brokerName MessageQueue LogicalQueue QueueStatus
broker1 0 0 Normal
broker1 1 1 Normal
broker1 2 2(101-) Normal
broker1 3 3(101-) Normal

当部署了新的 broker 后,我们可以使用命令来迁移一些 LogicQueue 到这个 broker 上,来分担一些流量:

brokerName MessageQueue LogicalQueue QueueStatus
broker1 0 0 Normal
broker1 1 1 Normal
broker1 2 2(101-200) ReadOnly
broker1 3 3(101-200) ReadOnly
broker3 0 2(201-) Normal
broker3 1 3(201-) Normal

实现

public class LogicalQueuesInfo extends HashMap<Integer, List<LogicalQueueRouteData>> {
}
/**
 * logical queue offset -> message queue offset mapping
 */
public class LogicalQueueRouteData implements Comparable<LogicalQueueRouteData> {
    
    private volatile int logicalQueueIndex = -1; /* -1 means not set */
    private volatile long logicalQueueDelta = -1; /* inclusive, -1 means not set, occurred in writeOnly state */

    private MessageQueue messageQueue;

    private volatile MessageQueueRouteState state = MessageQueueRouteState.Normal;

    private volatile long offsetDelta = 0; // valid when Normal/WriteOnly/ReadOnly
    private volatile long offsetMax = -1; // exclusive, valid when ReadOnly

}

话题路由信息 TopicRouteData 中增加了和逻辑队列相关的信息:

public class TopicRouteData extends RemotingSerializable {

	private LogicalQueuesInfo logicalQueuesInfo;

}

在构造器中,logicQueueIdx 封装为了一个 brokerName__logical_queue_broker__,同时 queueIdlogicQueueIdxMessageQueue:

public class SendResultForLogicalQueue extends SendResult {

	public SendResultForLogicalQueue(SendResult sendResult, int logicalQueueIdx) {
        super(sendResult.getSendStatus(), sendResult.getMsgId(), sendResult.getOffsetMsgId(),
            	new MessageQueue(sendResult.getMessageQueue().getTopic(), MixAll.LOGICAL_QUEUE_MOCK_BROKER_NAME, logicalQueueIdx),
            	sendResult.getQueueOffset());
        // ...
    }

}
public class PullResultWithLogicalQueues extends PullResultExt {
}

参考