本文将介绍基于流行的开源项目hystrix
(实际上,我将查看用golang编写的hystrix-go,而不是用Java编写的原始版本)的“熔断器”模式。在本文的第一部分,我将对“熔断器”进行一般介绍,让你了解它是什么以及为什么它很重要。此外,让我们回顾一下项目hystrix-go
和hystrix
的背景,并使用一个小的演示例子了解其基本用法。
熔断器
分布式架构中的软件通常有许多依赖项,即使是最可靠的服务在某些时候也难免会失败。
如果我们失败的服务变得无响应会发生什么?所有依赖它的服务也有风险变得无响应。这被称为“灾难性的级联失败”。
熔断器背后的基本思想非常简单。熔断器通过包装对目标服务的调用并持续监视失败率来工作。一旦故障达到一定的阈值,熔断器将跳闸,并且对电路的所有进一步调用都将返回故障或错误。
熔断器模式的设计哲学是“快速失败”:当一个服务变得无响应时,依赖它的其他服务应该停止等待它并开始处理失败的事实。通过防止单个服务的故障通过整个系统级联,熔断器模式有助于整个系统的“稳定性”和“弹性”。
熔断器模式可以实现为下面展示的有限状态机:
熔断器
有三个状态:开放
、关闭
和半开放
- 关闭: 请求被传递到目标服务。保持监视错误率、请求数量和超时等指标。当这些指标超过开发者设定的特定阈值时,熔断器会跳闸并转换为“开放”状态。
- 开放: 请求不会传递到目标服务,而是会调用“回退”逻辑(同样由开发者定义)来处理故障。熔断器会在一个被称为“休眠窗口”的时间段内保持“开放”状态,之后熔断器可以从“开放”状态转换为“半开放”状态。
- 半开放: 在此状态下,会向目标服务传递有限数量的请求,旨在重置状态。如果目标服务能够成功响应,则熔断器被“重置”回“关闭”状态。否则,熔断器会再次转换为“开放”状态。
这是关于熔断器的基本背景,你可以在网上找到更多信息。
接下来,让我们了解一下项目hystrix
。
hystrix
hystrix
是一个非常流行的开源项目。你可以在这个链接中找到关于它的一切。
我想从上面的链接中引用几个重要的点。Hystrix的设计目标如下:
- 通过第三方客户端库访问的依赖项(通常是通过网络访问)提供保护和控制延迟和故障。
- 阻止复杂分布式系统中的级联失败。
- 快速失败并快速恢复。
- 在可能的情况下回退和优雅降级。
- 启用接近实时的监控、警报和操作控制。
你可以看到,hystrix
完美地实现了我们在上一节中讨论的熔断器模式的思想,对吗?
hystrix
项目是用Java
开发的。在本文中,我更喜欢使用golang版本的hystrix-go
,它是一个简化版本,但实现了有关熔断器的所有主要设计和思想。
关于hystrix-go
的用法,你可以在这个链接中找到,这很容易理解。而且你可以很容易地在网上找到许多其他文章,其中包含演示例子以展示更多的使用级别的东西。请继续阅读。
在我的文章中,我想从hystrix-go
的源代码入手,深入探讨熔断器
的实现。请继续阅读以下部分。
三种服务降级策略
Hystrix
提供了三种不同的服务降级策略,以避免在整个系统中发生“级联故障”:超时
、最大并发请求数
和请求错误率
。
- 超时: 如果服务调用在预定义的时间段内没有成功返回响应,则将运行回退逻辑。这种策略是最简单的。
- 最大并发请求数: 当并发请求数量超过阈值时,回退逻辑将处理后续请求。
- 请求错误率:
hystrix
将记录每个服务调用的响应状态,当错误率达到阈值时,熔断器将打开,并在熔断器状态改变回到关闭之前执行回退逻辑。错误率
策略是最复杂的。
这可以从hystrix
的基本用法中看到:
import (
"github.com/afex/hystrix-go/hystrix"
"time"
)
hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
Timeout: int(10 * time.Second),
MaxConcurrentRequests: 100,
ErrorPercentThreshold: 25,
})
hystrix.Go("my_command", func() error {
// talk to dependency services
return nil
}, func(err error) error {
// fallback logic when services are down
return nil
})
在上面的用法示例中,你可以看到,超时
设置为10秒,最大请求数为100,错误率阈值为25%。
在消费者应用程序级别上,这几乎是你需要设置的所有配置。hystrix
会在内部使魔术发生。
在本文中,我计划通过查看源代码来展示hystrix
的内部。让我们从简单的最大并发请求
和超时
开始。然后继续探索复杂的策略请求错误率
。
GoC
基于上面的例子,你可以看到Go
函数是进入hystrix
源代码的门,因此我们从下面开始:
func Go(name string, run runFunc, fallback fallbackFunc) chan error {
runC := func(ctx context.Context) error {
return run()
}
var fallbackC fallbackFuncC
if fallback != nil {
fallbackC = func(ctx context.Context, err error) error {
return fallback(err)
}
}
return GoC(context.Background(), name, runC, fallbackC)
}
Go
函数接受三个参数:
- name: 命令名称,它绑定到
hystrix
内创建的电路。 - run: 包含正常逻辑的函数,该函数向依赖服务发送请求。
- fallback: 包含回退逻辑的函数。
Go
函数只是使用Context
包装run
和fallback
,Context
用于控制和取消goroutine。如果你不熟悉它,请参考之前的文章。最后,它将调用GoC
函数。GoC
函数的实现如下:
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
// construct a new command instance
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
// get circuit by command name
circuit, _, err := GetCircuit(name)
if err != nil {
cmd.errChan <- err
return cmd.errChan
}
cmd.circuit = circuit
//declare a condition variable sync.Cond: ticketCond, to synchronize among goroutines
//declare a flag variable: ticketChecked, work together with ticketCond
ticketCond := sync.NewCond(cmd)
ticketChecked := false
// declare a function: returnTicket, will execute when a concurrent request is done to return `ticket`
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
}
// declare a sync.Once instance: returnOnce, make sure the returnTicket function execute only once
returnOnce := &sync.Once{}
// declare another function: reportAllEvent, used to collect the metrics
reportAllEvent := func() {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
if err != nil {
log.Printf(err.Error())
}
}
// launch a goroutine which executes the `run` logic
go func() {
defer func() { cmd.finished <- true }()
if !cmd.circuit.AllowRequest() {
cmd.Lock()
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrCircuitOpen)
reportAllEvent()
})
return
}
cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
reportAllEvent()
})
return
}
runStart := time.Now()
runErr := run(ctx)
returnOnce.Do(func() {
defer reportAllEvent()
cmd.runDuration = time.Since(runStart)
returnTicket()
if runErr != nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")
})
}()
// launch the second goroutine for timeout strategy
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
case <-ctx.Done():
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
case <-timer.C:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()
return cmd.errChan
}
我承认它很复杂,但它也是整个hystrix
项目的核心。耐心一点,我们来仔细地逐个部分地回顾它。
首先,GoC
函数的代码结构如下:
GoC
- 构建一个新的
Command
对象,其中包含每次调用GoC
函数的所有信息。 - 通过调用
GetCircuit(name)
函数获取(如果不存在则创建)名称为name
的circuit breaker
。 - 使用
sync.Cond
声明条件变量ticketCond和ticketChecked,用于在goroutine之间通信。 - 声明函数returnTicket。什么是ticket?什么是returnTicket?我们稍后会详细讨论。
- 声明另一个函数reportAllEvent。该函数对于
error rate
策略至关重要。 - 声明
sync.Once
的一个实例,这是golang提供的另一个有趣的同步原语
。 - 启动两个goroutine,每个goroutine都包含许多逻辑。简单地说,第一个包含向目标服务发送请求的逻辑和
max concurrent request number
策略,第二个包含timeout
策略。 - 返回一个
channel
类型的值。
让我们逐个回顾它们。
command
command
结构如下,它嵌入sync.Mutex并定义了几个字段:
type command struct {
sync.Mutex
ticket *struct{}
start time.Time
errChan chan error
finished chan bool
circuit *CircuitBreaker
run runFuncC
fallback fallbackFuncC
runDuration time.Duration
events []string
}
请注意,command
对象本身不包含命令名称信息,它的生命周期仅在一个GoC
调用的范围内。这意味着关于服务请求的统计指标,如error rate
和concurrent request number
,不存储在命令对象中。相反,这些指标存储在CircuitBreaker
类型的circuit字段中。
CircuitBreaker
正如我们在GoC
函数的工作流程中提到的,调用GetCircuit(name)
来获取或创建circuit breaker
。它在circuit.go
文件中实现如下:
func init() {
circuitBreakersMutex = &sync.RWMutex{}
circuitBreakers = make(map[string]*CircuitBreaker)
}
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
circuitBreakersMutex.RLock()
_, ok := circuitBreakers[name]
if !ok {
circuitBreakersMutex.RUnlock()
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()
if cb, ok := circuitBreakers[name]; ok {
return cb, false, nil
}
circuitBreakers[name] = newCircuitBreaker(name)
} else {
defer circuitBreakersMutex.RUnlock()
}
return circuitBreakers[name], !ok, nil
}
逻辑非常直接。所有的熔断器都存储在一个以命令名称为键的map对象circuitBreakers中。
newCircuitBreaker
构造函数和CircuitBreaker
结构如下:
type CircuitBreaker struct {
Name string
open bool
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64
executorPool *executorPool // used in the strategy of max concurrent request number
metrics *metricExchange // used in the strategy of request error rate
}
func newCircuitBreaker(name string) *CircuitBreaker {
c := &CircuitBreaker{}
c.Name = name
c.metrics = newMetricExchange(name)
c.executorPool = newExecutorPool(name)
c.mutex = &sync.RWMutex{}
return c
}
CircuitBreaker
的所有字段都很重要,以了解熔断器的工作原理。
有两个字段不是简单类型,需要更多的分析,包括executorPool
和metrics
。
- executorPool:用于
max concurrent request number
策略。 - metrics:用于
request error rate
策略,没错吧?
executorPool
我们可以在pool.go
文件中找到executorPool
逻辑:
type executorPool struct {
Name string
Metrics *poolMetrics
Max int
Tickets chan *struct{} // Tickets channel
}
func newExecutorPool(name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
p.Max = getSettings(name).MaxConcurrentRequests
p.Tickets = make(chan *struct{}, p.Max)
// send Max numbers of value into the Tickets channel
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{}
}
return p
}
它利用golang channel
来实现max concurrent request number
策略。请注意,创建了一个具有MaxConcurrentRequests容量的缓冲通道Tickets。在下面的for循环中,通过向通道发送值使缓冲通道充满,直到达到容量为止。
正如我们之前展示的那样,在GoC
函数的第一个goroutine中,Tickets
通道的用法如下:
go func() {
...
select {
case cmd.ticket = <-circuit.executorPool.Tickets: // receive ticket from Tickets channel
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency) // run fallback logic when concurrent requests reach threshold
reportAllEvent()
})
return
}
...
}()
每次调用GoC
函数都将从circuit.executorPool.Tickets通道获取一个ticket,直到没有ticket可用,这意味着并发请求数量达到了阈值。在这种情况下,default
case将执行,并且服务将以优雅的降级逻辑降级。
另一方面,在每次调用GoC
完成后,ticket需要发送回circuit.executorPool.Tickets,对吧?还记得上面提到的returnTicket
函数吗?是的,它就是为此目的而定义的。GoC
函数中定义的returnTicket
函数如下:
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait()
}
cmd.circuit.executorPool.Return(cmd.ticket) // return ticket to the executorPool
cmd.Unlock()
}
它调用了executorPool.Return
函数:
// Return function in pool.go file
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return
}
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
p.Tickets <- ticket // send ticket back to Tickets channel
}
Tickets的设计和实现是golang channel
在实际应用中的一个很好的例子。
总之,max concurrent request number
策略可以如下所示:
在上面的部分中,我们仔细回顾了hystrix
中的max concurrent requests
策略,希望你能从中学到一些有趣的东西。
现在让我们一起探讨timeout
策略。
Timeout
与max concurrent request number
策略相比,timeout
非常容易理解。
正如我们在前面的部分中提到的,hystrix
的核心逻辑在GoC
函数内部运行两个goroutine。你已经看到第一个goroutine包含将请求发送到目标服务的逻辑和max concurrent request number
策略。第二个goroutine呢?让我们来看看它:
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
// returnOnce has been executed in another goroutine
case <-ctx.Done():
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ctx.Err())
reportAllEvent()
})
return
case <-timer.C:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout)
reportAllEvent()
})
return
}
}()
请注意,使用超时持续时间值从设置中创建了一个Timer。select
语句让这个goroutine等待,直到一个case
条件从通道接收到值。timeout case只是第三个case(当前两个case没有触发时),它将以ErrTimeout错误消息运行fallback逻辑。
到目前为止,你应该清楚了这两个goroutine的主要结构和功能。但是,具体而言,有两个Golang技术需要你的注意:sync.Once
和sync.Cond
。
sync.Once
你可能已经注意到下面的代码块,在GoC
函数内部多次重复:
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrTimeout) // with various error types
reportAllEvent()
})
returnOnce是sync.Once
类型,它确保Do
方法的回调函数在不同的goroutine中仅运行一次。
在这种特定情况下,它可以保证returnTicket()和reportAllEvent()仅执行一次。这确实有意义,因为如果returnTicket() 为一个GoC
调用运行多次,则当前并发请求数量将不正确,对吧?
我还写了一篇关于sync.Once
的详细文章,你可以参考那篇文章获取更深入的解释。
sync.Cond
下面是 returnTicket 函数的实现:
ticketCond := sync.NewCond(cmd)
ticketChecked := false
returnTicket := func() {
cmd.Lock()
for !ticketChecked {
ticketCond.Wait() // hang the current goroutine
}
cmd.circuit.executorPool.Return(cmd.ticket)
cmd.Unlock()
}
ticketCond 是一个条件变量,在 Golang 中它的类型是 sync.Cond
。
条件变量在不同的 goroutine 之间进行通信时非常有用。具体来说,sync.Cond
的 Wait
方法会阻塞当前 goroutine,而 Signal
方法则会唤醒被阻塞的 goroutine 继续执行。
在 hystrix
的情况下,当 ticketChecked 是 false 时,这意味着当前的 GoC
调用尚未完成,ticket 还不应该被返回。因此,调用 ticketCond.Wait() 来阻塞这个 goroutine,直到 GoC
调用完成并由 Signal
方法通知为止。
ticketChecked = true
ticketCond.Signal()
请注意,上面的两行代码总是一起调用的。当 ticketChecked 被设置为 true 时,意味着当前的 GoC
调用已经完成,并且 ticket 已经准备好返回。此外,将用于挂起 goroutine 的 Wait
方法放在了一个 for 循环内,这也是一种最佳实践技巧。
关于 sync.Cond
的更多解释,我会在未来写另一篇文章来解释,请稍等一会儿。
Fallback
最后,让我们看看当目标服务无响应时如何调用 fallback 函数。
我们回想一下,每个 GoC
调用都会创建一个新的 command 实例。而 fallback 函数将被分配到具有相同名称的字段中,稍后将使用它。
cmd := &command{
run: run,
fallback: fallback, // fallback logic here
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
正如我们在上面的章节中看到的那样,当达到 timeout
或 max concurrent requests
阈值时,会触发 errorWithFallback 方法。
func (c *command) errorWithFallback(ctx context.Context, err error) {
eventType := "failure"
if err == ErrCircuitOpen {
eventType = "short-circuit"
} else if err == ErrMaxConcurrency {
eventType = "rejected"
} else if err == ErrTimeout {
eventType = "timeout"
} else if err == context.Canceled {
eventType = "context_canceled"
} else if err == context.DeadlineExceeded {
eventType = "context_deadline_exceeded"
}
c.reportEvent(eventType)
fallbackErr := c.tryFallback(ctx, err)
if fallbackErr != nil {
c.errChan <- fallbackErr
}
}
errorWithFallback 方法将通过调用 tryFallback 来运行 fallback,并报告度量事件,例如 fallback-failure 和 fallback-success。
func (c *command) tryFallback(ctx context.Context, err error) error {
if c.fallback == nil {
return err
}
fallbackErr := c.fallback(ctx, err) // execute the fallback logic here
if fallbackErr != nil {
c.reportEvent("fallback-failure")
return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err)
}
c.reportEvent("fallback-success")
return nil
}
在上面,我们讨论了 timeout
策略,它是 hystrix
提供的所有策略中最简单的策略。还回顾了一些详细的 Golang 技术,以更好地理解复杂的代码逻辑。
总结
在本文中,我们讨论了 hystrix
提供的 max concurrent requests
策略和 timeout
策略的详细实现。还回顾了一些详细的 Golang 技术,以更好地理解复杂的代码逻辑。
我把 error rate
策略留给你,希望你能深入代码库,更多地了解断路器。
译自:https://levelup.gitconnected.com/how-to-write-a-circuit-breaker-in-golang-9ebd5644738c
评论(0)