ShardingSphere与Seata分布式事务
ShardingSphere与Seata分布式事务
本篇文章源码基于4.0.1版本
ShardingSphere除了支持本地事务,还支持XA事务和BASE 事务
Seata流程
Base事务是最终一致性事务,Seata就是基于Base理论设计的,它的角色有事务管理器TransactionManager(TM)、资源管理器ResourceManager(RM)和事务协调者TransactionCoordinator(TC)。他们各司其职,TM负责发起事务和结束事务,TC负责维护分布式事务,RM负责运行本地事务
具体步骤:
- TM向TC申请开启全局事务,生成全局的唯一ID
- RM向TC注册分支事务,执行分支事务并提交
- TM根据TC中所有分支事务的执行情况,发起全局提交或回滚
- TC对这个全局ID对应的所有事务进行提交或回滚
在使用Seata的时候,需要在resources文件夹下增加 seata.conf 配置文件
文件内容:
client {
application.id = xpp-base
transaction.service.group = xpp-base-group
}
这个配置文件中定义的是事务管理器客户端和资源管理器客户端
分片上下文运行时实例初始化的时候会调用ShardingTransactionManager接口的init()方法,实现类SeataATShardingTransactionManager实现这个方法
初始化事务管理器和资源管理器
SeataATShardingTransactionManager的init()方法:
public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources) {
initSeataRPCClient();
for (ResourceDataSource each : resourceDataSources) {
dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource()));
}
}
- 调用自身的initSeataRPCClient()方法,初始化Seata使用的事务管理器和资源管理器客户端
- 创建数据源代理对象,放入map集合中
我们具体看一下initSeataRPCClient()方法:
private final FileConfiguration configuration = new FileConfiguration("seata.conf");
private void initSeataRPCClient() {
String applicationId = configuration.getConfig("client.application.id");
Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file");
String transactionServiceGroup = configuration.getConfig("client.transaction.service.group", "default");
TMClient.init(applicationId, transactionServiceGroup);
RMClient.init(applicationId, transactionServiceGroup);
}
从这一段代码我们就可以看出来我们为什么要定义seata.conf文件了,根据配置的应用id和事务服务组来初始化事务管理器和资源管理器
事务操作
开启事务:
SeataATShardingTransactionManager的begin()方法:
public void begin() {
SeataTransactionHolder.set(GlobalTransactionContext.getCurrentOrCreate());
SeataTransactionHolder.get().begin();
SeataTransactionBroadcaster.collectGlobalTxId();
}
- 通过GlobalTransactionContext创建GlobalTransaction,并放入SeataTransactionHolder类中,使用ThreadLocal存储
- 获取SeataTransactionHolder中的GlobalTransaction对象,调用它的begin()方法开启事务
- 调用Seata事务广播器SeataTransactionBroadcaster的collectGlobalTxId()方法,判断是否在全局事务中,如果在的话把全局事务ID放入缓存中。
关闭事务,提交事务,回滚事务的逻辑也是基于GlobalTransaction,很好理解。
总结
这篇文章我们讲了ShardingSphere与Seata的集成,在初始化分片上下文运行时实例化的时候会根据配置文件seata.conf初始化Seata中的事务管理器和资源管理器,SeataATShardingTransactionManager在进行事务处理的主要依赖全局事务类GlobalTransaction,全局Id的生成保存在RootContext中进行传播
- 点赞
- 收藏
- 关注作者
评论(0)