RocketMQ 事务消息
基于 RocketMQ 4.8.0 版本进行的源码分析。
讲述 RocketMQ 是如何支持事务消息的。
源码调试过程
mkdir .rocketmq/conf
cp distribution/conf/logback_namesrv.xml .rocketmq/conf
cp distribution/conf/logback_broker.xml .rocketmq/conf
NamesrvStartup.java
加上如下代码:
static {
System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, ".rocketmq");
}
BrokerStartup.java
加上如下代码:
static {
System.setProperty(MixAll.ROCKETMQ_HOME_PROPERTY, ".rocketmq");
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "localhost:9876");
}
TransactionProducer.java
加上如下代码:
static {
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "localhost:9876");
}
运行顺序:
NamesrvStartup
BrokerStartup
TransactionProducer
发送事务消息
我们先来看客户端调用 Producer 发送事务消息的过程:
初始化事务环境
初始化事务环境是为了构建 checkExecutor 线程池:
public class TransactionMQProducer extends DefaultMQProducer {
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
}
}
发送消息并执行本地事务
public class TransactionMQProducer extends DefaultMQProducer {
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
}
下面来看 DefaultMQProducerImpl
类的发送事务消息的具体实现:
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
// 标识这条消息的属性:TRANSACTION_PREPARED
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 发送消息
sendResult = this.send(msg);
// 根据消息的发送状态决定是否执行本地事务
switch (sendResult.getSendStatus()) {
// 发送消息成功
case SEND_OK: {
// 获取事务 ID
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 执行本地事务
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
}
break;
// 发送消息失败
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
}
// 事务结束
this.endTransaction(sendResult, localTransactionState, localException);
}
从上述代码可以看出,RocketMQ 是先发送消息,然后再执行本地事务。
结束事务
在 endTransaction
内部,需要根据本地事务执行的状态,来决定是 COMMIT
还是 ROLLBACK
已经发送到服务器的消息:
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
// 调用 API: 只调用一次,不重试
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
其中 endTransactionOneway()
发送的是到 Server 的 END_TRANSACTION
命令:
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
接受事务消息
上面讲的是 Client 端发送事务消息的流程,接下来看 Server 端是如何接受事务消息的。
接受事务准备消息
Broker 检查收到的消息是否是 PROPERTY_TRANSACTION_PREPARED
消息:
// SendMessageProcessor
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
// 没有发送事务的权限
response.setCode(ResponseCode.NO_PERMISSION);
return CompletableFuture.completedFuture(response);
}
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
}
异步准备消息内部,其实就是存储 half
消息的过程:
// org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl
@Override
public CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner) {
return transactionalMessageBridge.asyncPutHalfMessage(messageInner);
}
将消息的话题替换为了 RMQ_SYS_TRANS_HALF_TOPIC
:
// 备份原来真正的 Topic
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
public static String buildHalfTopic() {
return TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
}
下图是,刚发送完 PREPARED
消息后,consumequeue
文件夹中存放的文件:
PREPARED 消息不会被消费吗 ?
PREPARED
消息在存储到磁盘之前,会将话题改为 RMQ_SYS_TRANS_HALF_TOPIC
,因此通过订阅该消息关联的原来的话题,是消费不到该消息的。
另外就是在 Broker 分发消息的时候,正常情况下,当收到了一条消息,后台会根据消息,构建 consume
文件 (下面代码中的 putMessagePositionInfo()
方法就是为了构建消费文件),以供消费者消费。但是在遇到 PREPARED
消息的时候,就不再构建 consome
文件了,即消费者根本看不到这条消息。
// DefaultMessageStore.java
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
接收事务结束的消息
对于 END_TRANSACTION
的请求,BrokerController
注册了单独的处理器来处理事务结束的命令:
// BrokerController
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
EndTransactionProcessor
内部,处理命令的逻辑如下:
// SLAVE 不支持接受 END_TRANSACTION 命令
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
return response;
}
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// COMMIT 消息
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// ROLLBACK 消息
}
(1) COMMIT 提交消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
上述代码第一行 commitMessage
内部是根据 commitLogOffset
这个偏移量,从 commitLog
的 MappedFile
文件中查找消息的过程。
第二行的 endMessageTransaction
内部根据 preparedMessage
构建了新的 Message,恢复了话题、拷贝了这个消息上其它的属性等信息,最重要的是清除了 MessageConst.PROPERTY_TRANSACTION_PREPARED
属性,以便这个消息可以构建消费队列文件,从而让消费者能够消费。
最后 sendMessage
内部就是将构建好的新的消息体,重新调用 Broker
端的发送消息的流程,来发送消息。所以消费端消费的已经不是之前发送的 PREPARED 消息,而是根据 PREPARED 消息重新克隆出来的新的消息体,当然内容、属性等肯定给都是一样的。
最后 deletePrepareMessage
的过程,内部其实只是给这条消息打上了一个 TransactionalMessageUtil.REMOVETAG
标签,然后重新 putMessage()
。
下图展示的是,执行 COMMIT
之后的,consumequeue
存放文件的情况:
(2) ROLLBACK 回滚消息
回滚消息第一步也是根据 commitOffset
查找消息,然后再给这条消息打上 TransactionalMessageUtil.REMOVETAG
的过程。
下图是消息执行 ROLLBACK
之后的 consumequeue
所存储的文件的状态:
扫描事务状态
假如 Client 执行本地事务,运行时间过长,或者发送了 COMMIT
消息或者 ROLLBACK
消息,但是这条消息由于网络原因等没有到达 Server 端,那么可能会导致出于 PREPARED
的消息越来越多。因此 Broker 会在后台定期给 Client 发送检查事务状态的消息。
Server 定时扫描
public class TransactionalMessageCheckService extends ServiceThread {
@Override
public void run() {
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
}
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
// 检查事务状态
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
}
}
transactionCheckInterval = 60 * 1000
:每隔 60s 执行一次check
方法。transactionTimeOut = 6 * 1000
:第一次检查事务消息的时间,一条消息只有大于这个时间还没有收到COMMIT
或者ROLLBACK
,那么就执行检查。transactionCheckMax = 15
:最多执行多少次检查后,如果依然还没有收到这条消息是提交还是回滚,那么这条消息将被丢弃。
Server 检查事务状态
在 check
方法内部,Server 端需要扫描是否有消息需要去检查事务的状态,如果需要,则会给 Client 发送 CHECK_TRANSACTION_STATE
命令。
首先,Broker 将自己作为一个客户端来去订阅消费 RMQ_SYS_TRANS_OP_HALF_TOPIC
话题中的消息。
// TransactionalMessageBridge.java
public PullResult getOpMessage(int queueId, long offset, int nums) {
String group = TransactionalMessageUtil.buildConsumerGroup();
String topic = TransactionalMessageUtil.buildOpTopic();
SubscriptionData sub = new SubscriptionData(topic, "*");
return getMessage(group, topic, queueId, offset, nums, sub);
}
那么每一次消费,我怎么知道上一次消费到哪里了呢?实际上,最新的消息偏移量存储在了 offsetTable
中:
// ConsumerOffsetManager.java
public long queryOffset(final String group, final String topic, final int queueId) {
// [email protected]
String key = topic + TOPIC_GROUP_SEPARATOR + group;
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
Long offset = map.get(queueId);
if (offset != null)
return offset;
}
return -1;
}
offsetTable
在后台也会定时地将里面的信息保存到磁盘上的 config/consumerOffset.json
文件中 (如下图所示)。0:9
的 0
代表 queueId
,9
代表最新的 offset
。
在获取到上一轮 offset
到最新的 offset
之间的消息列表后,那么就需要逐一检查这些消息的事务状态了。
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
long i = halfOffset;
while (true) {
// ...
GetResult getResult = getHalfMsg(messageQueue, i);
// ...
if (isNeedCheck) {
// ...
listener.resolveHalfMsg(msgExt);
}
}
msgExt
的内部状态:
那么在每一轮循环中,即每一条消息内部,逻辑又是怎样执行的呢?
// TransactionalMessageServiceImpl
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
break;
}
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
continue;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime);
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
}
我们看到:
- 首先对于
while (true)
的时间设定了限制,不能超过MAX_PROCESS_TIME_LIMIT
这个值。 - 其次,
needDiscard()
这个方法检查的就是从消息的MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES
属性中,获取到这个消息已经检查了多少次,如果超过transactionCheckMax
,那么就需要丢弃。 needSkip()
函数判断的是这条消息自诞生以来,在 Broker 端放置的时间是否超过了 3 天,如果超过 3 天,这条消息也没有必要检查了,因为 RocketMQ 默认存储消息的最长时间就是 3 天。isNeedCheck
看的主要就是消息诞生的时间是否超过了transactionTimeout
。putBackHalfMsgQueue
主要就是将当前的消息,最新修改的属性等,重新拷贝一份,然后将新的消息追加到MappedFile
的末尾。resolveHalfMsg
就是在线程池中执行发送检查事务状态的任务:
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
sendCheckMessage
的内部实现:
// AbstractTransactionalMessageCheckListener.java
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
}
checkProducerTransactionState
的内部实现,就是发送了 CHECK_TRANSACTION_STATE
报文给 Client:
// Broker2Client.java
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
request.setBody(MessageDecoder.encode(messageExt, false));
try {
this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
} catch (Exception e) {
log.error("Check transaction failed because invoke producer exception. group={}, msgId={}, error={}",
group, messageExt.getMsgId(), e.toString());
}
客户端检查事务状态
Producer
也通过 Netty
监听在了一个端口上,这样也能接受来自外接的命令了:
public class ClientRemotingProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:
return this.checkTransactionState(ctx, request);
}
}
}
当收到 CHECK_TRANSACTION_STATE
命令后,Client 会解析出消息的事务 ID、存放消息的 Broker 地址等:
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
// Broker 地址
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
producer.checkTransactionState(addr, messageExt, requestHeader);
}
}
然后 checkTransactionState
内部则通过线程池提交了一个新的任务,检查事务的状态,并反馈给 Broker。
localTransactionState = transactionListener.checkLocalTransaction(message);
// COMMIT_TYPE 或者 ROLLBACK_TYPE 等
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000);