上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
context 1.18
上下文 (context)用于广播 消息 (cancel、timeout)和传递 数据 (value)。
- 链式流程 :每节点都可检查消息状态,以决定是否放弃后续调用。
- 并发模型 :创建并发任务单元,给予额外协作控制(取消、超时)。
- 消息只能向后广播。且只是建议,非强制。
- 数据沿调用链传递,子知父态,反之不行。
root (Background) | +--- A cancel / timeout value | | . ^ | +--- C . . | | . . | +--- D v . | +--- B broadcast: A -> C -> D recursion: D -> C -> A
type Context interface { // 截止时间:未设置时,ok == false。 Deadline() (deadline time.Time, ok bool) // 消息通知:closed。(cancel, deadline, timeoout) Done() <-chan struct{} // 取消原因:nil, Canceled, DeadlineExceeded。 Err() error // 关联数据:不适合传递业务参数(request)。 Value(key any) any }
消息 = Done + Err
:
Done
:检查消息是否发生。Err
:确认是哪类消息。
说明:
- 上下文对象不可变,以
With
函数生成有继承关系的子对象。 - 以
Background
或TODO
为根对象,以ctx
为参数名。 - 以
Value
方法获取关联值时,逐级向上(父)递归查找。 - 所有
cancel
函数应显式调用,避免正常结束未及释放资源。
package main import ( "context" "fmt" "time" ) func request(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() resp := make(chan int) go handle(ctx, resp) // do something ... select { case v := <- resp: println(v) case <- ctx.Done(): println(ctx.Err().Error()) } } func handle(ctx context.Context, resp chan<- int) { println("1/3 handle") cache(ctx, resp) } func cache(ctx context.Context, resp chan<- int) { println("2/3 cache") // long-time working ... time.Sleep(time.Second * 2) // !!!!! database(ctx, resp) } func database(ctx context.Context, resp chan<- int) { // check done! select { case <- ctx.Done(): println("3/3 database: timeout!") return default: } println("3/3 database.") resp <- 100 } func main() { // chain: request -> handle -> cache -> database request(context.Background()) fmt.Scanln() } /* 1/3 handle 2/3 cache context deadline exceeded 3/3 database: timeout! */
package main import ( "context" "time" ) func request(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() ctx = context.WithValue(ctx, "A", "a") go handle(ctx) // do something ... time.Sleep(time.Second) } func handle(ctx context.Context) { println("1/3 handle") ctx = context.WithValue(ctx, "B", "b") cache(ctx) } func cache(ctx context.Context) { println("2/3 cache:") println(" ", ctx.Value("A").(string)) ctx = context.WithValue(ctx, "A", "a2") database(ctx) } func database(ctx context.Context) { println("3/3 database.") println(" ", ctx.Value("A").(string)) println(" ", ctx.Value("B").(string)) } func main() { request(context.Background()) } /* 1/3 handle 2/3 cache: a 3/3 database. a2 b */
源码剖析
实现 Context
接口的几种上下文类型。
+---------+ | Context | +---------+ | +---------------+---------------+ | | | +----------+ +-----------+ +----------+ | emptyCtx | | cancelCtx | | valueCtx | +----------+ +-----------+ +----------+ | | | Background | WithValue TODO | +--------+-------+ | | WithCancel +----------+ | timerCtx | +----------+ | WithTimeout, WithDeadline
// context/context.go type emptyCtx int func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return } func (*emptyCtx) Done() <-chan struct{} { return nil } func (*emptyCtx) Err() error { return nil } func (*emptyCtx) Value(key any) any { return nil } var ( background = new(emptyCtx) todo = new(emptyCtx) ) func Background() Context { return background }
cancel
向下(子)发送 “取消” 广播。
- 所谓消息,就是
done
被关闭。 - 即便正常结束,也应该调用
cancel
函数清理资源。 - 内部同步保护,多次调用
cancel
没有影响。
// A cancelCtx can be canceled. When canceled, it also cancels // any children that implement canceler. type cancelCtx struct { Context // parent mu sync.Mutex done atomic.Value // channel children map[canceler]struct{} // broadcast err error }
Context
:直属父辈,用于往上递归。children
:广播字典。后辈可能跳过几层Context
,找到并加入。
所有 With
函数都返回一个包装上下文,以 Context
嵌入字段记录父级上下文。
而 WithCancel
还返回一个用来发送消息的 cancel
函数,其中包括 Err
返回值。
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { c := newCancelCtx(parent) propagateCancel(parent, &c) // 返回上下文和取消函数(内含取消原因)。 return &c, func() { c.cancel(true, Canceled) } } func newCancelCtx(parent Context) cancelCtx { return cancelCtx{Context: parent} }
创建时,尝试加入父辈广播网( children
),以便接收消息。
func propagateCancel(parent Context, child canceler) { done := parent.Done() if done == nil { return // parent is never canceled } // 父辈已经取消。 select { case <-done: // parent is already canceled child.cancel(false, parent.Err()) return default: } // 寻找 cancelCtx 类型的祖先。 if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil { // 立即响应! child.cancel(false, p.err) } else { // 加入广播网。 if p.children == nil { p.children = make(map[canceler]struct{}) } p.children[child] = struct{}{} } p.mu.Unlock() } else { // 其他情形,比如没找到 cancelCtx 类型祖先。 // 新建 goroutine 监控父辈消息,自己广播。 atomic.AddInt32(&goroutines, +1) go func() { select { case <-parent.Done(): child.cancel(false, parent.Err()) case <-child.Done(): } }() } }
func (c *cancelCtx) Done() <-chan struct{} { d := c.done.Load() if d != nil { return d.(chan struct{}) } c.mu.Lock() defer c.mu.Unlock() d = c.done.Load() if d == nil { d = make(chan struct{}) // !!!! c.done.Store(d) } return d.(chan struct{}) }
往上递归查找 cancelCtx
类型祖先,只有它有广播字典( children
)。
func parentCancelCtx(parent Context) (*cancelCtx, bool) { done := parent.Done() if done == closedchan || done == nil { return nil, false } // 用特别标识,让父辈自行判断. p, ok := parent.Value(&cancelCtxKey).(*cancelCtx) if !ok { return nil, false } // 找到! pdone, _ := p.done.Load().(chan struct{}) if pdone != done { return nil, false } return p, true }
func (c *cancelCtx) Value(key any) any { // 当前已是 cancelCtx,返回自己。 if key == &cancelCtxKey { return c } // 取关联值。 return value(c.Context, key) } func (c *valueCtx) Value(key any) any { // key != &cancelCtxKey if c.key == key { return c.val } // 往上递归。 return value(c.Context, key) } func value(c Context, key any) any { for { switch ctx := c.(type) { case *valueCtx: if key == ctx.key { return ctx.val } c = ctx.Context case *cancelCtx: if key == &cancelCtxKey { return c } c = ctx.Context case *timerCtx: if key == &cancelCtxKey { return &ctx.cancelCtx } c = ctx.Context case *emptyCtx: return nil default: return c.Value(key) // 递归 !!! } } }
当 canel
被调用,广播(递归)消息。
// cancel closes c.done, cancels each of c's children, and, if // removeFromParent is true, removes c from its parent's children. func (c *cancelCtx) cancel(removeFromParent bool, err error) { c.mu.Lock() if c.err != nil { c.mu.Unlock() return // already canceled } c.err = err // 关闭 done channel。 d, _ := c.done.Load().(chan struct{}) if d == nil { c.done.Store(closedchan) } else { close(d) } // 向下递归广播取消。 for child := range c.children { child.cancel(false, err) } c.children = nil c.mu.Unlock() // 从广播字典中移除(delete)。 if removeFromParent { removeChild(c.Context, c) } }
timer
这是 WithTimeout
、 WithDeadline
的内核。
简单点说,就是继承 cancelCtx
手动取消功能后,另加定时器自动执行。
// A timerCtx carries a timer and a deadline. It embeds // a cancelCtx to implement Done and Err. It implements // cancel by stopping its timer then delegating to // cancelCtx.cancel. type timerCtx struct { cancelCtx timer *time.Timer // Under cancelCtx.mu. deadline time.Time }
func WithTimeout(parent Context, timeout Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) } func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) { // 如果父辈截止时间更早,以父辈为准。 if cur, ok := parent.Deadline(); ok && cur.Before(d) { return WithCancel(parent) } // 创建上下文。 c := &timerCtx{ cancelCtx: newCancelCtx(parent), deadline: d, } // 加入父辈广播网 propagateCancel(parent, c) // 如果已过了截止时间,立即执行。 dur := time.Until(d) if dur <= 0 { c.cancel(true, DeadlineExceeded) // deadline has already passed return c, func() { c.cancel(false, Canceled) } } c.mu.Lock() defer c.mu.Unlock() // 创建定时器,用于自动执行取消操作。 if c.err == nil { c.timer = time.AfterFunc(dur, func() { c.cancel(true, DeadlineExceeded) }) } return c, func() { c.cancel(true, Canceled) } }
func (c *timerCtx) cancel(removeFromParent bool, err error) { // 取消,从广播字典移除。 c.cancelCtx.cancel(false, err) if removeFromParent { // Remove this timerCtx from its parent cancelCtx's children. removeChild(c.cancelCtx.Context, c) } // 停止定时器。(截止时间之前被手动取消) c.mu.Lock() if c.timer != nil { c.timer.Stop() c.timer = nil } c.mu.Unlock() }
value
以递归方式通过层级关系向上(父)查找。
// A valueCtx carries a key-value pair. // It implements Value for that key and delegates all // other calls to the embedded Context. type valueCtx struct { Context key, val any }
func (c *valueCtx) Value(key any) any { // 本地。 if c.key == key { return c.val } // 向上递归。 return value(c.Context, key) }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论