SpringCloud微服务实战——搭建企业级开发框架(二十七):集成多数据源+Seata分布式事务+读写分离+分库分表【二】
【摘要】 接上文:SpringCloud微服务实战——搭建企业级开发框架(二十七):集成多数据源+Seata分布式事务+读写分离+分库分表【一】 3. 支付数据库表设计DROP TABLE IF EXISTS `t_mall_pay_record`;CREATE TABLE `t_mall_pay_record` ( `id` bigint(20) NOT NULL AUTO_INCREMENT...
接上文:SpringCloud微服务实战——搭建企业级开发框架(二十七):集成多数据源+Seata分布式事务+读写分离+分库分表【一】
3. 支付数据库表设计
DROP TABLE IF EXISTS `t_mall_pay_record`;
CREATE TABLE `t_mall_pay_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`tenant_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '租户id',
`user_id` bigint(20) NOT NULL COMMENT '用户id',
`order_no` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '0',
`amount` decimal(9, 2) NOT NULL,
`pay_status` tinyint(2) NOT NULL DEFAULT 0 COMMENT '支付状态:0=未支付,1=已支付, 2=已退款',
`pay_type` tinyint(2) NOT NULL DEFAULT 3 COMMENT '支付方式:1=微信支付,2=货到付款,3=余额支付,4=支付宝支付, 5=银行卡支付',
`title` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT '',
`refund` decimal(9, 2) NOT NULL DEFAULT 0.00 COMMENT '已退款金额',
`comments` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '备注',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`creator` bigint(20) NULL DEFAULT NULL COMMENT '创建者',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
`operator` bigint(20) NULL DEFAULT NULL COMMENT '更新者',
`del_flag` tinyint(2) NULL DEFAULT 0 COMMENT '1:删除 0:不删除',
PRIMARY KEY (`id`) USING BTREE,
INDEX `INDEX_TENANT_ID`(`tenant_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
4. 账户数据库表设计
-- ----------------------------
-- Table structure for t_mall_user_account
-- ----------------------------
DROP TABLE IF EXISTS `t_mall_user_account`;
CREATE TABLE `t_mall_user_account` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`tenant_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '租户id',
`user_id` bigint(20) NOT NULL COMMENT '用户id',
`integral` bigint(20) NOT NULL DEFAULT 0 COMMENT '积分',
`balance` decimal(10, 2) NOT NULL DEFAULT 0.00 COMMENT '余额',
`account_status` tinyint(2) NULL DEFAULT 1 COMMENT '账户状态 \'0\'禁用,\'1\' 启用',
`comments` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '备注',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`creator` bigint(20) NULL DEFAULT NULL COMMENT '创建者',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
`operator` bigint(20) NULL DEFAULT NULL COMMENT '更新者',
`del_flag` tinyint(2) NULL DEFAULT 0 COMMENT '1:删除 0:不删除',
PRIMARY KEY (`id`) USING BTREE,
INDEX `INDEX_TENANT_ID`(`tenant_id`) USING BTREE,
INDEX `INDEX_USER_ID`(`user_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '用户账户表' ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Table structure for t_mall_user_balance_record
-- ----------------------------
DROP TABLE IF EXISTS `t_mall_user_balance_record`;
CREATE TABLE `t_mall_user_balance_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`tenant_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '租户id',
`user_id` bigint(20) NOT NULL COMMENT '用户id',
`type` tinyint(2) NOT NULL COMMENT '类型:1=收入,2=支出',
`amount` decimal(10, 2) NOT NULL COMMENT '变动金额',
`comments` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT '备注',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`creator` bigint(20) NULL DEFAULT NULL COMMENT '创建者',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
`operator` bigint(20) NULL DEFAULT NULL COMMENT '更新者',
`del_flag` tinyint(2) NULL DEFAULT 0 COMMENT '1:删除 0:不删除',
PRIMARY KEY (`id`) USING BTREE,
INDEX `INDEX_TENANT_ID`(`tenant_id`) USING BTREE,
INDEX `INDEX_USER_ID`(`user_id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 17 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = DYNAMIC;
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
5. 上面的脚本中,每个数据都需要刷入了Seata分布式事务回滚需要的表脚本,在下载Seata包的seata-1.4.1\seata-1.4.1\script\client\at\db路径下
-- ----------------------------
-- Table structure for undo_log
-- ----------------------------
DROP TABLE IF EXISTS `undo_log`;
CREATE TABLE `undo_log` (
`branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id',
`xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
`context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` longblob NOT NULL COMMENT 'rollback info',
`log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` datetime(6) NOT NULL COMMENT 'create datetime',
`log_modified` datetime(6) NOT NULL COMMENT 'modify datetime',
UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Dynamic;
SET FOREIGN_KEY_CHECKS = 1;
三、测试代码
在GitEgg-Cloud工程下,新建gitegg-mall和gitegg-mall-client子工程,client子工程用于fegin调用
- 订单服务
@DS("mall_order")//每一层都需要使用多数据源注解切换所选择的数据库
@Transactional(propagation = Propagation.REQUIRES_NEW)
@GlobalTransactional //重点 第一个开启事务的需要添加seata全局事务注解
@Override
public void order(List<OrderSkuDTO> orderSkuList, Long userId) {
//获取商品的详细信息
Result<Object> goodsSkuResult = mallGoodsFeign.queryByIds(orderSkuList.stream()
.map(OrderSkuDTO::getGoodsSkuId)
.collect(Collectors.toList()));
List<Object> resultSkuList = (List<Object>) goodsSkuResult.getData();
List<GoodsSkuDTO> goodsSkuList = new ArrayList<>();
if(CollectionUtils.isEmpty(resultSkuList) || resultSkuList.size() != orderSkuList.size()) {
throw new BusinessException("商品不存在");
}
else {
resultSkuList.stream().forEach(goodsSku -> {
GoodsSkuDTO goodsSkuDTO = BeanUtil.fillBeanWithMap((Map<?, ?>) goodsSku, new GoodsSkuDTO(), false);
goodsSkuList.add(goodsSkuDTO);
});
}
//扣商品库存
List<ReduceStockDTO> reduceStockDtoList = orderSkuList.stream()
.map(t -> new ReduceStockDTO(t.getGoodsSkuId(),t.getGoodsSkuNumber()))
.collect(Collectors.toList());
mallGoodsFeign.reduceStock(reduceStockDtoList);
// //支付
BigDecimal totalMoney = new BigDecimal(0.0d);
for(OrderSkuDTO orderSkuDTO: orderSkuList) {
for(GoodsSkuDTO goodsSkuDTO: goodsSkuList) {
if(orderSkuDTO.getGoodsSkuId().equals(goodsSkuDTO.getId())) {
BigDecimal skuNumber = new BigDecimal(orderSkuDTO.getGoodsSkuNumber());
totalMoney = totalMoney.add(goodsSkuDTO.getPrice().multiply(skuNumber));
break;
}
}
}
mallPayFeign.pay(userId, totalMoney);
//主订单表插入数据
Order order = new Order();
order.setTotalPrice(totalMoney);
order.setTotalPayPrice(totalMoney);
order.setExpressOriginalPrice(totalMoney);
order.setStatus(1);
order.setUserId(userId);
this.save(order);
//子订单表插入数据
ArrayList<OrderSku> orderSkus = new ArrayList<>();
orderSkuList.forEach(payOrderReq -> {
OrderSku orderSku = new OrderSku();
orderSku.setOrderId(order.getId());
orderSku.setGoodsSkuNumber(payOrderReq.getGoodsSkuNumber());
orderSku.setGoodsSkuId(payOrderReq.getGoodsSkuId());
for(GoodsSkuDTO goodsSkuDTO : goodsSkuList) {
if(payOrderReq.getGoodsSkuId().equals(goodsSkuDTO.getId())) {
orderSku.setGoodsSkuPrice(goodsSkuDTO.getPrice());
break;
}
}
orderSkus.add(orderSku);
});
orderSkuService.saveBatch(orderSkus);
}
- 商品服务
@DS("mall_goods")
@Override
public List<GoodsSku> queryGoodsByIds(List<Long> idList) {
return goodsSkuMapper.queryGoodsByIds(idList);
}
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("mall_goods")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void reduceStock(List<ReduceStockDTO> reduceStockReqList) {
reduceStockReqList.forEach(sku -> {
Integer line = goodsSkuMapper.reduceStock(sku.getNumber(), sku.getSkuId());
if(line == null || line == 0) {
throw new BusinessException("商品不存在或库存不足");
}
});
}
- 支付服务
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("mall_pay")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Long pay(Long userId, BigDecimal payMoney) {
//调用gitegg-mall-user的账户扣除余额接口
mallUserFeign.accountDeduction(userId, payMoney);
// 插入支付记录表
PayRecord payRecord = new PayRecord();
payRecord.setUserId(userId);
payRecord.setAmount(payMoney);
payRecord.setPayStatus(GitEggConstant.Number.ONE);
payRecord.setPayType(GitEggConstant.Number.FIVE);
payRecordService.save(payRecord);
return payRecord.getId();
}
- 账户服务
/**
* 事务传播特性设置为 REQUIRES_NEW 开启新的事务 重要!!!!一定要使用REQUIRES_NEW
*/
@DS("mall_user")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void deduction(Long userId, BigDecimal amountOfMoney) {
//查看账户余额是否满足扣款
QueryUserAccountDTO queryUserAccountDTO = new QueryUserAccountDTO();
queryUserAccountDTO.setUserId(userId);
UserAccountDTO userAccount = this.queryUserAccount(queryUserAccountDTO);
if(userAccount == null) {
throw new BusinessException("用户未开通个人账户");
}
if(amountOfMoney.compareTo(userAccount.getBalance()) > GitEggConstant.Number.ZERO) {
throw new BusinessException("账户余额不足");
}
//执行扣款
userAccountMapper.deductionById(userAccount.getId(), amountOfMoney);
//加入账户变动记录
UserBalanceRecord userBalanceRecord = new UserBalanceRecord();
userBalanceRecord.setUserId(userId);
userBalanceRecord.setAmount(amountOfMoney);
userBalanceRecord.setType(GitEggConstant.Number.TWO);
userBalanceRecordService.save(userBalanceRecord);
}
- 使用Postman测试,发送请求,然后查看数据库是否都增加了数据,正常情况下,几个数据库的表都有新增或更新
- 测试异常情况,在代码中抛出异常,然后进行debug,查看在异常之前数据库数据是否入库,异常之后,入库数据是否已回滚。同时可观察undo_log表的数据情况。
# 在订单服务中添加
throw new BusinessException("测试异常回滚");
四、整合数据库分库分表
首先在我们整合dynamic-datasource和shardingsphere-JDBC之前,需要了解它们的异同点:dynamic-datasource从字面意思可以看出,它是动态多数据源,其主要功能是支持多数据源及数据源动态切换不支持数据分片,shardingsphere-jdbc主要功能是数据分片、读写分离,当然也支持多数据源,但是到目前为止如果要支持多数据源动态切换的话,需要自己实现,所以,这里结合两者的优势,使用dynamic-datasource的动态多数据源切换+shardingsphere-jdbc的数据分片、读写分离。
- 在gitegg-platform-bom和gitegg-platform-db中引入shardingsphere-jdbc的依赖,重新install。(注意这里使用了5.0.0-alpha版本,正式环境请使用最新发布版。)
<!-- shardingsphere-jdbc -->
<sharding.jdbc.version>5.0.0-alpha</sharding.jdbc.version>
<!-- Shardingsphere-jdbc -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>${shardingsphere.jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-namespace</artifactId>
<version>${shardingsphere.jdbc.version}</version>
</dependency>
- 在gitegg-platform-db中,新建DynamicDataSourceProviderConfig类,自定义DynamicDataSourceProvider完成与shardingsphere的集成
/**
* @author GitEgg
* @date 2021-04-23 19:06:51
**/
@Configuration
@AutoConfigureBefore(DynamicDataSourceAutoConfiguration.class)
public class DynamicDataSourceProviderConfig {
@Resource
private DynamicDataSourceProperties properties;
/**
* shardingSphereDataSource
*/
@Lazy
@Resource(name = "shardingSphereDataSource")
private DataSource shardingSphereDataSource;
@Bean
public DynamicDataSourceProvider dynamicDataSourceProvider() {
Map<String, DataSourceProperty> datasourceMap = properties.getDatasource();
return new AbstractDataSourceProvider() {
@Override
public Map<String, DataSource> loadDataSources() {
Map<String, DataSource> dataSourceMap = createDataSourceMap(datasourceMap);
dataSourceMap.put("sharding", shardingSphereDataSource);
return dataSourceMap;
}
};
}
/**
* 将动态数据源设置为首选的
* 当spring存在多个数据源时, 自动注入的是首选的对象
* 设置为主要的数据源之后,就可以支持shardingsphere-jdbc原生的配置方式了
*/
@Primary
@Bean
public DataSource dataSource(DynamicDataSourceProvider dynamicDataSourceProvider) {
DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
dataSource.setPrimary(properties.getPrimary());
dataSource.setStrict(properties.getStrict());
dataSource.setStrategy(properties.getStrategy());
dataSource.setProvider(dynamicDataSourceProvider);
dataSource.setP6spy(properties.getP6spy());
dataSource.setSeata(properties.getSeata());
return dataSource;
}
}
- 新建用来分库的数据库表gitegg_cloud_mall_order0和gitegg_cloud_mall_order1,复制gitegg_cloud_mall_order中的表结构。
- 在Nacos中分别配置shardingsphere-jdbc和多数据源
# shardingsphere 配置
shardingsphere:
props:
sql:
show: true
datasource:
common:
type: com.alibaba.druid.pool.DruidDataSource
validationQuery: SELECT 1 FROM DUAL
names: shardingorder0,shardingorder1
shardingorder0:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1/gitegg_cloud_mall_order0?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf8&all owMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: root
shardingorder1:
type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1/gitegg_cloud_mall_order1?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf8&all owMultiQueries=true&serverTimezone=Asia/Shanghai
username: root
password: root
rules:
sharding:
tables:
t_mall_order:
actual-data-nodes: shardingorder$->{0..1}.t_mall_order$->{0..1}
# 配置分库策略
databaseStrategy:
standard:
shardingColumn: id
shardingAlgorithmName: database-inline
table-strategy:
standard:
sharding-column: id
sharding-algorithm-name: table-inline-order
key-generate-strategy:
column: id
key-generator-name: snowflake
t_mall_order_sku:
actual-data-nodes: shardingorder$->{0..1}.t_mall_order_sku$->{0..1}
# 配置分库策略
databaseStrategy:
standard:
shardingColumn: id
shardingAlgorithmName: database-inline
table-strategy:
standard:
sharding-column: id
sharding-algorithm-name: table-inline-order-sku
key-generate-strategy:
column: id
key-generator-name: snowflake
sharding-algorithms:
database-inline:
type: INLINE
props:
algorithm-expression: shardingorder$->{id % 2}
table-inline-order:
type: INLINE
props:
algorithm-expression: t_mall_order$->{id % 2}
table-inline-order-sku:
type: INLINE
props:
algorithm-expression: t_mall_order_sku$->{id % 2}
key-generators:
snowflake:
type: SNOWFLAKE
props:
worker-id: 123
- 修改OrderServiceImpl.java的下单方法order注解,数据源选择sharding
@DS("sharding")//每一层都需要使用多数据源注解切换所选择的数据库
@Transactional(propagation = Propagation.REQUIRES_NEW)
@GlobalTransactional //重点 第一个开启事务的需要添加seata全局事务注解
@Override
public void order(List<OrderSkuDTO> orderSkuList, Long userId) {
......
}
- postman模拟测试调用下单接口,观察数据库gitegg_cloud_mall_order0和gitegg_cloud_mall_order1里面的order表数据变化,我们发现,数据记录根据id取余存放到对应的库和表。这里的配置是使用order表的id,在实际生产环境中,需要根据实际情况来选择合适的分库分表策略。
- 测试引入shardingsphere-jdbc后分布式事务是否正常,在OrderServiceImpl.java的下单方法order中的最后主动抛出异常,saveBatch之后打断点,使用postman模拟测试调用下单接口,到达断点时,查看数据是否入库,放开断点,抛出异常,然后再查看数据是否被回滚。
orderSkuService.saveBatch(orderSkus);
throw new BusinessException("测试异常");
备注:
- sharding-jdbc启动时报错java.sql.SQLFeatureNotSupportedException: isValid
解决: 这个是4.x版本的问题,官方会在5.x结局这个问题,目前解决方案是关闭sql健康检查。
本文源码在 https://gitee.com/wmz1930/GitEgg 的 chapter-27(未使用shardingsphere-jdbc分库分表)和 chapter-27-shardingsphere-jdbc(使用shardingsphere-jdbc分库分表)分支。
GitEgg-Cloud是一款基于SpringCloud整合搭建的企业级微服务应用开发框架,开源项目地址:
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)