通道是一种类型化的、线程安全的队列。通道允许不同的goroutine相互通信。
使用通道操作符发送和接收值:
ch <- v // Send v to channel ch.
v := <-ch // Receive from ch, and
// assign value to v.
数据沿箭头方向流动。
与map和slice一样,通道必须在使用之前创建。它们也使用相同的make
关键字:
ch := make(chan int)
默认情况下,发送和接收操作是_阻塞_的(这意味着你的代码会安全等待),直到另一端准备好。这使得goroutine能够在没有显式锁或条件变量的情况下进行同步。
死锁
死锁是指一组goroutine被阻塞,因此它们中的任何一个都无法继续。这是并发编程中的常见问题。
运行程序。你会发现它死锁并且永远不会退出。sendEmails
函数正在尝试在通道上发送值,但没有其他的goroutine在运行,可以从通道中接收值。
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan bool)
go sendEmails(c)
// don't touch below this line
isOld := <-c
fmt.Println("email 1 is old:", isOld)
isOld = <-c
fmt.Println("email 2 is old:", isOld)
isOld = <-c
fmt.Println("email 3 is old:", isOld)
}
func isOld(e email, c chan bool) {
if e.date.Before(time.Date(2020, 0, 0, 0, 0, 0, 0, time.UTC)) {
c <- true
}
c <- false
}
type email struct {
body string
date time.Time
}
func sendEmails(c chan bool) {
emails := []email{
{
body: "Are you going to make it?",
date: time.Date(2019, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
body: "I need a break",
date: time.Date(2021, 0, 0, 0, 0, 0, 0, time.UTC),
},
{
body: "What we're you thinking?",
date: time.Date(2022, 0, 0, 0, 0, 0, 0, time.UTC),
},
}
for _, email := range emails {
isOld(email, c)
}
close(c)
}
// output:
email 1 is old: true
email 2 is old: false
email 3 is old: false
通道继续:
在Go程序中,空结构体经常被用作“令牌”。在这个上下文中,令牌是一个一元运算值。换句话说,我们不关心通过通道传递的内容是什么。我们关心的是_何时_和_是否_传递了它。
请注意,我们可以使用以下语法阻塞并等待直到通道中发送了某些内容
<-ch
这将阻塞,直到它从通道中弹出一个单独的项目,然后继续,丢弃该项目。
package main
import (
"fmt"
"time"
)
func main() {
ch := getDatabasesChannel()
<-ch
<-ch
<-ch
<-ch
time.Sleep(1 * time.Second)
fmt.Println("mailio server ready")
}
// don't touch below this line
func getDatabasesChannel() chan struct{} {
ch := make(chan struct{})
go func() {
ch <- struct{}{}
fmt.Println("first db online")
ch <- struct{}{}
fmt.Println("second db online")
ch <- struct{}{}
fmt.Println("third db online")
ch <- struct{}{}
fmt.Println("fourth db online")
}()
return ch
}
//output:
first db online
second db online
third db online
fourth db online
mailio server ready
缓冲通道
通道可以选择性地缓冲。你可以通过将缓冲区长度作为make()
的第二个参数提供来创建缓冲通道:
ch := make(chan int, 100)
仅当缓冲区_满_时,发送才会被阻塞。
仅当缓冲区_为空_时,接收才会被阻塞。
package main
import "fmt"
func main() {
emailsToSend := make(chan string, 2)
emailsToSend <- "Hello John, tell Kathy I said hi"
emailsToSend <- "Whazzup bruther"
fmt.Println(<-emailsToSend)
fmt.Println(<-emailsToSend)
}
//output:
Hello John, tell Kathy I said hi
Whazzup bruther
在Go中关闭通道
发送者可以显式关闭通道:
close(ch)
类似于在map
中访问数据时的ok
值,当从通道接收数据时,接收者可以检查ok
值以测试通道是否已关闭。
v, ok := <-ch
如果通道为空且已关闭,则ok为false
。
在关闭的通道上发送将导致恐慌。
关闭不是必要的操作。将通道保持打开没有问题,如果它们未被使用,则仍将被垃圾收集。通常关闭通道是为了明确告诉接收者,不会再有其他数据过来了。
例如:
package main
import (
"time"
"fmt"
)
func main() {
ch := make(chan int)
go sendReports(ch)
for count := range ch {
fmt.Printf("%d emails sent in the last second\n", count)
}
}
// don't touch below this line
func sendReports(ch chan int) {
time.Sleep(time.Second)
ch <- 5
time.Sleep(time.Second)
ch <- 6
time.Sleep(time.Second)
ch <- 5
time.Sleep(time.Second)
ch <- 109
time.Sleep(time.Second)
ch <- 3
time.Sleep(time.Second)
ch <- 17
close(ch)
}
//output:
5 emails sent in the last second
6 emails sent in the last second
5 emails sent in the last second
109 emails sent in the last second
3 emails sent in the last second
17 emails sent in the last second
范围
与slice和map类似,通道可以范围。
for item := range ch{
}
上面的代码将在每次迭代时从通道中接收值(如果没有新内容,则在每次迭代时阻塞),并仅在通道关闭时退出。
package main
import "fmt"
func main() {
ch := make(chan int, 10)
go fibonacci(cap(ch), ch)
for num := range ch{
fmt.Println(num)
}
}
func fibonacci(n int, ch chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
ch <- x
x, y = y, x+y
}
close(ch)
}
//output:
0
1
1
2
3
5
8
13
21
34
Select
有时,我们有一个goroutine监听多个通道,并希望按照数据到达的顺序处理数据,无论哪个通道先发送数据。
select {
case i, ok := <- chInts:
fmt.Println(i)
case s, ok := <- chStrings:
fmt.Println(s)
}
如果多个case同时准备执行,则随机选择一个。还要记住,示例中的ok
变量是指通道是否已关闭。
package main
import (
"time"
"fmt"
)
func main() {
chEmails := make(chan string)
chSms := make(chan string)
go smsReady(chSms)
go emailReady(chEmails)
for {
select {
case msg, ok := <-chSms:
if !ok {
fmt.Println("Sms channel closed")
return
}
fmt.Printf("sms: sending sms: %s\n", msg)
case msg, ok := <-chEmails:
if !ok {
fmt.Println("Email channel closed")
return
}
fmt.Printf("email: sending email: %s\n", msg)
}
}
}
// don't touch below this line
func smsReady(ch chan string) {
time.Sleep(time.Millisecond)
ch <- "hi friend"
time.Sleep(time.Millisecond * 100)
ch <- "What's going on?"
time.Sleep(time.Second)
ch <- "Will you make your appointment?"
time.Sleep(time.Millisecond * 350)
ch <- "Let's be friends"
close(ch)
}
func emailReady(ch chan string) {
time.Sleep(time.Millisecond * 503)
ch <- "Welcome to the business"
time.Sleep(time.Millisecond * 43)
ch <- "I'll pay you to be my friend"
time.Sleep(time.Second)
ch <- "How's the family?"
time.Sleep(time.Millisecond * 440)
ch <- "Want to go out tonight?"
close(ch)
}
//output:
sms: sending sms: hi friend
sms: sending sms: What's going on?
email: sending email: Welcome to the business
email: sending email: I'll pay you to be my friend
sms: sending sms: Will you make your appointment?
sms: sending sms: Let's be friends
Sms channel closed
选择默认情况
在选择语句中,如果没有其他case准备好,则默认情况立即执行。
select {
case i := <-cH:
// use i
default:
// receiving from ch would block
}
time.Ticker
time.Tick()是一个标准库函数,它返回一个通道,以在给定时间间隔上发送值。
time.After()在持续时间过去后一次发送一个值。
time.Sleep()阻塞当前goroutine指定的时间量。
package main
import (
"fmt"
"time"
)
func main() {
snapshotInterval := time.Tick(800 * time.Millisecond)
saveAfter := time.After(2800 * time.Millisecond)
for {
select {
case <-snapshotInterval:
fmt.Println("taking backup snapshot...")
case <-saveAfter:
fmt.Println("all backups saved!")
return
default:
fmt.Println("nothing to do, waiting...")
time.Sleep(500 * time.Millisecond)
}
}
}
//output:
nothing to do, waiting...
nothing to do, waiting...
taking backup snapshot...
nothing to do, waiting...
nothing to do, waiting...
taking backup snapshot...
nothing to do, waiting...
taking backup snapshot...
nothing to do, waiting...
all backups saved!
超时
通常情况下,我们希望等待某些内容通过通道传输。但是,如果没有内容传输,则希望退出并打印错误或其他内容。
time.After()非常适合实现此目的。你可以直接在选择语句中使用它。
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
go streamTweets(ch)
for {
select {
case <-ch:
fmt.Println("got a tweet")
case <-time.After(5 * time.Second):
fmt.Println("too long since last tweet, disconnecting")
return
}
}
}
// don't touch below this line
func streamTweets(ch chan struct{}) {
t := time.Millisecond * 670
for {
ch <- struct{}{}
time.Sleep(t)
t *= 2
}
}
// output:
got a tweet
got a tweet
got a tweet
got a tweet
too long since last tweet, disconnecting
解决main
中的错误的示例,该错误防止了pinger
和ponger
函数按预期运行。
package main
import (
"fmt"
"time"
)
func main() {
pings := make(chan struct{})
pongs := make(chan struct{})
go ponger(pings, pongs)
go pinger(pings, pongs)
// add a for loop that reads from the pongs channel and exits the main function when it is closed.
for range pongs {
}
}
// don't touch below this line
func pinger(pings, pongs chan struct{}) {
go func() {
i := 0
for range pongs {
fmt.Println("pong", i, "got")
i++
}
fmt.Println("pongs done")
}()
sleepTime := 200 * time.Millisecond
for i := 0; i < 4; i++ {
fmt.Println("ping", i, "sent")
pings <- struct{}{}
time.Sleep(sleepTime)
sleepTime *= 2
}
close(pings)
time.Sleep(time.Second)
}
func ponger(pings, pongs chan struct{}) {
i := 0
for range pings {
fmt.Println("ping", i, "got", "pong", i, "sent")
pongs <- struct{}{}
i++
}
fmt.Println("pings done")
close(pongs)
}
// output:
ping 0 sent
ping 0 got pong 0 sent
ping 1 sent
ping 1 got pong 1 sent
pong 0 got
ping 2 sent
ping 2 got pong 2 sent
ping 3 sent
ping 3 got pong 3 sent
pong 1 got
pings done
你如何解决上述主要代码的问题?如果你有更好的解决方案,请留下评论。
另一个例子:
package main
import (
"fmt"
"time"
)
func main() {
go calculateDBUsage()
// don't touch above this line
time.Sleep(time.Second)
}
// don't touch below this line
func calculateDBUsage() {
total := 0
for i := 0; i < 10000; i++ {
total += i
}
fmt.Println(total, "database rows accessed today")
}
在上面的代码中,calculateDBUsage
函数执行一个耗时的操作,即对从0到9999的数字进行求和,然后将结果打印到控制台。如果在main()
函数中同步调用此函数,它将阻塞程序的执行,直到操作完成。
通过使用go
关键字将calculateDBUsage
函数作为goroutine调用,main()
函数可以继续并发执行其他任务,而无需等待calculateDBUsage
完成。在这种情况下,main()
只是睡眠一秒钟,然后退出。
总之,在main()
中使用goroutine的目的是在不阻塞程序的其余部分的情况下并发执行长时间运行的函数。
通道的高级主题:
大多数新的Go程序员很快就会理解通道作为值队列的概念,并且对通道操作可能在满或空时阻塞的概念感到舒适。
我将探讨通道的四个不太常见的属性:
- 发送到nil通道会永远阻塞
- 从nil通道接收会永远阻塞
- 向关闭的通道发送会导致panic
- 从关闭的通道接收会立即返回零值
发送到nil通道会永远阻塞
对于新手来说有点惊讶的第一种情况是在nil
通道上发送会永远阻塞。
这个例子程序将在第5行死锁,因为未初始化通道的零值是nil
。
package main
func main() {
var c chan string
c <- "let's get started" // deadlock
}
// fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send (nil chan)]:
main.main()
/tmp/sandbox1491956371/prog.go:5 +0x25
Program exited.
使用以下Go Playground进行检查:Go Playground - Go编程语言
从一个空通道接收数据会一直阻塞
同样地,从一个 nil
通道接收数据也会使接收方一直阻塞。
package main
import "fmt"
func main() {
var c chan string
fmt.Println(<-c) // deadlock
}
http://play.golang.org/p/tjwSfLi7x0
那么为什么会这样呢?下面是一个可能的解释:
- 通道缓冲区的大小不是其 类型 声明的一部分,所以必须作为通道的值的一部分。
- 如果通道未被初始化,则其缓冲区大小将为零。
- 如果通道缓冲区的大小为零,则通道是 无缓冲 的。
- 如果通道是无缓冲的,则发送方将一直阻塞,直到另一个goroutine准备好接收。
- 如果通道是
nil
,则发送方和接收方彼此没有引用;它们都被阻塞,等待独立的通道,并且永远不会解除阻塞。
向一个已关闭的通道发送数据会导致panic
以下程序可能会导致panic,因为首个goroutine到达10时会关闭通道,而它的兄弟们还没有时间完成发送它们的值。
package main
import "fmt"
func main() {
var c = make(chan int, 100)
for i := 0; i < 10; i++ {
go func() {
for j := 0; j < 10; j++ {
c <- j
}
close(c)
}()
}
for i := range c {
fmt.Println(i)
}
}
http://play.golang.org/p/hxUVqy7Qj-
那么为什么没有一个版本的 close()
函数可以让你检查一个通道是否已关闭呢?
if !isClosed(c) {
// c isn't closed, send the value
c <- v
}
但是这个函数将会存在固有的竞争条件。在代码到达 c <- v
之前,有人可能会关闭通道。
从一个已关闭的通道接收数据会立即返回零值
最后一种情况与前一种情况相反。一旦通道被关闭,并且其缓冲区中的所有值都被取出,通道将始终立即返回零值。
package main
import "fmt"
func main() {
c := make(chan int, 3)
c <- 1
c <- 2
c <- 3
close(c)
for i := 0; i < 4; i++ {
fmt.Printf("%d ", <-c) // prints 1 2 3 0
}
}
http://play.golang.org/p/ajtVMsu8EO
解决此问题的正确方法是使用 for range
循环。
for v := range c {
// do something with v
}
for v, ok := <- c; ok ; v, ok = <- c {
// do something with v
}
这两个语句在功能上是等效的,并演示了 for range
在内部是如何工作的。
译自:https://medium.com/@lordmoma/go-channels-the-ultimate-guide-3a2552a2a458
评论(0)