逐步分解事务以使用saga模式

举报
码乐 发表于 2025/02/22 09:49:25 2025/02/22
【摘要】 1 简介本文介绍在 web 服务中实现 Saga 模式,实现一个在商品管理系统中的CRUD操作想要在 web 服务中实现 Saga 模式,可以通过以下步骤实现一个商品管理系统的 CRUD 操作,其中 Saga 模式的事务链包含以下基本元素:事务分解:将每个操作拆分为单独的小事务。每个小事务对应其业务逻辑(如创建、更新、删除)。为每个小事务实现补偿操作(如回滚)。协调器设计:负责管理 Sag...

1 简介

本文介绍在 web 服务中实现 Saga 模式,实现一个在商品管理系统中的CRUD操作

想要在 web 服务中实现 Saga 模式,可以通过以下步骤实现一个商品管理系统的 CRUD 操作,其中 Saga 模式的事务链包含以下基本元素:

  • 事务分解:

将每个操作拆分为单独的小事务。
每个小事务对应其业务逻辑(如创建、更新、删除)。
为每个小事务实现补偿操作(如回滚)。
协调器设计:

负责管理 Saga 流程,控制事务链的执行顺序和状态。

  • 补偿机制:

当某个事务失败时,通过补偿操作撤销已完成的事务。
以下是一个完整的示例,展示如何在商品管理系统中实现 Saga 模式的 CRUD 操作。

2 系统设计

场景描述

服务管理一个商品表,包含商品信息。

事务链包括:

创建商品(主数据库)。
添加库存记录(库存数据库)。
更新日志记录(日志数据库)。

数据库表结构
商品表 (主数据库)

      CREATE TABLE products (
          id INT AUTO_INCREMENT PRIMARY KEY,
          name VARCHAR(255) NOT NULL,
          price DECIMAL(10, 2) NOT NULL,
          created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      );

库存表 (库存数据库)

      CREATE TABLE inventory (
          id INT AUTO_INCREMENT PRIMARY KEY,
          product_id INT NOT NULL,
          quantity INT NOT NULL,
          created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      );

日志表 (日志数据库)

      CREATE TABLE saga_logs (
          id INT AUTO_INCREMENT PRIMARY KEY,
          saga_id VARCHAR(64) NOT NULL,
          step VARCHAR(255) NOT NULL,
          state ENUM('STARTED', 'COMPLETED', 'FAILED') NOT NULL,
          created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
      );

3 服务实现

依赖安装

      go get -u github.com/gin-gonic/gin
      go get -u github.com/go-sql-driver/mysql
      go get -u github.com/sirupsen/logrus

代码实现

        type DB struct {
            conn *sql.DB
        }

        var dbProducts, dbInventory, dbLogs *DB

        func initDB(dsn string) *DB {
            conn, err := sql.Open("mysql", dsn)
            if err != nil {
                log.Fatalf("Failed to connect to database: %v", err)
            }
            return &DB{conn: conn}
        }

        // Saga Step: Create Product
        func createProduct(txID, name string, price float64) (int64, error) {
            result, err := dbProducts.conn.Exec("INSERT INTO products (name, price) VALUES (?, ?)", name, price)
            if err != nil {
                return 0, err
            }
            productID, _ := result.LastInsertId()

            // Log Saga step
            _, _ = dbLogs.conn.Exec("INSERT INTO saga_logs (saga_id, step, state) VALUES (?, ?, 'STARTED')", txID, "create_product")
            return productID, nil
        }

        func rollbackCreateProduct(txID string, productID int64) error {
            _, err := dbProducts.conn.Exec("DELETE FROM products WHERE id = ?", productID)

            // Update Saga log
            _, _ = dbLogs.conn.Exec("UPDATE saga_logs SET state = 'FAILED' WHERE saga_id = ? AND step = ?", txID, "create_product")
            return err
        }

        // Saga Step: Add Inventory
        func addInventory(txID string, productID int64, quantity int) error {
            _, err := dbInventory.conn.Exec("INSERT INTO inventory (product_id, quantity) VALUES (?, ?)", productID, quantity)

            // Log Saga step
            _, _ = dbLogs.conn.Exec("INSERT INTO saga_logs (saga_id, step, state) VALUES (?, ?, 'STARTED')", txID, "add_inventory")
            return err
        }

        func rollbackAddInventory(txID string, productID int64) error {
            _, err := dbInventory.conn.Exec("DELETE FROM inventory WHERE product_id = ?", productID)

            // Update Saga log
            _, _ = dbLogs.conn.Exec("UPDATE saga_logs SET state = 'FAILED' WHERE saga_id = ? AND step = ?", txID, "add_inventory")
            return err
        }

        // Saga Step: Log Transaction
        func logTransaction(txID, description string) error {
            _, err := dbLogs.conn.Exec("INSERT INTO saga_logs (saga_id, step, state, created_at) VALUES (?, ?, 'COMPLETED', NOW())", txID, description)
            return err
        }

        func main() {
            // Initialize databases
            dbProducts = initDB("user:password@tcp(127.0.0.1:3306)/products_db")
            dbInventory = initDB("user:password@tcp(127.0.0.1:3306)/inventory_db")
            dbLogs = initDB("user:password@tcp(127.0.0.1:3306)/logs_db")
            defer dbProducts.conn.Close()
            defer dbInventory.conn.Close()
            defer dbLogs.conn.Close()

            r := gin.Default()

            r.POST("/saga/create_product", func(c *gin.Context) {
                var req struct {
                    Name     string  `json:"name" binding:"required"`
                    Price    float64 `json:"price" binding:"required"`
                    Quantity int     `json:"quantity" binding:"required"`
                }

                if err := c.ShouldBindJSON(&req); err != nil {
                    c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
                    return
                }

                txID := fmt.Sprintf("saga_%d", time.Now().UnixNano())
                log.Println("Starting Saga:", txID)

                // Step 1: Create Product
                productID, err := createProduct(txID, req.Name, req.Price)
                if err != nil {
                    c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create product"})
                    return
                }

                // Step 2: Add Inventory
                err = addInventory(txID, productID, req.Quantity)
                if err != nil {
                    rollbackCreateProduct(txID, productID) // Rollback product creation
                    c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to add inventory"})
                    return
                }

                // Step 3: Log Transaction
                err = logTransaction(txID, "Product creation completed")
                if err != nil {
                    rollbackAddInventory(txID, productID) // Rollback inventory
                    rollbackCreateProduct(txID, productID)
                    c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to log transaction"})
                    return
                }

                c.JSON(http.StatusOK, gin.H{"message": "Saga completed successfully", "product_id": productID})
            })

            r.Run(":8080")
        }

执行过程,创建商品:

请求:

      POST http://localhost:8080/saga/create_product
      {
        "name": "Laptop",
        "price": 1200.50,
        "quantity": 10
      }

服务会依次执行以下步骤:

创建商品。
增加库存。
记录日志。

事务回滚,如果任意步骤失败,将执行补偿逻辑:

回滚商品创建。
删除库存记录。

4 小结

Saga 模式的优势与劣势

优势:

非阻塞,适合分布式服务。
更灵活,可扩展。
支持最终一致性。

劣势:

编程复杂性较高。
数据不保证强一致性。

以上实现展示了如何使用web服务搭建一个基于 Saga 模式 的商品管理系统,适合在分布式微服务中应用。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。