返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

9.2.2 模式

发布于 2024-10-12 19:15:49 字数 3862 浏览 0 评论 0 收藏 0

通常以工厂方法将 goroutine 和 channel 绑定。

func newRecv[T any](cap int) (data chan T, done chan struct{}) {
	data = make(chan T, cap)
	done = make(chan struct{})

	go func() {
		defer close(done)

		for v := range data {
			println(v)
		}
	}()

	return
}

func main() {
	data, done := newRecv[int](3)

	for i := 0; i < 10; i++ {
		data <- i
	}

	close(data)
	<- done
}

超时

如果 channel 阻塞且没有关闭,那么可能导致 goroutine 泄漏(leak)。

解决办法是用 select default ,或 time.After 设置超时。

func main() {
	quit := make(chan struct{})
	c := make(chan int)

	go func() {
		defer close(quit)

		select {
		case x, ok := <- c: println(x, ok)
		case <- time.After(time.Second): return
		}
	}()

	<- quit
}

信号量

用通道实现信号量(semaphore),在同一时刻仅指定数量的 goroutine 参与工作。

type Sema struct {
	c chan struct{}
}

func NewSema(n int) *Sema {
	return &Sema{
		c: make(chan struct{}, n),
	}
}

func (m *Sema) Acquire() {
	m.c <- struct{}{}
}

func (m *Sema) Release() {
	<- m.c
}

// ------------------------------

func main() {
	var wg sync.WaitGroup

    // runtime: 4
    //    sema: 2
	runtime.GOMAXPROCS(4)
	sem := NewSema(2)

	for i := 0; i < 5; i++ {
		wg.Add(1)

		go func(id int) {
			defer wg.Done()

			sem.Acquire()
			defer sem.Release()

			for n := 0; n < 3; n++ {
				time.Sleep(time.Second * 2)
				fmt.Println(id, time.Now())
			}
		}(i)
	}

	wg.Wait()
}

对象池

鉴于通道本身就是一个并发安全的队列,可用作 ID generator、Pool 用途。

type Pool[T any] chan T

func NewPool[T any](cap int) Pool[T] {
	return make(chan T, cap)
}

func (p Pool[T]) Get() (v T, ok bool) {
	select {
	case v = <-p: ok = true
	default:
	}

	return
}

func (p Pool[T]) Put(v T) bool {
	select {
	case p <- v: return true
	default:
	}

	return false
}

// -----------------------------

func main() {
	p := NewPool[int](2)

	println(p.Put(1))
	println(p.Put(2))
	println(p.Put(3))

	for {
		v, ok := p.Get()
		if !ok { break }
		println(v)
	}
}

退出

捕获 INTTERM 信号,顺便实现一个简易的 atexit 和 函数。

package main

import (
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

var exs = &struct {
	sync.RWMutex
	funcs   []func()
	signals chan os.Signal
}{}

func atexit(f func()) {
	exs.Lock()
	defer exs.Unlock()
	exs.funcs = append(exs.funcs, f)
}

func wait(code int) {
    
    // 信号注册。
	if exs.signals == nil {
		exs.signals = make(chan os.Signal)
		signal.Notify(exs.signals, syscall.SIGINT, syscall.SIGTERM)
	}

    // 独立函数,确保 atexit 函数能按 FILO 顺序执行。
    // 不受 os.Exit 影响。
	func() {
		exs.RLock()
		for _, f := range exs.funcs {
			defer f() 
		}             
		exs.RUnlock()
		<-exs.signals
	}()

    // 终止进程。
	os.Exit(code)
}

func main() {
	atexit(func() { 
		time.Sleep(time.Second * 3) 
		println("1 ..."); 
	})

	atexit(func() { 
		println("2 ...") 
	})

	println("Press CTRL + C to exit.")
	wait(1)
}

队列

通道本就是队列,需要关心的是如何优雅(gracefully)地关闭通道。

package main

import (
	"sync"
	"sync/atomic"
)

func main() {
	max := int64(100)  // 最大发送计数。
	m   := 3           // 接收者数量。
	n   := 3           // 发送者数量。

	var wg sync.WaitGroup
	wg.Add(m + n)

	data := make(chan int)        // 数据通道。
	done := make(chan struct{})   // 结束通知。
    
	// m recv
    for i := 0; i < m; i++ {	
		go func() {
			defer wg.Done()

			for {
				select {
				case <- done: return
				case v := <- data: println(v)
				}
			}
		}()
	}

    // n send
    for i := 0; i < n; i++ {
		go func(id int) {
			defer wg.Done()
			defer func(){ recover() }()

			for {
				select {
				case <- done: return
				case data <- id:
					if atomic.AddInt64(&max, -1) <= 0 {
						close(done)
						return
					}
				default:
				}
			}
		}(i)
	}
    
    wg.Wait()
}

注意:为便于阅读,以上示例代码并未完善!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文