RocketMQ(十一):事务消息如何满足分布式一致性?

举报
菜菜的后端私房菜 发表于 2024/12/17 14:28:02 2024/12/17
573 0 0
【摘要】 RocketMQ(十一):事务消息如何满足分布式一致性? 前言在分布式系统中由于相关联的多个服务所在的数据库互相隔离,数据库无法使用本地事务来保证数据的一致性,因此需要使用分布式事务来保证数据的一致性比如用户支付订单后,需要更改订单状态,还需要涉及其他服务的其他操作如:物流出货、积分变更、清空购物车等由于它们数据所存储的数据库会互相隔离,当订单状态修改成功/失败时,其他服务对应的数据也需要...

RocketMQ(十一):事务消息如何满足分布式一致性?

前言

在分布式系统中由于相关联的多个服务所在的数据库互相隔离,数据库无法使用本地事务来保证数据的一致性,因此需要使用分布式事务来保证数据的一致性

比如用户支付订单后,需要更改订单状态,还需要涉及其他服务的其他操作如:物流出货、积分变更、清空购物车等

由于它们数据所存储的数据库会互相隔离,当订单状态修改成功/失败时,其他服务对应的数据也需要修改成功/失败,否则就会出现数据不一致的情况

解决分布式事务常用的一种方案是使用MQ做补偿以此来达到数据的最终一致性,而RocketMQ提供的事务消息能够简单、有效的解决分布式事务满足数据最终一致性

在上面支付订单的案例中,主分支只需要修改订单状态,其他分支(出货、积分变更、清空购物车)都可以发送事务消息来达到数据最终一致性

image.png

本篇文章通过分析源码来描述事务消息的原理以及使用方法,并总结使用时需要注意的地方,思维导图如下:

本文导图

往期回顾:

RocketMQ(十):如何保证消息严格有序?

RocketMQ(九):延迟消息是如何实现的?

RocketMQ(八):轻量级拉取消费原理

RocketMQ(七):消费者如何保证顺序消费?

RocketMQ(六):Consumer Rebalanc原理(运行流程、触发时机、导致的问题)

RocketMQ(五):揭秘高吞吐量并发消费原理

RocketMQ(四):消费前如何拉取消息?(长轮询机制)

RocketMQ(三):面对高并发请求,如何高效持久化消息?(核心存储文件、持久化核心原理、源码解析)

RocketMQ(二):揭秘发送消息核心原理(源码与设计思想解析)

RocketMQ(一):消息中间件缘起,一览整体架构及核心组件

使用事务消息

事务消息拥有“半事务”的状态,在这种状态下即时消息到达broker也不能进行消费,直到主分支本地事务提交,事务消息才能被下游服务进行消费

使用事务消息的流程如下:

  1. 生产者发送半事务消息(消息到达broker后处于半事务状态,下游服务暂时无法消费)
  2. 生产者执行本地事务,无论本地事务成功(commit)还是失败(rollback)都要通知broker,如果成功则事务消息允许被消费,如果失败则丢弃事务消息
  3. 在步骤2中,由于网络等缘故broker可能未接收到本地事务执行的结果,当broker等待一定时间未收到状态时会自动回查状态

image.png

发送事务消息的生产者为TransactionMQProducer,TransactionMQProducer的使用与默认类似,只不过需要设置事务监听器TransactionListener

事务监听器接口需要实现executeLocalTransaction用于执行本地事务和checkLocalTransaction用于broker回查本地事务状态

public interface TransactionListener {
    //执行本地事务
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    //回查事务状态
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

它们的结果LocalTransactionState有三个状态:COMMIT_MESSAGE 成功、ROLLBACK_MESSAGE 失败、UNKNOW 未知

当为未知状态时,后续还会触发回查,直到超过次数或者返回成功/失败

调用 sendMessageInTransaction 发送事务消息,其中参数arg用于扩展,执行本地事务时会携带使用

public TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg)

根据我们的情况写出TransactionListener的模拟代码

public class OrderPayTransactionListener implements TransactionListener {
    //执行本地事务 其中参数arg传递的为订单ID
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object orderId) {
        try {
            //修改订单状态为已支付
            if (updatePayStatus((Long) orderId)) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception e) {
            //log
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
    //回查状态
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Long orderId = Long.valueOf(msg.getBuyerId());
        //查询订单状态是否为已支付
        try {
            if (isPayed(orderId)) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        } catch (Exception e) {
            //log
            return LocalTransactionState.UNKNOW;
        }

        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
}

执行本地事务时如果成功修改订单状态就返回commit,回查状态时判断订单状态是否为已支付

事务消息原理

发送事务消息

前文分析过通用的发送消息流程,而 sendMessageInTransaction 发送消息调用通用的发送消息流程外,还会在期间多做一些处理:

  1. 准备(检查事务监听器、消息、清理延迟级别、标记事务消息为半事务状态、存储数据)
  2. **通用同步发送消息流程 sendDefaultImpl **(校验参数、获取路由信息、选择队列、封装消息、netty rpc调用,期间检查超时、超时情况)
  3. **获取发送消息结果,如果成功使用事务监听器执行本地事务 executeLocalTransaction **
  4. **根据本地事务状态单向通知broker endTransactionOneway **(有回查机制无需考虑失败)
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
    //检查事务监听器
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }
    //清除延迟等级 使用事务消息就不能使用延迟消息
    // ignore DelayTimeLevel parameter
    if (msg.getDelayTimeLevel() != 0) {
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
    }
    //检查消息
    Validators.checkMessage(msg, this.defaultMQProducer);
    SendResult sendResult = null;
    //标记事务消息为半事务状态
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
    //存储生产者组
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
        //通用的发送消息流程
        sendResult = this.send(msg);
    } catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
    
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
                    log.debug("Used new transaction API");
                    //成功执行本地事务
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            //刷盘超时 或 从节点不可用 相当于失败
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }
    
    try {
        //通知broker本地事务状态
        this.endTransaction(msg, sendResult, localTransactionState, localException);
    } catch (Exception e) {
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }
    
    //返回
    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}

在发送的流程中主要会在发送前做一些准备如标记半事务状态,然后进行同步发送,如果发送成功则会执行本地事务,最后单向通知broker本地事务的状态

image.png

broker存储事务消息

之前的文章也说过消息到达后,broker存储消息的原理(先写CommitLog、再写其他文件)

事务消息在消息进行存储前,会使用桥接器TransactionalMessageBridge调用 parseHalfMessageInner ,将消息topic改为半事务topic并存储原始topic、队列ID(方便后续重新投入真正的topic)

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //存储真正的topic和队列ID
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //设置本次要投入的topic为半事务Topic RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

这样半事务状态的事务消息就会被投入半事务topic的队列中,这样就能达到消费者无法消费半事务消息(因为它们没被投入真实的队列中)

image.png

broker接收本地事务状态通知

生产者发送完消息,无论成功还是失败都会通知broker本地事务状态

broker使用EndTransactionProcessor处理END_TRANSACTION的请求,其核心逻辑就是根据本地事务状态进行处理:

  1. 如果成功根据CommitLog偏移量找到半事务消息,将其重投到真实的topic、队列中,最后再删除
  2. 如果失败根据CommitLog偏移量找到半事务消息进行删除
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
    //构建通用响应
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    //解析
    final EndTransactionRequestHeader requestHeader =
        (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);

    //从节点直接响应失败
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        return response;
    }
    //...
    OperationResult result = new OperationResult();
    //成功的情况
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
        //调用 getHalfMessageByOffset 根据commitLog偏移量获取半事务消息
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        //找到半事务消息
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            //检查数据
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                //检查成功 
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                //清理半事务标识
                MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                //重新将消息投入真实topic、队列中
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
                    //重投成功 删除事务消息
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
        //失败情况 也是调用 getHalfMessageByOffset 根据commitLog偏移量获取半事务消息
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
                //找到消息检查完就删除事务消息
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

成功或失败(commit/rollback)的情况都会删除半消息,成功的情况会将消息投入原始队列中,后续进行消费

image.png

而还要一种无法确定是成功还是失败的情况,需要broker进行回查

broker回查机制

负责回查的组件是TransactionalMessageCheckService:定期对半事务消息进行检查是否需要回查(在broker启动初始化时进行初始化)

其检查回查会调用this.brokerController.getTransactionalMessageService().check

它会遍历事务topic RMQ_SYS_TRANS_HALF_TOPIC 下的所有队列,循环取出半事务消息进行判断是否需要进行回查

由于代码较多,这里总结性贴出关键代码:

  1. 根据队列、偏移量取出半事务消息 getHalfMsg
  2. 超过检查次数(15)或最大存储时间(72h)就丢弃半事务消息 resolveDiscardMsg
  3. 将消息重投入半消息topic(避免消息丢失)putBackHalfMsgQueue
  4. 向生产者发送回查请求(请求码为CHECK_TRANSACTION_STATE)resolveHalfMsg
public void check(long transactionTimeout, int transactionCheckMax,AbstractTransactionalMessageCheckListener listener) {
    //遍历事务topic下的所有队列,循环取出半事务消息进行判断是否需要进行回查
    String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
	Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
    for (MessageQueue messageQueue : msgQueues) {
        while (true) {
            //超出边界会退出 代码略
            
            //获取半事务消息 这里的参数i是半事务消息偏移量
            GetResult getResult = getHalfMsg(messageQueue, i);
           	MessageExt msgExt = getResult.getMsg();
            
            //needDiscard 超过最大检查次数 15次
            //needSkip  超过最大存储时间 72h
            if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
                //丢弃半事务消息
                listener.resolveDiscardMsg(msgExt);
                //..
                continue;
            }
            
            //...
            
            //超过6s
            if (isNeedCheck) {
                //将消息重投入半消息队列
            	if (!putBackHalfMsgQueue(msgExt, i)) {
                	continue;
            	}
            	//向生产者发送回查的请求 CHECK_TRANSACTION_STATE
            	listener.resolveHalfMsg(msgExt);
            }
            
        }
    }
}

请求回查并不会返回结果,生产者处理查询到事务状态后,再向broker发送单向的本地事务状态通知请求(endTransactionOneway)

image.png

生产者处理回查请求

ClientRemotingProcessor 处理broker发送的回查请求CHECK_TRANSACTION_STATE

ClientRemotingProcessor 调用 checkTransactionState 进行处理:

  1. 调用事务监听器回查本地事务的方法 transactionListener.checkLocalTransaction
  2. 调用endTransactionOneway 对broker进行通知本地事务状态结果

总结

涉及多服务的分布式事务,不追求强一致性的情况下,可考虑使用事务消息+重试的方式尽力达到最终一致性

使用时需要定义事务监听器执行本地事务和回查本地事务状态的方法,注意可能消费失败,重试多次后需要记录并特殊处理避免最终数据不一致

使用事务消息时无法设置延迟级别,发送前会将延迟级别清除

发送事务消息采用同步发送,在发送前会标记为半(事务)消息状态,在发送成功后会调用事务监听器执行本地事务,最后单向通知broker本地事务的状态

broker存储半(事务)消息前会更改它的topic、queueId,将其持久化到事务(半消息)topic中,以此来达到暂时不可以被消费的目的

broker接收本地事务状态通知时,如果是commit状态则将半(事务)消息重投入原始topic、队列中,以此来达到可以进行消费的目的,并且删除半(事务)消息,rollback状态也会删除半(事务)消息,只有未知状态的情况下不删除,等待后续触发回查机制

broker使用组件定期遍历事务(半消息)topic下的所有队列检查是否需要进行回查,遍历队列时循环取出半(事务)消息,如果超过检查最大次数(15)或超时(72h),则会丢弃消息;否则会将半(事务)消息放回队列,当事务消息超过6s时会触发回查机制,向produce发送检查事务状态的请求

produce收到回查请求后,调用事务监听器的检查事务状态方法,并又调用单向通知broker本地事务状态

最后(点赞、收藏、关注求求啦~)

我是菜菜,热爱技术交流、分享与写作,喜欢图文并茂、通俗易懂的输出知识

本篇文章被收入专栏 消息中间件,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以star持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。