首页
Preview

如何在Golang中编写熔断器

本文将介绍基于流行的开源项目hystrix(实际上,我将查看用golang编写的hystrix-go,而不是用Java编写的原始版本)的“熔断器”模式。在本文的第一部分,我将对“熔断器”进行一般介绍,让你了解它是什么以及为什么它很重要。此外,让我们回顾一下项目hystrix-gohystrix的背景,并使用一个小的演示例子了解其基本用法。

熔断器

分布式架构中的软件通常有许多依赖项,即使是最可靠的服务在某些时候也难免会失败。

如果我们失败的服务变得无响应会发生什么?所有依赖它的服务也有风险变得无响应。这被称为“灾难性的级联失败”。

熔断器背后的基本思想非常简单。熔断器通过包装对目标服务的调用并持续监视失败率来工作。一旦故障达到一定的阈值,熔断器将跳闸,并且对电路的所有进一步调用都将返回故障或错误。

熔断器模式的设计哲学是“快速失败”:当一个服务变得无响应时,依赖它的其他服务应该停止等待它并开始处理失败的事实。通过防止单个服务的故障通过整个系统级联,熔断器模式有助于整个系统的“稳定性”和“弹性”。

熔断器模式可以实现为下面展示的有限状态机:

熔断器

有三个状态:开放关闭半开放

  • 关闭: 请求被传递到目标服务。保持监视错误率、请求数量和超时等指标。当这些指标超过开发者设定的特定阈值时,熔断器会跳闸并转换为“开放”状态。
  • 开放: 请求不会传递到目标服务,而是会调用“回退”逻辑(同样由开发者定义)来处理故障。熔断器会在一个被称为“休眠窗口”的时间段内保持“开放”状态,之后熔断器可以从“开放”状态转换为“半开放”状态。
  • 半开放: 在此状态下,会向目标服务传递有限数量的请求,旨在重置状态。如果目标服务能够成功响应,则熔断器被“重置”回“关闭”状态。否则,熔断器会再次转换为“开放”状态。

这是关于熔断器的基本背景,你可以在网上找到更多信息

接下来,让我们了解一下项目hystrix

hystrix

hystrix是一个非常流行的开源项目。你可以在这个链接中找到关于它的一切。

我想从上面的链接中引用几个重要的点。Hystrix的设计目标如下:

  • 通过第三方客户端库访问的依赖项(通常是通过网络访问)提供保护和控制延迟和故障。
  • 阻止复杂分布式系统中的级联失败。
  • 快速失败并快速恢复。
  • 在可能的情况下回退和优雅降级。
  • 启用接近实时的监控、警报和操作控制。

你可以看到,hystrix完美地实现了我们在上一节中讨论的熔断器模式的思想,对吗?

hystrix项目是用Java开发的。在本文中,我更喜欢使用golang版本的hystrix-go,它是一个简化版本,但实现了有关熔断器的所有主要设计和思想。

关于hystrix-go的用法,你可以在这个链接中找到,这很容易理解。而且你可以很容易地在网上找到许多其他文章,其中包含演示例子以展示更多的使用级别的东西。请继续阅读。

在我的文章中,我想从hystrix-go的源代码入手,深入探讨熔断器的实现。请继续阅读以下部分。

三种服务降级策略

Hystrix提供了三种不同的服务降级策略,以避免在整个系统中发生“级联故障”:超时最大并发请求数请求错误率

  • 超时: 如果服务调用在预定义的时间段内没有成功返回响应,则将运行回退逻辑。这种策略是最简单的。
  • 最大并发请求数: 当并发请求数量超过阈值时,回退逻辑将处理后续请求。
  • 请求错误率: hystrix将记录每个服务调用的响应状态,当错误率达到阈值时,熔断器将打开,并在熔断器状态改变回到关闭之前执行回退逻辑。错误率策略是最复杂的。

这可以从hystrix的基本用法中看到:

import (
    "github.com/afex/hystrix-go/hystrix"
    "time"
)

hystrix.ConfigureCommand("my_command", hystrix.CommandConfig{
	Timeout:               int(10 * time.Second),
	MaxConcurrentRequests: 100,
	ErrorPercentThreshold: 25,
})

hystrix.Go("my_command", func() error {
	// talk to dependency services
	return nil
}, func(err error) error {
	// fallback logic when services are down
	return nil
})

在上面的用法示例中,你可以看到,超时设置为10秒,最大请求数为100,错误率阈值为25%。

在消费者应用程序级别上,这几乎是你需要设置的所有配置。hystrix会在内部使魔术发生。

在本文中,我计划通过查看源代码来展示hystrix的内部。让我们从简单的最大并发请求超时开始。然后继续探索复杂的策略请求错误率

GoC

基于上面的例子,你可以看到Go函数是进入hystrix源代码的门,因此我们从下面开始:

func Go(name string, run runFunc, fallback fallbackFunc) chan error {
	runC := func(ctx context.Context) error {
		return run()
	}
	var fallbackC fallbackFuncC
	if fallback != nil {
		fallbackC = func(ctx context.Context, err error) error {
			return fallback(err)
		}
	}
	return GoC(context.Background(), name, runC, fallbackC)
}

Go函数接受三个参数:

  • name: 命令名称,它绑定到hystrix内创建的电路。
  • run: 包含正常逻辑的函数,该函数向依赖服务发送请求。
  • fallback: 包含回退逻辑的函数。

Go函数只是使用Context包装runfallbackContext用于控制和取消goroutine。如果你不熟悉它,请参考之前的文章。最后,它将调用GoC函数。GoC函数的实现如下:

func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
	// construct a new command instance
	cmd := &command{
		run:      run,
		fallback: fallback,
		start:    time.Now(),
		errChan:  make(chan error, 1),
		finished: make(chan bool, 1),
	}
	// get circuit by command name
	circuit, _, err := GetCircuit(name)
	if err != nil {
		cmd.errChan <- err
		return cmd.errChan
	}
	cmd.circuit = circuit
	//declare a condition variable sync.Cond: ticketCond, to synchronize among goroutines
	//declare a flag variable: ticketChecked, work together with ticketCond
	ticketCond := sync.NewCond(cmd)
	ticketChecked := false
	// declare a function: returnTicket, will execute when a concurrent request is done to return `ticket`
	returnTicket := func() {
		cmd.Lock()
		for !ticketChecked {
			ticketCond.Wait()
		}
		cmd.circuit.executorPool.Return(cmd.ticket)
		cmd.Unlock()
	}
	// declare a sync.Once instance: returnOnce, make sure the returnTicket function execute only once
	returnOnce := &sync.Once{}

	// declare another function: reportAllEvent, used to collect the metrics
	reportAllEvent := func() {
		err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
		if err != nil {
			log.Printf(err.Error())
		}
	}
	// launch a goroutine which executes the `run` logic
	go func() {
		defer func() { cmd.finished <- true }()

		if !cmd.circuit.AllowRequest() {
			cmd.Lock()
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrCircuitOpen)
				reportAllEvent()
			})
			return
		}

		cmd.Lock()
		select {
		case cmd.ticket = <-circuit.executorPool.Tickets:
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
		default:
			ticketChecked = true
			ticketCond.Signal()
			cmd.Unlock()
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrMaxConcurrency)
				reportAllEvent()
			})
			return
		}

		runStart := time.Now()
		runErr := run(ctx)
		returnOnce.Do(func() {
			defer reportAllEvent()
			cmd.runDuration = time.Since(runStart)
			returnTicket()
			if runErr != nil {
				cmd.errorWithFallback(ctx, runErr)
				return
			}
			cmd.reportEvent("success")
		})
	}()
	// launch the second goroutine for timeout strategy
	go func() {
		timer := time.NewTimer(getSettings(name).Timeout)
		defer timer.Stop()

		select {
		case <-cmd.finished:
		case <-ctx.Done():
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ctx.Err())
				reportAllEvent()
			})
			return
		case <-timer.C:
			returnOnce.Do(func() {
				returnTicket()
				cmd.errorWithFallback(ctx, ErrTimeout)
				reportAllEvent()
			})
			return
		}
	}()

	return cmd.errChan
}

我承认它很复杂,但它也是整个hystrix项目的核心。耐心一点,我们来仔细地逐个部分地回顾它。

首先,GoC函数的代码结构如下:

GoC

  • 构建一个新的Command对象,其中包含每次调用GoC函数的所有信息。
  • 通过调用GetCircuit(name)函数获取(如果不存在则创建)名称为namecircuit breaker
  • 使用sync.Cond声明条件变量ticketCondticketChecked,用于在goroutine之间通信。
  • 声明函数returnTicket。什么是ticket?什么是returnTicket?我们稍后会详细讨论。
  • 声明另一个函数reportAllEvent。该函数对于error rate策略至关重要。
  • 声明sync.Once的一个实例,这是golang提供的另一个有趣的同步原语
  • 启动两个goroutine,每个goroutine都包含许多逻辑。简单地说,第一个包含向目标服务发送请求的逻辑和max concurrent request number策略,第二个包含timeout策略。
  • 返回一个channel类型的值。

让我们逐个回顾它们。

command

command结构如下,它嵌入sync.Mutex并定义了几个字段:

type command struct {
	sync.Mutex

	ticket      *struct{}
	start       time.Time
	errChan     chan error
	finished    chan bool
	circuit     *CircuitBreaker
	run         runFuncC
	fallback    fallbackFuncC
	runDuration time.Duration
	events      []string
}

请注意,command对象本身不包含命令名称信息,它的生命周期仅在一个GoC调用的范围内。这意味着关于服务请求的统计指标,如error rateconcurrent request number,不存储在命令对象中。相反,这些指标存储在CircuitBreaker类型的circuit字段中。

CircuitBreaker

正如我们在GoC函数的工作流程中提到的,调用GetCircuit(name)来获取或创建circuit breaker。它在circuit.go文件中实现如下:

func init() {
	circuitBreakersMutex = &sync.RWMutex{}
	circuitBreakers = make(map[string]*CircuitBreaker)
}

func GetCircuit(name string) (*CircuitBreaker, bool, error) {
	circuitBreakersMutex.RLock()
	_, ok := circuitBreakers[name]
	if !ok {
		circuitBreakersMutex.RUnlock()
		circuitBreakersMutex.Lock()
		defer circuitBreakersMutex.Unlock()

		if cb, ok := circuitBreakers[name]; ok {
			return cb, false, nil
		}
		circuitBreakers[name] = newCircuitBreaker(name)
	} else {
		defer circuitBreakersMutex.RUnlock()
	}

	return circuitBreakers[name], !ok, nil
}

逻辑非常直接。所有的熔断器都存储在一个以命令名称为键的map对象circuitBreakers中。

newCircuitBreaker构造函数和CircuitBreaker结构如下:

type CircuitBreaker struct {
	Name                   string
	open                   bool
	forceOpen              bool
	mutex                  *sync.RWMutex
	openedOrLastTestedTime int64

	executorPool *executorPool   // used in the strategy of max concurrent request number 
	metrics      *metricExchange // used in the strategy of request error rate
}

func newCircuitBreaker(name string) *CircuitBreaker {
	c := &CircuitBreaker{}
	c.Name = name
	c.metrics = newMetricExchange(name)
	c.executorPool = newExecutorPool(name)
	c.mutex = &sync.RWMutex{}

	return c
}

CircuitBreaker的所有字段都很重要,以了解熔断器的工作原理。

有两个字段不是简单类型,需要更多的分析,包括executorPoolmetrics

  • executorPool:用于max concurrent request number策略。
  • metrics:用于request error rate策略,没错吧?

executorPool

我们可以在pool.go文件中找到executorPool逻辑:

type executorPool struct {
	Name    string
	Metrics *poolMetrics
	Max     int
	Tickets chan *struct{} // Tickets channel 
}

func newExecutorPool(name string) *executorPool {
	p := &executorPool{}
	p.Name = name
	p.Metrics = newPoolMetrics(name)
	p.Max = getSettings(name).MaxConcurrentRequests

	p.Tickets = make(chan *struct{}, p.Max)
	// send Max numbers of value into the Tickets channel
	for i := 0; i < p.Max; i++ {
		p.Tickets <- &struct{}{}
	}

	return p
}

它利用golang channel来实现max concurrent request number策略。请注意,创建了一个具有MaxConcurrentRequests容量的缓冲通道Tickets。在下面的for循环中,通过向通道发送值使缓冲通道充满,直到达到容量为止。

正如我们之前展示的那样,在GoC函数的第一个goroutine中,Tickets通道的用法如下:

go func() {
	...
	select {
	case cmd.ticket = <-circuit.executorPool.Tickets: // receive ticket from Tickets channel
		ticketChecked = true
		ticketCond.Signal()
		cmd.Unlock()
	default:
		ticketChecked = true
		ticketCond.Signal()
		cmd.Unlock()
		returnOnce.Do(func() {
			returnTicket()
			cmd.errorWithFallback(ctx, ErrMaxConcurrency) // run fallback logic when concurrent requests reach threshold
			reportAllEvent()
		})
		return
	}
	...
}()

每次调用GoC函数都将从circuit.executorPool.Tickets通道获取一个ticket,直到没有ticket可用,这意味着并发请求数量达到了阈值。在这种情况下,default case将执行,并且服务将以优雅的降级逻辑降级。

另一方面,在每次调用GoC完成后,ticket需要发送回circuit.executorPool.Tickets,对吧?还记得上面提到的returnTicket函数吗?是的,它就是为此目的而定义的。GoC函数中定义的returnTicket函数如下:

returnTicket := func() {
	cmd.Lock()
	for !ticketChecked {
		ticketCond.Wait()
	}
	cmd.circuit.executorPool.Return(cmd.ticket) // return ticket to the executorPool
	cmd.Unlock()
}

它调用了executorPool.Return函数:

// Return function in pool.go file
func (p *executorPool) Return(ticket *struct{}) {
	if ticket == nil {
		return
	}

	p.Metrics.Updates <- poolMetricsUpdate{
		activeCount: p.ActiveCount(),
	}
	p.Tickets <- ticket // send ticket back to Tickets channel
}

Tickets的设计和实现是golang channel在实际应用中的一个很好的例子。

总之,max concurrent request number策略可以如下所示:

在上面的部分中,我们仔细回顾了hystrix中的max concurrent requests策略,希望你能从中学到一些有趣的东西。

现在让我们一起探讨timeout策略。

Timeout

max concurrent request number策略相比,timeout非常容易理解。

正如我们在前面的部分中提到的,hystrix的核心逻辑在GoC函数内部运行两个goroutine。你已经看到第一个goroutine包含将请求发送到目标服务的逻辑和max concurrent request number策略。第二个goroutine呢?让我们来看看它:

go func() {
	timer := time.NewTimer(getSettings(name).Timeout)
	defer timer.Stop()

	select {
	case <-cmd.finished:
		// returnOnce has been executed in another goroutine
	case <-ctx.Done():
		returnOnce.Do(func() {
			returnTicket()
			cmd.errorWithFallback(ctx, ctx.Err())
			reportAllEvent()
		})
		return
	case <-timer.C:
		returnOnce.Do(func() {
			returnTicket()
			cmd.errorWithFallback(ctx, ErrTimeout)
			reportAllEvent()
		})
		return
	}
}()

请注意,使用超时持续时间值从设置中创建了一个Timerselect语句让这个goroutine等待,直到一个case条件从通道接收到值。timeout case只是第三个case(当前两个case没有触发时),它将以ErrTimeout错误消息运行fallback逻辑。

到目前为止,你应该清楚了这两个goroutine的主要结构和功能。但是,具体而言,有两个Golang技术需要你的注意:sync.Oncesync.Cond

sync.Once

你可能已经注意到下面的代码块,在GoC函数内部多次重复:

returnOnce.Do(func() {
	returnTicket()
	cmd.errorWithFallback(ctx, ErrTimeout) // with various error types 
	reportAllEvent()
})

returnOncesync.Once类型,它确保Do方法的回调函数在不同的goroutine中仅运行一次。

在这种特定情况下,它可以保证returnTicket()reportAllEvent()仅执行一次。这确实有意义,因为如果returnTicket() 为一个GoC调用运行多次,则当前并发请求数量将不正确,对吧?

我还写了一篇关于sync.Once的详细文章,你可以参考那篇文章获取更深入的解释。

sync.Cond

下面是 returnTicket 函数的实现:

ticketCond := sync.NewCond(cmd)
ticketChecked := false
returnTicket := func() {
	cmd.Lock()
	for !ticketChecked {
		ticketCond.Wait() // hang the current goroutine
	}
	cmd.circuit.executorPool.Return(cmd.ticket)
	cmd.Unlock()
}

ticketCond 是一个条件变量,在 Golang 中它的类型是 sync.Cond

条件变量在不同的 goroutine 之间进行通信时非常有用。具体来说,sync.CondWait 方法会阻塞当前 goroutine,而 Signal 方法则会唤醒被阻塞的 goroutine 继续执行。

hystrix 的情况下,当 ticketCheckedfalse 时,这意味着当前的 GoC 调用尚未完成,ticket 还不应该被返回。因此,调用 ticketCond.Wait() 来阻塞这个 goroutine,直到 GoC 调用完成并由 Signal 方法通知为止。

ticketChecked = true
ticketCond.Signal()

请注意,上面的两行代码总是一起调用的。当 ticketChecked 被设置为 true 时,意味着当前的 GoC 调用已经完成,并且 ticket 已经准备好返回。此外,将用于挂起 goroutine 的 Wait 方法放在了一个 for 循环内,这也是一种最佳实践技巧。

关于 sync.Cond 的更多解释,我会在未来写另一篇文章来解释,请稍等一会儿。

Fallback

最后,让我们看看当目标服务无响应时如何调用 fallback 函数。

我们回想一下,每个 GoC 调用都会创建一个新的 command 实例。而 fallback 函数将被分配到具有相同名称的字段中,稍后将使用它。

cmd := &command{
	run:      run,
	fallback: fallback, // fallback logic here
	start:    time.Now(),
	errChan:  make(chan error, 1),
	finished: make(chan bool, 1),
}

正如我们在上面的章节中看到的那样,当达到 timeoutmax concurrent requests 阈值时,会触发 errorWithFallback 方法。

func (c *command) errorWithFallback(ctx context.Context, err error) {
	eventType := "failure"
	if err == ErrCircuitOpen {
		eventType = "short-circuit"
	} else if err == ErrMaxConcurrency {
		eventType = "rejected"
	} else if err == ErrTimeout {
		eventType = "timeout"
	} else if err == context.Canceled {
		eventType = "context_canceled"
	} else if err == context.DeadlineExceeded {
		eventType = "context_deadline_exceeded"
	}

	c.reportEvent(eventType)
	fallbackErr := c.tryFallback(ctx, err)
	if fallbackErr != nil {
		c.errChan <- fallbackErr
	}
}

errorWithFallback 方法将通过调用 tryFallback 来运行 fallback,并报告度量事件,例如 fallback-failurefallback-success

func (c *command) tryFallback(ctx context.Context, err error) error {
	if c.fallback == nil {
		return err
	}
	fallbackErr := c.fallback(ctx, err) // execute the fallback logic here
	if fallbackErr != nil {
		c.reportEvent("fallback-failure")
		return fmt.Errorf("fallback failed with '%v'. run error was '%v'", fallbackErr, err)
	}

	c.reportEvent("fallback-success")

	return nil
}

在上面,我们讨论了 timeout 策略,它是 hystrix 提供的所有策略中最简单的策略。还回顾了一些详细的 Golang 技术,以更好地理解复杂的代码逻辑。

总结

在本文中,我们讨论了 hystrix 提供的 max concurrent requests 策略和 timeout 策略的详细实现。还回顾了一些详细的 Golang 技术,以更好地理解复杂的代码逻辑。

我把 error rate 策略留给你,希望你能深入代码库,更多地了解断路器。

译自:https://levelup.gitconnected.com/how-to-write-a-circuit-breaker-in-golang-9ebd5644738c

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论