RocketMQ 定时消息和重试消息

RocketMQ 定时消息和重试消息

讲述 RocketMQ 定时消息和重试消息

一、定时消息概述

RocketMQ 支持 Producer 端发送定时消息,即该消息被发送之后,到一段时间之后才能被 Consumer 消费者端消费。但是当前开源版本的 RocketMQ 所支持的定时时间是有限的、不同级别的精度的时间,并不是任意无限制的定时时间。因此在每条消息上设置定时时间的 API 叫做 setDelayTimeLevel,而非 setDelayTime 这样的命名:

Message msg =
    new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
msg.setDelayTimeLevel(i + 1);

默认 Broker 服务器端有 18 个定时级别:

public class MessageStoreConfig {

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
}

这 18 个定时级别在服务器端启动的时候,会被解析并放置到表 delayLevelTable 中。解析的过程就是上述字符串按照空格拆分开,然后根据时间单位的不同再进一步进行计算,得到最终的毫秒时间。级别就是根据这些毫秒时间的顺序而确定的,例如上述 1s 延迟就是级别 1, 5s 延迟就是级别 2,以此类推:

public class ScheduleMessageService extends ConfigManager {

    public boolean parseDelayLevel() {
        for (int i = 0; i < levelArray.length; i++) {
            // ...
                
            int level = i + 1;
            long delayTimeMillis = tu * num;

            // 级别:延迟时间
            this.delayLevelTable.put(level, delayTimeMillis);
        }
    }
    
}

二、定时消息预存储

客户端在为某条消息设置上定时级别的时候,实际上级别这个字段会被作为附属属性放到消息中:

public class Message implements Serializable {

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
    
}

我们先前的文章提到过,发送到 Broker 服务器的消息会被存储到 CommitLog 消息文件中。那么在此处即使是定时消息也不例外,将定时消息存储下来是为了保证消息最大程度地不丢失。然而毕竟和普通消息不同,在遇到定时消息后,CommitLog 会将这条消息的话题和队列 ID 替换成专门用于定时的话题和相应的级别对应的队列 ID。真实的话题和队列 ID 会作为属性放置到这条消息中。

public class CommitLog {

    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {

            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            // 替换 Topic 和 QueueID
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
        
    }
    
}

随后,这条消息会被存储在 CommitLog 消息文件中。而我们知道后台重放消息服务 ReputMessageService 会一直监督 CommitLog 文件是否添加了新的消息。当有了新的消息后,重放消息服务会取出消息并封装为 DispatchRequest 请求,然后将其分发给不同的三个分发服务,建立消费队列文件服务就是这其中之一。而此处当取消息封装为 DispatchRequest 的时候,当遇到定时消息时,又多做了一些额外的事情。

当遇见定时消息时,CommitLog 计算 tagsCode 标签码与普通消息不同。对于定时消息,tagsCode 值设置的是这条消息的投递时间,即建立消费队列文件的时候,文件中的 tagsCode 存储的是这条消息未来在什么时候被投递:

public class CommitLog {

    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer,
                                                     final boolean checkCRC,
                                                     final boolean readBody) {
        // Timing message processing
        {
            String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
            if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
                int delayLevel = Integer.parseInt(t);

                if (delayLevel > 0) {
                    tagsCode = this.defaultMessageStore.getScheduleMessageService()
                        .computeDeliverTimestamp(delayLevel,storeTimestamp);
                }
            }
        }
    }
    
}

如下是,发送了 10 条定时级别分别为 1-10 的消息以后,$HOME/store/consumequeue 文件下的消费队列文件的分布情况:

消费队列的文件分布

不同的定时级别对应于不同的队列 ID,定时级别减 1 得到的就是队列 ID 的值。因此级别 1-10 对应的是 0-9 的队列 ID:

public class ScheduleMessageService extends ConfigManager {

    public static int delayLevel2QueueId(final int delayLevel) {
        return delayLevel - 1;
    }
    
}

三、定时消息再存储

Broker 启动的时候,会开启一个调度消息服务,此服务会监控所有定时消息队列,每一个消息队列会创建一个专门的延时消息投递任务用以到达规定时间后投递此消息:

public class ScheduleMessageService extends ConfigManager {

    public void start() {
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    }
    
}

每个消息队里的消息投递任务,会检查自己跟踪的消息队列,并从此消息队列所对应的定时级别的偏移量中检查是否有新的定时消息到来。其中定时级别的偏移量是维护在内存中的偏移量表 offsetTable 中。每隔 10 秒钟,这个表会被持久化到磁盘上的 delayOffset.json 文件中一次:

public class ScheduleMessageService extends ConfigManager {

    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
        new ConcurrentHashMap<Integer, Long>(32);

    public void start() {
        // 每隔 10 秒钟持久化一次
        this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    ScheduleMessageService.this.persist();
                }
            },
            10000,
            this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
    
}

delayOffset.json 文件中存储的示例信息如下所示:

定时偏移量维护文件

DeliverDelayedMessageTimerTask 任务会从消费任务队列文件中取出最新的定时消息的 tagsCode ,并计算出的当前是否已经到了这条消息投递的时间。如果到了,即 countdown < 0,那么便会从 CommitLog 文件中取出消息,修正消息的话题和队列 ID 等信息,然后重新存储此条消息。如果还没有到,那么便会重新执行一个定时时间设置为 countdown 毫秒的定时任务。在完成之后,会更新当前的偏移量表,为下一次做准备:

class DeliverDelayedMessageTimerTask extends TimerTask {

    public void executeOnTimeup() {
        // ...
        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
            // 是否到时间
            long countdown = deliverTimestamp - now;

            if (countdown <= 0) {
                // 取出消息
                MessageExt msgExt =
                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                // 修正消息,设置上正确的话题和队列 ID
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                // 重新存储消息
                PutMessageResult putMessageResult =
                    ScheduleMessageService.this.defaultMessageStore
                    .putMessage(msgInner);
            } else {
                // countdown 后投递此消息
                ScheduleMessageService.this
                    .timer
                    .schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
                // 更新偏移量
            }
        } // end of for

        // 更新偏移量
    }
    
}

四、消息重试概述

消息重试分为消息发送重试消息接受重试,消息发送重试是指消息从 Producer 端发送到 Broker 服务器的失败以后的重试情况,消息接受重试是指 Consumer 在消费消息的时候出现异常或者失败的重试情况。

Producer 端通过配置如下这两个两个 API 可以分别配置在同步发送异步发送消息失败的时候的重试次数:

DefaultMQProducer producer =
    new DefaultMQProducer("please_rename_unique_group_name");
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setRetryTimesWhenSendFailed(3);

Consumer 端在消费的时候,如果接收消息的回调函数出现了以下几种情况:

  • 抛出异常
  • 返回 NULL 状态
  • 返回 RECONSUME_LATER 状态
  • 超时 15 分钟没有响应

那么 Consumer 便会将消费失败的消息重新调度直到成功消费:

consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 抛出异常
            // 返回 NULL 或者 RECONSUME_LATER 状态
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });

五、Producer 消息发送重试

发送失败的重试方式,主要表现在发送消息的时候,会最多尝试 getRetryTimesWhenSendFailed() 次发送,当成功发送以后,会直接返回发送结果给调用者。当发送失败以后,会继续进行下一次发送尝试,核心代码如下所示:

public class DefaultMQProducerImpl implements MQProducerInner {

    private SendResult sendDefaultImpl(Message msg, /** 其他参数 **/) throws MQClientException,
                                                                             RemotingException,
                                                                             MQBrokerException,
                                                                             InterruptedException {
        int timesTotal = communicationMode ==
            CommunicationMode.SYNC ?
            1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() :
            1;
        int times = 0;

        for (; times < timesTotal; times++) {
            // 尝试发送消息,发送成功 return,发送失败 continue
        }
        
    }
    
}

六、Consumer 消息接受重试

(1) 订阅重试话题

Consumer 在启动的时候,会执行一个函数 copySubscription() ,当用户注册的消息模型为集群模式的时候,会根据用户指定的创建重试组话题并放入到注册信息中:

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
        case CREATE_JUST:
            // ...
            this.copySubscription();
            // ...
        
            this.serviceState = ServiceState.RUNNING;
            break;
        }
    }

    private void copySubscription() throws MQClientException {
        switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            break;
            
        case CLUSTERING:
            // 重试话题组
            final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                                                                                retryTopic, SubscriptionData.SUB_ALL);
            this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
            break;
            
        default:
            break;
        }
    }
    
}

假设用户指定的组为 “ORDER”,那么重试话题则为 “%RETRY%ORDER”,即前面加上了 “%RETRY%” 这个字符串。

Consumer 在一开始启动的时候,就为用户自动注册了订阅组的重试话题。即用户不单单只接受这个组的话题的消息,也接受这个组的重试话题的消息。这样一来,就为下文用户如何重试接受消息奠定了基础。

消费组重试信息

(2) 失败消息发往重试话题

当 Consumer 客户端在消费消息的时候,抛出了异常、返回了非正确消费的状态等错误的时候,这个时候 ConsumeMessageConcurrentlyService 会收集所有失败的消息,然后将每一条消息封装进 CONSUMER_SEND_MSG_BACK 的请求中,并将其发送到 Broker 服务器:

public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {

    public void processConsumeResult(final ConsumeConcurrentlyStatus status,
                                     /** 其他参数 **/) {
        switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            // ...
            break;
        case CLUSTERING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // 重新将消息发往 Broker 服务器
                boolean result = this.sendMessageBack(msg, context);
            }
            // ...
            break;
        default:
            break;
        }
    }
    
}

当消费失败的消息重新发送到服务器后,Broker 会为其指定新的话题重试话题,并根据当前这条消息的已有的重试次数来选择定时级别,即将这条消息变成定时消息投放到重试话题消息队列中。可见消息消费失败后并不是立即进行新的投递,而是有一定的延迟时间的。延迟时间随着重试次数的增加而增加,也即投递的时间的间隔也越来越长:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements NettyRequestProcessor {

    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx,
                                                final RemotingCommand request)
        throws RemotingCommandException {

        // 指定为重试话题
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // 指定为延时信息,设定延时级别
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // 重试次数增加
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // 重新存储
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...
    }
    
}

当然,消息如果一直消费不成功,那也不会一直无限次的尝试重新投递的。当重试次数大于最大重试次数 (默认为 16 次) 的时候,该消息将会被送往死信话题队列,认定这条话题投递无门:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements NettyRequestProcessor {

    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx,
                                                final RemotingCommand request)
        throws RemotingCommandException {
        // 重试次数大于最大重试次数
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
            || delayLevel < 0) {
            // 死信队列话题
            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
        }
        // ...
    }
    
}

死信话题队列

上述客户端消费失败信息的流程图如下所示:

消费失败信息完整流程图

扫描下面二维码,在手机端阅读: