这篇文章主要讲解我们为什么需要一个独立的事务层?如何将其抽象出来,并使用单元测试和集成测试进行测试。
在我之前的博客中,《十亿美元的 Go 错误》,我讨论了 Golang 中常见但是天真的错误,可能会导致连接泄漏。虽然我提供了几种解决此问题的方法,但仍有一个问题困扰着我和我的读者。我们可以通过将事务机制抽象到不同的层中来解决此问题。许多人与我联系以建议此解决方案。
因此,我有一个好奇的问题:
我们真的需要一个事务层吗?
我阅读了几篇其他博客和 GitHub 代码。我注意到其中许多都很好,但没有包括证明抽象层有效的测试。但是,如果没有测试,我们无法确认事务层是否有效。连接泄漏可能会发生,我们无法通过检查代码来检测它。
这导致了我另一个重要的问题:
“如何证明新层没有连接泄漏?”
测试是验证某些东西是否按预期工作的唯一方法。我决定从我脑海中想到的两个简单选项开始:
- 我将运行一些更改表的查询,并验证代码将数据提交到数据库。
- 我将使用一些公开连接信息的机制。我可以验证代码在最后关闭连接。 第一种方法很简单,我在我的项目中使用了很多 😅。但是,第二种方法对我来说完全是未知的 👾。当我在理解事物如何工作方面遇到困难时,我通常会查看语言的源代码。
为了解决我的问题,我在 Golang 的标准数据库库代码中寻求帮助。我发现了一些可以帮助我的东西:DBStats
结构体。
// DBStats contains database statistics.
type DBStats struct {
MaxOpenConnections int // Maximum number of open connections to the database.
// Pool Status
OpenConnections int // The number of established connections both in use and idle.
InUse int // The number of connections currently in use.
Idle int // The number of idle connections.
// Counters
WaitCount int64 // The total number of connections waited for.
WaitDuration time.Duration // The total time blocked waiting for a new connection.
MaxIdleClosed int64 // The total number of connections closed due to SetMaxIdleConns.
MaxIdleTimeClosed int64 // The total number of connections closed due to SetConnMaxIdleTime.
MaxLifetimeClosed int64 // The total number of connections closed due to SetConnMaxLifetime.
}
这正是我正在寻找的解决方案。事务完成后,“MaxOpenConnections”和“InUse”计数应为“0”。如果不是这种情况,则表示抽象层中可能存在潜在泄漏。
我很高兴找到了解决第二个场景的理想解决方案 🎉
要访问 DBStats
,我们可以在 sql.DB
实例上使用 Stats()
方法,如下所示:
db, _ := sqlx.Open("postgres", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
db.Stats()
Stats()
方法的源代码如下所示。你可以通过在这里跟踪源代码 here 来了解它的工作方式以及 go 代码如何记录信息。
// Stats returns database statistics.
func (db *DB) Stats() DBStats {
wait := db.waitDuration.Load()
db.mu.Lock()
defer db.mu.Unlock()
stats := DBStats{
MaxOpenConnections: db.maxOpen,
Idle: len(db.freeConn),
OpenConnections: db.numOpen,
InUse: db.numOpen - len(db.freeConn),
WaitCount: db.waitCount,
WaitDuration: time.Duration(wait),
MaxIdleClosed: db.maxIdleClosed,
MaxIdleTimeClosed: db.maxIdleTimeClosed,
MaxLifetimeClosed: db.maxLifetimeClosed,
}
return stats
}
这已经足够让我实现事务层。因此,现在让我们深入了解代码。但在我们开始之前,让我向你介绍一些我将在代码中使用的库:
有趣的是,在事务层中很难再现连接泄漏的情况。相信我,我尝试过并失败了。😂
我将展示如何使用 DBStats
断言测试旧代码。通过这个示例,那些不会抽象事务层的人可以更新他们的测试,以避免任何连接泄漏。稍后,我们将探讨如何提取事务层并对其进行测试。
package apptest
import (
"context"
"database/sql"
"github.com/jmoiron/sqlx"
)
type Subscription struct {
ID int64 `db:"id"`
Status string `db:"status"`
CanceledAt sql.NullTime `db:"canceled_at"`
}
// ------------------------------ Repository ------------------------------
type srepo struct {
}
// GetSubscription fetches the subscription by id
func (r *srepo) GetSubscription(tx *sqlx.Tx, id int64) (Subscription, error) {
var sub Subscription
err := tx.Get(&sub, "SELECT * FROM subscription WHERE id = $1", id)
if err != nil {
return sub, err
}
return sub, nil
}
// CancelSubscription cancels a given subscription by setting canceled_at to now()
func (r *srepo) CancelSubscription(tx *sqlx.Tx, id int64) (Subscription, error) {
var sub Subscription
err := tx.Get(&sub, "UPDATE subscription SET canceled_at = NOW(), status='canceled' WHERE id = $1 RETURNING *", id)
if err != nil {
return sub, err
}
return sub, nil
}
// ------------------------------ Service ------------------------------
type Service struct {
db *sqlx.DB
repo *srepo
}
func NewService(db *sqlx.DB, repo *srepo) *Service {
return &Service{repo: repo, db: db}
}
func (s *Service) CancelSubscription(ctx context.Context, id int64) (*Subscription, error) {
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
defer func() {
// !!! This would not work if the subscriptions is already canceled
// and the error is not returned
if err != nil {
_ = tx.Rollback()
return
}
}()
sub, err := s.repo.GetSubscription(tx, id)
if err != nil {
return nil, err
}
if sub.Status != "active" {
return &sub, nil
}
if sub.CanceledAt.Valid {
return &sub, nil
}
sub, err = s.repo.CancelSubscription(tx, id)
if err != nil {
return nil, err
}
err = tx.Commit()
return &sub, err
}
上述代码有什么问题?
当订阅已经取消时,它将返回而不出错。当函数返回而不出错时,连接既不回滚也不提交。这会导致连接泄漏。
以下是集成测试,以查看我们如何捕获泄漏。
func Test_ConnectionLeak(t *testing.T) {
pg, err := apptest.StartTestPostgres(t) // Please use the source code to learn more about this code
require.NoError(t, err)
_, err = pg.DB.Exec("CREATE TABLE IF NOT EXISTS subscription (id serial PRIMARY KEY, status varchar(25) NOT NULL, canceled_at timestamp NULL)")
require.NoError(t, err)
_, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('active', NULL)")
require.NoError(t, err)
_, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('canceled', '2023-02-02 01:00:00')")
require.NoError(t, err)
subscription, err := NewService(pg.DB, &srepo{}).CancelSubscription(context.Background(), 2)
require.NoError(t, err)
stats := pg.DB.Stats()
require.Equal(t, 0, stats.InUse, "expected no connections in use")
require.Equal(t, 0, stats.MaxOpenConnections, "expected no max open connection")
require.Equal(t, "canceled", subscription.Status)
require.Equal(t, "2023-02-02 01:00:00 +0000 +0000", subscription.CanceledAt.Time.String())
}
我连接到 Docker 中的 Postgres DB 实例并运行测试。从下面的测试结果中,你可以看到代码存在事务问题。实际计数 InUse
为 1
。在函数调用结束时,连接未关闭。这是我们将在下一节中使用事务抽象层解决的问题。
=== RUN Test_ConnectionLeak
service-connection-leak_test.go:27:
Error Trace: /pkg/service-connection-leak_test.go:27
Error: Not equal:
expected: 0
actual : 1
Test: Test_ConnectionLeak
Messages: expected no connections in use
--- FAIL: Test_ConnectionLeak (6.69s)
Expected :0
Actual :1
<Click to see difference>
FAIL
提取事务层
为了解决连接泄漏问题,一种方法是修复失败的部分。通过手动测试服务并审查代码来关闭连接。但是同样的问题可能会在未来再次出现。正确的方法是提取事务层。这个提取的想法很简单:我们提供一个公共方法
func InTx(ctx context.Context, db *sqlx.DB, txFunc func(*TxWrap) error) (err error)
其中 txFunc
参数接受基于事务的业务逻辑。
采用这种方法,开发人员不再需要手动处理事务,因为 InTx
方法将事务机制从业务逻辑中抽象出来。通过将 txFunc
参数传递给 InTx
,开发人员可以专注于实际的业务操作,而不必担心底层的事务管理。
出于博客目的,我保持代码简单并避免任何复杂性。
package db
import (
"context"
"database/sql"
"github.com/jmoiron/sqlx"
)
// TxWrap is a wrapper around sqlx.Tx that adds a context
// and redirects calls to methods like Get, Select to GetContext and SelectContext
// with the context it wraps.
type TxWrap struct {
tx *sqlx.Tx // underlying transaction
ctx context.Context // context to use for all calls
}
// Get is a wrapper around sqlx.Tx.GetContext
// that uses the context it wraps.
func (tx *TxWrap) Get(dest interface{}, query string, args ...interface{}) error {
return tx.tx.GetContext(tx.ctx, dest, query, args...)
}
// Select is a wrapper around sqlx.Tx.SelectContext
// that uses the context it wraps.
func (tx *TxWrap) Select(dest interface{}, query string, args ...interface{}) error {
return tx.tx.SelectContext(tx.ctx, dest, query, args...)
}
// IMPLEMENT OTHER RELATED sqlx Methods to use wrapped context
// InTx executes a function in a transaction.
// If the function returns an error, the transaction is rolled back.
// If the function panics, the transaction is rolled back and the panic is re-raised.
// If the function returns nil, the transaction is committed.
func InTx(ctx context.Context, db *sqlx.DB, txFunc func(*TxWrap) error) error {
var err error
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}
txWrap := &TxWrap{
tx: tx,
ctx: ctx,
}
defer func() {
if p := recover(); p != nil {
_ = txWrap.tx.Rollback()
panic(p)
}
if err != nil {
_ = txWrap.tx.Rollback()
return
}
err = txWrap.tx.Commit()
}()
return txFunc(txWrap)
}
InTx
在 TxWrap
结构中围绕实际的 sqlx.Tx
,并在该方法中封装了事务管理逻辑。InTx
负责调用 Begin、Commit 和 Rollback
。
TxWrap
结构还包含一个派生的取消上下文(ctx
),它在我的上一篇文章中讨论,以确保在方法调用结束时发生上下文取消。在 InTx
方法内部,我们有一个 defer
块,处理事务逻辑的三种可能结果:
- Panic:如果发生任何未处理的异常,事务将自动回滚。
- Error:如果事务期间发生任何错误,它也将被回滚。
- Success:如果一切如预期那样,事务将提交。
为了提供进一步的安全性,我们还实现了与
sqlx
库中相同的Get
和Select
函数,但我们将这些函数的调用代理到这些函数的上下文版本。这确保了如果上下文被取消,则取消任何正在进行的请求,如果客户端取消 HTTP 请求,则取消任何正在进行的请求。现在真正的刺激开始了!测试!🚀 🥹
我正在进行单元测试和集成测试两种方法。你可以选择你想用的方法。我更喜欢集成测试。它们模拟了接近真实基础架构的行为。
单元测试
对于单元测试,我使用 sqlmock
。我按照我们的代码的三种行为设置期望,并断言是否满足期望。我还检查连接计数器在结束时是否重置为 0
。单元测试非常快,因此我们还在每个测试中使用 t.Parallel
并初始化一个新的 sqlmock
。
// ------------------------------ UNIT Test ------------------------------
func Test_Unit(t *testing.T) {
t.Parallel()
tests := []struct {
name string
fn func(tx *TxWrap) error
setup func(mock sqlmock.Sqlmock)
wantErr bool
wantPanic bool
}{
{
name: "success path",
fn: func(tx *TxWrap) error {
return nil
},
setup: func(mock sqlmock.Sqlmock) {
mock.ExpectBegin()
mock.ExpectCommit()
},
},
{
name: "failure path",
fn: func(tx *TxWrap) error {
return errors.New("some error")
},
setup: func(mock sqlmock.Sqlmock) {
mock.ExpectBegin()
mock.ExpectRollback()
},
wantErr: true,
},
{
name: "panic",
fn: func(tx *TxWrap) error {
panic("some panic")
return nil
},
setup: func(mock sqlmock.Sqlmock) {
mock.ExpectBegin()
mock.ExpectRollback()
},
wantPanic: true,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
t.Parallel()
db, mock, err := sqlmock.New()
require.NoError(t, err)
dbx := sqlx.NewDb(db, "sqlmock")
if test.setup != nil {
test.setup(mock)
}
// Only add this defer when we expect panic to take over the
// panic recovery and see if there is a valid error
if test.wantPanic {
defer func() {
require.NotNil(t, recover())
}()
}
err = InTx(context.Background(), dbx, test.fn)
require.Equal(t, test.wantErr, err != nil)
require.NoError(t, mock.ExpectationsWereMet())
stats := dbx.Stats()
require.Equal(t, 0, stats.InUse)
require.Equal(t, 0, stats.MaxOpenConnections)
})
}
}
集成测试
对于下面的集成测试,我使用的是 Postgres DB。首先我创建一个虚拟的 Employees
表。然后我对不同的情况执行一些插入和选择语句。最后,检查每个测试是否关闭了连接。
需要注意的一点是我没有使用 t.Parallel
。并行运行测试中的共享连接会导致问题。InUse
和 MaxOpenConnections
将永远不会是 0
。你可以像在单元测试中所做的那样为每个测试创建 单独的连接:
// ---------------------------------- INTEGRATION TEST -------------------------------------
type Employee struct {
ID int64 `db:"id"`
Name string `db:"name"`
}
func Test_Integration(t *testing.T) {
pg, err := apptest.StartTestPostgres(t)
require.NoError(t, err)
_, err = pg.DB.Exec("CREATE TABLE IF NOT EXISTS employee (id serial PRIMARY KEY, name varchar(25) NOT NULL)")
require.NoError(t, err)
tests := []struct {
name string
txfunc func(tx *TxWrap) error
wantErr bool
wantPanic bool
}{
{
name: "success path",
txfunc: func(tx *TxWrap) error {
_, err := tx.Exec("INSERT INTO employee (name) VALUES ('John Doe')")
return err
},
},
{
name: "failure path",
txfunc: func(tx *TxWrap) error {
var employee Employee
err := tx.Get(&employee, "SELECT * FROM employee WHERE id = $1", 100)
return err
},
wantErr: true,
},
{
name: "panic",
txfunc: func(tx *TxWrap) error {
panic("some panic")
return nil
},
wantPanic: true,
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
// Wrap the function in a defer to catch panics
// and assert that the panic is not nil.
defer func() {
if test.wantPanic {
require.NotNil(t, recover())
}
stats := pg.DB.Stats()
require.Equal(t, 0, stats.InUse)
require.Equal(t, 0, stats.MaxOpenConnections)
}()
err = InTx(context.Background(), pg.DB, test.txfunc)
require.Equal(t, test.wantErr, err != nil)
})
}
}
更新服务
我们已经准备好了新的事务层。让我们更改旧代码以使用这个新层。
// ------------------------------ Repository ------------------------------
type txRepo struct {
}
// GetSubscription is a repository method that does not leak connections
// it uses *TxWrap to wrap the transaction
// it uses the context to cancel the transaction if the context is canceled
// but the context is inside the *TxWrap and not exposed to the service
func (r *txRepo) GetSubscription(tx *db.TxWrap, id int64) (Subscription, error) {
var sub Subscription
err := tx.Get(&sub, "SELECT * FROM subscription WHERE id = $1", id)
if err != nil {
return sub, err
}
return sub, nil
}
func (r *txRepo) CancelSubscription(tx *db.TxWrap, id int64) (Subscription, error) {
var sub Subscription
err := tx.Get(&sub, "UPDATE subscription SET canceled_at = NOW(), status='canceled' WHERE id = $1 RETURNING *", id)
if err != nil {
return sub, err
}
return sub, nil
}
// ------------------------------ Service ------------------------------
type txService struct {
db *sqlx.DB
repo *txRepo
}
// CancelSubscriptionWithoutLeak is a service method that does not leak connections
// it uses InTx helper to wrap the transaction
func (s *txService) CancelSubscriptionWithoutLeak(ctx context.Context, id int64) (*Subscription, error) {
var sub Subscription
var err error
// So cool!!!!!!!! 🎸
err = db.InTx(ctx, s.db, func(tx *db.TxWrap) error {
sub, err = s.repo.GetSubscription(tx, id)
if err != nil {
return err
}
if sub.Status != "active" {
return nil
}
if sub.CanceledAt.Valid {
return nil
}
sub, err = s.repo.CancelSubscription(tx, id)
if err != nil {
return err
}
return nil
})
return &sub, err
}
func Test_NoConnectionLeak(t *testing.T) {
pg, err := apptest.StartTestPostgres(t)
require.NoError(t, err)
_, err = pg.DB.Exec("CREATE TABLE IF NOT EXISTS subscription (id serial PRIMARY KEY, status varchar(25) NOT NULL, canceled_at timestamp NULL)")
require.NoError(t, err)
_, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('active', NULL)")
require.NoError(t, err)
_, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('canceled', '2023-02-02 01:00:00')")
require.NoError(t, err)
subscription, err := NewTxService(pg.DB, &txRepo{}).CancelSubscriptionWithoutLeak(context.Background(), 2)
require.NoError(t, err)
stats := pg.DB.Stats()
require.Equal(t, 0, stats.InUse, "expected no connections in use")
require.Equal(t, 0, stats.MaxOpenConnections, "expected no max open connection")
require.Equal(t, "canceled", subscription.Status)
require.Equal(t, "2023-02-02 01:00:00 +0000 +0000", subscription.CanceledAt.Time.String())
}
-------------
=== RUN Test_NoConnectionLeak
--- PASS: Test_NoConnectionLeak (5.61s)
PASS
上面的测试是我们在连接泄漏示例中使用的确切测试。我必须为新服务和新导入进行调整。测试中的其他所有内容都是相同的。
你可以看到,当我们将服务更改为使用新的事务层时,我们的同一个测试变成了绿色。🥳🥳🥳
我们需要事务层吗?
在旧代码中,我很难使用事务层创建连接泄漏。几乎不可能出现任何问题。
尽管如此,我觉得测试会是一个挑战。DBStruct
为测试提供了一种简单的方法。
在事务内有不同的业务操作时,一行代码就可能在生产中引起问题和问题。但是一个经过考验的单独的层将保护你免受任何问题的困扰。此外,相同的层可以与不同的流共享,以避免重复代码。
因此,如果你还没有提取事务逻辑,请务必这样做!你可以编写自己的库或使用现成的库。我也会在我的 GitHub 上发布这个库,我会和你分享的 🚀
结论
通过这个证明,我非常有信心不会再引起与事务相关的生产问题。我希望这篇博客能帮助你学习关于 Golang 的新知识。
我的一些最初的假设被证明是错误的。测试可以让我回答层是否有效,是否存在连接泄漏。
我要感谢所有的读者 🙏。你们的回复和建议让我感到非常有动力。
当人们阅读博客并以某种方式帮助社区时,这种感觉非常好。如果你有任何建议或想联系我,请使用 AMA
评论(0)