首页
Preview

使用Go从头构建TCP连接池

作为一名初级软件工程师,我的入门任务是使用Go实现一个用户注册系统。

听起来很简单,对吧?但是,有性能要求和设计约束。

首先,我需要将所有业务逻辑放在TCP服务器中,并通过网络连接与面向客户端的HTTP服务器进行交互。

在此基础上,我不允许使用任何远程过程调用(RPC) 框架,如Go的net/rpc或Google的gRPC。我需要创建自己的逻辑来处理两个服务器之间的连接。

其次,整个系统需要支持1,000个并发HTTP请求和每秒3,000个登录请求,而且没有缓存。数据库中还需要有1千万个现有用户。

破解这个入门任务的关键是正确地管理HTTP和TCP服务器之间的网络连接。如果可用连接太少,你将会遇到请求速度缓慢的问题。如果连接太多,你的服务器将会遇到连接问题。

在本文中,我将与你分享如何构建自定义连接池以实现正确的平衡。我将解释设计概述并附上代码片段。让我们开始吧! 🏃‍♂️

什么是连接池?

连接池是服务器中的一组缓存连接,可以在将来再次使用。

通常,打开一个新的网络连接是昂贵的。以TCP连接为例,客户端和服务器需要执行三次握手。这是一项消耗数据带宽并引起往返延迟的昂贵操作。

通过连接池,我们可以将空闲的连接保持空闲,以便在等待处理另一个请求时再次使用,而不是在使用后关闭连接。这节省了创建新连接所需的资源。

然而,太多的任何东西都是不好的。维护打开的连接也会占用服务器资源。在打开连接和重用旧连接之间需要保持正确的平衡。

连接池需要知道何时创建、保留或删除连接。

在本文中,我们将重点介绍使用两个核心方法getput构建简单连接池。

put方法很简单。一旦服务器完成连接,池就会尝试“保存”空闲连接。如果池已满,则关闭连接。

放置逻辑

get方法稍微复杂一些。除了重用空闲连接和打开新连接之外,如果不能立即返回连接,则此方法还可以排队连接请求。

获取逻辑

Go中的TCP连接

在我们深入池(双关语)之前,让我快速展示一些使用Go与net包建立TCP连接的代码片段。

conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
    // handle error
}

我们可以使用Dial函数从客户端创建到服务器的连接。

ln, err := net.Listen("tcp", "localhost:8080")
if err != nil {
    // handle error
}
for {
    conn, err := ln.Accept()
    if err != nil {
	// handle error
    }
    go handleConnection(conn)
}

同时,Listen函数启动一个连接服务器。Accept方法等待传入请求并返回连接。我们可以使用Goroutines并发处理多个请求。

我们可以从底层连接中读取数据。bufio.ReadBytesio.ReadFull是许多读取数据的方法之一。

// Reads until the first occurrence of delimiter '\n'
r := bufio.NewReader(conn)
data, err := r.ReadBytes('\n')

// Reads exactly len(buf) bytes from conn into buf
buf := make([]byte, 100)
n, err = io.ReadFull(conn, buf)

将数据写入连接更容易。我们可以使用conn.Write方法。

// Write bytes to a connection
data := "hello tcp"
n, err := conn.Write([]byte(data))

这些函数足以让我们开始!现在,我们可以继续构建我们的功能TCP连接池。

构建连接池

在其核心,连接池必须具有两个配置设置:maxIdleCountmaxOpenCount

maxOpenCount限制池可以打开的连接总数,而maxIdleCount控制池中空闲连接的数量。这两个设置共同设置打开新连接和重用旧连接之间的平衡。

这是我的连接池,作为Go结构体。

// TcpConnPool represents a pool of tcp connections
type TcpConnPool struct {
    host         string
    port         int
    mu           sync.Mutex           // mutex to prevent race conditions
    idleConns    map[string]*tcpConn  // holds the idle connections
    numOpen      int                  // counter that tracks open connections
    maxOpenCount int
    maxIdleCount int
}

请注意,我使用tcpConn表示TCP连接。它是一个自定义包装器,包含底层的net.Conn和对池的引用。

// tcpConn is a wrapper for a single tcp connection
type tcpConn struct {
    id   string       // A unique id to identify a connection
    pool *TcpConnPool // The TCP connecion pool
    conn net.Conn     // The underlying TCP connection
}

现在,我们可以编写我们的getput方法了。在 put 方法中,我们首先检查连接池是否已经达到最大空闲连接数。如果是,我们关闭连接并将 numOpen 计数器减少。如果不是,我们将连接放入连接池中。

另一方面,get 方法比较棘手,因为它需要在无法立即满足连接请求时排队等待。

// put() attempts to return a used connection back to the pool
// It closes the connection if it can't do so
func (p *TcpConnPool) put(c *tcpConn) {
    p.mu.Lock()
    defer p.mu.Unlock()

    if p.maxIdleCount > 0 && p.maxIdleCount > len(p.idleConns) {
        p.idleConns[c.id] = c // put into the pool
    } else {
        c.conn.Close()
        c.pool.numOpen--
    }
}

我们可以使用 Go 的通道来满足这个要求。让我介绍一个叫做 connRequest 的新结构体。它包含一个通道,当请求被满足时,通道会接收到一个 tcpConn。然后,我们还在连接池中添加了一个新的字段 requestChan,它是一个连接请求的队列。

// connRequest wraps a channel to receive a connection
// and a channel to receive an error
type connRequest struct {
    connChan chan *tcpConn
    errChan  chan error
}

type TcpConnPool struct {
    // ...
    // A queue of connection requests
    requestChan  chan *connRequest
}

如果这还是让你感到困惑的话,别担心。我们可以分步骤实现 get 方法。首先,我们处理连接池中有空闲连接的最简单情况。

// get() retrieves a TCP connection
func (p *TcpConnPool) get() (*tcpConn, error) {
    p.mu.Lock()
  
    // Case 1: Gets a free connection from the pool if any
    numIdle := len(p.idleConns)
    if numIdle > 0 {
        // Loop map to get one conn
        for _, c := range p.idleConns {
            // remove from pool
            delete(p.idleConns, c.id)
            p.mu.Unlock()
            return c, nil
        }
    }
    
    // ...
}

接下来,当没有更多的空闲连接并且我们不能打开更多的连接时,我们创建一个连接请求。同样,我们利用 Go 通道来创建请求队列。

func (p *TcpConnPool) get() (*tcpConn, error) {

    // ...
    
    // Case 2: Queue a connection request
    if p.maxOpenCount > 0 && p.numOpen >= p.maxOpenCount {
        // Create the request
        req := &connRequest{
            connChan: make(chan *tcpConn, 1),
            errChan:  make(chan error, 1),
        }
      
        // Queue the request
        p.requestChan <- req
      
        p.mu.Unlock()
      
        // Waits for either
        // 1. Request fulfilled, or
        // 2. An error is returned
        select {
            case tcpConn := <-req.connChan:
                return tcpConn, nil
            case err := <-req.errChan:
                return nil, err
        }
    }

    // ...
}

一旦我们将连接请求发送到队列中,代码块就会阻塞,直到其中一个情况完成。如果接收到连接,则返回它。否则,返回错误。

最后,当没有更多的空闲连接并且打开的连接数少于允许的最大连接数时,我们打开一个新的连接。

func (p *TcpConnPool) get() (*tcpConn, error) {
  
    // ...
    
    // Case 3: Open a new connection
    p.numOpen++
    p.mu.Unlock()
  
    newTcpConn, err := p.openNewTcpConnection()
    if err != nil {
        p.mu.Lock()
        p.numOpen--
        p.mu.Unlock()
        return nil, err
    }
    
    return newTcpConn, nil
}

// openNewTcpConnection() creates a new TCP connection at p.host and p.port
func (p *TcpConnPool) openNewTcpConnection() (*tcpConn, error) {
    addr := fmt.Sprintf("%s:%d", p.host, p.port)
  
    c, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
  
    return &tcpConn{
        // Use unix time as id
        id:   fmt.Sprintf("%v", time.Now().UnixNano()),
        conn: c,
        pool: p,
    }, nil
}

注意,在创建新的 TCP 连接之前,我会先增加 numOpen 计数器。这是因为打开新连接可能会有延迟。为了不阻塞其他请求,代码首先假定连接成功,以便可以释放互斥锁。如果连接无法打开,我们会减少计数器并返回错误。

现在我们已经完成了 get 方法的实现。问题是,如何满足连接请求呢?

我们将为此创建一个名为 handleConnectionRequest 的新方法。它作为一个 worker 在单独的 goroutine 中永久运行。让我演示一下。

// handleConnectionRequest() listens to the request queue
// and attempts to fulfil any incoming requests
func (p *TcpConnPool) handleConnectionRequest() {
    for req := range p.requestChan {
        var (
            requestDone = false
            hasTimeout  = false

            // start a 3-second timeout
            timeoutChan = time.After(3 * time.Second)
        )

        for {
            if requestDone || hasTimeout {
                break
            }
            select {
            // request timeout
            case <-timeoutChan:
                hasTimeout = true
                req.errChan <- errors.New("connection request timeout")
            default:
                // 1. get idle conn or open new conn
                // 2. if success, pass conn into req.conn. requestDone!
                // 3. if fail, we retry until timeout
            }
        }
    }
}

worker 不断监听队列中的请求。当有请求时,它开始一个 3 秒的超时计时器。然后,它尝试满足请求,直到超时或请求完成。

默认情况下,它与我们之前的 get 逻辑非常相似。我们只需要将连接传递到 connChan 中,并在成功时将 requestDone 设置为 true。如果不成功,则进入下一个循环并重试该过程,直到超时。


// ...

default:
    p.mu.Lock()

    // First, we try to get an idle conn.
    // If fail, we try to open a new conn.
    // If both does not work, we try again in the next loop until timeout.
    numIdle := len(p.idleConns)
    if numIdle > 0 {
        for _, c := range p.idleConns {
            delete(p.idleConns, c.id)
            p.mu.Unlock()
            req.connChan <- c // give conn
            requestDone = true
            break
        }
    } else if p.maxOpenCount > 0 && p.numOpen < p.maxOpenCount {
        p.numOpen++
        p.mu.Unlock()

        c, err := p.openNewTcpConnection()
        if err != nil {
            p.mu.Lock()
            p.numOpen--
            p.mu.Unlock()
        } else {
            req.connChan <- c // give conn
            requestDone = true
        }
    } else {
        p.mu.Unlock()
    }

// ...

有了 worker,请求连接的 goroutine 将从 connChanerrChan 中得到响应。最后,我们可以创建一个函数,用于初始化连接池并在一个 goroutine 中启动 worker。

const maxQueueLength = 10_000

// TcpConfig is a set of configuration for a TCP connection pool
type TcpConfig struct {
    Host         string
    Port         int
    MaxIdleConns int
    MaxOpenConn  int
}

// CreateTcpConnPool() creates a connection pool
// and starts the worker that handles connection request
func CreateTcpConnPool(cfg *TcpConfig) (*TcpConnPool, error) {
    pool := &TcpConnPool{
    host:         cfg.Host,
    port:         cfg.Port,
    idleConns:    make(map[string]*tcpConn),
    requestChan:  make(chan *connRequest, maxQueueLength),
    maxOpenCount: cfg.MaxOpenConn,
    maxIdleCount: cfg.MaxIdleConns,
}

    go pool.handleConnectionRequest()

    return pool, nil
}

太棒了!我们已经讨论了 TCP 连接池。还有一个未解决的问题,如何正确地从 TCP 连接中读取数据呢?

协议

是的,我确实告诉过你 ReadFullReadBytes。这些读取函数的问题在于,我们无法预先知道要读取的数据量有多大。

ReadFull 为例。我们需要传递一个预定义数据长度的字节切片,而这个长度在读取之前是未知的。而且,创建一个非常大的切片,希望它可以涵盖所有可能长度的数据是浪费空间的。

ReadBytes 呢?使用特定字符作为分隔符有漏洞。你永远不知道这个字符何时会出现在你的数据中间。没有保证所选择的分隔符标记了字节流的结尾。

最终,我们希望协议能够提供有关正在通信的数据量的强有力的保证。同时,它还需要足够灵活,以支持不同的数据格式,例如 JSON 和协议缓冲区

要解决这个问题,我们只需要让接收方知道他们需要接收多少数据。这个信息可以作为实际发送的数据的前缀添加。

协议

这个前缀由一定数量的字节组成。在我的例子中,它是 4 个字节,可以存储最大值为 4,294,967,295 的 uint32。这已经足够大,可以满足我的使用场景中的数据长度。

例如,如果我们要发送 8 个字节的数据,则前缀应存储数字 12,即前缀的 4 个字节加上 8 个字节的数据。通过先读取前缀,我们知道需要提供多大的字节切片才能使用 ReadFull 读取实际数据!

让我们创建一个帮助函数,将前缀添加到我们的数据中。请注意,所使用的 binary 包是官方库,用于在数字和字节之间进行转换。

// 4 bytes
const prefixSize = 4

// createTcpBuffer() implements the TCP protocol used in this application
// A stream of TCP data to be sent over has two parts: a prefix and the actual data itself
// The prefix is a fixed length byte that states how much data is being transferred over
func createTcpBuffer(data []byte) []byte {
    // Create a buffer with size enough to hold a prefix and actual data
    buf := make([]byte, prefixSize+len(data))

    // State the total number of bytes (including prefix) to be transferred over
    binary.BigEndian.PutUint32(buf[:prefixSize], uint32(prefixSize+len(data)))

    // Copy data into the remaining buffer
    copy(buf[prefixSize:], data[:])

    return buf
}

要读取数据,我们只需要先读取前缀,然后再读取实际数据。这可以通过 io.ReadFull 完成。

// Read() reads the data from the underlying TCP connection
func (c *tcpConn) Read() ([]byte, error) {
    prefix := make([]byte, prefixSize)

    // Read the prefix, which contains the length of data expected
    _, err := io.ReadFull(c.conn, prefix)
    if err != nil {
        return nil, err
    }

    totalDataLength := binary.BigEndian.Uint32(prefix[:])

    // Buffer to store the actual data
    data := make([]byte, totalDataLength-prefixSize)

    // Read actual data without prefix
    _, err = io.ReadFull(c.conn, data)
    if err != nil {
        return nil, err
    }

    return data, nil
}

将数据写入连接只需调用 conn.Write。我们只需要确保前缀存在,方法是使用 createTcpBuffer

协议就介绍到这里了!虽然简单,但它足以确保正确读取数据量,只要客户端和服务器同意使用相同的前缀大小。

改进空间

我描述的连接池已经足以满足我的入门任务要求(耶!)。但这并不意味着它不能进一步改进。

最大空闲时间

记得我提到过,保持空闲连接在连接池中会消耗服务器资源吗?如果一段时间内没有传入连接,则这是浪费的。为了节省资源,我们可以关闭打开时间过长的空闲连接。

我们可以通过在连接池中添加一个新的配置项 maxIdleTime 来实现这一点。当将连接放入池中时,可以启动一个个人计时器。当它达到 maxIdleTime 时,连接应该自动关闭。

关闭错误连接

读取和写入错误时常常发生。当发生这种错误时,我们应该假定连接是有问题的。我们可以关闭它,而不是将其放回池中。

优雅的关闭

优雅地关闭 Go 服务器是一个好习惯,可以确保所有正在处理的请求在关闭服务器连接之前得到适当的终止。这可以避免突然关闭,从而损害用户体验并可能导致数据丢失。

Go 服务器可以通过首先捕获 SIGINT 和 SIGTERM 信号,然后根据情况采取行动来进行优雅的关闭。连接池可以为服务器提供一个清理函数,在关闭事件发生时调用。

不需要详细解释,清理过程可以等待连接不再使用,然后再关闭它们。还需要一个超时时间,以防连接被卡住并需要“强制关闭”。# 总结

这篇文章到这里就结束了!如果你想更深入地了解连接池的细节,我强烈建议你阅读Go的sql包的源代码。

译自:https://betterprogramming.pub/build-a-tcp-connection-pool-from-scratch-with-go-d7747023fe14

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论