返回介绍

7.2 pool

发布于 2024-10-11 12:39:09 字数 14922 浏览 0 评论 0 收藏 0

本章会介绍 pool 。这个包用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的 goroutine 之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非常有用。如果 goroutine 需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。

让我们看一下 pool 包里的 pool.go 代码文件,如代码清单 7-14 所示。

代码清单 7-14  pool /pool.go

 01 // Fatih Arslan 和 Gabriel Aszalos 协助完成了这个示例
 02 // 包 pool 管理用户定义的一组资源
 03 package pool
 04
 05 import (
 06   "errors"
 07   "log"
 08   "io"
 09   "sync"
 10 )
 11
 12 // Pool 管理一组可以安全地在多个 goroutine 间
 13 // 共享的资源。被管理的资源必须
 14 // 实现 io.Closer 接口
 15 type Pool struct {
 16   m     sync.Mutex
 17   resources chan io.Closer
 18   factory  func() (io.Closer, error)
 19   closed   bool
 20 }
 21
 22 // ErrPoolClosed 表示请求(Acquire)了一个
 23 // 已经关闭的池
 24 var ErrPoolClosed = errors.New("Pool has been closed.")
 25
 26 // New 创建一个用来管理资源的池。
 27 // 这个池需要一个可以分配新资源的函数, 
 28 // 并规定池的大小
 29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
 30   if size <= 0 {
 31     return nil, errors.New("Size value too small.")
 32   }
 33
 34   return &Pool{
 35     factory:  fn,
 36     resources: make(chan io.Closer, size),
 37   }, nil
 38 }
 39
 40 // Acquire 从池中获取一个资源
 41 func (p *Pool) Acquire() (io.Closer, error) {
 42   select {
 43   // 检查是否有空闲的资源
 44   case r, ok := <-p.resources:
 45     log.Println("Acquire:", "Shared Resource")
 46     if !ok {
 47       return nil, ErrPoolClosed
 48     }
 49     return r, nil
 50
 51   // 因为没有空闲资源可用,所以提供一个新资源
 52   default:
 53     log.Println("Acquire:", "New Resource")
 54     return p.factory()
 55   }
 56 }
 57
 58 // Release 将一个使用后的资源放回池里
 59 func (p *Pool) Release(r io.Closer) {
 60   // 保证本操作和 Close 操作的安全
 61   p.m.Lock()
 62   defer p.m.Unlock()
 63
 64   // 如果池已经被关闭,销毁这个资源
 65   if p.closed {
 66     r.Close()
 67     return
 68   }
 69
 70   select {
 71   // 试图将这个资源放入队列
 72   case p.resources <- r:
 73     log.Println("Release:", "In Queue")
 74
 75   // 如果队列已满,则关闭这个资源
 76   default:
 77     log.Println("Release:", "Closing")
 78     r.Close()
 79   }
 80 }
 81
 82 // Close 会让资源池停止工作,并关闭所有现有的资源
 83 func (p *Pool) Close() {
 84   // 保证本操作与 Release 操作的安全
 85   p.m.Lock()
 86   defer p.m.Unlock()
 87
 88   // 如果 pool 已经被关闭,什么也不做
 89   if p.closed {
 90     return
 91   }
 92
 93   // 将池关闭
 94   p.closed = true
 95
 96   // 在清空通道里的资源之前,将通道关闭
 97   // 如果不这样做,会发生死锁
 98   close(p.resources)
 99
100   // 关闭资源
101   for r := range p.resources {
102     r.Close()
103   }
104 }

代码清单 7-14 中的 pool 包的代码声明了一个名为 Pool 的结构,该结构允许调用者根据所需数量创建不同的资源池。只要某类资源实现了 io.Closer 接口,就可以用这个资源池来管理。让我们看一下 Pool 结构的声明,如代码清单 7-15 所示。

代码清单 7-15  pool /pool.go:第 12 行到第 20 行

12 // Pool 管理一组可以安全地在多个 goroutine 间
13 // 共享的资源。被管理的资源必须
14 // 实现 io.Closer 接口
15 type Pool struct {
16   m     sync.Mutex
17   resources chan io.Closer
18   factory  func() (io.Closer, error)
19   closed  bool
20 }

Pool 结构声明了 4 个字段,每个字段都用来辅助以 goroutine 安全的方式来管理资源池。在第 16 行,结构以一个 sync.Mutex 类型的字段开始。这个互斥锁用来保证在多个 goroutine 访问资源池时,池内的值是安全的。第二个字段名为 resources ,被声明为 io.Closer 接口类型的通道。这个通道是作为一个有缓冲的通道创建的,用来保存共享的资源。由于通道的类型是一个接口,所以池可以管理任意实现了 io.Closer 接口的资源类型。

factory 字段是一个函数类型。任何一个没有输入参数且返回一个 io.Closer 和一个 error 接口值的函数,都可以赋值给这个字段。这个函数的目的是,当池需要一个新资源时,可以用这个函数创建。这个函数的实现细节超出了 pool 包的范围,并且需要由包的使用者实现并提供。

第 19 行中的最后一个字段是 closed 字段。这个字段是一个标志,表示 Pool 是否已经被关闭。现在已经了解了 Pool 结构的声明,让我们看一下第 24 行声明的 error 接口变量,如代码清单 7-16 所示。

代码清单 7-16  pool /pool.go:第 22 行到第 24 行

22 // ErrPoolClosed 表示请求(Acquire)了一个
23 // 已经关闭的池
24 var ErrPoolClosed = errors.New("Pool has been closed.")

Go 语言里会经常创建 error 接口变量。这可以让调用者来判断某个包里的函数或者方法返回的具体的错误值。当调用者对一个已经关闭的池调用 Acquire 方法时,会返回代码清单 7-16 里的 error 接口变量。因为 Acquire 方法可能返回多个不同类型的错误,所以 Pool 已经关闭时会关闭时返回这个错误变量可以让调用者从其他错误中识别出这个特定的错误。

既然已经声明了 Pool 类型和 error 接口值,我们就可以开始看一下 pool 包里声明的函数和方法了。让我们从池的工厂函数开始,这个函数名为 New ,如代码清单 7-17 所示。

代码清单 7-17  pool /pool.go:第 26 行到第 38 行

26 // New 创建一个用来管理资源的池。
27 // 这个池需要一个可以分配新资源的函数, 
28 // 并规定池的大小
29 func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
30   if size <= 0 {
31     return nil, errors.New("Size value too small.")
32   }
33
34   return &Pool{
35     factory:  fn,
36     resources: make(chan io.Closer, size),
37   }, nil
38 }

代码清单 7-17 中的 New 函数接受两个参数,并返回两个值。第一个参数 fn 声明为一个函数类型,这个函数不接受任何参数,返回一个 io.Closer 和一个 error 接口值。这个作为参数的函数是一个工厂函数,用来创建由池管理的资源的值。第二个参数 size 表示为了保存资源而创建的有缓冲的通道的缓冲区大小。

第 30 行检查了 size 的值,保证这个值不小于等于 0。如果这个值小于等于 0,就会使用 nil 值作为返回的 pool 指针值,然后为该错误创建一个 error 接口值。因为这是这个函数唯一可能返回的错误值,所以不需要为这个错误单独创建和使用一个 error 接口变量。如果能够接受传入的 size ,就会创建并初始化一个新的 Pool 值。在第 35 行,函数参数 fn 被赋值给 factory 字段,并且在第 36 行,使用 size 值创建有缓冲的通道。在 return 语句里,可以构造并初始化任何值。因此,第 34 行的 return 语句用指向新创建的 Pool 类型值的指针和 nil 值作为 error 接口值,返回给函数的调用者。

在创建并初始化 Pool 类型的值之后,接下来让我们来看一下 Acquire 方法,如代码清单 7-18 所示。这个方法可以让调用者从池里获得资源。

代码清单 7-18  pool /pool.go:第 40 行到第 56 行

40 // Acquire 从池中获取一个资源
41 func (p *Pool) Acquire() (io.Closer, error) {
42   select {
43   // 检查是否有空闲的资源
44   case r, ok := <-p.resources:
45     log.Println("Acquire:", "Shared Resource")
46     if !ok {
47       return nil, ErrPoolClosed
48     }
49     return r, nil
50
51   // 因为没有空闲资源可用,所以提供一个新资源
52   default:
53     log.Println("Acquire:", "New Resource")
54     return p.factory()
55   }
56 }

代码清单 7-18 包含了 Acquire 方法的代码。这个方法在还有可用资源时会从资源池里返回一个资源,否则会为该调用创建并返回一个新的资源。这个实现是通过 select/case 语句来检查有缓冲的通道里是否还有资源来完成的。如果通道里还有资源,如第 44 行到第 49 行所写,就取出这个资源,并返回给调用者。如果该通道里没有资源可取,就会执行 default 分支。在这个示例中,在第 54 行执行用户提供的工厂函数,并且创建并返回一个新资源。

如果不再需要已经获得的资源,必须将这个资源释放回资源池里。这是 Release 方法的任务。不过在理解 Release 方法的代码背后的机制之前,我们需要先看一下 Close 方法,如代码清单 7-19 所示。

代码清单 7-19  pool /pool.go:第 82 行到第 104 行

 82 // Close 会让资源池停止工作,并关闭所有现有的资源
 83 func (p *Pool) Close() {
 84   // 保证本操作与 Release 操作的安全
 85   p.m.Lock()
 86   defer p.m.Unlock()
 87
 88   // 如果 pool 已经被关闭,什么也不做
 89   if p.closed {
 90     return
 91   }
 92
 93   // 将池关闭
 94   p.closed = true
 95
 96   // 在清空通道里的资源之前,将通道关闭
 97   // 如果不这样做,会发生死锁
 98   close(p.resources)
 99
100   // 关闭资源
101   for r := range p.resources {
102     r.Close()
103   }
104 }

一旦程序不再使用资源池,需要调用这个资源池的 Close 方法。代码清单 7-19 中展示了 Close 方法的代码。在第 98 行到第 101 行,这个方法关闭并清空了有缓冲的通道,并将缓冲的空闲资源关闭。需要注意的是,在同一时刻只能有一个 goroutine 执行这段代码。事实上,当这段代码被执行时,必须保证其他 goroutine 中没有同时执行 Release 方法。你一会儿就会理解为什么这很重要。

在第 85 行到第 86 行,互斥量被加锁,并在函数返回时解锁。在第 89 行,检查 closed 标志,判断池是不是已经关闭。如果已经关闭,该方法会直接返回,并释放锁。如果这个方法第一次被调用,就会将这个标志设置为 true ,并关闭且清空 resources 通道。

现在我们可以看一下 Release 方法,看看这个方法是如何和 Close 方法配合的,如代码清单 7-20 所示。

代码清单 7-20  pool /pool.go:第 58 行到第 80 行

58 // Release 将一个使用后的资源放回池里
59 func (p *Pool) Release(r io.Closer) {
60   // 保证本操作和 Close 操作的安全
61   p.m.Lock()
62   defer p.m.Unlock()
63
64   // 如果池已经被关闭,销毁这个资源
65   if p.closed {
66     r.Close()
67     return
68   }
69
70   select {
71   // 试图将这个资源放入队列
72   case p.resources <- r:
73     log.Println("Release:", "In Queue")
74
75   // 如果队列已满,则关闭这个资源
76   default:
77     log.Println("Release:", "Closing")
78     r.Close()
79   }
80 }

在代码清单 7-20 中可以找到 Release 方法的实现。该方法一开始在第 61 行和第 62 行对互斥量进行加锁和解锁。这和 Close 方法中的互斥量是同一个互斥量。这样可以阻止这两个方法在不同 goroutine 里同时运行。使用互斥量有两个目的。第一,可以保护第 65 行中读取 closed 标志的行为,保证同一时刻不会有其他 goroutine 调用 Close 方法写同一个标志。第二,我们不想往一个已经关闭的通道里发送数据,因为那样会引起崩溃。如果 closed 标志是 true ,我们就知道 resources 通道已经被关闭。

在第 66 行,如果池已经被关闭,会直接调用资源值 rClose 方法。因为这时已经清空并关闭了池,所以无法将资源重新放回到该资源池里。对 closed 标志的读写必须进行同步,否则可能误导其他 goroutine,让其认为该资源池依旧是打开的,并试图对通道进行无效的操作。

现在看过了池的代码,了解了池是如何工作的,让我们看一下 main.go 代码文件里的测试程序,如代码清单 7-21 所示。

代码清单 7-21  pool /main/main.go

01 // 这个示例程序展示如何使用 pool 包
02 // 来共享一组模拟的数据库连接
03 package main
04
05 import (
06   "log"
07   "io"
08   "math/rand"
09   "sync"
10   "sync/atomic"
11   "time"
12
13   "github.com/goinaction/code/chapter7/patterns/pool"
14 )
15
16 const (
17   maxGoroutines  = 25 // 要使用的 goroutine 的数量
18   pooledResources = 2 // 池中的资源的数量
19 )
20
21 // dbConnection 模拟要共享的资源
22 type dbConnection struct {
23   ID int32
24 }
25
26 // Close 实现了 io.Closer 接口,以便 dbConnection
27 // 可以被池管理。Close 用来完成任意资源的
28 // 释放管理
29 func (dbConn *dbConnection) Close() error {
30   log.Println("Close: Connection", dbConn.ID)
31   return nil
32 }
33
34 // idCounter 用来给每个连接分配一个独一无二的 id
35 var idCounter int32
36
37 // createConnection 是一个工厂函数, 
38 // 当需要一个新连接时,资源池会调用这个函数
39 func createConnection() (io.Closer, error) {
40   id := atomic.AddInt32(&idCounter, 1)
41   log.Println("Create: New Connection", id)
42
43   return &dbConnection{id}, nil
44 }
45
46 // main 是所有 Go 程序的入口
47 func main() {
48   var wg sync.WaitGroup
49   wg.Add(maxGoroutines)
50
51   // 创建用来管理连接的池
52   p, err := pool.New(createConnection, pooledResources)
53   if err != nil {
54     log.Println(err)
55   }
56
57   // 使用池里的连接来完成查询
58   for query := 0; query < maxGoroutines; query++ {
59     // 每个 goroutine 需要自己复制一份要
60     // 查询值的副本,不然所有的查询会共享
61     // 同一个查询变量
62     go func(q int) {
63       performQueries(q, p)
64       wg.Done()
65     }(query)
66   }
67
68   // 等待 goroutine 结束
69   wg.Wait()
70
71   // 关闭池
72   log.Println("Shutdown Program.")
73   p.Close()
74 }
75
76 // performQueries 用来测试连接的资源池
77 func performQueries(query int, p *pool.Pool) {
78   // 从池里请求一个连接
79   conn, err := p.Acquire()
80   if err != nil {
81     log.Println(err)
82     return
83   }
84
85   // 将该连接释放回池里
86   defer p.Release(conn)
87
88   // 用等待来模拟查询响应
89   time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90   log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
91 }

代码清单 7-21 展示的 main.go 中的代码使用 pool 包来管理一组模拟数据库连接的连接池。代码一开始声明了两个常量 maxGoroutinespooledResource ,用来设置 goroutine 的数量以及程序将要使用资源的数量。资源的声明以及 io.Closer 接口的实现如代码清单 7-22 所示。

代码清单 7-22  pool /main/main.go:第 21 行到第 32 行

21 // dbConnection 模拟要共享的资源
22 type dbConnection struct {
23   ID int32
24 }
25
26 // Close 实现了 io.Closer 接口,以便 dbConnection
27 // 可以被池管理。Close 用来完成任意资源的
28 // 释放管理
29 func (dbConn *dbConnection) Close() error {
30   log.Println("Close: Connection", dbConn.ID)
31   return nil
32 }

代码清单 7-22 展示了 dbConnection 结构的声明以及 io.Closer 接口的实现。 dbConnection 类型模拟了管理数据库连接的结构,当前版本只包含一个字段 ID ,用来保存每个连接的唯一标识。 Close 方法只是报告了连接正在被关闭,并显示出要关闭连接的标识。

接下来我们来看一下创建 dbConnection 值的工厂函数,如代码清单 7-23 所示。

代码清单 7-23  pool /main/main.go:第 34 行到第 44 行

34 // idCounter 用来给每个连接分配一个独一无二的 id
35 var idCounter int32
36
37 // createConnection 是一个工厂函数, 
38 // 当需要一个新连接时,资源池会调用这个函数
39 func createConnection() (io.Closer, error) {
40   id := atomic.AddInt32(&idCounter, 1)
41   log.Println("Create: New Connection", id)
42
43   return &dbConnection{id}, nil
44 }

代码清单 7-23 展示了 createConnection 函数的实现。这个函数给连接生成了一个唯一标识,显示连接正在被创建,并返回指向带有唯一标识的 dbConnection 类型值的指针。唯一标识是通过 atomic.AddInt32 函数生成的。这个函数可以安全地增加包级变量 idCounter 的值。现在有了资源以及工厂函数,我们可以配合使用 pool 包了。

接下来让我们看一下 main 函数的代码,如代码清单 7-24 所示。

代码清单 7-24  pool /main/main.go:第 48 行到第 55 行

48   var wg sync.WaitGroup
49   wg.Add(maxGoroutines)
50
51   // 创建用来管理连接的池
52   p, err := pool.New(createConnection, pooledResources)
53   if err != nil {
54     log.Println(err)
55   }

在第 48 行, main 函数一开始就声明了一个 WaitGroup 值,并将 WaitGroup 的值设置为要创建的 goroutine 的数量。之后使用 pool 包里的 New 函数创建了一个新的 Pool 类型。工厂函数和要管理的资源的数量会传入 New 函数。这个函数会返回一个指向 Pool 值的指针,并检查可能的错误。现在我们有了一个 Pool 类型的资源池实例,就可以创建 goroutine,并使用这个资源池在 goroutine 之间共享资源,如代码清单 7-25 所示。

代码清单 7-25  pool /main/main.go:第 57 行到第 66 行

57   // 使用池里的连接来完成查询
58   for query := 0; query < maxGoroutines; query++ {
59     // 每个 goroutine 需要自己复制一份要
60     // 查询值的副本,不然所有的查询会共享
61     // 同一个查询变量
62     go func(q int) {
63       performQueries(q, p)
64       wg.Done()
65     }(query)
66   }

代码清单 7-25 中用一个 for 循环创建要使用池的 goroutine。每个 goroutine 调用一次 performQueries 函数然后退出。 performQueries 函数需要传入一个唯一的 ID 值用于做日志以及一个指向 Pool 的指针。一旦所有的 goroutine 都创建完成, main 函数就等待所有 goroutine 执行完毕,如代码清单 7-26 所示。

代码清单 7-26  pool /main/main.go:第 68 行到第 73 行

68   // 等待 goroutine 结束
69   wg.Wait()
70
71   // 关闭池
72   log.Println("Shutdown Program.")
73   p.Close()

在代码清单 7-26 中, main 函数等待 WaitGroup 实例的 Wait 方法执行完成。一旦所有 goroutine 都报告其执行完成,就关闭 Pool ,并且终止程序。接下来,让我们看一下 performQueries 函数。这个函数使用了池的 Acquire 方法和 Release 方法,如代码清单 7-27 所示。

代码清单 7-27  pool /main/main.go:第 76 行到第 91 行

76 // performQueries 用来测试连接的资源池
77 func performQueries(query int, p *pool.Pool) {
78   // 从池里请求一个连接
79   conn, err := p.Acquire()
80   if err != nil {
81     log.Println(err)
82     return
83   }
84
85   // 将该连接释放回池里
86   defer p.Release(conn)
87
88   // 用等待来模拟查询响应
89   time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
90   log.Printf("QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
91 }

代码清单 7-27 展示了 performQueries 的实现。这个实现使用了 poolAcquire 方法和 Release 方法。这个函数首先调用了 Acquire 方法,从池里获得 dbConnection 。之后会检查返回的 error 接口值,在第 86 行,再使用 defer 语句在函数退出时将 dbConnection 释放回池里。在第 89 行和第 90 行,随机休眠一段时间,以此来模拟使用 dbConnection 工作时间。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文