6.5 通道
原子函数和互斥锁都能工作,但是依靠它们都不会让编写并发程序变得更简单,更不容易出错,或者更有趣。在 Go 语言里,你不仅可以使用原子函数和互斥锁来保证对共享资源的安全访问以及消除竞争状态,还可以使用通道,通过发送和接收需要共享的资源,在 goroutine 之间做同步。
当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。声明通道时,需要指定将要被共享的数据的类型。可以通过通道共享内置类型、命名类型、结构类型和引用类型的值或者指针。
在 Go 语言中需要使用内置函数 make
来创建一个通道,如代码清单 6-17 所示。
代码清单 6-17 使用 make
创建通道
// 无缓冲的整型通道
unbuffered := make(chan int)
// 有缓冲的字符串通道
buffered := make(chan string, 10)
在代码清单 6-17 中,可以看到使用内置函数 make
创建了两个通道,一个无缓冲的通道,一个有缓冲的通道。 make
的第一个参数需要是关键字 chan
,之后跟着允许通道交换的数据的类型。如果创建的是一个有缓冲的通道,之后还需要在第二个参数指定这个通道的缓冲区的大小。
向通道发送值或者指针需要用到 <-
操作符,如代码清单 6-18 所示。
代码清单 6-18 向通道发送值
// 有缓冲的字符串通道
buffered := make(chan string, 10)
// 通过通道发送一个字符串
buffered <- "Gopher"
在代码清单 6-18 里,我们创建了一个有缓冲的通道,数据类型是字符串,包含一个 10 个值的缓冲区。之后我们通过通道发送字符串 "Gopher"
。为了让另一个 goroutine 可以从该通道里接收到这个字符串,我们依旧使用 <-
操作符,但这次是一元运算符,如代码清单 6-19 所示。
代码清单 6-19 从通道里接收值
// 从通道接收一个字符串
value := <-buffered
当从通道里接收一个值或者指针时, <-
运算符在要操作的通道变量的左侧,如代码清单 6-19 所示。
通道是否带有缓冲,其行为会有一些不同。理解这个差异对决定到底应该使用还是不使用缓冲很有帮助。下面我们分别介绍一下这两种类型。
6.5.1 无缓冲的通道
无缓冲的通道 (unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。其中任意一个操作都无法离开另一个操作单独存在。
在图 6-6 里,可以看到一个例子,展示两个 goroutine 如何利用无缓冲的通道来共享一个值。在第 1 步,两个 goroutine 都到达通道,但哪个都没有开始执行发送或者接收。在第 2 步,左侧的 goroutine 将它的手伸进了通道,这模拟了向通道发送数据的行为。这时,这个 goroutine 会在通道中被锁住,直到交换完成。在第 3 步,右侧的 goroutine 将它的手放入通道,这模拟了从通道里接收数据。这个 goroutine 一样也会在通道中被锁住,直到交换完成。在第 4 步和第 5 步,进行交换,并最终,在第 6 步,两个 goroutine 都将它们的手从通道里拿出来,这模拟了被锁住的 goroutine 得到释放。两个 goroutine 现在都可以去做别的事情了。
图 6-6 使用无缓冲的通道在 goroutine 之间同步
为了讲得更清楚,让我们来看两个完整的例子。这两个例子都会使用无缓冲的通道在两个 goroutine 之间同步交换数据。
在网球比赛中,两位选手会把球在两个人之间来回传递。选手总是处在以下两种状态之一:要么在等待接球,要么将球打向对方。可以使用两个 goroutine 来模拟网球比赛,并使用无缓冲的通道来模拟球的来回,如代码清单 6-20 所示。
代码清单 6-20 listing20.go
01 // 这个示例程序展示如何用无缓冲的通道来模拟
02 // 2 个 goroutine 间的网球比赛
03 package main
04
05 import (
06 "fmt"
07 "math/rand"
08 "sync"
09 "time"
10 )
11
12 // wg 用来等待程序结束
13 var wg sync.WaitGroup
14
15 func init() {
16 rand.Seed(time.Now().UnixNano())
17 }
18
19 // main 是所有 Go 程序的入口
20 func main() {
21 // 创建一个无缓冲的通道
22 court := make(chan int)
23
24 // 计数加 2,表示要等待两个 goroutine
25 wg.Add(2)
26
27 // 启动两个选手
28 go player("Nadal", court)
29 go player("Djokovic", court)
30
31 // 发球
32 court <- 1
33
34 // 等待游戏结束
35 wg.Wait()
36 }
37
38 // player 模拟一个选手在打网球
39 func player(name string, court chan int) {
40 // 在函数退出时调用 Done 来通知 main 函数工作已经完成
41 defer wg.Done()
42
43 for {
44 // 等待球被击打过来
45 ball, ok := <-court
46 if !ok {
47 // 如果通道被关闭,我们就赢了
48 fmt.Printf("Player %s Won\n", name)
49 return
50 }
51
52 // 选随机数,然后用这个数来判断我们是否丢球
53 n := rand.Intn(100)
54 if n%13 == 0 {
55 fmt.Printf("Player %s Missed\n", name)
56
57 // 关闭通道,表示我们输了
58 close(court)
59 return
60 }
61
62 // 显示击球数,并将击球数加 1
63 fmt.Printf("Player %s Hit %d\n", name, ball)
64 ball++
65
66 // 将球打向对手
67 court <- ball
68 }
69 }
运行这个程序会得到代码清单 6-21 所示的输出。
代码清单 6-21 listing20.go 的输出
Player Nadal Hit 1
Player Djokovic Hit 2
Player Nadal Hit 3
Player Djokovic Missed
Player Nadal Won
在 main
函数的第 22 行,创建了一个 int
类型的无缓冲的通道,让两个 goroutine 在击球时能够互相同步。之后在第 28 行和第 29 行,创建了参与比赛的两个 goroutine。在这个时候,两个 goroutine 都阻塞住等待击球。在第 32 行,将球发到通道里,程序开始执行这个比赛,直到某个 goroutine 输掉比赛。
在 player
函数里,在第 43 行可以找到一个无限循环的 for
语句。在这个循环里,是玩游戏的过程。在第 45 行,goroutine 从通道接收数据,用来表示等待接球。这个接收动作会锁住 goroutine,直到有数据发送到通道里。通道的接收动作返回时,第 46 行会检测 ok
标志是否为 false
。如果这个值是 false
,表示通道已经被关闭,游戏结束。在第 53 行到第 60 行,会产生一个随机数,用来决定 goroutine 是否击中了球。如果击中了球,在第 64 行 ball
的值会递增 1,并在第 67 行,将 ball
作为球重新放入通道,发送给另一位选手。在这个时刻,两个 goroutine 都会被锁住,直到交换完成。最终,某个 goroutine 没有打中球,在第 58 行关闭通道。之后两个 goroutine 都会返回,通过 defer
声明的 Done
会被执行,程序终止。
另一个例子,用不同的模式,使用无缓冲的通道,在 goroutine 之间同步数据,来模拟接力比赛。在接力比赛里,4 个跑步者围绕赛道轮流跑(如代码清单 6-22 所示)。第二个、第三个和第四个跑步者要接到前一位跑步者的接力棒后才能起跑。比赛中最重要的部分是要传递接力棒,要求同步传递。在同步接力棒的时候,参与接力的两个跑步者必须在同一时刻准备好交接。
代码清单 6-22 listing22.go
01 // 这个示例程序展示如何用无缓冲的通道来模拟
02 // 4 个 goroutine 间的接力比赛
03 package main
04
05 import (
06 "fmt"
07 "sync"
08 "time"
09 )
10
11 // wg 用来等待程序结束
12 var wg sync.WaitGroup
13
14 // main 是所有 Go 程序的入口
15 func main() {
16 // 创建一个无缓冲的通道
17 baton := make(chan int)
18
19 // 为最后一位跑步者将计数加 1
20 wg.Add(1)
21
22 // 第一位跑步者持有接力棒
23 go Runner(baton)
24
25 // 开始比赛
26 baton <- 1
27
28 // 等待比赛结束
29 wg.Wait()
30 }
31
32 // Runner 模拟接力比赛中的一位跑步者
33 func Runner(baton chan int) {
34 var newRunner int
35
36 // 等待接力棒
37 runner := <-baton
38
39 // 开始绕着跑道跑步
40 fmt.Printf("Runner %d Running With Baton\n", runner)
41
42 // 创建下一位跑步者
43 if runner != 4 {
44 newRunner = runner + 1
45 fmt.Printf("Runner %d To The Line\n", newRunner)
46 go Runner(baton)
47 }
48
49 // 围绕跑道跑
50 time.Sleep(100 * time.Millisecond)
51
52 // 比赛结束了吗?
53 if runner == 4 {
54 fmt.Printf("Runner %d Finished, Race Over\n", runner)
55 wg.Done()
56 return
57 }
58
59 // 将接力棒交给下一位跑步者
60 fmt.Printf("Runner %d Exchange With Runner %d\n",
61 runner,
62 newRunner)
63
64 baton <- newRunner
65 }
运行这个程序会得到代码清单 6-23 所示的输出。
代码清单 6-23 listing22.go 的输出
Runner 1 Running With Baton
Runner 1 To The Line
Runner 1 Exchange With Runner 2
Runner 2 Running With Baton
Runner 2 To The Line
Runner 2 Exchange With Runner 3
Runner 3 Running With Baton
Runner 3 To The Line
Runner 3 Exchange With Runner 4
Runner 4 Running With Baton
Runner 4 Finished, Race Over
在 main
函数的第 17 行,创建了一个无缓冲的 int
类型的通道 baton
,用来同步传递接力棒。在第 20 行,我们给 WaitGroup
加 1,这样 main
函数就会等最后一位跑步者跑步结束。在第 23 行创建了一个 goroutine,用来表示第一位跑步者来到跑道。之后在第 26 行,将接力棒交给这个跑步者,比赛开始。最终,在第 29 行, main
函数阻塞在 WaitGroup
,等候最后一位跑步者完成比赛。
在 Runner
goroutine 里,可以看到接力棒 baton
是如何在跑步者之间传递的。在第 37 行,goroutine 对 baton
通道执行接收操作,表示等候接力棒。一旦接力棒传了进来,在第 46 行就会创建一位新跑步者,准备接力下一棒,直到 goroutine 是第四个跑步者。在第 50 行,跑步者围绕跑道跑 100 ms。在第 55 行,如果第四个跑步者完成了比赛,就调用 Done
,将 WaitGroup
减 1,之后 goroutine 返回。如果这个 goroutine 不是第四个跑步者,那么在第 64 行,接力棒会交到下一个已经在等待的跑步者手上。在这个时候,goroutine 会被锁住,直到交接完成。
在这两个例子里,我们使用无缓冲的通道同步 goroutine,模拟了网球和接力赛。代码的流程与这两个活动在真实世界中的流程完全一样,这样的代码很容易读懂。现在知道了无缓冲的通道是如何工作的,接下来我们会学习有缓冲的通道的工作方法。
6.5.2 有缓冲的通道
有缓冲的通道 (buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine 会在同一时间进行数据交换;有缓冲的通道没有这种保证。
在图 6-7 中可以看到两个 goroutine 分别向有缓冲的通道里增加一个值和从有缓冲的通道里移除一个值。在第 1 步,右侧的 goroutine 正在从通道接收一个值。在第 2 步,右侧的这个 goroutine 独立完成了接收值的动作,而左侧的 goroutine 正在发送一个新值到通道里。在第 3 步,左侧的 goroutine 还在向通道发送新值,而右侧的 goroutine 正在从通道接收另外一个值。这个步骤里的两个操作既不是同步的,也不会互相阻塞。最后,在第 4 步,所有的发送和接收都完成,而通道里还有几个值,也有一些空间可以存更多的值。
图 6-7 使用有缓冲的通道在 goroutine 之间同步数据
让我们看一个使用有缓冲的通道的例子,这个例子管理一组 goroutine 来接收并完成工作。有缓冲的通道提供了一种清晰而直观的方式来实现这个功能,如代码清单 6-24 所示。
代码清单 6-24 listing24.go
01 // 这个示例程序展示如何使用
02 // 有缓冲的通道和固定数目的
03 // goroutine 来处理一堆工作
04 package main
05
06 import (
07 "fmt"
08 "math/rand"
09 "sync"
10 "time"
11 )
12
13 const (
14 numberGoroutines = 4 // 要使用的 goroutine 的数量
15 taskLoad = 10 // 要处理的工作的数量
16 )
17
18 // wg 用来等待程序完成
19 var wg sync.WaitGroup
20
21 // init 初始化包,Go 语言运行时会在其他代码执行之前
22 // 优先执行这个函数
23 func init() {
24 // 初始化随机数种子
25 rand.Seed(time.Now().Unix())
26 }
27
28 // main 是所有 Go 程序的入口
29 func main() {
30 // 创建一个有缓冲的通道来管理工作
31 tasks := make(chan string, taskLoad)
32
33 // 启动 goroutine 来处理工作
34 wg.Add(numberGoroutines)
35 for gr := 1; gr <= numberGoroutines; gr++ {
36 go worker(tasks, gr)
37 }
38
39 // 增加一组要完成的工作
40 for post := 1; post <= taskLoad; post++ {
41 tasks <- fmt.Sprintf("Task : %d", post)
42 }
43
44 // 当所有工作都处理完时关闭通道
45 // 以便所有 goroutine 退出
46 close(tasks)
47
48 // 等待所有工作完成
49 wg.Wait()
50 }
51
52 // worker 作为 goroutine 启动来处理
53 // 从有缓冲的通道传入的工作
54 func worker(tasks chan string, worker int) {
55 // 通知函数已经返回
56 defer wg.Done()
57
58 for {
59 // 等待分配工作
60 task, ok := <-tasks
61 if !ok {
62 // 这意味着通道已经空了,并且已被关闭
63 fmt.Printf("Worker: %d : Shutting Down\n", worker)
64 return
65 }
66
67 // 显示我们开始工作了
68 fmt.Printf("Worker: %d : Started %s\n", worker, task)
69
70 // 随机等一段时间来模拟工作
71 sleep := rand.Int63n(100)
72 time.Sleep(time.Duration(sleep) * time.Millisecond)
73
74 // 显示我们完成了工作
75 fmt.Printf("Worker: %d : Completed %s\n", worker, task)
76 }
77 }
运行这个程序会得到代码清单 6-25 所示的输出。
代码清单 6-25 listing24.go 的输出
Worker: 1 : Started Task : 1
Worker: 2 : Started Task : 2
Worker: 3 : Started Task : 3
Worker: 4 : Started Task : 4
Worker: 1 : Completed Task : 1
Worker: 1 : Started Task : 5
Worker: 4 : Completed Task : 4
Worker: 4 : Started Task : 6
Worker: 1 : Completed Task : 5
Worker: 1 : Started Task : 7
Worker: 2 : Completed Task : 2
Worker: 2 : Started Task : 8
Worker: 3 : Completed Task : 3
Worker: 3 : Started Task : 9
Worker: 1 : Completed Task : 7
Worker: 1 : Started Task : 10
Worker: 4 : Completed Task : 6
Worker: 4 : Shutting Down
Worker: 3 : Completed Task : 9
Worker: 3 : Shutting Down
Worker: 2 : Completed Task : 8
Worker: 2 : Shutting Down
Worker: 1 : Completed Task : 10
Worker: 1 : Shutting Down
由于程序和 Go 语言的调度器带有随机成分,这个程序每次执行得到的输出会不一样。不过,通过有缓冲的通道,使用所有 4 个 goroutine 来完成工作,这个流程不会变。从输出可以看到每个 goroutine 是如何接收从通道里分发的工作。
在 main
函数的第 31 行,创建了一个 string
类型的有缓冲的通道,缓冲的容量是 10。在第 34 行,给 WaitGroup
赋值为 4,代表创建了 4 个工作 goroutine。之后在第 35 行到第 37 行,创建了 4 个 goroutine,并传入用来接收工作的通道。在第 40 行到第 42 行,将 10 个字符串发送到通道,模拟发给 goroutine 的工作。一旦最后一个字符串发送到通道,通道就会在第 46 行关闭,而 main
函数就会在第 49 行等待所有工作的完成。
第 46 行中关闭通道的代码非常重要。当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。如果在获取通道时还加入了可选的标志,就能得到通道的状态信息。
在 worker
函数里,可以在第 58 行看到一个无限的 for
循环。在这个循环里,会处理所有接收到的工作。每个 goroutine 都会在第 60 行阻塞,等待从通道里接收新的工作。一旦接收到返回,就会检查 ok
标志,看通道是否已经清空而且关闭。如果 ok
的值是 false
,goroutine 就会终止,并调用第 56 行通过 defer
声明的 Done
函数,通知 main
有工作结束。
如果 ok
标志是 true
,表示接收到的值是有效的。第 71 行和第 72 行模拟了处理的工作。一旦工作完成,goroutine 会再次阻塞在第 60 行从通道获取数据的语句。一旦通道被关闭,这个从通道获取数据的语句会立刻返回,goroutine 也会终止自己。
有缓冲的通道和无缓冲的通道的例子很好地展示了如何编写使用通道的代码。在下一章,我们会介绍真实世界里的一些可能会在工程里用到的并发模式。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论