理解事务消息

举报
jiangyu666 发表于 2019/12/30 18:01:28 2019/12/30
【摘要】 事务消息是消息中间件的一个高级特性。Kafka和RocketMQ作为广泛使用的高性能消息中间件,都支持事务消息,你知道其中的差异吗? Kafka与RocketMQ的事务消息,其实是两码事,解决的问题不同。 Kafka事务消息主要解决的是消息之间的一致性问题,RocketMQ的事务消息主要解决的是消息发送与数据库本地事务之间的一致性问题。

事务消息是消息中间件的一个高级特性。Kafka和RocketMQ作为广泛使用的高性能消息中间件,都支持事务消息,你知道其中的差异吗?

Kafka与RocketMQ的事务消息,其实是两码事,解决的问题不同。

Kafka事务消息主要解决的是消息之间的一致性问题,RocketMQ的事务消息主要解决的是消息发送与数据库本地事务之间的一致性问题。

Kafka事务消息

Kafka事务消息指一系列的生产、消费操作可以要么都完成,要么都失败,类似数据库的事务。这个特性从0.11版本开始才支持。

事务场景

  1. 最简单的需求是producer发的多条消息组成一个事务,这些消息需要对consumer同时可见或者同时不可见 。

  2. Producer可能会给多个topic/partition发消息,这些消息也需要能放在一个事务里面。

  3. 先消费一个topic,然后做处理,再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。

  4. Producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。

  5. 流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别。

应用示例

  1. public class KafkaTransactionsExample {
      
      public static void main(String args[]) {
        KafkaConsumer consumer = new KafkaConsumer<>(consumerConfig);
        KafkaProducer producer = new KafkaProducer<>(producerConfig);
    
        producer.initTransactions();
         
        while(true) {
          ConsumerRecords records = consumer.poll(CONSUMER_POLL_TIMEOUT);
          if (!records.isEmpty()) {
            producer.beginTransaction();
            List> outputRecords = processRecords(records);
            for (ProducerRecord outputRecord : outputRecords) {
              producer.send(outputRecord);
            }
            sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
            producer.endTransaction();
          }
        }
      }
    }

RocketMQ事务消息

说到分布式事务,就会谈到经典的转账问题:A和B两个账户,数据存储在两个不同数据库里,A要扣钱,B要加钱,如何保证原子性?

通过消息中间件来实现最终一致性的思路是:A系统扣钱,然后发条消息给消息中间件,B系统接收此消息,进行加钱操作。

这里面有个问题:A是先update DB,后发送消息呢,还是先发送消息,后update DB?

假设先update DB成功,发送消息失败,重试还是失败,怎么办? 
假设先发送消息成功,update DB失败,重试还是失败,怎么办?

因为数据库操作和消息发送不在一个本地事务,无法保证同时成功或同时失败,一个常用办法是在数据库中加一张本地消息表,实现最终一致性。该方法在实际业务中广泛使用,网上有很多介绍(比如,https://www.ucloud.cn/yun/47411.html),在此不做赘述。

这种方法的缺点是对业务侵入性太强,为此RocketMQ提出了“事务消息”的概念。

具体来说,就是把消息的发送分成了Prepare阶段和确认阶段,两个阶段之间更新数据库: 

1)发送Prepared消息 

2)Broker保存消息

3)返回发送结果

4)执行本地操作,update DB 

5)根据update DB结果成功或失败,Confirm或者取消Prepared消息。

如下图所示:

1577700337994160.jpg

前面成功了,Confirm/Prepared消息失败了怎么办?RocketMQ会定期扫描所有的Prepared消息,调用业务实现的回查接口,确定这条消息发出去还是取消。

回查接口要告诉消息中间件发送还是取消这条消息,实际实现中,通常还是需要借助一张本地消息表,查询该消息ID所对应的“Update DB”是否已经提交了。

这种方法其实还是要侵入业务,并没有真正把业务与事务分离开。

有没有更好的办法?

有的。需要借助于分布式事务中间件,把业务与事务分开,优雅地解决问题。

事务消息+分布式事务中间件

典型处理步骤如下:

1) 开启分布式事务

2)Update DB,失败则回滚分布式事务

3)发送消息(分布式事务中间件检查到处于事务上下文则发送Prepared消息,不在事务上下文则发送普通消息),失败则回滚分布式事务

4)提交分布式事务

5)分布式事务中间件的事务协调器推进二阶段操作,自动调用消息的Confirm方法

这种方式非侵入性地解决了数据库本地事务与消息发送的一致性问题,是作者看好的未来方向。


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。