图片来源:Jan Antonin Kolar,来自 Unsplash
Mysql、Redis 和 Mongo 都是非常受欢迎的存储引擎,每个引擎都有其优点。在实际应用中,通常会同时使用多个存储引擎,并确保跨多个存储引擎的数据一致性成为一个要求。
本文举例说明了如何在多个存储引擎(Mysql、Redis 和 Mongo)之间实现分布式事务。本示例基于分布式事务框架 https://github.com/dtm-labs/dtm,并希望能够帮助你解决微服务中的数据一致性问题。
DTM 首先提出了灵活组合多个存储引擎以形成分布式事务的能力,其他分布式事务框架都没有像 DTM 一样表述过这种能力。
我们要解决的问题是什么?
假设一个用户正在参与促销活动:他有一个余额,充值话费,促销活动将赠送商城积分。余额存储在 Mysql 中,话费存储在 Redis 中,商城积分存储在 Mongo 中。由于促销活动有时间限制,参与可能失败,因此需要支持回滚。
对于上述问题场景,你可以使用 DTM 的 Saga 事务,下面我们将详细解释解决方案。
准备数据
第一步是准备数据。为了方便用户快速开始使用示例,我们已经准备好了所有数据,包括 Mysql、Redis 和 Mongo,具体的连接用户名和密码可以在 dtm-labs/dtm-examples 中找到。
如果你想自己在本地准备数据环境,可以使用 dtm-labs/dtm/blob/main/helper/compose.store.yml 来启动 Mysql、Redis、Mongo,然后在 dtm-labs/dtm/tree/main/sqls 中执行脚本来为本示例准备数据,其中 busi.*
是业务数据,barrier.*
是 DTM 使用的辅助表。
编写业务代码
我们先从最熟悉的存储引擎 Mysql 的业务代码开始:
下面的代码使用 Golang 编写,其他语言如 C#、PHP、Java 可以在 DTM SDKs 中找到。
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int) error {
_, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?" , amount, uid)
return err
}
该代码主要在数据库中执行用户余额的调整。在本示例中,这部分代码不仅用于 Saga 的正向操作,还用于补偿操作,其中补偿操作只需要传入负数金额即可。
对于 Redis 和 Mongo,业务代码的处理方式类似,只需递增或递减相应的余额。
如何确保幂等性
对于 Saga 事务模式,当子事务服务出现临时故障时,将重试失败的操作。此故障可能发生在子事务提交之前或之后,因此子事务操作需要具有幂等性。
DTM 提供了辅助表和辅助函数,以帮助用户快速实现幂等性。对于 Mysql,它将在业务数据库中创建一个辅助表 barrier
,当用户启动事务以调整余额时,它将首先在 barrier
表中插入 Gid
。如果有重复行,则插入将失败,然后跳过余额调整以确保幂等性。使用辅助函数的代码如下:
app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
})
}))
Mongo 处理幂等性的方式与 Mysql 类似,因此我不再详细介绍。
Redis 的幂等性处理方式与 Mysql 不同,主要是由于事务原理的差异。Redis 事务主要通过 Lua 的原子执行来保证。DTM 辅助函数将通过 Lua 脚本来调整余额。在调整余额之前,它将在 Redis 中查询 Gid
。如果 Gid
存在,则跳过余额调整;如果不存在,则记录 Gid
并执行余额调整。使用辅助函数的代码如下:
app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
}))
如何进行补偿
对于 Saga,我们还需要处理补偿操作,但补偿不仅仅是简单的反向调整,还有许多需要注意的问题。
一方面,补偿需要考虑幂等性,因为前面的子节中描述的失败和重试也存在于补偿中。
另一方面,补偿还需要考虑“空补偿”,因为 Saga 的正向操作可能会返回失败,这可能发生在数据调整之前或之后。对于已提交调整的失败,我们需要执行反向调整,但对于未提交调整的失败,我们需要跳过反向调整。
在 DTM 提供的辅助函数中,一方面,它将根据前向操作插入的 Gid
来确定补偿是否为空补偿,另一方面,它将再次插入 Gid+'compensate'
来确定补偿是否为重复操作。如果是正常的补偿操作,则会在业务上执行数据调整;如果是空补偿或重复补偿,则会跳过业务上的调整。以下是MySQL和Redis的代码。
app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
})
}))
app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
}))
赔偿服务的代码与前面的转账操作几乎完全相同,只是金额乘以了-1。DTM助手函数会自动处理幂等性和空值赔偿问题。
其他异常
在编写转账和赔偿操作时,实际上还有一种异常叫做“暂停”。当全局事务超时或重试达到配置的限制时,将回滚全局事务。正常情况下,先执行转账操作再执行赔偿操作,但在“进程暂停”情况下,赔偿操作可能会先于转账操作执行。因此,转账操作也需要确定是否已执行赔偿操作,在已执行的情况下,也需要跳过数据调整。
对于DTM用户,这些异常已经得到了优雅而妥善的处理,你只需要按照上面描述的MustBarrierFromGin(c).Call
调用即可,无需关心它们。DTM处理这些异常的原则在此处有详细描述:异常和子事务障碍
启动分布式事务
在编写单个子事务服务后,以下代码部分启动Saga全局事务。
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)).
Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", &busi.TransReq{Amount: 50}).
Add(busi.Busi+"/SagaMongoTransIn", busi.Busi+"/SagaMongoTransInCom", &busi.TransReq{Amount: 30}).
Add(busi.Busi+"/SagaRedisTransIn", busi.Busi+"/SagaRedisTransOutIn", &busi.TransReq{Amount: 20})
err := saga.Submit()
在这段代码中,创建了一个由3个子事务组成的Saga全局事务。
- 从MySQL转出50元
- 在Mongo中转入30元
- 在Redis中转入20元
如果在整个事务期间,所有子事务都成功完成,则全局事务成功;如果其中一个子事务返回业务失败,则全局事务将回滚。
运行
如果你想运行上述完整的示例,请按照以下步骤操作。
- 运行DTM
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
- 运行成功示例
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb
- 运行失败示例
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb_rollback
你可以修改示例以模拟各种临时故障、空赔偿情况和其他各种数据在整个全局事务完成时保持一致的异常情况。
总结
本文给出了一个跨MySQL、Redis和Mongo的分布式事务示例。它详细描述了需要处理的问题和解决方案。
本文中的原则适用于所有支持ACID事务的存储引擎,你可以快速将其扩展到其他引擎,如TiKV。
欢迎访问github.com/dtm-labs/dtm。这是一个专门为了使微服务中的分布式事务更容易而创建的项目。它支持多种语言和多种模式,如2阶段消息、Saga、Tcc和Xa。
评论(0)