上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
9.2.2 模式
通常以工厂方法将 goroutine 和 channel 绑定。
func newRecv[T any](cap int) (data chan T, done chan struct{}) { data = make(chan T, cap) done = make(chan struct{}) go func() { defer close(done) for v := range data { println(v) } }() return } func main() { data, done := newRecv[int](3) for i := 0; i < 10; i++ { data <- i } close(data) <- done }
超时
如果 channel 阻塞且没有关闭,那么可能导致 goroutine 泄漏(leak)。
解决办法是用 select
default
,或 time.After
设置超时。
func main() { quit := make(chan struct{}) c := make(chan int) go func() { defer close(quit) select { case x, ok := <- c: println(x, ok) case <- time.After(time.Second): return } }() <- quit }
信号量
用通道实现信号量(semaphore),在同一时刻仅指定数量的 goroutine 参与工作。
type Sema struct { c chan struct{} } func NewSema(n int) *Sema { return &Sema{ c: make(chan struct{}, n), } } func (m *Sema) Acquire() { m.c <- struct{}{} } func (m *Sema) Release() { <- m.c } // ------------------------------ func main() { var wg sync.WaitGroup // runtime: 4 // sema: 2 runtime.GOMAXPROCS(4) sem := NewSema(2) for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() sem.Acquire() defer sem.Release() for n := 0; n < 3; n++ { time.Sleep(time.Second * 2) fmt.Println(id, time.Now()) } }(i) } wg.Wait() }
对象池
鉴于通道本身就是一个并发安全的队列,可用作 ID generator、Pool 用途。
type Pool[T any] chan T func NewPool[T any](cap int) Pool[T] { return make(chan T, cap) } func (p Pool[T]) Get() (v T, ok bool) { select { case v = <-p: ok = true default: } return } func (p Pool[T]) Put(v T) bool { select { case p <- v: return true default: } return false } // ----------------------------- func main() { p := NewPool[int](2) println(p.Put(1)) println(p.Put(2)) println(p.Put(3)) for { v, ok := p.Get() if !ok { break } println(v) } }
退出
捕获 INT
、 TERM
信号,顺便实现一个简易的 atexit
和 函数。
package main import ( "os" "os/signal" "sync" "syscall" "time" ) var exs = &struct { sync.RWMutex funcs []func() signals chan os.Signal }{} func atexit(f func()) { exs.Lock() defer exs.Unlock() exs.funcs = append(exs.funcs, f) } func wait(code int) { // 信号注册。 if exs.signals == nil { exs.signals = make(chan os.Signal) signal.Notify(exs.signals, syscall.SIGINT, syscall.SIGTERM) } // 独立函数,确保 atexit 函数能按 FILO 顺序执行。 // 不受 os.Exit 影响。 func() { exs.RLock() for _, f := range exs.funcs { defer f() } exs.RUnlock() <-exs.signals }() // 终止进程。 os.Exit(code) } func main() { atexit(func() { time.Sleep(time.Second * 3) println("1 ..."); }) atexit(func() { println("2 ...") }) println("Press CTRL + C to exit.") wait(1) }
队列
通道本就是队列,需要关心的是如何优雅(gracefully)地关闭通道。
package main import ( "sync" "sync/atomic" ) func main() { max := int64(100) // 最大发送计数。 m := 3 // 接收者数量。 n := 3 // 发送者数量。 var wg sync.WaitGroup wg.Add(m + n) data := make(chan int) // 数据通道。 done := make(chan struct{}) // 结束通知。 // m recv for i := 0; i < m; i++ { go func() { defer wg.Done() for { select { case <- done: return case v := <- data: println(v) } } }() } // n send for i := 0; i < n; i++ { go func(id int) { defer wg.Done() defer func(){ recover() }() for { select { case <- done: return case data <- id: if atomic.AddInt64(&max, -1) <= 0 { close(done) return } default: } } }(i) } wg.Wait() }
注意:为便于阅读,以上示例代码并未完善!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论