返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

9.2.1 收发

发布于 2024-10-12 19:15:49 字数 5438 浏览 0 评论 0 收藏 0

正常接收操作是 ok-idiomrange 模式。

  • ok == false :通道被关闭。
  • for ... range :循环,直至通道关闭。
func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	c := make(chan int)
    
	go func() {
		defer wg.Done()

		for {
			x, ok := <-c
			if !ok { return }
			println(x)
		}
	}()
    
    go func() {
    	defer wg.Done()
		defer close(c)

		c <- 1
		c <- 2
		c <- 3	    
	}()

	wg.Wait()
}
func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	c := make(chan int)
    
	go func() {
		defer wg.Done()

		for x := range c{
			println(x)
		}
	}()
    
    go func() {
    	defer wg.Done()
		defer close(c)

		c <- 1
		c <- 2
		c <- 3	    
	}()

	wg.Wait()
}

及时关闭通道,否则可能导致死锁。

fatal error: all goroutines are asleep - deadlock!

单向

通道默认双向,不区分发送和接收端。可限制方向获得更严谨的操作逻辑。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)

	c := make(chan int)
	var send chan<- int = c
	var recv <-chan int = c

    // recv
	go func() {
		defer wg.Done()
		for x := range recv {
			println(x)
		}
	}()

    // send
	go func() {
		defer wg.Done()
		defer close(c)
		for i := 0; i < 3; i++ {
			send <- i
		}
	}()

	wg.Wait()
}

不能在单向通道上做逆向操作。

func main() {
	c := make(chan int, 2)
	
    var send chan<- int = c
	var recv <-chan int = c
    
	// <-send 
	// ~~~~~~ cannot receive from send-only channel

	// recv <- 1
	// ~~~~~~ cannot send to receive-only channel

	// close(recv)
	// ~~~~~~ cannot close receive-only channel
}

无法将单向通道转换回去。

func main() {
	var a, b chan int
    
	a = make(chan int, 2)
	var recv <-chan int = a
	var send chan<- int = a
    
	// b = (chan int)(recv)
	//      ~~~~~~~ cannot convert recv (<-chan int) to type chan int

	// b = (chan int)(send)
	//      ~~~~~~~ cannot convert send (chan<- int) to type chan int
}

尽管可用 make 创建单向通道,但没有任何意义,因为通道必须同时有收发行为。

func main() {
	q := make(<-chan struct{})

	go func(){
		close(q)
        ~~~~~ cannot close receive-only channel
	}()

	<- q
}

选择

select 语句处理多个通道,随机选择可用通道做收发操作。

将失效通道置为 nil (阻塞,不可用),用作结束判断。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
    
	cha, chb := make(chan int), make(chan int)
    
    // recv
	go func() { 
		defer wg.Done()
        
		for {
			x := 0
			ok := false

			// random
			select {
			case x, ok = <-cha: if !ok { cha = nil }
			case x, ok = <-chb: if !ok { chb = nil }
			}
            
			if (cha == nil) && (chb == nil) { return }

			println(x)
		}
	}()
    
    // send
	go func() { 
		defer wg.Done()
		defer close(cha)
		defer close(chb)
        
		for i := 0; i < 10; i++ {
            // random
			select { 
			case cha <- i:
			case chb <- i * 10:
			}
		}
	}()
    
	wg.Wait()
}

即便是同一通道,也会随机选择 case 执行。

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
    
	cha := make(chan int)
    
    // recv
	go func() { 
		defer wg.Done()
        
		for {
			x := 0
			ok := false

			// random
			select {
			case x, ok = <-cha: println("c1", x)
			case x, ok = <-cha: println("c2", x)
			}
            
			if !ok { return }
		}
	}()
    
    // send
	go func() { 
		defer wg.Done()
		defer close(cha)
        
		for i := 0; i < 10; i++ {
            // random
			select { 
			case cha <- i:
			case cha <- i * 10:
			}
		}
	}()
    
	wg.Wait()
}

所有通道都不可用,则执行 default 分支,避免阻塞。

func main() {
	exit := make(chan struct{})
	c := make(chan int)
    
	go func() {
		defer close(exit)
        
		for {
			select {
			case x, ok := <-c:
				if !ok { return }
				println(x)
			default: 
			}
            
			fmt.Println("wait...")
			time.Sleep(time.Second)
		}
	}()
    
	time.Sleep(time.Second * 3)
    
	c <- 100
	close(c)
    
	<-exit
}

select 语句,一直阻塞或死锁。

select{}

缺省

利用 default 实现判断逻辑。

func main() {
	done := make(chan struct{})
    
    // 多槽构成的数据区。
	data := []chan int{ 
		make(chan int, 3),
	}

	go func() {
		defer close(done)
        
		for i := 0; i < 10; i++ {

			// 向最后一个槽添加数据。
			// 添加失败(已满),则创建新槽。

			select {
			case data[len(data)-1] <- i:
			default:
				data = append(data, make(chan int, 3))
				data[len(data)-1] <- i
			}
		}
	}()

	<-done

	for _, c := range data {
		close(c)
		for x := range c {
			println(x)
		}
	}
}

反射

如果运行期才能确定通道数量,可利用反射( reflect )实现。

func main() {
	exit := make(chan struct{})

    // 运行时动态创建。
	chans := make([]chan int, 0)
	chans = append(chans, make(chan int))
	chans = append(chans, make(chan int))

	go func() {
		defer close(exit)

        // 反射构建 select 操作。
		cases := make([]reflect.SelectCase, len(chans))
		for i, c := range chans {
			cases[i] = reflect.SelectCase {
                Dir: reflect.SelectRecv,
                Chan: reflect.ValueOf(c),
            }
		}

		for {
			index, value, ok := reflect.Select(cases)
            
            // 检查并退出。
			if !ok {
				chans[index] = nil

				n := 0
				for _, c := range chans {
					if c == nil { n++ }
					if n == len(chans) { return }
				}

				continue
			}

			println(index, value.Int(), ok)
		}
	}()

	chans[1] <- 101
	chans[0] <- 100

	for _, c := range chans {
		close(c)
	}

	<- exit
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文