首页
Preview

使用 Select、Goroutines 和 Channels 掌握 Go 中的并发编程

本文将介绍如何在 Golang 中结合 select、goroutines 和 channels 构建并发程序。

建议先阅读以下两篇文章,以熟悉并发、通道和 goroutines 的概念:

Select

引用自 Go tour 文档:

select 语句可以让 goroutine 等待多个通信操作。 select 会一直阻塞,直到其中一个通信可以执行,然后执行该 case。如果有多个 case 可以执行,它会随机选择一个执行。”

API 服务器响应

我们将探讨如何使用 select 获取最快 API 调用的响应。让我们直接看一些代码以了解 select 及其强大的功能。

package main

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"time"
)

const (
	API_KEY              = "f32ee7b348msh230c75aaf106721p1366a6jsn952b266f7ae5"
	API_GOOGLE_NEWS_HOST = "google-news.p.rapidapi.com"
	API_FREE_NEWS_HOST   = "free-news.p.rapidapi.com"
	GOOGLE_NEWS_URL      = "https://google-news.p.rapidapi.com/v1/top_headlines?lang=en&country=US"
	FREE_NEWS_URL        = "https://free-news.p.rapidapi.com/v1/search?lang=en&q=Elon"
)

var (
	google = make(chan News)
	free   = make(chan News)
)

type Article struct {
	Title   string `json:"title"`
	Link    string `json:"link"`
	Id      string `json:"id"`
	MongoId string `json:"_id"`
}

type News struct {
	Source   string
	Articles []*Article `json:"articles"`
}

type Function struct {
	f       func(news chan<- News)
	channel chan News
}

func main() {
	functions := []*Function{
		{f: googleNews, channel: google},
		{f: freeNews, channel: free},
	}
	quickestApiResponse(functions)
}

func quickestApiResponse(functions []*Function) {
	var articles []*Article

	for _, function := range functions {
		function.Run()
	}

	select {
	case googleNewsResponse := <-google:
		fmt.Printf("Source: %s\n", googleNewsResponse.Source)
		articles = googleNewsResponse.Articles
	case freeNewsReponse := <-free:
		fmt.Printf("Source: %s\n", freeNewsReponse.Source)
		articles = freeNewsReponse.Articles
	}

	fmt.Printf("Articles %v\n", articles)
}

func googleNews(google chan<- News) {
	req, err := http.NewRequest("GET", GOOGLE_NEWS_URL, nil)
	if err != nil {
		fmt.Printf("Error initializing request%v\n", err.Error())
		return
	}

	req.Header.Add("X-RapidAPI-Key", API_KEY)
	req.Header.Add("X-RapidAPI-Host", API_GOOGLE_NEWS_HOST)
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Printf("Error making request %v\n", err.Error())
		return
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		fmt.Printf("Google News Response StatusCode %v Status %v\n", resp.StatusCode, resp.Status)
		return
	}

	googleNewsArticles := News{Source: "GoogleNewsApi"}
	if err := json.NewDecoder(resp.Body).Decode(&googleNewsArticles); err != nil {
		fmt.Printf("Error decoding body %v\n", err.Error())
		return
	}

	fmt.Printf("Google Articles %v\n", googleNewsArticles)
	fmt.Printf("Google Articles Size %d\n", len(googleNewsArticles.Articles))
	google <- googleNewsArticles
}

func freeNews(free chan<- News) {
	req, err := http.NewRequest("GET", FREE_NEWS_URL, nil)
	if err != nil {
		fmt.Printf("Error initializing request%v\n", err.Error())
		return
	}

	req.Header.Add("X-RapidAPI-Key", API_KEY)
	req.Header.Add("X-RapidAPI-Host", API_FREE_NEWS_HOST)
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Printf("Error making request %v\n", err.Error())
		return
	}
	defer resp.Body.Close()

	if resp.StatusCode != 200 {
		fmt.Printf("Free News Response StatusCode %v Status %v\n", resp.StatusCode, resp.Status)
		return
	}

	var freeNewsArticles News
	if err := json.NewDecoder(resp.Body).Decode(&freeNewsArticles); err != nil {
		fmt.Printf("Error decoding body %v\n", err.Error())
		return
	}

	freeNewsArticles.Source = "FreeNewsApi"
	fmt.Printf("Free Articles %v\n", freeNewsArticles)
	fmt.Printf("Free Articles Size %d\n", len(freeNewsArticles.Articles))
	free <- freeNewsArticles
}

func (a *Article) GetId() string {
	if a.Id == "" {
		return a.MongoId
	}

	return a.Id
}

func (f *Function) Run() {
	go f.f(f.channel)
}

以下实现重点突出了 select 将等待直到其中一个 case 执行。

这个示例中有许多要理解的部分,所以让我们一一看一下。

在查看 select 逻辑之前,让我们先看一下如何进行 API 调用。

Function 结构体表示一个单独的 API 调用,它的属性是一个名为 f 的函数,该函数接收一个类型为 News 的通道,注意该函数的签名已经强制要求将该通道视为“仅发送”通道,第二个属性是一个类型为 News 的通道,一旦 API 调用被执行并解析响应,该通道将用于发送结果。

News 结构体是用于保存文章及其来源的对象。

在第 43 行,我们初始化了一个 Function 切片,其中包含两个元素,第一个使用 google 通道的 googleNews 函数,第二个使用 free 通道的 freeNews 函数。

由于两个 API 调用都将获取新闻,因此这些通道具有相同的类型,但每个函数使用一个通道。

在第 69 和 102 行,我们有这两个 API 的实现。每个 API 都会向其各自的 URL 发送 HTTP 请求并解析响应,完成后,新闻将通过其各自的通道发送。

现在让我们聚焦于 quickestApiResponse 方法。此方法的目的是将文章变量设置为最快 API 的响应。在第 54 行,通过调用 Run 方法来执行每个函数。此方法会在该函数上启动一个新的 goroutine 并传递通道。重要的是要注意,这些 API 调用需要在单独的 goroutine 中运行,因为我们不希望它们按顺序运行。

然后,select 将等待 googlefree 通道中的任何一个发送响应。一旦任何一个 API 调用通过其各自的通道发送响应,select 将执行该 case 下的代码并忽略另一个 case。这将有效地将文章设置为最快 API 调用的响应。

让我们运行程序以查看输出:

API server response output

The FreeNewsApi ran faster!.

这种逻辑可以应用于许多其他用例,允许程序运行多个 goroutines,使用通道进行通信,并使用 select 等待它们。

在这个示例中,我们还可以实现一件事,即强制实施某种超时,如果 API 调用时间超过限制,我们将留下文章为空。以下代码通过在 select 中添加一个 case 来实现这一点。

const (
  API_MAX_TIMEOUT      = 3 * time.Second 
)

func quickestApiResponse(functions []*Function) {
	var articles []*Article

	for _, function := range functions {
		function.Run()
	}

	select {
	case googleNewsResponse := <-google:
		fmt.Printf("Source: %s\n", googleNewsResponse.Source)
		articles = googleNewsResponse.Articles
	case freeNewsReponse := <-free:
		fmt.Printf("Source: %s\n", freeNewsReponse.Source)
		articles = freeNewsReponse.Articles
         case <-time.After(API_MAX_TIMEOUT):
		fmt.Println("Time out! API calls took too long!!")
	}

	fmt.Printf("Articles %v\n", articles)
}

time.After 返回一个类型为 time.Time 的通道,一旦指定的时间过去,它就会发送当前时间。请注意,这里我们没有将该通道的值分配给变量,这是因为我们不关心该通道将发送的数据,我们只关心接收信号。如果我们在两个 API 上都睡眠三秒钟,我们将看到执行超时 case,而其他两个 case 将被忽略。

API server response timeout

带超时的 Context

Context 几乎总是存在于程序中,我们可以使用上下文对象来为特定的昂贵任务设置超时。下面的示例演示了如何使用 context.WithTimeout 来限制程序等待昂贵任务的时间。

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	signal := make(chan bool)
	go expensiveTask(ctx, signal)

	select {
	case <-ctx.Done():
		fmt.Println("Expensive task took too long to complete")
		return
	case <-signal:
		fmt.Println("Expensive task was completed on time")
	}
}

func expensiveTask(ctx context.Context, signal chan<- bool) {
	time.Sleep(6 * time.Second)
	signal <- true
}

上面的示例中使用 context.Background() 作为父上下文,但在更现实的设置中,上下文已经存在。具有超时的上下文将返回一个上下文,该上下文在指定的时间持续时间过去时通过 ctx.Done 通道发送信号。

在第 13 行,昂贵的任务在单独的 goroutine 中运行,上下文和信号通道作为参数传递。该昂贵任务休眠 6 秒钟以模拟延迟,但上下文具有 5 秒的超时时间。

select 有两个 case,等待 ctx.Done 运行或等待昂贵任务发送信号,表示它已完成。

在这种情况下,运行此示例将产生以下输出:

Expensive task took too long to complete

运行循环进程

让我们看看如何使用 select 运行循环进程。对于此程序,我们将具有以下场景:

当该进程应该开始运行和每次运行之间的间隔时间时,程序需要让我们传递任何函数作为循环进程。

以下是最初的代码,请看:

package chaptereight

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/brianvoe/gofakeit/v6"
)

type PendingUserNotifications map[int][]*Notification
type Notification struct {
	Content string
	UserId  int
}

func sendUserBatchNotificationsEmail(userId int, notifications []*Notification) {
	fmt.Printf("Sending email to user with userId %d for pending notifications %v\n", userId, notifications)
}

func handlePendingUsersNotifications(pendingNotifications PendingUserNotifications, handler func(userId int, notifications []*Notification)) {
	for userId, notifications := range pendingNotifications {
		handler(userId, notifications)
		delete(pendingNotifications, userId)
	}
}

func collectNewUsersNotifications(notifications PendingUserNotifications) {
	randomNotifications := getRandomNotifications()
	if len(randomNotifications) > 0 {
		notifications[randomNotifications[0].UserId] = randomNotifications
	}
}

func getRandomNotifications() (notifications []*Notification) {
	rand.Seed(time.Now().UnixNano())
	userId := rand.Intn(100-10+1) + 10
	numOfNotifications := rand.Intn(5-0+1) + 0
	fmt.Printf("numOfNotifications %v\n", numOfNotifications)
	for i := 0; i < numOfNotifications; i++ {
		notifications = append(notifications, &Notification{Content: gofakeit.Paragraph(1, 2, 10, " "), UserId: userId})
	}

	return
}

上面的代码反映了我们想要运行的任务。我们有两个主要函数 collectNewUsersNotificationshandlePendingUsersNotifications,第一个函数用于收集所有新用户通知,理想的实现是该函数在数据库中查找未读通知,但为了本例子,我们正在模拟为某些用户获取随机通知。通知是使用Notification结构体创建的,只有两个字段,一个是内容,一个是用户ID。

collect函数使用PendingUserNotifications类型存储通知。这个类型是一个映射,其中键是表示用户ID的整数,值是Notification的切片。

在收集所有通知后,我们想使用handlePendingUserNotifications函数迭代通知并在每个通知上运行处理程序函数。在处理完每个用户的通知后,它们将从映射中删除。在这种情况下,我们将使用的处理程序是sendUserBatchNotificationsEmail。它的目的是向用户发送包含所有待处理通知的电子邮件,以便他们查看。

现在让我们专注于如何使用select以重复的方式运行此任务。正如我之前提到的,我们必须考虑以下几点:

  • 允许传递时间间隔
  • 允许传递进程的开始时间
  • 允许调用者在需要时取消/停止重复进程

下面的代码显示了如何实现这一点:

package main

import (
	"fmt"
	"math/rand"
	"time"

	"github.com/brianvoe/gofakeit/v6"
)

type PendingUserNotifications map[int][]*Notification
type ProcessHandler func()
type Notification struct {
	Content string
	UserId  int
}
type RecurringProcess struct {
	name      string
	interval  time.Duration
	startTime time.Time
	handler   func()
	stop      chan struct{}
}

func main() {
	pendingNotificationsProcess()
}

func pendingNotificationsProcess() {
	process := &RecurringProcess{}
	notifications := PendingUserNotifications{}
	handler := func() {
		collectNewUsersNotifications(notifications)
		handlePendingUsersNotifications(notifications, sendUserBatchNotificationsEmail, process)
	}
	interval := 10 * time.Second
	startTime := time.Now().Add(3 * time.Minute)
	process = createRecurringProcess("Pending User Notifications", handler, interval, startTime)

	<-process.stop
}

func sendUserBatchNotificationsEmail(userId int, notifications []*Notification) {
	fmt.Printf("Sending email to user with userId %d for pending notifications %v\n", userId, notifications)
}

func handlePendingUsersNotifications(pendingNotifications PendingUserNotifications, handler func(userId int, notifications []*Notification), process *RecurringProcess) {
	userNotificationCount := 0
	for userId, notifications := range pendingNotifications {
		userNotificationCount++
		handler(userId, notifications)
		delete(pendingNotifications, userId)
	}

	if userNotificationCount == 0 {
		process.Cancel()
	}
}

func collectNewUsersNotifications(notifications PendingUserNotifications) {
	randomNotifications := getRandomNotifications()
	if len(randomNotifications) > 0 {
		notifications[randomNotifications[0].UserId] = randomNotifications
	}
}

func getRandomNotifications() (notifications []*Notification) {
	rand.Seed(time.Now().UnixNano())
	userId := rand.Intn(100-10+1) + 10
	numOfNotifications := rand.Intn(5-0+1) + 0
	fmt.Printf("numOfNotifications %v\n", numOfNotifications)
	for i := 0; i < numOfNotifications; i++ {
		notifications = append(notifications, &Notification{Content: gofakeit.Paragraph(1, 2, 10, " "), UserId: userId})
	}

	return
}

func createRecurringProcess(name string, handler ProcessHandler, interval time.Duration, startTime time.Time) *RecurringProcess {
	process := &RecurringProcess{
		name:      name,
		interval:  interval,
		startTime: startTime,
		handler:   handler,
		stop:      make(chan struct{}),
	}

	go process.Start()

	return process
}

func (p *RecurringProcess) Start() {
	startTicker := &time.Timer{}
	ticker := &time.Ticker{C: nil}
	defer func() { ticker.Stop() }()

	if p.startTime.Before(time.Now()) {
		p.startTime = time.Now()
	}
	startTicker = time.NewTimer(time.Until(p.startTime))

	for {
		select {
		case <-startTicker.C:
			ticker = time.NewTicker(p.interval)
			fmt.Println("Starting recurring process")
			p.handler()
		case <-ticker.C:
			fmt.Println("Next run")
			p.handler()
		case <-p.stop:
			fmt.Println("Stoping recurring process")
			return
		}
	}
}

func (p *RecurringProcess) Cancel() {
	close(p.stop)
}

我们引入了一个新的结构体来表示重复的进程RecurringProcess。这个结构包含以下字段:

  • name——进程的名称
  • interval——每次运行之间的时间间隔
  • startTime——进程将开始的时间
  • handler——每次运行时调用的处理程序函数
  • stop——停止进程的通道

pendingNotificationsProcess函数中,我们初始化一个新的重复进程和通知分别在第30行和第31行。我们将使用的处理程序函数是一个函数,它在其中包含collectNewUsersNotificationshandlePendingUsersNotifications函数。请注意,在这里,我们将进程传递给handlePendingUsersNotifications,因为它将需要停止进程。

我们还指定了间隔和开始时间。

然后我们调用createRecurringProcess,这个函数创建了重复的进程并启动了它。让我们专注于第88行,在那里我们使用一个goroutine来启动进程。

在第40行,我们通过从停止通道中读取来阻塞主goroutine,这意味着主goroutine将被阻塞,直到向此通道发送消息。

让我们看一下第93行的Start函数,它包含运行重复进程的所有逻辑。

这个函数使用startTicker变量来使用开始时间启动重复进程。如果开始时间在过去,进程将立即启动。

time.NewTimer会在经过指定的持续时间后在其通道上发送当前时间,这将允许我们启动进程。这就是为什么我们有第一个情况等待通道接收信号的原因。

我们还在第95行有一个ticker变量,它是一个time.Ticker。go中的一个ticker会在其通道上发送指定间隔的tick。一旦startTicker.C通道发送信号,我们在第106行中将一个新的ticker分配给ticker变量,并且我们调用处理程序函数。

在此之后,ticker将开始在第二个选择情况下接收tick,并且每次接收到一个tick时,处理程序函数也将被调用。

在选择的最后一种情况下,我们等待发送停止进程的信号,只需返回即可。

请注意,select在无限的for循环内。这是因为我们想要不断循环,直到其中一个情况明确打破循环。每次接收到一个tick时,第二个情况将被执行,然后它将再次进入相同的循环,其中选择将再次等待某些情况运行。

为了停止进程,我们在第55行添加了一些逻辑,我们计算通知的数量,如果在任何时候没有待处理的通知,程序将取消进程。Cancel函数关闭停止通道,这将使程序完成。

让我们运行程序看看它的工作原理:

程序输出

太好了,程序按预期工作。这只是运行重复进程的示例。这可以是实现更复杂内容的基础代码。你可以使用select构建复杂的程序。

结论

构建并发程序可能一开始会很具有挑战性,特别是如果你难以理解goroutine,通道和select的工作原理。

我希望通过本文,你感到不那么困惑,并找到了一些可以使用select的用例。

译自:https://betterprogramming.pub/concurrency-with-select-goroutines-and-channels-9786e0c6be3c

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论