本文将介绍如何在 Golang 中结合 select、goroutines 和 channels 构建并发程序。
建议先阅读以下两篇文章,以熟悉并发、通道和 goroutines 的概念:
- Concurrency in Golang, Goroutines, and Channels Explained
- File Processing Using Concurrency With GoLang
Select
引用自 Go tour 文档:
“
select
语句可以让 goroutine 等待多个通信操作。select
会一直阻塞,直到其中一个通信可以执行,然后执行该 case。如果有多个 case 可以执行,它会随机选择一个执行。”
API 服务器响应
我们将探讨如何使用 select
获取最快 API 调用的响应。让我们直接看一些代码以了解 select
及其强大的功能。
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"time"
)
const (
API_KEY = "f32ee7b348msh230c75aaf106721p1366a6jsn952b266f7ae5"
API_GOOGLE_NEWS_HOST = "google-news.p.rapidapi.com"
API_FREE_NEWS_HOST = "free-news.p.rapidapi.com"
GOOGLE_NEWS_URL = "https://google-news.p.rapidapi.com/v1/top_headlines?lang=en&country=US"
FREE_NEWS_URL = "https://free-news.p.rapidapi.com/v1/search?lang=en&q=Elon"
)
var (
google = make(chan News)
free = make(chan News)
)
type Article struct {
Title string `json:"title"`
Link string `json:"link"`
Id string `json:"id"`
MongoId string `json:"_id"`
}
type News struct {
Source string
Articles []*Article `json:"articles"`
}
type Function struct {
f func(news chan<- News)
channel chan News
}
func main() {
functions := []*Function{
{f: googleNews, channel: google},
{f: freeNews, channel: free},
}
quickestApiResponse(functions)
}
func quickestApiResponse(functions []*Function) {
var articles []*Article
for _, function := range functions {
function.Run()
}
select {
case googleNewsResponse := <-google:
fmt.Printf("Source: %s\n", googleNewsResponse.Source)
articles = googleNewsResponse.Articles
case freeNewsReponse := <-free:
fmt.Printf("Source: %s\n", freeNewsReponse.Source)
articles = freeNewsReponse.Articles
}
fmt.Printf("Articles %v\n", articles)
}
func googleNews(google chan<- News) {
req, err := http.NewRequest("GET", GOOGLE_NEWS_URL, nil)
if err != nil {
fmt.Printf("Error initializing request%v\n", err.Error())
return
}
req.Header.Add("X-RapidAPI-Key", API_KEY)
req.Header.Add("X-RapidAPI-Host", API_GOOGLE_NEWS_HOST)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("Error making request %v\n", err.Error())
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
fmt.Printf("Google News Response StatusCode %v Status %v\n", resp.StatusCode, resp.Status)
return
}
googleNewsArticles := News{Source: "GoogleNewsApi"}
if err := json.NewDecoder(resp.Body).Decode(&googleNewsArticles); err != nil {
fmt.Printf("Error decoding body %v\n", err.Error())
return
}
fmt.Printf("Google Articles %v\n", googleNewsArticles)
fmt.Printf("Google Articles Size %d\n", len(googleNewsArticles.Articles))
google <- googleNewsArticles
}
func freeNews(free chan<- News) {
req, err := http.NewRequest("GET", FREE_NEWS_URL, nil)
if err != nil {
fmt.Printf("Error initializing request%v\n", err.Error())
return
}
req.Header.Add("X-RapidAPI-Key", API_KEY)
req.Header.Add("X-RapidAPI-Host", API_FREE_NEWS_HOST)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("Error making request %v\n", err.Error())
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
fmt.Printf("Free News Response StatusCode %v Status %v\n", resp.StatusCode, resp.Status)
return
}
var freeNewsArticles News
if err := json.NewDecoder(resp.Body).Decode(&freeNewsArticles); err != nil {
fmt.Printf("Error decoding body %v\n", err.Error())
return
}
freeNewsArticles.Source = "FreeNewsApi"
fmt.Printf("Free Articles %v\n", freeNewsArticles)
fmt.Printf("Free Articles Size %d\n", len(freeNewsArticles.Articles))
free <- freeNewsArticles
}
func (a *Article) GetId() string {
if a.Id == "" {
return a.MongoId
}
return a.Id
}
func (f *Function) Run() {
go f.f(f.channel)
}
以下实现重点突出了 select 将等待直到其中一个 case 执行。
这个示例中有许多要理解的部分,所以让我们一一看一下。
在查看 select 逻辑之前,让我们先看一下如何进行 API 调用。
Function
结构体表示一个单独的 API 调用,它的属性是一个名为 f
的函数,该函数接收一个类型为 News
的通道,注意该函数的签名已经强制要求将该通道视为“仅发送”通道,第二个属性是一个类型为 News
的通道,一旦 API 调用被执行并解析响应,该通道将用于发送结果。
News
结构体是用于保存文章及其来源的对象。
在第 43 行,我们初始化了一个 Function
切片,其中包含两个元素,第一个使用 google
通道的 googleNews
函数,第二个使用 free
通道的 freeNews
函数。
由于两个 API 调用都将获取新闻,因此这些通道具有相同的类型,但每个函数使用一个通道。
在第 69 和 102 行,我们有这两个 API 的实现。每个 API 都会向其各自的 URL 发送 HTTP 请求并解析响应,完成后,新闻将通过其各自的通道发送。
现在让我们聚焦于 quickestApiResponse
方法。此方法的目的是将文章变量设置为最快 API 的响应。在第 54 行,通过调用 Run
方法来执行每个函数。此方法会在该函数上启动一个新的 goroutine 并传递通道。重要的是要注意,这些 API 调用需要在单独的 goroutine 中运行,因为我们不希望它们按顺序运行。
然后,select 将等待 google
或 free
通道中的任何一个发送响应。一旦任何一个 API 调用通过其各自的通道发送响应,select 将执行该 case 下的代码并忽略另一个 case。这将有效地将文章设置为最快 API 调用的响应。
让我们运行程序以查看输出:
API server response output
The FreeNewsApi
ran faster!.
这种逻辑可以应用于许多其他用例,允许程序运行多个 goroutines,使用通道进行通信,并使用 select 等待它们。
在这个示例中,我们还可以实现一件事,即强制实施某种超时,如果 API 调用时间超过限制,我们将留下文章为空。以下代码通过在 select 中添加一个 case 来实现这一点。
const (
API_MAX_TIMEOUT = 3 * time.Second
)
func quickestApiResponse(functions []*Function) {
var articles []*Article
for _, function := range functions {
function.Run()
}
select {
case googleNewsResponse := <-google:
fmt.Printf("Source: %s\n", googleNewsResponse.Source)
articles = googleNewsResponse.Articles
case freeNewsReponse := <-free:
fmt.Printf("Source: %s\n", freeNewsReponse.Source)
articles = freeNewsReponse.Articles
case <-time.After(API_MAX_TIMEOUT):
fmt.Println("Time out! API calls took too long!!")
}
fmt.Printf("Articles %v\n", articles)
}
time.After
返回一个类型为 time.Time
的通道,一旦指定的时间过去,它就会发送当前时间。请注意,这里我们没有将该通道的值分配给变量,这是因为我们不关心该通道将发送的数据,我们只关心接收信号。如果我们在两个 API 上都睡眠三秒钟,我们将看到执行超时 case,而其他两个 case 将被忽略。
API server response timeout
带超时的 Context
Context 几乎总是存在于程序中,我们可以使用上下文对象来为特定的昂贵任务设置超时。下面的示例演示了如何使用 context.WithTimeout
来限制程序等待昂贵任务的时间。
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
signal := make(chan bool)
go expensiveTask(ctx, signal)
select {
case <-ctx.Done():
fmt.Println("Expensive task took too long to complete")
return
case <-signal:
fmt.Println("Expensive task was completed on time")
}
}
func expensiveTask(ctx context.Context, signal chan<- bool) {
time.Sleep(6 * time.Second)
signal <- true
}
上面的示例中使用 context.Background()
作为父上下文,但在更现实的设置中,上下文已经存在。具有超时的上下文将返回一个上下文,该上下文在指定的时间持续时间过去时通过 ctx.Done
通道发送信号。
在第 13 行,昂贵的任务在单独的 goroutine 中运行,上下文和信号通道作为参数传递。该昂贵任务休眠 6 秒钟以模拟延迟,但上下文具有 5 秒的超时时间。
select 有两个 case,等待 ctx.Done
运行或等待昂贵任务发送信号,表示它已完成。
在这种情况下,运行此示例将产生以下输出:
Expensive task took too long to complete
运行循环进程
让我们看看如何使用 select
运行循环进程。对于此程序,我们将具有以下场景:
当该进程应该开始运行和每次运行之间的间隔时间时,程序需要让我们传递任何函数作为循环进程。
以下是最初的代码,请看:
package chaptereight
import (
"fmt"
"math/rand"
"time"
"github.com/brianvoe/gofakeit/v6"
)
type PendingUserNotifications map[int][]*Notification
type Notification struct {
Content string
UserId int
}
func sendUserBatchNotificationsEmail(userId int, notifications []*Notification) {
fmt.Printf("Sending email to user with userId %d for pending notifications %v\n", userId, notifications)
}
func handlePendingUsersNotifications(pendingNotifications PendingUserNotifications, handler func(userId int, notifications []*Notification)) {
for userId, notifications := range pendingNotifications {
handler(userId, notifications)
delete(pendingNotifications, userId)
}
}
func collectNewUsersNotifications(notifications PendingUserNotifications) {
randomNotifications := getRandomNotifications()
if len(randomNotifications) > 0 {
notifications[randomNotifications[0].UserId] = randomNotifications
}
}
func getRandomNotifications() (notifications []*Notification) {
rand.Seed(time.Now().UnixNano())
userId := rand.Intn(100-10+1) + 10
numOfNotifications := rand.Intn(5-0+1) + 0
fmt.Printf("numOfNotifications %v\n", numOfNotifications)
for i := 0; i < numOfNotifications; i++ {
notifications = append(notifications, &Notification{Content: gofakeit.Paragraph(1, 2, 10, " "), UserId: userId})
}
return
}
上面的代码反映了我们想要运行的任务。我们有两个主要函数 collectNewUsersNotifications
和 handlePendingUsersNotifications
,第一个函数用于收集所有新用户通知,理想的实现是该函数在数据库中查找未读通知,但为了本例子,我们正在模拟为某些用户获取随机通知。通知是使用Notification
结构体创建的,只有两个字段,一个是内容,一个是用户ID。
collect
函数使用PendingUserNotifications
类型存储通知。这个类型是一个映射,其中键是表示用户ID的整数,值是Notification
的切片。
在收集所有通知后,我们想使用handlePendingUserNotifications
函数迭代通知并在每个通知上运行处理程序函数。在处理完每个用户的通知后,它们将从映射中删除。在这种情况下,我们将使用的处理程序是sendUserBatchNotificationsEmail
。它的目的是向用户发送包含所有待处理通知的电子邮件,以便他们查看。
现在让我们专注于如何使用select
以重复的方式运行此任务。正如我之前提到的,我们必须考虑以下几点:
- 允许传递时间间隔
- 允许传递进程的开始时间
- 允许调用者在需要时取消/停止重复进程
下面的代码显示了如何实现这一点:
package main
import (
"fmt"
"math/rand"
"time"
"github.com/brianvoe/gofakeit/v6"
)
type PendingUserNotifications map[int][]*Notification
type ProcessHandler func()
type Notification struct {
Content string
UserId int
}
type RecurringProcess struct {
name string
interval time.Duration
startTime time.Time
handler func()
stop chan struct{}
}
func main() {
pendingNotificationsProcess()
}
func pendingNotificationsProcess() {
process := &RecurringProcess{}
notifications := PendingUserNotifications{}
handler := func() {
collectNewUsersNotifications(notifications)
handlePendingUsersNotifications(notifications, sendUserBatchNotificationsEmail, process)
}
interval := 10 * time.Second
startTime := time.Now().Add(3 * time.Minute)
process = createRecurringProcess("Pending User Notifications", handler, interval, startTime)
<-process.stop
}
func sendUserBatchNotificationsEmail(userId int, notifications []*Notification) {
fmt.Printf("Sending email to user with userId %d for pending notifications %v\n", userId, notifications)
}
func handlePendingUsersNotifications(pendingNotifications PendingUserNotifications, handler func(userId int, notifications []*Notification), process *RecurringProcess) {
userNotificationCount := 0
for userId, notifications := range pendingNotifications {
userNotificationCount++
handler(userId, notifications)
delete(pendingNotifications, userId)
}
if userNotificationCount == 0 {
process.Cancel()
}
}
func collectNewUsersNotifications(notifications PendingUserNotifications) {
randomNotifications := getRandomNotifications()
if len(randomNotifications) > 0 {
notifications[randomNotifications[0].UserId] = randomNotifications
}
}
func getRandomNotifications() (notifications []*Notification) {
rand.Seed(time.Now().UnixNano())
userId := rand.Intn(100-10+1) + 10
numOfNotifications := rand.Intn(5-0+1) + 0
fmt.Printf("numOfNotifications %v\n", numOfNotifications)
for i := 0; i < numOfNotifications; i++ {
notifications = append(notifications, &Notification{Content: gofakeit.Paragraph(1, 2, 10, " "), UserId: userId})
}
return
}
func createRecurringProcess(name string, handler ProcessHandler, interval time.Duration, startTime time.Time) *RecurringProcess {
process := &RecurringProcess{
name: name,
interval: interval,
startTime: startTime,
handler: handler,
stop: make(chan struct{}),
}
go process.Start()
return process
}
func (p *RecurringProcess) Start() {
startTicker := &time.Timer{}
ticker := &time.Ticker{C: nil}
defer func() { ticker.Stop() }()
if p.startTime.Before(time.Now()) {
p.startTime = time.Now()
}
startTicker = time.NewTimer(time.Until(p.startTime))
for {
select {
case <-startTicker.C:
ticker = time.NewTicker(p.interval)
fmt.Println("Starting recurring process")
p.handler()
case <-ticker.C:
fmt.Println("Next run")
p.handler()
case <-p.stop:
fmt.Println("Stoping recurring process")
return
}
}
}
func (p *RecurringProcess) Cancel() {
close(p.stop)
}
我们引入了一个新的结构体来表示重复的进程RecurringProcess
。这个结构包含以下字段:
name
——进程的名称interval
——每次运行之间的时间间隔startTime
——进程将开始的时间handler
——每次运行时调用的处理程序函数stop
——停止进程的通道
在pendingNotificationsProcess
函数中,我们初始化一个新的重复进程和通知分别在第30行和第31行。我们将使用的处理程序函数是一个函数,它在其中包含collectNewUsersNotifications
和handlePendingUsersNotifications
函数。请注意,在这里,我们将进程传递给handlePendingUsersNotifications
,因为它将需要停止进程。
我们还指定了间隔和开始时间。
然后我们调用createRecurringProcess
,这个函数创建了重复的进程并启动了它。让我们专注于第88行,在那里我们使用一个goroutine来启动进程。
在第40行,我们通过从停止通道中读取来阻塞主goroutine,这意味着主goroutine将被阻塞,直到向此通道发送消息。
让我们看一下第93行的Start
函数,它包含运行重复进程的所有逻辑。
这个函数使用startTicker
变量来使用开始时间启动重复进程。如果开始时间在过去,进程将立即启动。
time.NewTimer
会在经过指定的持续时间后在其通道上发送当前时间,这将允许我们启动进程。这就是为什么我们有第一个情况等待通道接收信号的原因。
我们还在第95行有一个ticker
变量,它是一个time.Ticker
。go中的一个ticker会在其通道上发送指定间隔的tick。一旦startTicker.C
通道发送信号,我们在第106行中将一个新的ticker分配给ticker
变量,并且我们调用处理程序函数。
在此之后,ticker
将开始在第二个选择情况下接收tick,并且每次接收到一个tick时,处理程序函数也将被调用。
在选择的最后一种情况下,我们等待发送停止进程的信号,只需返回即可。
请注意,select
在无限的for
循环内。这是因为我们想要不断循环,直到其中一个情况明确打破循环。每次接收到一个tick时,第二个情况将被执行,然后它将再次进入相同的循环,其中选择将再次等待某些情况运行。
为了停止进程,我们在第55行添加了一些逻辑,我们计算通知的数量,如果在任何时候没有待处理的通知,程序将取消进程。Cancel
函数关闭停止通道,这将使程序完成。
让我们运行程序看看它的工作原理:
程序输出
太好了,程序按预期工作。这只是运行重复进程的示例。这可以是实现更复杂内容的基础代码。你可以使用select
构建复杂的程序。
结论
构建并发程序可能一开始会很具有挑战性,特别是如果你难以理解goroutine,通道和select的工作原理。
我希望通过本文,你感到不那么困惑,并找到了一些可以使用select
的用例。
译自:https://betterprogramming.pub/concurrency-with-select-goroutines-and-channels-9786e0c6be3c
评论(0)