在 MongoDB 中执行两阶段提交

举报
看山 发表于 2021/09/28 22:54:22 2021/09/28
【摘要】 简介 本篇文档提供了一个通过使用两阶段提交将数据写入多个文档的方法来处理多文档更新或“多文档事务”。此外,你可以扩展实现类似数据回滚的功能。 背景 在MongoDB中,操作单个文档(document...

简介

本篇文档提供了一个通过使用两阶段提交将数据写入多个文档的方法来处理多文档更新或“多文档事务”。此外,你可以扩展实现类似数据回滚的功能。

背景

在MongoDB中,操作单个文档(document)是原子性的;但是,涉及到多个文档的操作,也就是常说的“多文档事务”,是非原子性的。由于document可以设计的非常复杂,包含多个“内嵌的”文档,因此单个文档的原子性为很多实际场景提供了必要的支持。
尽管单文档原子操作很强大,但在很多场景下依然需要多文档事务。当执行一个由几个顺序操作组成的事务时,可能会出现某些问题,例如:

  • 原子性:如果某个操作失败了,在同一个事务中前面的操作回滚到最初的状态(即,要么全做,要么全部做)。
  • 一致性:如果发生了严重故障将事务中断(比如:网络、硬件故障),数据库必须能够恢复到一致的状态。

在需要多文档事务的场景中,你可以实现两阶段提交来完成场景需求。两阶段提交可以保证数据的一致性,如果发生错误,可以恢复到事务开始之前的状态。在事务执行过程中,无论发生什么情况都可以还原到数据和状态的准备阶段。

注:
因为在MongoDB中只有单文档操作是原子性的,两阶段提交只能提供类似事务的语义。在两阶段提交或回滚过程中,应用程序可以返回的任意步骤点的中间数据。

模式

概述

考虑这样一个场景,你想从账户A转账给账户B。在关系型数据库系统中,你可以在一个多语句事务中先减少账户A的资金然后增加账户B的资金。在MongoDB中,你可以模拟实现一个两阶段提交的得到同样的结果。
本节中的示例使用下面两个集合:

  1. 集合accounts保存账户信息。
  2. 集合transactions保存转账事务信息。

初始化源账户和目标账户

将账户A和账户B的信息写入到集合accounts。

db.accounts.insert(
   [
     { _id: "A", balance: 1000, pendingTransactions: [] },
     { _id: "B", balance: 1000, pendingTransactions: [] }
   ]
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

上面的语句返回一个包含了本次操作的状态信息的BulkWriteResult() 对象。如果成功写入,BulkWriteResult()对象中的nInserted的值为2。

初始化转帐数据

将每笔转账信息写入到transactions表,转账数据包含以下字段:

  • source和destination字段,指向accounts集合中的_id
  • value字段,表示转账金额,影响源账户和目标账户的余额
  • state字段,表示转账操作当前状态,state字段可选值范围为initial, pending, applied, done, canceling和 canceled
  • lastModified字段,表示最后更新时间

将账户A向账户B转账100的操作信息初始化到transactions集合,事务状态(state字段)为"initial", lastModified字段值设为当前时间:

db.transactions.insert(
    { _id: 1, source: "A", destination: "B", value: 100, state: "initial", lastModified: new Date() }
)

  
 
  • 1
  • 2
  • 3

上面的语句返回一个包含了本次操作的状态信息的WriteResult() 对象。如果成功写入,WriteResult()对象中的nInserted的值为2。

使用两阶段提交进行转账

1. 检索交易开始

在transactions集合中查询一条state字段值为initial的数据。当前transactions集合中只有一条数据,也就是在初始化转账数据中添加的数据。如果集合中有另外的数据,查询将会返回所有state字段为initial的数据,除非你附加一些别的查询条件。

var t = db.transactions.findOne( { state: "initial" } )

  
 
  • 1

mongo shell中定义变量t来保存返回的内容。上边的语句会得到如下输出(lastModified字段应该是你插入数据的时间):

{ "_id" : 1, "source" : "A", "destination" : "B", "value" : 100, "state" : "initial", "lastModified" : ISODate("2014-07-11T20:39:26.345Z") }

  
 
  • 1

2. 将transaction数据的state字段设为pending

将transaction数据的state字段从initial改为pending,并用$currentDate操作将lastModified字段设为当前时间。

db.transactions.update(
    { _id: t._id, state: "initial" },
    {
      $set: { state: "pending" },
      $currentDate: { lastModified: true }
    }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

这个操作返回一个WriteResult()对象,包含本次更新操作的状态信息,如果更新成功,nMatchednModified值为1。
在修改块中,state: "initial"条件确保没有其它线程更新过本条数据。如果nMatchednModified值为0,回到第一步重新获取一条数据然后执行。

3. 账户间转账

如果账户不包含该事务t的信息,用update()方法更新帐户信息。在更新条件中,带有pendingTransactions: { $ne: t._id }是为了避免重复同一次转账。
同时更新balance字段和pendingTransactions字段来实现转账。
在事务中操作account集合,同事修改balance字段和pendingTransactions字段。
更新源账户信息,为balance字段减去transaction数据的value值,并将transaction的_id写入到pendingTransactions字段的数组中。

db.accounts.update(
   { _id: t.source, pendingTransactions: { $ne: t._id } },
   { $inc: { balance: -t.value }, $push: { pendingTransactions: t._id } }
)

  
 
  • 1
  • 2
  • 3
  • 4

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。
更新目标账户信息,为balance字段加上transaction数据的value值,并将transaction的_id写入到pendingTransactions字段的数组中。

db.accounts.update(
   { _id: t.destination, pendingTransactions: { $ne: t._id } },
   { $inc: { balance: t.value }, $push: { pendingTransactions: t._id } }
)

  
 
  • 1
  • 2
  • 3
  • 4

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。

4. 将transaction数据的state设为applied

用下面的update()操作将transaction数据的state值设为applied,并更新lastModified字段值为当前时间:

db.transactions.update(
   { _id: t._id, state: "pending" },
   {
     $set: { state: "applied" },
     $currentDate: { lastModified: true }
   }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。

5. 修改两个账户的事务待定列表

将transaction数据的_id值从两个账户的pendingTransactions字段中移除。
修改源账户。

db.accounts.update(
   { _id: t.source, pendingTransactions: t._id },
   { $pull: { pendingTransactions: t._id } }
)

  
 
  • 1
  • 2
  • 3
  • 4

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。
修改目标账户。

db.accounts.update(
   { _id: t.destination, pendingTransactions: t._id },
   { $pull: { pendingTransactions: t._id } }
)

  
 
  • 1
  • 2
  • 3
  • 4

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。

6. 更新transaction数据的state为done.

将transaction数据的state设为done,更新lastModified为当前时间,这也标志着本次事务的结束:

db.transactions.update(
   { _id: t._id, state: "applied" },
   {
     $set: { state: "done" },
     $currentDate: { lastModified: true }
   }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。

从失败场景恢复

其实事务最重要的部分不是上面示例中比较顺利的场景,而是当事务未成功完成时有没有可能从各种各样失败情况中恢复。本节会概括各种可能出现的失败场景,并教你一些步骤,如何从这些事件中恢复。

恢复操作

两阶段提交模式运行应用程勋运行一系列的操作来恢复事务到一致状态。在应用启动时或一个定时任务来运行恢复操作,可以用来捕捉未完成的事务。
在一致性问题上对于时间的需求取决于应用间隔多长时间来恢复每个事务。
接下来的恢复过程根据lastModified字段做为指标来决定pending状态的事务是否需要进行恢复;再具体点,如果pending或applied 状态的事务已经有30分钟没有更新过,恢复程序会认为这些事务需要进行恢复。你可以用不同的条件来决定事务是否需要恢复。

pending状态的事务

要恢复发生在上文举例的“将transaction数据的state字段设为pending” 步骤之后,但发生在“将transaction数据的state设为applied”步骤之前的错误,先从transactions集合中获取一条pending状态的数据:

var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "pending", lastModified: { $lt: dateThreshold } } );

  
 
  • 1
  • 2
  • 3

然后从上文的 “账户间转账”步骤开始继续执行。

applied状态的事务

To recover from failures that occur after step “Update transaction state to applied.” but before “Update transaction state to done.” step, retrieve from the transactions collection an applied transaction for recovery:
要恢复发生在上文举例的“将transaction数据的state设为applied”步骤之后,但发生在“更新transaction数据的state为done”步骤之前的错误,先从transactions集合中获取一条applied状态的数据:

var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
var t = db.transactions.findOne( { state: "applied", lastModified: { $lt: dateThreshold } } );

  
 
  • 1
  • 2
  • 3

然后从上文的“修改两个账户的事务待定列表”步骤开始继续执行。

回滚操作

在一些情况下,你可能需要“回滚”或取消事务;举例来说,比如应用程序需要去“取消”事务,或者事务中的某个账户不存在,或者在事务运行阶段账户不存在了。

applied状态的事务

在“将transaction数据的state设为applied”步骤之后,你最好不要回滚事务了。而应该完成这个事务,然后启动一个补偿事务,将上个事务的源账户和目标账户调换一下,再做一次转账。

pending状态的事务

在“将transaction数据的state字段设为pending”步骤之后,在“将transaction数据的state设为applied”步骤之前,你可以根据下面的流程来回滚事务:

1. 将transaction数据state设为canceling

将transaction的state从pending设为canceling。

db.transactions.update(
   { _id: t._id, state: "pending" },
   {
     $set: { state: "canceling" },
     $currentDate: { lastModified: true }
   }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。

2. 撤消两个账户的事务

在两个账户上撤销事务,如果事务已经是applied状态,执行反向操作。在update条件的中加上pendingTransactions:t._id来筛选状态是applied的数据。
更新目标账户信息,在balance字段上减去transaction数据的value值,并从pendingTransactions数组中移除transaction数据的_id

db.accounts.update(
   { _id: t.destination, pendingTransactions: t._id },
   {
     $inc: { balance: -t.value },
     $pull: { pendingTransactions: t._id }
   }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。如果转账事务没有发生在该账户上,那么上面的更新操作匹配不到数据,nMatchednModified值会是0。
更新源账户信息,在balance字段上加上transaction数据的value值,并从pendingTransactions数组中移除transaction数据的_id

db.accounts.update(
   { _id: t.source, pendingTransactions: t._id },
   {
     $inc: { balance: t.value},
     $pull: { pendingTransactions: t._id }
   }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。如果转账事务没有发生在该账户上,那么上面的更新操作匹配不到数据,nMatchednModified值会是0。

3. 将transaction数据state设为canceled

将transaction数据的state从canceling设为cancelled来完成回滚。

db.transactions.update(
   { _id: t._id, state: "canceling" },
   {
     $set: { state: "cancelled" },
     $currentDate: { lastModified: true }
   }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

操作成功后,方法会返回WriteResult()对象,nMatchednModified值为1。

多应用

从某种程度上说,事务的存在是为了便于多个应用并发创建和运行某些操作,而不会引发数据不一致和冲突。在我们的程序中,更新或获取transaction集合的数据时,更新条件中都会包含state字段条件,这能防止多应用冲突的请求transaction数据。
例如,App1和App2同时获取了某条相同的state为initial的transaction数据。在App2开工前,App1执行了完整的事务,当App2试图执行步骤“将transaction数据的state字段设为pending”时,由于更新条件中包含有state: "initial"语句,更新操作匹配不到数据,nMatchednModified的值会是0。这会让App2返回到第一步去获取另一条transaction数据重新开始事务流程。
当多个应用运行时,最关键的是在任意时刻只能有唯一一个应用能操作一条给定的transaction数据。同样的,除了在更新条件中包含预期的事务状态之外,你还可以为transaction数据创建一个标记来鉴别正在操作该transaction数据的应用。用findAndModify()方法来修改并返回transaction数据:

t = db.transactions.findAndModify(
       {
         query: { state: "initial", application: { $exists: false } },
         update:
           {
             $set: { state: "pending", application: "App1" },
             $currentDate: { lastModified: true }
           },
         new: true
       }
    )

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

修改事务操作,可以确保只有匹配上application字段标识的应用才能操作相应的transaction数据。
如果App1在事务执行过程中失败了,你可以用恢复程序进行恢复,但是在恢复之前,应用程序必须确定它们“拥有”相应的transaction数据。例如要找到并继续执行一个pending状态的事务,使用类似下面的查询:

var dateThreshold = new Date();
dateThreshold.setMinutes(dateThreshold.getMinutes() - 30);
db.transactions.find(
   {
     application: "App1",
     state: "pending",
     lastModified: { $lt: dateThreshold }
   }
)

  
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

在生产环境下使用两阶段提交

本文中的事务例子有意的设计简单写。比如,假设总能够对账户做回滚操作,并且账户余额是负数。
生产环境中的情况可能会更复杂一些。例如,真实场景下账户需要的信息还包括当前余额、待转出、待转入。
对于所有的事务来说,在你部署的时候需要保证使用合适等级的write concern

原文链接:Perform Two Phase Commits
翻译:沉潜飞动
译文链接:在 MongoDB 中执行两阶段提交

文章来源: kanshan.blog.csdn.net,作者:看山,版权归原作者所有,如需转载,请联系作者。

原文链接:kanshan.blog.csdn.net/article/details/50178045

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。