返回介绍

7.3 work

发布于 2024-10-11 12:39:09 字数 8056 浏览 0 评论 0 收藏 0

work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执行。无缓冲的通道保证两个 goroutine 之间的数据交换。这种使用无缓冲的通道的方法允许使用者知道什么时候 goroutine 池正在执行工作,而且如果池里的所有 goroutine 都忙,无法接受新的工作的时候,也能及时通过通道来通知调用者。使用无缓冲的通道不会有工作在队列里丢失或者卡住,所有工作都会被处理。

让我们来看一下 work 包里的 work.go 代码文件,如代码清单 7-28 所示。

代码清单 7-28  work /work.go

01 // Jason Waldrip 协助完成了这个示例
02 // work 包管理一个 goroutine 池来完成工作
03 package work
04
05 import "sync"
06
07 // Worker 必须满足接口类型,
08 // 才能使用工作池
09 type Worker interface {
10   Task()
11 }
12
13 // Pool 提供一个 goroutine 池,这个池可以完成
14 // 任何已提交的 Worker 任务
15 type Pool struct {
16   work chan Worker
17   wg  sync.WaitGroup
18 }
19
20 // New 创建一个新工作池
21 func New(maxGoroutines int) *Pool {
22   p := Pool{
23     tasks: make(chan Worker),
24   }
25
26   p.wg.Add(maxGoroutines)
27   for i := 0; i < maxGoroutines; i++ {
28     go func() {
29       for w := range p.work {
30         w.Task()
31       }
32       p.wg.Done()
33     }()
34   }
35
36   return &p
37 }
38
39 // Run 提交工作到工作池
40 func (p *Pool) Run(w Worker) {
41   p.work <- w
42 }
43
44 // Shutdown 等待所有 goroutine 停止工作
45 func (p *Pool) Shutdown() {
46   close(p.work)
47   p.wg.Wait()
48 }

代码清单 7-28 中展示的 work 包一开始声明了名为 Worker 的接口和名为 Pool 的结构,如代码清单 7-29 所示。

代码清单 7-29  work /work.go:第 07 行到第 18 行

07 // Worker 必须满足接口类型, 
08 // 才能使用工作池
09 type Worker interface {
10   Task()
11 }
12
13 // Pool 提供一个 goroutine 池,这个池可以完成
14 // 任何已提交的 Worker 任务
15 type Pool struct {
16   work chan Worker
17   wg  sync.WaitGroup
18 }

代码清单 7-29 的第 09 行中的 Worker 接口声明了一个名为 Task 的方法。在第 15 行,声明了名为 Pool 的结构,这个结构类型实现了 goroutine 池,并实现了一些处理工作的方法。这个结构类型声明了两个字段,一个名为 work (一个 Worker 接口类型的通道),另一个名为 wgsync.WaitGroup 类型。

接下来,让我们来看一下 work 包的工厂函数,如代码清单 7-30 所示。

代码清单 7-30  work /work.go:第 20 行到第 37 行

20 // New 创建一个新工作池
21 func New(maxGoroutines int) *Pool {
22   p := Pool{
23     tasks: make(chan Worker),
24   }
25
26   p.wg.Add(maxGoroutines)
27   for i := 0; i < maxGoroutines; i++ {
28     go func() {
29       for w := range p.work {
30         w.Task()
31       }
32       p.wg.Done()
33     }()
34   }
35
36   return &p
37 }

代码清单 7-30 展示了 New 函数,这个函数使用固定数量的 goroutine 来创建一个工作池。goroutine 的数量作为参数传给 New 函数。在第 22 行,创建了一个 Pool 类型的值,并使用无缓冲的通道来初始化 work 字段。

之后,在第 26 行,初始化 WaitGroup 需要等待的数量,并在第 27 行到第 34 行,创建了同样数量的 goroutine。这些 goroutine 只接收 Worker 类型的接口值,并调用这个值的 Task 方法,如代码清单 7-31 所示。

代码清单 7-31  work /work.go:第 28 行到第 33 行

28     go func() {
29       for w := range p.work {
30         w.Task()
31       }
32       p.wg.Done()
33     }()

代码清单 7-31 里的 for range 循环会一直阻塞,直到从 work 通道收到一个 Worker 接口值。如果收到一个值,就会执行这个值的 Task 方法。一旦 work 通道被关闭, for range 循环就会结束,并调用 WaitGroupDone 方法。然后 goroutine 终止。

现在我们可以创建一个等待并执行工作的 goroutine 池了。让我们看一下如何向池里提交工作,如代码清单 7-32 所示。

代码清单 7-32  work /work.go:第 39 行到第 42 行

39 // Run 提交工作到工作池
40 func (p *Pool) Run(w Worker) {
41   p.work <- w
42 }

代码清单 7-32 展示了 Run 方法。这个方法可以向池里提交工作。该方法接受一个 Worker 类型的接口值作为参数,并将这个值通过 work 通道发送。由于 work 通道是一个无缓冲的通道,调用者必须等待工作池里的某个 goroutine 接收到这个值才会返回。这正是我们想要的,这样可以保证调用的 Run 返回时,提交的工作已经开始执行。

在某个时间点,需要关闭工作池。这是 Shutdown 方法所做的事情,如代码清单 7-33 所示。

代码清单 7-33  work /work.go:第 44 行到第 48 行

44 // Shutdown 等待所有 goroutine 停止工作
45 func (p *Pool) Shutdown() {
46   close(p.work)
47   p.wg.Wait()
48 }

代码清单 7-33 中的 Shutdown 方法做了两件事,首先,它关闭了 work 通道,这会导致所有池里的 goroutine 停止工作,并调用 WaitGroupDone 方法;然后, Shutdown 方法调用 WaitGroupWait 方法,这会让 Shutdown 方法等待所有 goroutine 终止。

我们看了 work 包的代码,并了解了它是如何工作的,接下来让我们看一下 main.go 源代码文件中的测试程序,如代码清单 7-34 所示。

代码清单 7-34  work /main/main.go

01 // 这个示例程序展示如何使用 work 包
02 // 创建一个 goroutine 池并完成工作
03 package main
04
05 import (
06   "log"
07   "sync"
08   "time"
09
10   "github.com/goinaction/code/chapter7/patterns/work"
11 )
12
13 // names 提供了一组用来显示的名字
14 var names = []string{
15   "steve",
16   "bob",
17   "mary",
18   "therese",
19   "jason",
20 }
21
22 // namePrinter 使用特定方式打印名字
23 type namePrinter struct {
24   name string
25 }
26
27 // Task 实现 Worker 接口
28 func (m *namePrinter) Task() {
29   log.Println(m.name)
30   time.Sleep(time.Second)
31 }
32
33 // main 是所有 Go 程序的入口
34 func main() {
35   // 使用两个 goroutine 来创建工作池
36   p := work.New(2)
37
38   var wg sync.WaitGroup
39   wg.Add(100 * len(names))
40
41   for i := 0; i < 100; i++ {
42     // 迭代 names 切片
43     for _, name := range names {
44       // 创建一个 namePrinter 并提供
45       // 指定的名字
46       np := namePrinter{
47         name: name,
48       }
49
50       go func() {
51         // 将任务提交执行。当 Run 返回时
52         // 我们就知道任务已经处理完成
53         p.Run(&np)
54         wg.Done()
55       }()
56     }
57   }
58
59   wg.Wait()
60
61   // 让工作池停止工作,等待所有现有的
62   // 工作完成
63   p.Shutdown()
64 }

代码清单 7-34 展示了使用 work 包来完成名字显示工作的测试程序。这段代码一开始在第 14 行声明了名为 names 的包级的变量,这个变量被声明为一个字符串切片。这个切片使用 5 个名字进行了初始化。然后声明了名为 namePrinter 的类型,如代码清单 7-35 所示。

代码清单 7-35  work /main/main.go:第 22 行到第 31 行

22 // namePrinter 使用特定方式打印名字
23 type namePrinter struct {
24   name string
25 }
26
27 // Task 实现 Worker 接口
28 func (m *namePrinter) Task() {
29   log.Println(m.name)
30   time.Sleep(time.Second)
31 }

在代码清单 7-35 的第 23 行,声明了 namePrinter 类型,接着是这个类型对 Worker 接口的实现。这个类型的工作任务是在显示器上显示名字。这个类型只包含一个字段,即 name ,它包含要显示的名字。 Worker 接口的实现 Task 函数用 log.Println 函数来显示名字,之后等待 1 秒再退出。等待这 1 秒只是为了让测试程序运行的速度慢一些,以便看到并发的效果。

有了 Worker 接口的实现,我们就可以看一下 main 函数内部的代码了,如代码清单 7-36 所示。

代码清单 7-36  work /main/main.go:第 33 行到第 64 行

33 // main 是所有 Go 程序的入口
34 func main() {
35   // 使用两个 goroutine 来创建工作池
36   p := work.New(2)
37
38   var wg sync.WaitGroup
39   wg.Add(100 * len(names))
40
41   for i := 0; i < 100; i++ {
42     // 迭代 names 切片
43     for _, name := range names {
44       // 创建一个 namePrinter 并提供
45       // 指定的名字
46       np := namePrinter{
47         name: name,
48       }
49
50       go func() {
51         // 将任务提交执行。当 Run 返回时
52         // 我们就知道任务已经处理完成
53         p.Run(&np)
54         wg.Done()
55       }()
56     }
57   }
58
59   wg.Wait()
60
61   // 让工作池停止工作,等待所有现有的
62   // 工作完成
63   p.Shutdown()
64 }

在代码清单 7-36 第 36 行,调用 work 包里的 New 函数创建一个工作池。这个调用传入的参数是 2,表示这个工作池只会包含两个执行任务的 goroutine。在第 38 行和第 39 行,声明了一个 WaitGroup ,并初始化为要执行任务的 goroutine 数。在这个例子里, names 切片里的每个名字都会创建 100 个 goroutine 来提交任务。这样就会有一堆 goroutine 互相竞争,将任务提交到池里。

在第 41 行到第 43 行,内部和外部的 for 循环用来声明并创建所有的 goroutine。每次内部循环都会创建一个 namePrinter 类型的值,并提供一个用来打印的名字。之后,在第 50 行,声明了一个匿名函数,并创建一个 goroutine 执行这个函数。这个 goroutine 会调用工作池的 Run 方法,将 namePrinter 的值提交到池里。一旦工作池里的 goroutine 接收到这个值, Run 方法就会返回。这也会导致 goroutine 将 WaitGroup 的计数递减,并终止 goroutine。

一旦所有的 goroutine 都创建完成, main 函数就会调用 WaitGroupWait 方法。这个调用会等待所有创建的 goroutine 提交它们的工作。一旦 Wait 返回,就会调用工作池的 Shutdown 方法来关闭工作池。 Shutdown 方法直到所有的工作都做完才会返回。在这个例子里,最多只会等待两个工作的完成。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文