首页
Preview

防止 Golang 中的数据库连接泄漏:从十亿美元的错误中吸取教训

这篇文章主要讲解我们为什么需要一个独立的事务层?如何将其抽象出来,并使用单元测试和集成测试进行测试。

在我之前的博客中,《十亿美元的 Go 错误》,我讨论了 Golang 中常见但是天真的错误,可能会导致连接泄漏。虽然我提供了几种解决此问题的方法,但仍有一个问题困扰着我和我的读者。我们可以通过将事务机制抽象到不同的层中来解决此问题。许多人与我联系以建议此解决方案。

因此,我有一个好奇的问题:

我们真的需要一个事务层吗?

我阅读了几篇其他博客和 GitHub 代码。我注意到其中许多都很好,但没有包括证明抽象层有效的测试。但是,如果没有测试,我们无法确认事务层是否有效。连接泄漏可能会发生,我们无法通过检查代码来检测它。

这导致了我另一个重要的问题:

“如何证明新层没有连接泄漏?”

测试是验证某些东西是否按预期工作的唯一方法。我决定从我脑海中想到的两个简单选项开始:

  • 我将运行一些更改表的查询,并验证代码将数据提交到数据库。
  • 我将使用一些公开连接信息的机制。我可以验证代码在最后关闭连接。 第一种方法很简单,我在我的项目中使用了很多 😅。但是,第二种方法对我来说完全是未知的 👾。当我在理解事物如何工作方面遇到困难时,我通常会查看语言的源代码。

为了解决我的问题,我在 Golang 的标准数据库库代码中寻求帮助。我发现了一些可以帮助我的东西:DBStats 结构体。

// DBStats contains database statistics.
type DBStats struct {
 MaxOpenConnections int // Maximum number of open connections to the database.

 // Pool Status
 OpenConnections int // The number of established connections both in use and idle.
 InUse           int // The number of connections currently in use.
 Idle            int // The number of idle connections.

 // Counters
 WaitCount         int64         // The total number of connections waited for.
 WaitDuration      time.Duration // The total time blocked waiting for a new connection.
 MaxIdleClosed     int64         // The total number of connections closed due to SetMaxIdleConns.
 MaxIdleTimeClosed int64         // The total number of connections closed due to SetConnMaxIdleTime.
 MaxLifetimeClosed int64         // The total number of connections closed due to SetConnMaxLifetime.
}

这正是我正在寻找的解决方案。事务完成后,“MaxOpenConnections”和“InUse”计数应为“0”。如果不是这种情况,则表示抽象层中可能存在潜在泄漏。

我很高兴找到了解决第二个场景的理想解决方案 🎉

要访问 DBStats,我们可以在 sql.DB 实例上使用 Stats() 方法,如下所示:

db, _ := sqlx.Open("postgres", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
db.Stats()

Stats() 方法的源代码如下所示。你可以通过在这里跟踪源代码 here 来了解它的工作方式以及 go 代码如何记录信息。

// Stats returns database statistics.
func (db *DB) Stats() DBStats {
 wait := db.waitDuration.Load()

 db.mu.Lock()
 defer db.mu.Unlock()

 stats := DBStats{
  MaxOpenConnections: db.maxOpen,

  Idle:            len(db.freeConn),
  OpenConnections: db.numOpen,
  InUse:           db.numOpen - len(db.freeConn),

  WaitCount:         db.waitCount,
  WaitDuration:      time.Duration(wait),
  MaxIdleClosed:     db.maxIdleClosed,
  MaxIdleTimeClosed: db.maxIdleTimeClosed,
  MaxLifetimeClosed: db.maxLifetimeClosed,
 }
 return stats
}

这已经足够让我实现事务层。因此,现在让我们深入了解代码。但在我们开始之前,让我向你介绍一些我将在代码中使用的库:

有趣的是,在事务层中很难再现连接泄漏的情况。相信我,我尝试过并失败了。😂

我将展示如何使用 DBStats 断言测试旧代码。通过这个示例,那些不会抽象事务层的人可以更新他们的测试,以避免任何连接泄漏。稍后,我们将探讨如何提取事务层并对其进行测试。

package apptest

import (
 "context"
 "database/sql"
 "github.com/jmoiron/sqlx"
)

type Subscription struct {
 ID         int64        `db:"id"`
 Status     string       `db:"status"`
 CanceledAt sql.NullTime `db:"canceled_at"`
}

// ------------------------------ Repository ------------------------------

type srepo struct {
}

// GetSubscription fetches the subscription by id
func (r *srepo) GetSubscription(tx *sqlx.Tx, id int64) (Subscription, error) {
 var sub Subscription
 err := tx.Get(&sub, "SELECT * FROM subscription WHERE id = $1", id)
 if err != nil {
  return sub, err
 }

 return sub, nil
}

// CancelSubscription cancels a given subscription by setting canceled_at to now()
func (r *srepo) CancelSubscription(tx *sqlx.Tx, id int64) (Subscription, error) {
 var sub Subscription
 err := tx.Get(&sub, "UPDATE subscription SET canceled_at = NOW(), status='canceled' WHERE id = $1 RETURNING *", id)
 if err != nil {
  return sub, err
 }

 return sub, nil
}

// ------------------------------ Service ------------------------------

type Service struct {
 db   *sqlx.DB
 repo *srepo
}

func NewService(db *sqlx.DB, repo *srepo) *Service {
 return &Service{repo: repo, db: db}
}

func (s *Service) CancelSubscription(ctx context.Context, id int64) (*Subscription, error) {
 tx, err := s.db.BeginTxx(ctx, nil)
 if err != nil {
  return nil, err
 }

 defer func() {
  // !!! This would not work if the subscriptions is already canceled 
  // and the error is not returned
  if err != nil {
   _ = tx.Rollback()
   return
  }
 }()

 sub, err := s.repo.GetSubscription(tx, id)
 if err != nil {
  return nil, err
 }

 if sub.Status != "active" {
  return &sub, nil
 }

 if sub.CanceledAt.Valid {
  return &sub, nil
 }

 sub, err = s.repo.CancelSubscription(tx, id)
 if err != nil {
  return nil, err
 }

 err = tx.Commit()

 return &sub, err
}

上述代码有什么问题?

当订阅已经取消时,它将返回而不出错。当函数返回而不出错时,连接既不回滚也不提交。这会导致连接泄漏。

以下是集成测试,以查看我们如何捕获泄漏。

func Test_ConnectionLeak(t *testing.T) {
 pg, err := apptest.StartTestPostgres(t) // Please use the source code to learn more about this code
 require.NoError(t, err)

 _, err = pg.DB.Exec("CREATE TABLE IF NOT EXISTS subscription (id serial PRIMARY KEY, status varchar(25) NOT NULL, canceled_at timestamp NULL)")
 require.NoError(t, err)

 _, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('active', NULL)")
 require.NoError(t, err)

 _, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('canceled', '2023-02-02 01:00:00')")
 require.NoError(t, err)

 subscription, err := NewService(pg.DB, &srepo{}).CancelSubscription(context.Background(), 2)
 require.NoError(t, err)

 stats := pg.DB.Stats()
 require.Equal(t, 0, stats.InUse, "expected no connections in use")
 require.Equal(t, 0, stats.MaxOpenConnections, "expected no max open connection")

 require.Equal(t, "canceled", subscription.Status)
 require.Equal(t, "2023-02-02 01:00:00 +0000 +0000", subscription.CanceledAt.Time.String())
}

我连接到 Docker 中的 Postgres DB 实例并运行测试。从下面的测试结果中,你可以看到代码存在事务问题。实际计数 InUse1。在函数调用结束时,连接未关闭。这是我们将在下一节中使用事务抽象层解决的问题。

=== RUN   Test_ConnectionLeak
    service-connection-leak_test.go:27: 
         Error Trace: /pkg/service-connection-leak_test.go:27
         Error:       Not equal: 
                      expected: 0
                      actual  : 1
         Test:        Test_ConnectionLeak
         Messages:    expected no connections in use
--- FAIL: Test_ConnectionLeak (6.69s)

Expected :0
Actual   :1
<Click to see difference>

FAIL

提取事务层

为了解决连接泄漏问题,一种方法是修复失败的部分。通过手动测试服务并审查代码来关闭连接。但是同样的问题可能会在未来再次出现。正确的方法是提取事务层。这个提取的想法很简单:我们提供一个公共方法

func InTx(ctx context.Context, db *sqlx.DB, txFunc func(*TxWrap) error) (err error) 

其中 txFunc 参数接受基于事务的业务逻辑。

采用这种方法,开发人员不再需要手动处理事务,因为 InTx 方法将事务机制从业务逻辑中抽象出来。通过将 txFunc 参数传递给 InTx,开发人员可以专注于实际的业务操作,而不必担心底层的事务管理。

出于博客目的,我保持代码简单并避免任何复杂性。

package db

import (
 "context"
 "database/sql"
 "github.com/jmoiron/sqlx"
)

// TxWrap is a wrapper around sqlx.Tx that adds a context
// and redirects calls to methods like Get, Select to GetContext and SelectContext
// with the context it wraps.
type TxWrap struct {
 tx  *sqlx.Tx        // underlying transaction
 ctx context.Context // context to use for all calls
}

// Get is a wrapper around sqlx.Tx.GetContext
// that uses the context it wraps.
func (tx *TxWrap) Get(dest interface{}, query string, args ...interface{}) error {
 return tx.tx.GetContext(tx.ctx, dest, query, args...)
}

// Select is a wrapper around sqlx.Tx.SelectContext
// that uses the context it wraps.
func (tx *TxWrap) Select(dest interface{}, query string, args ...interface{}) error {
 return tx.tx.SelectContext(tx.ctx, dest, query, args...)
}

// IMPLEMENT OTHER RELATED sqlx Methods to use wrapped context

// InTx executes a function in a transaction.
// If the function returns an error, the transaction is rolled back.
// If the function panics, the transaction is rolled back and the panic is re-raised.
// If the function returns nil, the transaction is committed.
func InTx(ctx context.Context, db *sqlx.DB, txFunc func(*TxWrap) error) error {
 var err error
 ctx, cancel := context.WithCancel(ctx)
 defer cancel()

 tx, err := db.BeginTxx(ctx, nil)
 if err != nil {
  return err
 }

 txWrap := &TxWrap{
  tx:  tx,
  ctx: ctx,
 }

 defer func() {
  if p := recover(); p != nil {
   _ = txWrap.tx.Rollback()
   panic(p)
  }
  if err != nil {
   _ = txWrap.tx.Rollback()
   return
  }
  err = txWrap.tx.Commit()
 }()

 return txFunc(txWrap)
}

InTxTxWrap 结构中围绕实际的 sqlx.Tx,并在该方法中封装了事务管理逻辑。InTx 负责调用 Begin、Commit 和 Rollback

TxWrap 结构还包含一个派生的取消上下文(ctx),它在我的上一篇文章中讨论,以确保在方法调用结束时发生上下文取消。在 InTx 方法内部,我们有一个 defer 块,处理事务逻辑的三种可能结果:

  • Panic:如果发生任何未处理的异常,事务将自动回滚。
  • Error:如果事务期间发生任何错误,它也将被回滚。
  • Success:如果一切如预期那样,事务将提交。 为了提供进一步的安全性,我们还实现了与 sqlx 库中相同的 GetSelect 函数,但我们将这些函数的调用代理到这些函数的上下文版本。这确保了如果上下文被取消,则取消任何正在进行的请求,如果客户端取消 HTTP 请求,则取消任何正在进行的请求。现在真正的刺激开始了!测试!🚀 🥹

我正在进行单元测试和集成测试两种方法。你可以选择你想用的方法。我更喜欢集成测试。它们模拟了接近真实基础架构的行为。

单元测试

对于单元测试,我使用 sqlmock。我按照我们的代码的三种行为设置期望,并断言是否满足期望。我还检查连接计数器在结束时是否重置为 0。单元测试非常快,因此我们还在每个测试中使用 t.Parallel 并初始化一个新的 sqlmock

// ------------------------------ UNIT Test ------------------------------
func Test_Unit(t *testing.T) {
 t.Parallel()

 tests := []struct {
  name      string
  fn        func(tx *TxWrap) error
  setup     func(mock sqlmock.Sqlmock)
  wantErr   bool
  wantPanic bool
 }{
  {
   name: "success path",
   fn: func(tx *TxWrap) error {
    return nil
   },
   setup: func(mock sqlmock.Sqlmock) {
    mock.ExpectBegin()
    mock.ExpectCommit()
   },
  },
  {
   name: "failure path",
   fn: func(tx *TxWrap) error {
    return errors.New("some error")
   },
   setup: func(mock sqlmock.Sqlmock) {
    mock.ExpectBegin()
    mock.ExpectRollback()
   },
   wantErr: true,
  },
  {
   name: "panic",
   fn: func(tx *TxWrap) error {
    panic("some panic")
    return nil
   },
   setup: func(mock sqlmock.Sqlmock) {
    mock.ExpectBegin()
    mock.ExpectRollback()
   },
   wantPanic: true,
  },
 }
 for _, test := range tests {
  test := test
  t.Run(test.name, func(t *testing.T) {
   t.Parallel()

   db, mock, err := sqlmock.New()
   require.NoError(t, err)

   dbx := sqlx.NewDb(db, "sqlmock")

   if test.setup != nil {
      test.setup(mock)
   }

   // Only add this defer when we expect panic to take over the 
  // panic recovery and see if there is a valid error
   if test.wantPanic {
    defer func() {
     require.NotNil(t, recover())
    }()
   }

   err = InTx(context.Background(), dbx, test.fn)

   require.Equal(t, test.wantErr, err != nil)
   require.NoError(t, mock.ExpectationsWereMet())

   stats := dbx.Stats()
   require.Equal(t, 0, stats.InUse)
   require.Equal(t, 0, stats.MaxOpenConnections)
  })
 }
}

集成测试

对于下面的集成测试,我使用的是 Postgres DB。首先我创建一个虚拟的 Employees 表。然后我对不同的情况执行一些插入和选择语句。最后,检查每个测试是否关闭了连接。

需要注意的一点是我没有使用 t.Parallel。并行运行测试中的共享连接会导致问题。InUseMaxOpenConnections 将永远不会是 0。你可以像在单元测试中所做的那样为每个测试创建 单独的连接

// ---------------------------------- INTEGRATION TEST -------------------------------------
type Employee struct {
 ID   int64  `db:"id"`
 Name string `db:"name"`
}

func Test_Integration(t *testing.T) {
 pg, err := apptest.StartTestPostgres(t)
 require.NoError(t, err)

 _, err = pg.DB.Exec("CREATE TABLE IF NOT EXISTS employee (id serial PRIMARY KEY, name varchar(25) NOT NULL)")
 require.NoError(t, err)

 tests := []struct {
  name      string
  txfunc    func(tx *TxWrap) error
  wantErr   bool
  wantPanic bool
 }{
  {
   name: "success path",
   txfunc: func(tx *TxWrap) error {
    _, err := tx.Exec("INSERT INTO employee (name) VALUES ('John Doe')")
    return err
   },
  },
  {
   name: "failure path",
   txfunc: func(tx *TxWrap) error {
    var employee Employee
    err := tx.Get(&employee, "SELECT * FROM employee WHERE id = $1", 100)
    return err
   },
   wantErr: true,
  },
  {
   name: "panic",
   txfunc: func(tx *TxWrap) error {
    panic("some panic")

    return nil
   },
   wantPanic: true,
  },
 }
 for _, test := range tests {
  test := test
  t.Run(test.name, func(t *testing.T) {

   // Wrap the function in a defer to catch panics
   // and assert that the panic is not nil.
   defer func() {
    if test.wantPanic {
     require.NotNil(t, recover())
    }
    stats := pg.DB.Stats()
    require.Equal(t, 0, stats.InUse)
    require.Equal(t, 0, stats.MaxOpenConnections)
   }()

   err = InTx(context.Background(), pg.DB, test.txfunc)

   require.Equal(t, test.wantErr, err != nil)
  })
 }
}

更新服务

我们已经准备好了新的事务层。让我们更改旧代码以使用这个新层。

// ------------------------------ Repository ------------------------------

type txRepo struct {
}

// GetSubscription is a repository method that does not leak connections
// it uses *TxWrap to wrap the transaction
// it uses the context to cancel the transaction if the context is canceled
// but the context is inside the *TxWrap and not exposed to the service
func (r *txRepo) GetSubscription(tx *db.TxWrap, id int64) (Subscription, error) {
 var sub Subscription
 err := tx.Get(&sub, "SELECT * FROM subscription WHERE id = $1", id)
 if err != nil {
  return sub, err
 }

 return sub, nil
}

func (r *txRepo) CancelSubscription(tx *db.TxWrap, id int64) (Subscription, error) {
 var sub Subscription
 err := tx.Get(&sub, "UPDATE subscription SET canceled_at = NOW(), status='canceled' WHERE id = $1 RETURNING *", id)
 if err != nil {
  return sub, err
 }

 return sub, nil
}

// ------------------------------ Service ------------------------------

type txService struct {
 db   *sqlx.DB
 repo *txRepo
}

// CancelSubscriptionWithoutLeak is a service method that does not leak connections
// it uses InTx helper to wrap the transaction
func (s *txService) CancelSubscriptionWithoutLeak(ctx context.Context, id int64) (*Subscription, error) {
 var sub Subscription
 var err error

// So cool!!!!!!!! 🎸
 err = db.InTx(ctx, s.db, func(tx *db.TxWrap) error {
  sub, err = s.repo.GetSubscription(tx, id)
  if err != nil {
   return err
  }

  if sub.Status != "active" {
   return nil
  }

  if sub.CanceledAt.Valid {
   return nil
  }

  sub, err = s.repo.CancelSubscription(tx, id)
  if err != nil {
   return err
  }

  return nil
 })

 return &sub, err
}

func Test_NoConnectionLeak(t *testing.T) {
 pg, err := apptest.StartTestPostgres(t)
 require.NoError(t, err)

 _, err = pg.DB.Exec("CREATE TABLE IF NOT EXISTS subscription (id serial PRIMARY KEY, status varchar(25) NOT NULL, canceled_at timestamp NULL)")
 require.NoError(t, err)

 _, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('active', NULL)")
 require.NoError(t, err)

 _, err = pg.DB.Exec("INSERT INTO subscription (status, canceled_at) VALUES ('canceled', '2023-02-02 01:00:00')")
 require.NoError(t, err)

 subscription, err := NewTxService(pg.DB, &txRepo{}).CancelSubscriptionWithoutLeak(context.Background(), 2)
 require.NoError(t, err)

 stats := pg.DB.Stats()
 require.Equal(t, 0, stats.InUse, "expected no connections in use")
 require.Equal(t, 0, stats.MaxOpenConnections, "expected no max open connection")

 require.Equal(t, "canceled", subscription.Status)
 require.Equal(t, "2023-02-02 01:00:00 +0000 +0000", subscription.CanceledAt.Time.String())
}

-------------

=== RUN   Test_NoConnectionLeak
--- PASS: Test_NoConnectionLeak (5.61s)
PASS

上面的测试是我们在连接泄漏示例中使用的确切测试。我必须为新服务和新导入进行调整。测试中的其他所有内容都是相同的。

你可以看到,当我们将服务更改为使用新的事务层时,我们的同一个测试变成了绿色。🥳🥳🥳

我们需要事务层吗?

在旧代码中,我很难使用事务层创建连接泄漏。几乎不可能出现任何问题。

尽管如此,我觉得测试会是一个挑战。DBStruct 为测试提供了一种简单的方法。

在事务内有不同的业务操作时,一行代码就可能在生产中引起问题和问题。但是一个经过考验的单独的层将保护你免受任何问题的困扰。此外,相同的层可以与不同的流共享,以避免重复代码。

因此,如果你还没有提取事务逻辑,请务必这样做!你可以编写自己的库或使用现成的库。我也会在我的 GitHub 上发布这个库,我会和你分享的 🚀

结论

通过这个证明,我非常有信心不会再引起与事务相关的生产问题。我希望这篇博客能帮助你学习关于 Golang 的新知识。

我的一些最初的假设被证明是错误的。测试可以让我回答层是否有效,是否存在连接泄漏。

我要感谢所有的读者 🙏。你们的回复和建议让我感到非常有动力。

当人们阅读博客并以某种方式帮助社区时,这种感觉非常好。如果你有任何建议或想联系我,请使用 AMA

译自: https://www.iamninad.com

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论