首页
Preview

在 Go 中同时监听多个channel

欢迎回到本系列!今天我们将探讨同时监听多个通道的方法。之前的指南帮助你开始了 Go 中的并发编程。虽然简单的方法通常是最好的,但你可能一直在尝试实现更复杂的行为。在阅读本指南后,你将能够使你的并发代码更加灵活。

select 关键字

我们可以使用 select 关键字同时监听多个 goroutine。

package mainimport (
    "fmt"
    "time"
)func main() {
    c1 := make(chan string)
    c2 := make(chan string)    go func() {
        time.Sleep(1 * time.Second)
        c1 <- time.Now().String()
    }()    go func() {
        time.Sleep(2 * time.Second)
        c2 <- time.Now().String()
    }()    for i := 0; i < 2; i++ {
        select {
        case res1 := <-c1:
            fmt.Println("from c1:", res1)
        case res2 := <-c2:
            fmt.Println("from c2:", res2)
        }
    }
}from c1: 2022-09-04 14:30:39.4469184 -0400 EDT m=+1.000172801
from c2: 2022-09-04 14:30:40.4472699 -0400 EDT m=+2.000524401

上面的代码展示了 select 关键字的工作方式。

  • 首先,我们创建两个通道 c1c2 进行监听。
  • 然后,我们分别启动两个 goroutine,每个 goroutine 将当前时间发送到 c1c2
  • 在 for 循环中,我们创建一个 select 语句,并定义两种情况:一种是当我们可以从 c1 接收到消息时,另一种是当我们可以从 c2 接收到消息时。

你可以看到,select 语句在设计上与 switch 语句非常相似。两者都定义了不同的情况,并在满足特定情况时运行相应的代码。此外,我们可以看到 select 语句是阻塞的。也就是说,它会等待直到其中一个情况被满足。

我们在循环中迭代两次,因为只有两个 goroutine 需要监听。更准确地说,每个 goroutine 都是一种 fire-and-forget 的 goroutine,这意味着它们只会在返回之前向一个通道发送一次。因此,在此代码中始终存在两条消息,我们只需要选择两次即可。

如果我们不知道任务何时结束怎么办?

有时我们不知道有多少个任务。在这种情况下,将 select 语句放在 while 循环中。

package mainimport (
    "fmt"
    "math/rand"
    "time"
)func main() {
    c1 := make(chan string)
    rand.Seed(time.Now().UnixNano())    for i := 0; i < rand.Intn(10); i++ {
        go func() {
            time.Sleep(1 * time.Second)
            c1 <- time.Now().String()
        }()
    }    for {
        select {
        case res1 := <-c1:
            fmt.Println("from c1:", res1)
        }
    }
}

由于我们让随机数量的 goroutine 运行,因此我们不知道有多少个任务。幸运的是,底部的 for 循环包含了 select 语句,将捕获每个输出。让我们看看如果运行这段代码会发生什么。

from c1: 2022-09-04 14:48:47.5145341 -0400 EDT m=+1.000257801
from c1: 2022-09-04 14:48:47.5146126 -0400 EDT m=+1.000336201
from c1: 2022-09-04 14:48:47.5146364 -0400 EDT m=+1.000359901
fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]:
main.main()
        /home/jacob/blog/testing/listening-to-multiple-channels-in-go/main.go:22 +0x128
exit status 2

嗯,正如预期的那样,select 语句接收了三次,但程序由于死锁而出错。为什么会这样呢?

请记住,如果发送者没有准备好,从非缓冲通道接收数据将导致程序死锁。这正是我们示例中的情况。

那么我们该如何解决呢?我们可以使用前面文章中介绍的概念的组合:退出通道和 WaitGroups。

package mainimport (
    "fmt"
    "math/rand"
    "sync"
    "time"
)func main() {
    c1 := make(chan string)
    exit := make(chan struct{})
    rand.Seed(time.Now().UnixNano())
    var wg sync.WaitGroup    go func() {
        numJob := rand.Intn(10)
        fmt.Println("number of jobs:", numJob)
        for i := 0; i < numJob; i++ {
            wg.Add(1)
            go func() {
                defer wg.Done()
                time.Sleep(1 * time.Second)
                c1 <- time.Now().String()
            }()
        }
        wg.Wait()
        close(exit)
    }()    for {
        select {
        case res1 := <-c1:
            fmt.Println("from c1:", res1)
        case <-exit:
            return
        }
    }
}3
from c1: 2022-09-04 15:09:08.6936976 -0400 EDT m=+1.000287801
from c1: 2022-09-04 15:09:08.6937788 -0400 EDT m=+1.000369101
from c1: 2022-09-04 15:09:08.6937949 -0400 EDT m=+1.000385101
  • 我们创建一个退出通道和一个 WaitGroup。
  • 任务数量每次运行都是随机的。对于 numJobs 次,我们启动 goroutine。为了等待任务完成,我们将它们添加到 wg 中。当一个任务完成时,我们从 wg 中减去一个。
  • 一旦所有任务完成,我们关闭退出通道。
  • 我们将上述部分包装在一个 goroutine 中,因为我们希望所有内容都是非阻塞的。如果我们不将其包装在 goroutine 中,wg.Wait() 将等待任务完成。这将阻塞代码,不会让底部的 for-select 语句运行。
  • 此外,因为 c1 是非缓冲通道,等待所有 goroutine 将消息发送到 c1 将导致许多消息被发送到 c1,而底部的 for-select 语句无法接收它们。这会导致死锁,因为接收者没有准备好时发送者已经发送了消息。

如何使 select 不阻塞

select 语句默认是阻塞的。我们如何使它不阻塞?很简单 - 我们只需要添加一个默认情况。

package mainimport (
    "fmt"
    "math/rand"
    "sync"
    "time"
)func main() {
    ashleyMsg := make(chan string)
    brianMsg := make(chan string)
    exit := make(chan struct{})
    rand.Seed(time.Now().UnixNano())
    var wg sync.WaitGroup    go func() {
        numJob := rand.Intn(10)
        fmt.Println("number of jobs:", numJob)
        for i := 0; i < numJob; i++ {
            wg.Add(2)
            go func() {
                defer wg.Done()
                time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
                ashleyMsg <- "hi"
            }()
            go func() {
                defer wg.Done()
                time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond)
                brianMsg <- "what's up"
            }()
        }
        wg.Wait()
        close(exit)
    }()    for {
        select {
        case res1 := <-ashleyMsg:
            fmt.Println("ashley:", res1)
        case res2 := <-brianMsg:
            fmt.Println("brian:", res2)
        case <-exit:
            fmt.Println("chat ended")
            return
        default:
            fmt.Println("...")
            time.Sleep(time.Millisecond)
        }
    }
}...
number of jobs: 4
brian: what's up
...
ashley: hi
...
...
brian: what's up
ashley: hi
ashley: hi
brian: what's up
...
...
ashley: hi
...
brian: what's up
...
chat ended

除了无聊的对话,我们可以看到默认情况的工作方式。我们可以在没有通道可以接收数据时执行某些操作。在这个例子中,我们只是打印了省略号,但你可以执行任何你想要的操作。

结论

本篇文章就到这里了!现在你可以同时监听多个通道,这在开发个人项目时可能会是一个巨大的优势。感谢阅读,下次再见。

译自:https://blog.devgenius.io/listening-to-multiple-channels-in-go-11a1c6cd3a21

版权声明:本文内容由TeHub注册用户自发贡献,版权归原作者所有,TeHub社区不拥有其著作权,亦不承担相应法律责任。 如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

点赞(0)
收藏(0)
菜鸟一只
你就是个黄焖鸡,又黄又闷又垃圾。

评论(0)

添加评论