ShardingSphere与Seata分布式事务

举报
周杰伦本人 发表于 2022/11/29 17:38:31 2022/11/29
【摘要】 ShardingSphere与Seata分布式事务本篇文章源码基于4.0.1版本ShardingSphere除了支持本地事务,还支持XA事务和BASE 事务 Seata流程Base事务是最终一致性事务,Seata就是基于Base理论设计的,它的角色有事务管理器TransactionManager(TM)、资源管理器ResourceManager(RM)和事务协调者TransactionCo...

ShardingSphere与Seata分布式事务

本篇文章源码基于4.0.1版本

ShardingSphere除了支持本地事务,还支持XA事务和BASE 事务

Seata流程

Base事务是最终一致性事务,Seata就是基于Base理论设计的,它的角色有事务管理器TransactionManager(TM)、资源管理器ResourceManager(RM)和事务协调者TransactionCoordinator(TC)。他们各司其职,TM负责发起事务和结束事务,TC负责维护分布式事务,RM负责运行本地事务

具体步骤:

  1. TM向TC申请开启全局事务,生成全局的唯一ID
  2. RM向TC注册分支事务,执行分支事务并提交
  3. TM根据TC中所有分支事务的执行情况,发起全局提交或回滚
  4. TC对这个全局ID对应的所有事务进行提交或回滚

在使用Seata的时候,需要在resources文件夹下增加 seata.conf 配置文件

文件内容:

client {
    application.id = xpp-base
    transaction.service.group = xpp-base-group
}

这个配置文件中定义的是事务管理器客户端和资源管理器客户端

分片上下文运行时实例初始化的时候会调用ShardingTransactionManager接口的init()方法,实现类SeataATShardingTransactionManager实现这个方法

初始化事务管理器和资源管理器

SeataATShardingTransactionManager的init()方法:

    public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
        initSeataRPCClient();
        for (ResourceDataSource each : resourceDataSources) {
            dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()));
        }
    }
  1. 调用自身的initSeataRPCClient()方法,初始化Seata使用的事务管理器和资源管理器客户端
  2. 创建数据源代理对象,放入map集合中

我们具体看一下initSeataRPCClient()方法:

    private final FileConfiguration configuration = new FileConfiguration("seata.conf");

    private void initSeataRPCClient() {
        String applicationId = configuration.getConfig("client.application.id");
        Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file");
        String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default");
        TMClient.init(applicationId, transactionServiceGroup);
        RMClient.init(applicationId, transactionServiceGroup);
    }

从这一段代码我们就可以看出来我们为什么要定义seata.conf文件了,根据配置的应用id和事务服务组来初始化事务管理器和资源管理器

事务操作

开启事务:

SeataATShardingTransactionManager的begin()方法:

    public void begin() {
        SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());
        SeataTransactionHolder.get().begin();
        SeataTransactionBroadcaster.collectGlobalTxId();
    }
  1. 通过GlobalTransactionContext创建GlobalTransaction,并放入SeataTransactionHolder类中,使用ThreadLocal存储
  2. 获取SeataTransactionHolder中的GlobalTransaction对象,调用它的begin()方法开启事务
  3. 调用Seata事务广播器SeataTransactionBroadcaster的collectGlobalTxId()方法,判断是否在全局事务中,如果在的话把全局事务ID放入缓存中。

关闭事务,提交事务,回滚事务的逻辑也是基于GlobalTransaction,很好理解。

总结

这篇文章我们讲了ShardingSphere与Seata的集成,在初始化分片上下文运行时实例化的时候会根据配置文件seata.conf初始化Seata中的事务管理器和资源管理器,SeataATShardingTransactionManager在进行事务处理的主要依赖全局事务类GlobalTransaction,全局Id的生成保存在RootContext中进行传播

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。