代码
调试技术
数据库
- 《Getting started with impala》
- 《mysql 必知必会》
- 《mysql 性能调优与架构实践》
- 《Mysql 技术内幕 InnoDB 存储引擎》
- 《Redis 实战》
- 《Redis 深度历险核心原理和应用实践》
- 《redis设计与实现》
- 《七周七数据库》
- 《深入浅出mysql》
- 《高性能mysql第三版》
- 《MySQL是怎样运行的》
前端
GOLANG
- 《1 The Go Programming Language》
- 《2 The Go Programming Language》
- 《3 The Go Programming Language》
- 《Build Web Application With Golang》
- 《Go101》
- 《Network Programming with go》
- 《Building Microservices With Go》
- 《Building Restful Web Services with Go》
- 《Concurrency In Go》
- 《Go In Action(Go 实战)》
- 《Go学习笔记语言详解》
- 《Go学习笔记源码剖析》
- 《Go语言编程》
JAVA
网络
心理学
PYTHON
创业
UNIX/LINUX
分布式
系统设计
搜索引擎
开发工具
- 《Practical Vim》
- 《Vim8文本处理实战》
- 《Learn vim scrpt the hard way》
- 《Pro Git》
- 《Mastering Vim》
- 《Mastering Vim Quickly》
思维
源码
网站架构微服务
- 《微服务架构设计模式》
- 《从0开始学架构》
- 《web scalavility for startup engineers》
- 《designdatainstensive_application》
- 《designdatainstensive_application2》
- 《clean_architecture》
- 《微服务实战》
- 《微服务设计》
软件工程/项目管理
运维
金融理财
写作
互联网
区块链
技术演讲网课
- 《哔哩哔哩的go微服务实战》
- 《go业务基础库之Error&Context》
- 《Go同步和并发设计模式》
- 《300分钟吃透分布式缓存》
- 《DDD实战课》
- 《分布式技术原理与实战45讲》
- 《架构设计面试精讲》
- 《高并发系统设计40问》
- 《java并发编程78讲》
- 《中间件核心技术与实战讲》
职场
《2 The Go Programming Language》
8 Goroutines and Channels
go 支持两种风格的并发编程。本章讲 goroutines and channels,支持 CSP(communicating sequential processes) 模型的并发。 CSP模型里值通过独立的 activities(goroutines) 传递但是大部分变量被限定在一个 activity 里。下一章讲传统的基于多线程共享内存的并发。
8.1 Goroutines
在 go 里每个并发执行的活动叫做 goroutine。考虑一个场景,两个函数一个计算一个写输出,但是互不调用,串行方式是第一个调用 完成了再去调用另一个。但是在两个及多个 goroutine 的并发场景下,两个函数可以同时执行。 程序执行的开始,只有一个调用 main 的goroutine,我们叫它 main goroutine,新的 goroutine 可以通过 go 语句创建,语法上就是普通的函数或方法加上 go 关键字,go 声明使得函数在被创建的新的 goroutine 里执行,go 语句则会立刻返回,不会阻塞住。
f() // call f(); wait for it to return
go f() // create a new goroutine that calls f(); don't wait
我们看一个好玩的例子,这里输出斐波那契数,在计算的同时,屏幕上会显示一个转动的指针:
package main
import (
"fmt"
"time"
)
func main() {
go spinner(100 * time.Millisecond)
const n = 45
fibN := fib(n) // slow
fmt.Printf("\rFib(%d) = %d\n", n, fibN)
}
func spinner(delay time.Duration) {
for {
for _, r := range `-\|/` { //实现指针旋转的等待效果
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x-1) + fib(x-2)
}
运行的话就能看到一个指针在转,然后过会就输出了 fib(10)(这个递归计算很耗时)。并没有一种直接的编程方式让一个 goroutine 去结束掉另一个 goroutine,但是有方式可以结束自己。
8.2 Example: Concurrent Clock Server
web server 是最常用的使用并发的地方,要处理来自客户端的大量独立的连接。 我们先来写个 tcp serer发送时间:
package main
import (
"io"
"log"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept() //blocks until an incoming connection request is made
if err != nil {
log.Print(err) // e.g., coonnection aborted
continue
}
handleConn(conn) // handle one connection at a time
}
}
func handleConn(c net.Conn) {
defer c.Close()
for {
_, err := io.WriteString(c, time.Now().Format("15:04:05\n"))
if err != nil {
return // e.g., client disconnected
}
time.Sleep(1 * time.Second)
}
}
接下来写一个 只读的 tcp client:
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
如果你在两个终端里运行 client,你会发现另一个一直没有输出,server 一次只能处理一个 cilent(server 里是一直循环)。但是重点来了,我们只需要加上一个 go 关键字,就能并发处理多个 client 啦,so easy
for {
conn, err := listener.Accept() //blocks until an incoming connection request is made
if err != nil {
log.Print(err) // e.g., coonnection aborted
continue
}
go handleConn(conn) // NOTE: 并发处理连接,就是这么简单
}
这时候再运行 server,然后打开俩终端同时运行 client,你会发现俩 client 都有输出啦。
8.3 Example: Concurrent Echo Server
之前向客户端端输出时间的 clock server对每个连接使用了一个 goroutine,这一节我们对每个连接使用多个 goroutine
// netcat2
package main
import (
"bufio"
"fmt"
"io"
"log"
"net"
"os"
"strings"
"time"
)
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1*time.Second)
}
c.Close()
}
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
// reverb2
package main
import (
"bufio"
"fmt"
"net"
"strings"
"time"
)
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.FPrintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go echo(c, input.Text(), 1*time.Second) //input.Text() is evaluated in the main goroutine.
}
// NOTE: ignoring potential errors from input.Err()
c.Close()
}
8.4 Channels
如果 goroutine 是并行 go 程序的活动单元,channels 就是它们之间的连接通道。 channel是一种通信机制允许一个 goroutine 向另一个 goroutine 发送值。每个 channel 都是特定类型的值的管道,称为channel 的元素类型(element type)。比如 chan int
,使用内置的 make 函数创建管道: ch := make(chan int) // ch has type 'chan int'
和 map 一样,channel 也是引用类型,作为参数传递的时候会拷贝该引用本身,零值是 nil。 channel 有两个基本操作,send 和 receive。一个 send 把值从一个 goroutine 发送到另一个对应使用 receive 接收的 goroutine。
ch <- x // send 语句
x = <- ch // 赋值语句中的 receive 表达式
<- ch // receive 语句,丢弃结果
close(ch) // 关闭之后send 将 panic,但是却可以接受
channel 还支持第三个操作 close(ch),设置 flag 指示没有值将会发送到 channel,后续尝试 send 将会 panic。 在一个关闭的 channel 接收值将会一直接收到 channel 没有剩余的值,之后任何 receive 操作会立刻完成并且接收到 channel 的元素类型零值。 make 创建 channel 还可以指定容量:
ch = make(chan int) // unbuffered channel
ch = make(chan int, 0) // unbuffered channel
ch = make(chan int, 3) // buffered channel with capacity 3
8.4.1 Unbuffered Channels
在一个unbufferd channel 执行 send后 操作会 block 发送goroutine,直到另一个 goroutine 在相同 channel 执行对应的 receive,这样值才会被传输然后两个 goroutine 才有可能继续执行。如果 receive 先执行了,会被 block 直到对应的另一个 goroutine 执行了send 操作。在 unbuffered channel 上通信使得接收和发送者 goroutine 同步,所以也叫 synchronous channels。 在并发的讨论中,当我们说 x 在 y 之前发生,并不意味着时间上提前发生,而是保证它之前的操作(更新变量等),已经完成并且可以依赖它们了。 如果说 x 不在 y 前也不在 y 后发生,我们就说 x 和 y 是并发的。
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
done := make(chan struct{}) // 这里的 done 只是为了同步用,不需要值,防止main 结束了有些 coroutine 还没结束
go func() {
io.Copy(os.Stdout, conn) // NOTE: ignoring errors
log.Println("done")
done <- struct{}{} // signal the main goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // wait for background goroutine to finish,即使main先执行了,也会block到goroutine 执行send
}
8.4.2 Pipelines
channel 可以通过把一个 goroutine 的输出当成另一个的输入,成为 pipeline。
package main
import "fmt"
// counter -> squarer -> printer
func main() {
naturals := make(chan int)
squares := make(chan int)
// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()
//Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()
// Printer( in main goroutine)
for {
fmt.Println(<-squares)
}
}
go 提供了 range 语法用来迭代 channels,用来接收一个 channel 上被 send 的所有值,然后迭代到最后一个值后结束循环。 在一个已经 close 或者 nil 的channel执行 close 会 panic
8.4.3 Unidirectional Channel Types
上面例子最好能分开成几个函数,如下:
func counter(out chan int)
func squarer(out, in chan int)
func printer(in chan int)
go 里提供了单向 channel 允许只接收或者发送值。
chan<- int
, send-only channel of int ,只允许 send<-chan int
, receive-only channel of int,只允许 receive,close 一个只接收的 channel 会导致编译时错误
好,重构下上边的代码:
package main
import "fmt"
// counter -> squarer -> printer
func main() {
naturals := make(chan int)
squares := make(chan int)
go counter(naturals) // 允许双向 channel 隐式转成单向的 channel
go squarer(squares, naturals)
printer(squares)
}
func counter(out chan<- int) { // 只发送
for x := 0; x < 100; x++ {
out <- x
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for v := range in {
out <- v * v
}
close(out)
}
func printer(in <-chan int) {
for v := range in {
fmt.Println(v)
}
}
8.4.4 Buffered Channels
有容量的 channel: ch = make(chan string, 3)
。当 channel 满的时候会 block send 一直到其他goroutine receive 释放了空间。 当 channel 为空的时候接收者被 block 一直到其他 goroutine send 值。 可以用内置 cap 函数获取 channel 容量 cap(ch)
,而 len(ch)
返回元素个数。 通常 bufferd channel 在多个 goroutine 使用,go 新手经常在一个 goroutine 里当队列用,这个是错误的,channel 和 goroutine 调度是深度关联的,如果没有另一个 goroutine 从 channel 接收,sender 或者整个程序有可能被永久阻塞。简单的队列应该用 slice。
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response ,慢的 goroutine 会泄露
}
func request(hostname string) (response string) { /* ... */ }
goroutine leak: goroutine 泄露(视为bug)。NOTE: 泄露的 goroutine 不会被自动回收,必须确定不需要的时候自行终结。
8.5 Looping in Parallel
本节介绍几个把循环改成并发执行的几种模式。
package thumbnail
import "log"
func ImageFile(infile string) (string, error)
// makeThumbnails makes thumbnails of the specified files.
func makeThumbnails(filenames []string) {
for _, f := range filenames {
if _, err := thumbnail.ImageFile(f); err != nil {
log.Println(err)
}
}
}
注意这里的循环操作每个都是独立的,互不依赖,叫做 embarrassingly parallel,这种方式是最容易实现并发的。你能会立马写出如下代码:
// NOTE: incorrect!
func makeThumbnails2(filenames []string) {
for _, f := range filenames {
go thumbnail.ImageFile(f) // NOTE: ignoring errors
}
}
但是这是不对的,这段代码会启动所有 goroutine,然后没等他们完成直接退出了(运行它你会发现执行很快,但是没卵用,并非是并发的效果)。并没有一种直接的方法来等待 goroutine 完成,但是我们可以让内部的 goroutine 通过向一个共享 channel 发送事件来通知外部 goroutine 它完成了。
// makeThumbnails3 makes thumbnails of the specified files in parallel.
func makeThumbnails3(filenames []string) {
ch := make(chan struct{})
for _, f := range filenames {
go func(f string) {
thumbnail.ImageFile(f) // NOTE: ignoring errors
ch <- struct{}{}
}(f) // 注意这里使用参数防止只使用最后一个循环
}
// wait for goroutine complete
for range filenames {
<-ch
}
}
下面加上错误处理:
// makeThumbnails4 makes thumbnails for the specified files in parallel.
// It returns an error if any step failed.
func makeThumbnails4(filenames []string) error {
errors := make(chan error)
for _, f := range filenames {
go func(f string) {
_, err := thumbnail.ImageFile(f)
errors <- err
}(f)
}
for range filenames {
if err := <-errors; err != nil {
return err // NOTE: incorrect: goroutine leak! 注意直接返回第一个err 会造成 goroutine 泄露
}
}
return nil
}
注意这里有个隐含的bug,当遇到第一个 non-nil error 时, 返回error给调用者,导致没有 goroutine 消费 errors channel。每个还在工作的 worker goroutine 想要往 errors channel send 值的时候会被永久阻塞,无法终止,出现了 goroutine 泄露,程序被阻塞或者出现内存被用光。
两种解决方式:简单的方式是用一个有足够空间的 bufferd channel,当它发送消息的时候没有工作的 goruotine 被 block。另一种是在main goroutine 返回第一个 error 的时候创建一个新的 goroutine 消费 channel。
// makeThumbnails5 makes thumbnails for the specified files in parallel.
// It returns the generated file names in an arbitrary order,
// or an error if any step failed.
func makeThumbnails5(filenames []string) (thumbfiles []string, err error) {
type item struct {
thumbfile string
err error
}
ch := make(chan item, len(filenames)) // buffered channel
for _, f := range filenames {
go func(f string) {
var it item
it.thumbfile, it.err = thumbnail.ImageFile(f)
ch <- it
}(f)
}
for range filenames {
it := <-ch
if it.err != nil {
return nil, it.err
}
thumbfiles = append(thumbfiles, it.thumbfile)
}
return thumbfiles, nil
}
当我们不知道会循环多少次的时候,可以使用 sync.WaitGroup 记录 goroutine 数(Add and Done):
// makeThumbnails6 makes thumbnails for each file received from the channel.
// It returns the number of bytes occupied by the files it creates.
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // number of working goroutines
for f := range filenames {
wg.Add(1) // Add 必须在 goroutine 前调用
// worker
go func(f string) {
defer wg.Done() // 等价于 Add(-1)
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // OK to ignore error
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
8.6 Expmple: Conrurrent Web Crawler
package main
import (
"fmt"
"log"
"os"
"gopl.io/ch5/links"
)
func crawl(url string) []string {
fmt.Println(url)
list, err := links.Extract(url)
if err != nil {
log.Print(err)
}
return list
}
func main() {
worklist := make(chan []string)
go func() { worklist <- os.Args[1:] }()
// crawl the web concurrently
seen := make(map[string]bool)
for list := range worklist {
for _, link := range list {
if !seen[link] {
seen[link] = true
go func(link string) {
worklist <- crawl(link)
}(link)
}
}
}
}
但是这个程序太 "parallel",我们想限制下它的并发。可以通过有 n 个容量的bufferd channel来限制并发数(counting semaphore)
// tokens is a counting semaphore used to
// enforce a limit of 20 concurrent requests.
var tokens = make(chan struct{}, 20)
func crawl(url string) []string {
fmt.Println(url)
tokens <- struct{}{} // acquire a token ,初始化一个空 struct
list, err := links.Extract(url)
<-tokens // release the token
if er != nil {
log.Print(err)
}
return list
}
但是还有个问题,这个程序不会结束。我们需要当 worklist 为空并且没有爬虫 goroutine 活动的时候结束 main 里的循环。
8.7 Multiplexing with select
写一个火箭发射的倒计时程序:
package main
import (
"fmt"
"time"
)
//!+
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}
func launch() {
fmt.Println("Lift off!")
}
然后增加一个 abort 功能:
func main() {
//!+abort
abort := make(chan struct{})
go func() {
os.Stdin.Read(make([]byte, 1)) // read a single byte
abort <- struct{}{}
}()
//!-abort
//!+
fmt.Println("Commencing countdown. Press return to abort.")
select {
case <-time.After(10 * time.Second):
// Do nothing.
case <-abort:
fmt.Println("Launch aborted!")
return
}
launch()
}
select 允许我们轮训 channel (polling a channel):
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// do nothing
}
8.8 Example: Concurrent Directory Traversal
这一节实现一个类似 du 的命令,显示目录的使用量。main 函数用两个 goroutine,一个用来背后遍历目录,一个用来打印最后结果。
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
)
// 递归访问目录树
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// 返回目录的入口
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir) // returns a slice of os.FileInfo
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
func main() {
// 获取目录
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."} //没有命令行参数默认是当前目录
}
// 遍历文件树
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
// 打印结果
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
}
如果我们加上个进度输出会更好,用户给了 -v
参数,就定期打印出来结果
func main() {
// ...start background goroutine...
//!-
// Determine the initial directories.
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// Traverse the file tree.
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
//!+
// Print the results periodically.
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes was closed, labeled break statement breaks out of both the select and the for loop;
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes) // final totals
}
但是这个程序还是比较慢,我们还可以对每个 WalkDir 调用都开个 goroutine,我们使用sync.WaitGroup计算有多少个活跃的 WalkDir 调用,还有计数同步原语来限制太多的并发数。(从这里开始程序就开始难懂了,
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论