7.2 pool
本章会介绍 pool
包 ① 。这个包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的 goroutine 之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非常有用。如果 goroutine 需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。
让我们看一下 pool
包里的 pool.go 代码文件,如代码清单 7-14 所示。
代码清单 7-14 pool
/pool.go
01 // Fatih Arslan 和 Gabriel Aszalos 协助完成了这个示例
02 // 包 pool 管理用户定义的一组资源
03 package pool
04
05 import (
06 "errors"
07 "log"
08 "io"
09 "sync"
10 )
11
12 // Pool 管理一组可以安全地在多个 goroutine 间
13 // 共享的资源。被管理的资源必须
14 // 实现 io.Closer 接口
15 type Pool struct {
16 m sync.Mutex
17 resources chan io.Closer
18 factory func() (io.Closer, error)
19 closed bool
20 }
21
22 // ErrPoolClosed 表示请求(Acquire)了一个
23 // 已经关闭的池
24 var ErrPoolClosed = errors.New("Pool has been closed.")
25
26 // New 创建一个用来管理资源的池。
27 // 这个池需要一个可以分配新资源的函数,
28 // 并规定池的大小
29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
30 if size <= 0 {
31 return nil, errors.New("Size value too small.")
32 }
33
34 return &Pool{
35 factory: fn,
36 resources: make(chan io.Closer, size),
37 }, nil
38 }
39
40 // Acquire 从池中获取一个资源
41 func (p *Pool) Acquire() (io.Closer, error) {
42 select {
43 // 检查是否有空闲的资源
44 case r, ok := <-p.resources:
45 log.Println("Acquire:", "Shared Resource")
46 if !ok {
47 return nil, ErrPoolClosed
48 }
49 return r, nil
50
51 // 因为没有空闲资源可用,所以提供一个新资源
52 default:
53 log.Println("Acquire:", "New Resource")
54 return p.factory()
55 }
56 }
57
58 // Release 将一个使用后的资源放回池里
59 func (p *Pool) Release(r io.Closer) {
60 // 保证本操作和 Close 操作的安全
61 p.m.Lock()
62 defer p.m.Unlock()
63
64 // 如果池已经被关闭,销毁这个资源
65 if p.closed {
66 r.Close()
67 return
68 }
69
70 select {
71 // 试图将这个资源放入队列
72 case p.resources <- r:
73 log.Println("Release:", "In Queue")
74
75 // 如果队列已满,则关闭这个资源
76 default:
77 log.Println("Release:", "Closing")
78 r.Close()
79 }
80 }
81
82 // Close 会让资源池停止工作,并关闭所有现有的资源
83 func (p *Pool) Close() {
84 // 保证本操作与 Release 操作的安全
85 p.m.Lock()
86 defer p.m.Unlock()
87
88 // 如果 pool 已经被关闭,什么也不做
89 if p.closed {
90 return
91 }
92
93 // 将池关闭
94 p.closed = true
95
96 // 在清空通道里的资源之前,将通道关闭
97 // 如果不这样做,会发生死锁
98 close(p.resources)
99
100 // 关闭资源
101 for r := range p.resources {
102 r.Close()
103 }
104 }
代码清单 7-14 中的 pool
包的代码声明了一个名为 Pool
的结构,该结构允许调用者根据所需数量创建不同的资源池。只要某类资源实现了 io.Closer
接口,就可以用这个资源池来管理。让我们看一下 Pool
结构的声明,如代码清单 7-15 所示。
代码清单 7-15 pool
/pool.go:第 12 行到第 20 行
12 // Pool 管理一组可以安全地在多个 goroutine 间
13 // 共享的资源。被管理的资源必须
14 // 实现 io.Closer 接口
15 type Pool struct {
16 m sync.Mutex
17 resources chan io.Closer
18 factory func() (io.Closer, error)
19 closed bool
20 }
Pool
结构声明了 4 个字段,每个字段都用来辅助以 goroutine 安全的方式来管理资源池。在第 16 行,结构以一个 sync.Mutex
类型的字段开始。这个互斥锁用来保证在多个 goroutine 访问资源池时,池内的值是安全的。第二个字段名为 resources
,被声明为 io.Closer
接口类型的通道。这个通道是作为一个有缓冲的通道创建的,用来保存共享的资源。由于通道的类型是一个接口,所以池可以管理任意实现了 io.Closer
接口的资源类型。
factory
字段是一个函数类型。任何一个没有输入参数且返回一个 io.Closer
和一个 error
接口值的函数,都可以赋值给这个字段。这个函数的目的是,当池需要一个新资源时,可以用这个函数创建。这个函数的实现细节超出了 pool
包的范围,并且需要由包的使用者实现并提供。
第 19 行中的最后一个字段是 closed
字段。这个字段是一个标志,表示 Pool
是否已经被关闭。现在已经了解了 Pool
结构的声明,让我们看一下第 24 行声明的 error
接口变量,如代码清单 7-16 所示。
代码清单 7-16 pool
/pool.go:第 22 行到第 24 行
22 // ErrPoolClosed 表示请求(Acquire)了一个
23 // 已经关闭的池
24 var ErrPoolClosed = errors.New("Pool has been closed.")
Go 语言里会经常创建 error
接口变量。这可以让调用者来判断某个包里的函数或者方法返回的具体的错误值。当调用者对一个已经关闭的池调用 Acquire
方法时,会返回代码清单 7-16 里的 error
接口变量。因为 Acquire
方法可能返回多个不同类型的错误,所以 Pool
已经关闭时会关闭时返回这个错误变量可以让调用者从其他错误中识别出这个特定的错误。
既然已经声明了 Pool
类型和 error
接口值,我们就可以开始看一下 pool
包里声明的函数和方法了。让我们从池的工厂函数开始,这个函数名为 New
,如代码清单 7-17 所示。
代码清单 7-17 pool
/pool.go:第 26 行到第 38 行
26 // New 创建一个用来管理资源的池。
27 // 这个池需要一个可以分配新资源的函数,
28 // 并规定池的大小
29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
30 if size <= 0 {
31 return nil, errors.New("Size value too small.")
32 }
33
34 return &Pool{
35 factory: fn,
36 resources: make(chan io.Closer, size),
37 }, nil
38 }
代码清单 7-17 中的 New
函数接受两个参数,并返回两个值。第一个参数 fn
声明为一个函数类型,这个函数不接受任何参数,返回一个 io.Closer
和一个 error
接口值。这个作为参数的函数是一个工厂函数,用来创建由池管理的资源的值。第二个参数 size
表示为了保存资源而创建的有缓冲的通道的缓冲区大小。
第 30 行检查了 size
的值,保证这个值不小于等于 0。如果这个值小于等于 0,就会使用 nil
值作为返回的 pool
指针值,然后为该错误创建一个 error
接口值。因为这是这个函数唯一可能返回的错误值,所以不需要为这个错误单独创建和使用一个 error
接口变量。如果能够接受传入的 size
,就会创建并初始化一个新的 Pool
值。在第 35 行,函数参数 fn
被赋值给 factory
字段,并且在第 36 行,使用 size
值创建有缓冲的通道。在 return
语句里,可以构造并初始化任何值。因此,第 34 行的 return
语句用指向新创建的 Pool
类型值的指针和 nil
值作为 error
接口值,返回给函数的调用者。
在创建并初始化 Pool
类型的值之后,接下来让我们来看一下 Acquire
方法,如代码清单 7-18 所示。这个方法可以让调用者从池里获得资源。
代码清单 7-18 pool
/pool.go:第 40 行到第 56 行
40 // Acquire 从池中获取一个资源
41 func (p *Pool) Acquire() (io.Closer, error) {
42 select {
43 // 检查是否有空闲的资源
44 case r, ok := <-p.resources:
45 log.Println("Acquire:", "Shared Resource")
46 if !ok {
47 return nil, ErrPoolClosed
48 }
49 return r, nil
50
51 // 因为没有空闲资源可用,所以提供一个新资源
52 default:
53 log.Println("Acquire:", "New Resource")
54 return p.factory()
55 }
56 }
代码清单 7-18 包含了 Acquire
方法的代码。这个方法在还有可用资源时会从资源池里返回一个资源,否则会为该调用创建并返回一个新的资源。这个实现是通过 select/case
语句来检查有缓冲的通道里是否还有资源来完成的。如果通道里还有资源,如第 44 行到第 49 行所写,就取出这个资源,并返回给调用者。如果该通道里没有资源可取,就会执行 default
分支。在这个示例中,在第 54 行执行用户提供的工厂函数,并且创建并返回一个新资源。
如果不再需要已经获得的资源,必须将这个资源释放回资源池里。这是 Release
方法的任务。不过在理解 Release
方法的代码背后的机制之前,我们需要先看一下 Close
方法,如代码清单 7-19 所示。
代码清单 7-19 pool
/pool.go:第 82 行到第 104 行
82 // Close 会让资源池停止工作,并关闭所有现有的资源
83 func (p *Pool) Close() {
84 // 保证本操作与 Release 操作的安全
85 p.m.Lock()
86 defer p.m.Unlock()
87
88 // 如果 pool 已经被关闭,什么也不做
89 if p.closed {
90 return
91 }
92
93 // 将池关闭
94 p.closed = true
95
96 // 在清空通道里的资源之前,将通道关闭
97 // 如果不这样做,会发生死锁
98 close(p.resources)
99
100 // 关闭资源
101 for r := range p.resources {
102 r.Close()
103 }
104 }
一旦程序不再使用资源池,需要调用这个资源池的 Close
方法。代码清单 7-19 中展示了 Close
方法的代码。在第 98 行到第 101 行,这个方法关闭并清空了有缓冲的通道,并将缓冲的空闲资源关闭。需要注意的是,在同一时刻只能有一个 goroutine 执行这段代码。事实上,当这段代码被执行时,必须保证其他 goroutine 中没有同时执行 Release
方法。你一会儿就会理解为什么这很重要。
在第 85 行到第 86 行,互斥量被加锁,并在函数返回时解锁。在第 89 行,检查 closed
标志,判断池是不是已经关闭。如果已经关闭,该方法会直接返回,并释放锁。如果这个方法第一次被调用,就会将这个标志设置为 true
,并关闭且清空 resources
通道。
现在我们可以看一下 Release
方法,看看这个方法是如何和 Close
方法配合的,如代码清单 7-20 所示。
代码清单 7-20 pool
/pool.go:第 58 行到第 80 行
58 // Release 将一个使用后的资源放回池里
59 func (p *Pool) Release(r io.Closer) {
60 // 保证本操作和 Close 操作的安全
61 p.m.Lock()
62 defer p.m.Unlock()
63
64 // 如果池已经被关闭,销毁这个资源
65 if p.closed {
66 r.Close()
67 return
68 }
69
70 select {
71 // 试图将这个资源放入队列
72 case p.resources <- r:
73 log.Println("Release:", "In Queue")
74
75 // 如果队列已满,则关闭这个资源
76 default:
77 log.Println("Release:", "Closing")
78 r.Close()
79 }
80 }
在代码清单 7-20 中可以找到 Release
方法的实现。该方法一开始在第 61 行和第 62 行对互斥量进行加锁和解锁。这和 Close
方法中的互斥量是同一个互斥量。这样可以阻止这两个方法在不同 goroutine 里同时运行。使用互斥量有两个目的。第一,可以保护第 65 行中读取 closed
标志的行为,保证同一时刻不会有其他 goroutine 调用 Close
方法写同一个标志。第二,我们不想往一个已经关闭的通道里发送数据,因为那样会引起崩溃。如果 closed
标志是 true
,我们就知道 resources
通道已经被关闭。
在第 66 行,如果池已经被关闭,会直接调用资源值 r
的 Close
方法。因为这时已经清空并关闭了池,所以无法将资源重新放回到该资源池里。对 closed
标志的读写必须进行同步,否则可能误导其他 goroutine,让其认为该资源池依旧是打开的,并试图对通道进行无效的操作。
现在看过了池的代码,了解了池是如何工作的,让我们看一下 main.go 代码文件里的测试程序,如代码清单 7-21 所示。
代码清单 7-21 pool
/main/main.go
01 // 这个示例程序展示如何使用 pool 包
02 // 来共享一组模拟的数据库连接
03 package main
04
05 import (
06 "log"
07 "io"
08 "math/rand"
09 "sync"
10 "sync/atomic"
11 "time"
12
13 "github.com/goinaction/code/chapter7/patterns/pool"
14 )
15
16 const (
17 maxGoroutines = 25 // 要使用的 goroutine 的数量
18 pooledResources = 2 // 池中的资源的数量
19 )
20
21 // dbConnection 模拟要共享的资源
22 type dbConnection struct {
23 ID int32
24 }
25
26 // Close 实现了 io.Closer 接口,以便 dbConnection
27 // 可以被池管理。Close 用来完成任意资源的
28 // 释放管理
29 func (dbConn *dbConnection) Close() error {
30 log.Println("Close: Connection", dbConn.ID)
31 return nil
32 }
33
34 // idCounter 用来给每个连接分配一个独一无二的 id
35 var idCounter int32
36
37 // createConnection 是一个工厂函数,
38 // 当需要一个新连接时,资源池会调用这个函数
39 func createConnection() (io.Closer, error) {
40 id := atomic.AddInt32(&idCounter, 1)
41 log.Println("Create: New Connection", id)
42
43 return &dbConnection{id}, nil
44 }
45
46 // main 是所有 Go 程序的入口
47 func main() {
48 var wg sync.WaitGroup
49 wg.Add(maxGoroutines)
50
51 // 创建用来管理连接的池
52 p, err := pool.New(createConnection, pooledResources)
53 if err != nil {
54 log.Println(err)
55 }
56
57 // 使用池里的连接来完成查询
58 for query := 0; query < maxGoroutines; query++ {
59 // 每个 goroutine 需要自己复制一份要
60 // 查询值的副本,不然所有的查询会共享
61 // 同一个查询变量
62 go func(q int) {
63 performQueries(q, p)
64 wg.Done()
65 }(query)
66 }
67
68 // 等待 goroutine 结束
69 wg.Wait()
70
71 // 关闭池
72 log.Println("Shutdown Program.")
73 p.Close()
74 }
75
76 // performQueries 用来测试连接的资源池
77 func performQueries(query int, p *pool.Pool) {
78 // 从池里请求一个连接
79 conn, err := p.Acquire()
80 if err != nil {
81 log.Println(err)
82 return
83 }
84
85 // 将该连接释放回池里
86 defer p.Release(conn)
87
88 // 用等待来模拟查询响应
89 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90 log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
91 }
代码清单 7-21 展示的 main.go 中的代码使用 pool
包来管理一组模拟数据库连接的连接池。代码一开始声明了两个常量 maxGoroutines
和 pooledResource
,用来设置 goroutine 的数量以及程序将要使用资源的数量。资源的声明以及 io.Closer
接口的实现如代码清单 7-22 所示。
代码清单 7-22 pool
/main/main.go:第 21 行到第 32 行
21 // dbConnection 模拟要共享的资源
22 type dbConnection struct {
23 ID int32
24 }
25
26 // Close 实现了 io.Closer 接口,以便 dbConnection
27 // 可以被池管理。Close 用来完成任意资源的
28 // 释放管理
29 func (dbConn *dbConnection) Close() error {
30 log.Println("Close: Connection", dbConn.ID)
31 return nil
32 }
代码清单 7-22 展示了 dbConnection
结构的声明以及 io.Closer
接口的实现。 dbConnection
类型模拟了管理数据库连接的结构,当前版本只包含一个字段 ID
,用来保存每个连接的唯一标识。 Close
方法只是报告了连接正在被关闭,并显示出要关闭连接的标识。
接下来我们来看一下创建 dbConnection
值的工厂函数,如代码清单 7-23 所示。
代码清单 7-23 pool
/main/main.go:第 34 行到第 44 行
34 // idCounter 用来给每个连接分配一个独一无二的 id
35 var idCounter int32
36
37 // createConnection 是一个工厂函数,
38 // 当需要一个新连接时,资源池会调用这个函数
39 func createConnection() (io.Closer, error) {
40 id := atomic.AddInt32(&idCounter, 1)
41 log.Println("Create: New Connection", id)
42
43 return &dbConnection{id}, nil
44 }
代码清单 7-23 展示了 createConnection
函数的实现。这个函数给连接生成了一个唯一标识,显示连接正在被创建,并返回指向带有唯一标识的 dbConnection
类型值的指针。唯一标识是通过 atomic.AddInt32
函数生成的。这个函数可以安全地增加包级变量 idCounter
的值。现在有了资源以及工厂函数,我们可以配合使用 pool
包了。
接下来让我们看一下 main
函数的代码,如代码清单 7-24 所示。
代码清单 7-24 pool
/main/main.go:第 48 行到第 55 行
48 var wg sync.WaitGroup
49 wg.Add(maxGoroutines)
50
51 // 创建用来管理连接的池
52 p, err := pool.New(createConnection, pooledResources)
53 if err != nil {
54 log.Println(err)
55 }
在第 48 行, main
函数一开始就声明了一个 WaitGroup
值,并将 WaitGroup
的值设置为要创建的 goroutine 的数量。之后使用 pool
包里的 New
函数创建了一个新的 Pool
类型。工厂函数和要管理的资源的数量会传入 New
函数。这个函数会返回一个指向 Pool
值的指针,并检查可能的错误。现在我们有了一个 Pool
类型的资源池实例,就可以创建 goroutine,并使用这个资源池在 goroutine 之间共享资源,如代码清单 7-25 所示。
代码清单 7-25 pool
/main/main.go:第 57 行到第 66 行
57 // 使用池里的连接来完成查询
58 for query := 0; query < maxGoroutines; query++ {
59 // 每个 goroutine 需要自己复制一份要
60 // 查询值的副本,不然所有的查询会共享
61 // 同一个查询变量
62 go func(q int) {
63 performQueries(q, p)
64 wg.Done()
65 }(query)
66 }
代码清单 7-25 中用一个 for
循环创建要使用池的 goroutine。每个 goroutine 调用一次 performQueries
函数然后退出。 performQueries
函数需要传入一个唯一的 ID
值用于做日志以及一个指向 Pool
的指针。一旦所有的 goroutine 都创建完成, main
函数就等待所有 goroutine 执行完毕,如代码清单 7-26 所示。
代码清单 7-26 pool
/main/main.go:第 68 行到第 73 行
68 // 等待 goroutine 结束
69 wg.Wait()
70
71 // 关闭池
72 log.Println("Shutdown Program.")
73 p.Close()
在代码清单 7-26 中, main
函数等待 WaitGroup
实例的 Wait
方法执行完成。一旦所有 goroutine 都报告其执行完成,就关闭 Pool
,并且终止程序。接下来,让我们看一下 performQueries
函数。这个函数使用了池的 Acquire
方法和 Release
方法,如代码清单 7-27 所示。
代码清单 7-27 pool
/main/main.go:第 76 行到第 91 行
76 // performQueries 用来测试连接的资源池
77 func performQueries(query int, p *pool.Pool) {
78 // 从池里请求一个连接
79 conn, err := p.Acquire()
80 if err != nil {
81 log.Println(err)
82 return
83 }
84
85 // 将该连接释放回池里
86 defer p.Release(conn)
87
88 // 用等待来模拟查询响应
89 time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90 log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
91 }
代码清单 7-27 展示了 performQueries
的实现。这个实现使用了 pool
的 Acquire
方法和 Release
方法。这个函数首先调用了 Acquire
方法,从池里获得 dbConnection
。之后会检查返回的 error
接口值,在第 86 行,再使用 defer
语句在函数退出时将 dbConnection
释放回池里。在第 89 行和第 90 行,随机休眠一段时间,以此来模拟使用 dbConnection
工作时间。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论