逐步分解事务以使用saga模式
【摘要】 1 简介本文介绍在 web 服务中实现 Saga 模式,实现一个在商品管理系统中的CRUD操作想要在 web 服务中实现 Saga 模式,可以通过以下步骤实现一个商品管理系统的 CRUD 操作,其中 Saga 模式的事务链包含以下基本元素:事务分解:将每个操作拆分为单独的小事务。每个小事务对应其业务逻辑(如创建、更新、删除)。为每个小事务实现补偿操作(如回滚)。协调器设计:负责管理 Sag...
1 简介
本文介绍在 web 服务中实现 Saga 模式,实现一个在商品管理系统中的CRUD操作
想要在 web 服务中实现 Saga 模式,可以通过以下步骤实现一个商品管理系统的 CRUD 操作,其中 Saga 模式的事务链包含以下基本元素:
- 事务分解:
将每个操作拆分为单独的小事务。
每个小事务对应其业务逻辑(如创建、更新、删除)。
为每个小事务实现补偿操作(如回滚)。
协调器设计:
负责管理 Saga 流程,控制事务链的执行顺序和状态。
- 补偿机制:
当某个事务失败时,通过补偿操作撤销已完成的事务。
以下是一个完整的示例,展示如何在商品管理系统中实现 Saga 模式的 CRUD 操作。
2 系统设计
场景描述
服务管理一个商品表,包含商品信息。
事务链包括:
创建商品(主数据库)。
添加库存记录(库存数据库)。
更新日志记录(日志数据库)。
数据库表结构
商品表 (主数据库)
CREATE TABLE products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
price DECIMAL(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
库存表 (库存数据库)
CREATE TABLE inventory (
id INT AUTO_INCREMENT PRIMARY KEY,
product_id INT NOT NULL,
quantity INT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
日志表 (日志数据库)
CREATE TABLE saga_logs (
id INT AUTO_INCREMENT PRIMARY KEY,
saga_id VARCHAR(64) NOT NULL,
step VARCHAR(255) NOT NULL,
state ENUM('STARTED', 'COMPLETED', 'FAILED') NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
3 服务实现
依赖安装
go get -u github.com/gin-gonic/gin
go get -u github.com/go-sql-driver/mysql
go get -u github.com/sirupsen/logrus
代码实现
type DB struct {
conn *sql.DB
}
var dbProducts, dbInventory, dbLogs *DB
func initDB(dsn string) *DB {
conn, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatalf("Failed to connect to database: %v", err)
}
return &DB{conn: conn}
}
// Saga Step: Create Product
func createProduct(txID, name string, price float64) (int64, error) {
result, err := dbProducts.conn.Exec("INSERT INTO products (name, price) VALUES (?, ?)", name, price)
if err != nil {
return 0, err
}
productID, _ := result.LastInsertId()
// Log Saga step
_, _ = dbLogs.conn.Exec("INSERT INTO saga_logs (saga_id, step, state) VALUES (?, ?, 'STARTED')", txID, "create_product")
return productID, nil
}
func rollbackCreateProduct(txID string, productID int64) error {
_, err := dbProducts.conn.Exec("DELETE FROM products WHERE id = ?", productID)
// Update Saga log
_, _ = dbLogs.conn.Exec("UPDATE saga_logs SET state = 'FAILED' WHERE saga_id = ? AND step = ?", txID, "create_product")
return err
}
// Saga Step: Add Inventory
func addInventory(txID string, productID int64, quantity int) error {
_, err := dbInventory.conn.Exec("INSERT INTO inventory (product_id, quantity) VALUES (?, ?)", productID, quantity)
// Log Saga step
_, _ = dbLogs.conn.Exec("INSERT INTO saga_logs (saga_id, step, state) VALUES (?, ?, 'STARTED')", txID, "add_inventory")
return err
}
func rollbackAddInventory(txID string, productID int64) error {
_, err := dbInventory.conn.Exec("DELETE FROM inventory WHERE product_id = ?", productID)
// Update Saga log
_, _ = dbLogs.conn.Exec("UPDATE saga_logs SET state = 'FAILED' WHERE saga_id = ? AND step = ?", txID, "add_inventory")
return err
}
// Saga Step: Log Transaction
func logTransaction(txID, description string) error {
_, err := dbLogs.conn.Exec("INSERT INTO saga_logs (saga_id, step, state, created_at) VALUES (?, ?, 'COMPLETED', NOW())", txID, description)
return err
}
func main() {
// Initialize databases
dbProducts = initDB("user:password@tcp(127.0.0.1:3306)/products_db")
dbInventory = initDB("user:password@tcp(127.0.0.1:3306)/inventory_db")
dbLogs = initDB("user:password@tcp(127.0.0.1:3306)/logs_db")
defer dbProducts.conn.Close()
defer dbInventory.conn.Close()
defer dbLogs.conn.Close()
r := gin.Default()
r.POST("/saga/create_product", func(c *gin.Context) {
var req struct {
Name string `json:"name" binding:"required"`
Price float64 `json:"price" binding:"required"`
Quantity int `json:"quantity" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
txID := fmt.Sprintf("saga_%d", time.Now().UnixNano())
log.Println("Starting Saga:", txID)
// Step 1: Create Product
productID, err := createProduct(txID, req.Name, req.Price)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create product"})
return
}
// Step 2: Add Inventory
err = addInventory(txID, productID, req.Quantity)
if err != nil {
rollbackCreateProduct(txID, productID) // Rollback product creation
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to add inventory"})
return
}
// Step 3: Log Transaction
err = logTransaction(txID, "Product creation completed")
if err != nil {
rollbackAddInventory(txID, productID) // Rollback inventory
rollbackCreateProduct(txID, productID)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to log transaction"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "Saga completed successfully", "product_id": productID})
})
r.Run(":8080")
}
执行过程,创建商品:
请求:
POST http://localhost:8080/saga/create_product
{
"name": "Laptop",
"price": 1200.50,
"quantity": 10
}
服务会依次执行以下步骤:
创建商品。
增加库存。
记录日志。
事务回滚,如果任意步骤失败,将执行补偿逻辑:
回滚商品创建。
删除库存记录。
4 小结
Saga 模式的优势与劣势
优势:
非阻塞,适合分布式服务。
更灵活,可扩展。
支持最终一致性。
劣势:
编程复杂性较高。
数据不保证强一致性。
以上实现展示了如何使用web服务搭建一个基于 Saga 模式 的商品管理系统,适合在分布式微服务中应用。
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)