分布式系统中最常见的可靠性模式之一是限制正在处理的任务的速率。这个任务可以是请求或事件处理。这是为了平滑流量的形状和避免流量突发或仅允许最大数量的操作在任何给定时间内进行操作,因为底层系统正在操作。速率限制器模式在负载均衡器、公共API以及不同层次的网络策略中使用。
实现速率限制器的一种方法是一种称为“漏桶”算法的算法。我在面试中也看到过这个算法的编码挑战。虽然我知道这个算法,但我从未自己实现过。通常使用算法实现概念验证可以帮助我更好地理解设计决策和注意事项。在撰写本文时,我利用我以前从未使用过的Go泛型机会进行了演示。
该算法的想法很简单:想象一下有一个正在填充着传入任务的任务桶。这个桶底部也有孔,任务可以从中滴下到任务处理器中。
漏桶算法的图片描述
桶可能处于几种状态之一:空、满或正在处理一些任务但既不为空也不满。只要桶没有满,它就会以恒定的速率将任务交给处理器。当它满时,它应该“溢出”,并且不允许进入任何传入的任务,直到完成一些任务。
这概述了漏桶速率限制器应该如何工作。现在让我们来实现它。
我通常从定义我想向外界公开的接口开始。这些API在开发过程中是流动的,但它们使我保持抽象层次并隐藏实现的复杂性:
package main
// Task is an abstraction that represents a task that can be submitted to the rate
// limiter for processing.
type Task interface {
}
type RateLimiter[T Task] interface {
// Start initializes the rate limiter and starts the intake & processing of
// tasks. Until it's called, no task is processed.
Start()
// Stop stops the intake & processing of tasks. Once Stop is called, until Start
// is called again, the rate limiter stays inactive.
Stop()
}
速率限制器的接口定义
它们还允许作者独立于向用户公开的内容演变实现,并通过统一接口支持不同的选项,例如不同的速率限制策略。虽然目前的Task
可以是任何东西,但我认为从抽象层面开始会更好,以保持其灵活性。例如,将来我们可能决定为任务实现不同的权重-意味着有些任务可能比其他任务更昂贵,并在桶中占用更多空间。现在,我们只是将一切都视为相同。
在相同的思路下,定义用户将与之交互的另一个部分是我的下一步:
漏桶速率限制器构造函数
package main
// NewLeakyBucketRateLimiter returns a RateLimiter that uses the "leaky bucket"
// algorithm to limit the rate tasks added to the input channel are being
// processed. These tasks are passed to the output channel at a maximum rate
// denoted by "ratePerSecond". "input" & "output" may be a buffered channels
// that can be used to control the max concurrency the bucket can be filled
// and emptied.
//
// Once either of these channels are closed, the rate limiter will
// not be able to rate limit & output further tasks.
func NewLeakyBucketRateLimiter[T Task](
ratePerSecond uint,
input <-chan T,
output chan<- T,
) RateLimiter[T] {
// TODO
}
我选择使用Go通道来从速率限制器中提交和读取任务,因为我认为这将产生更简单的实现,更容易理解本文,但这可能不是用于生产的最佳选项。
通道带来的另一个有趣的属性是用户可以控制输入和输出任务的并发级别。这在语言规范中解释如下:
容量以元素数量为单位,设置通道中缓冲区的大小。如果容量为零或不存在,则通道是无缓冲的,只有在发送者和接收者都准备好时通信才会成功。否则,通道是有缓冲的,并且如果缓冲区不满(发送)或不为空(接收),则通信成功而不会阻塞。
你可以将输出通道的容量视为任务从桶中滴下的速率,而输入通道的容量可以视为桶的大小。当我们达到这些限制时,发送器或处理器将被阻止,直到通道不再满为止。我们将看到如何处理桶满的情况。
接下来,我们将研究使用漏桶算法实现接口RateLimiter
。
我们首先定义内部参数,例如我们在执行过程中需要的参数以及保持速率限制器的当前状态的其他参数:
package main
import (
"sync"
"time"
)
type lbLimiter[T Task] struct {
sync.Mutex
interval time.Duration
pollInterval time.Duration
out chan<- T
in <-chan T
running bool
}
这里,interval
表示传递任务到输出通道之间的理想时间,而pollInterval
是从输入通道读取任务的尝试之间的时间。
此外,running
是一个私有变量,指示速率限制器是否“活动”-这意味着它正在尝试从输入和输出通道中读取和写入。由于我们可能正在处理多线程环境,因此我选择使用<a class="af nq" href="https://pkg.go.dev/sync#RWMutex" rel="noopener ugc nofollow" target="_blank">sync.RWMutex</a>
来确保没有并发写入同时检查其状态。
func (l *lbLimiter[T]) Start() {
l.Lock()
defer l.Unlock()
l.running = true
go func() {
for l.isRunning() {
select {
case w := <-l.in:
go func() { l.out <- w }()
time.Sleep(l.interval)
default:
time.Sleep(l.pollInterval)
}
}
}()
}
通过我们的构造函数创建速率限制器后,用户需要调用Start
方法,该方法执行两个主要操作:通过running
字段标记自身为“活动”,并启动执行主任务的goroutine-从输入中读取并将任务传递到输出通道以进行处理。如果输入通道中有待处理的任务,我们将将其传递到输出通道并休眠一段时间,以便我们以每秒ratePerSecond
个任务的速度分发任务。否则,我们将等待一段时间,直到再次尝试从输入通道读取。
在接收操作中阻塞输入通道是另一种选择,但在这种情况下,如果用户想停止速率限制,则需要关闭输入通道以退出阻塞状态。这就是我选择在此处轮询输入通道中的项目的原因。
Stop
方法非常简单:
func (l *lbLimiter[T]) Stop() {
l.Lock()
defer l.Unlock()
l.running = false
}
func (l *lbLimiter[T]) isRunning() bool {
l.RLock()
defer l.RUnlock()
return l.running
}
它设置了running
标志,该标志由轮询输入通道中的任务的goroutine检查。当设置标志时,它将停止轮询并退出。
最后,让我们看一下这个速率限制器如何通过示例应用程序使用:
演示速率限制器使用的示例程序
func main() {
out := make(chan int64)
in := make(chan int64, 10)
// Create a rate limiter that allows processing 5 tasks / second
rl := NewLeakyBucketRateLimiter[int64](5, in, out)
rl.Start()
// Stop processing tasks in 10 seconds and exit the program
go func() {
time.Sleep(10 * time.Second)
rl.Stop()
close(out)
}()
// Try adding tasks at a rate of 10 tasks / seconds
go func() {
for i := 0; ; i++ {
ts := time.Now().Format(time.RFC3339)
select {
case in <- int64(i):
fmt.Printf("[%s] Added one item in the bucket\n", ts)
default:
fmt.Printf("[%s] The bucket is full\n", ts)
}
time.Sleep(time.Millisecond * 100)
}
}()
// Process tasks
t := time.Now()
for c := range out {
now := time.Now()
dt := now.Sub(t)
t = now
fmt.Printf("[%s] Ran the operation %d after %d milliseconds\n",
now.Format(time.RFC3339), c, dt.Milliseconds())
}
}
构建速率限制器后,我们设置了一些准备处理任务的东西:一个goroutine将任务添加到输入通道中,另一个goroutine控制何时停止处理以避免无限运行示例程序。这里需要注意的另一部分是如何处理“满桶”:使用 select
发送任务到输入通道时,我们可以检查输入通道(或桶)是否已满并调整我们的行为。在现实世界的场景中,我们可能会向这些用户返回诸如 429 Too Many Requests 状态码 的 HTTP 响应,表明他们已经超过了速率限制。在这种情况下,我们记录桶已满并在一段时间后继续尝试。
还有其他策略,如 “Token bucket” 算法,可作为漏桶算法的替代方案。如果你有一个有趣的速率限制策略,可以在评论中告诉我,我可以查看并实现它。
你可以在这里找到完整的代码。
译自:https://itnext.io/rate-limiting-with-leaky-bucket-algorithm-using-go-generics-7a5086c02695
评论(0)