返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

context 1.18

发布于 2024-10-12 19:15:51 字数 11073 浏览 0 评论 0 收藏 0

上下文 (context)用于广播 消息 (cancel、timeout)和传递 数据 (value)。

  • 链式流程 :每节点都可检查消息状态,以决定是否放弃后续调用。
  • 并发模型 :创建并发任务单元,给予额外协作控制(取消、超时)。
  • 消息只能向后广播。且只是建议,非强制。
  • 数据沿调用链传递,子知父态,反之不行。
 root (Background)
   |
   +--- A                 cancel / timeout                 value
   |    |                         .                          ^
   |    +--- C                    .                          .
   |         |                    .                          .
   |         +--- D               v                          . 
   |
   +--- B              broadcast: A -> C -> D     recursion: D -> C -> A
type Context interface {
    
    // 截止时间:未设置时,ok == false。
    Deadline() (deadline time.Time, ok bool)
    
    // 消息通知:closed。(cancel, deadline, timeoout)
    Done() <-chan struct{}
    
    // 取消原因:nil, Canceled, DeadlineExceeded。
    Err() error
    
    // 关联数据:不适合传递业务参数(request)。
    Value(key any) any
}

消息 = Done + Err

  • Done :检查消息是否发生。
  • Err :确认是哪类消息。

说明:

  • 上下文对象不可变,以 With 函数生成有继承关系的子对象。
  • BackgroundTODO 为根对象,以 ctx 为参数名。
  • Value 方法获取关联值时,逐级向上(父)递归查找。
  • 所有 cancel 函数应显式调用,避免正常结束未及释放资源。
package main

import (
	"context"
	"fmt"
	"time"
)

func request(ctx context.Context) {
	ctx, cancel := context.WithTimeout(ctx, time.Second)
	defer cancel()

	resp := make(chan int)
	go handle(ctx, resp)

	// do something ...

	select {
	case v := <- resp: 
		println(v)
	case <- ctx.Done():
		println(ctx.Err().Error())
	}
}

func handle(ctx context.Context, resp chan<- int) {
	println("1/3 handle")
	cache(ctx, resp)
}

func cache(ctx context.Context, resp chan<- int) {
	println("2/3 cache")
	
    // long-time working ...
	time.Sleep(time.Second * 2)  // !!!!!

	database(ctx, resp)
}

func database(ctx context.Context, resp chan<- int) {

    // check done!
	select {
	case <- ctx.Done(): 
		println("3/3 database: timeout!")
		return
	default:
	}

	println("3/3 database.")
	resp <- 100
}

func main() {

    // chain: request -> handle -> cache -> database
	request(context.Background())

	fmt.Scanln()
}

/*

1/3 handle
2/3 cache

context deadline exceeded

3/3 database: timeout!

*/
package main

import (
	"context"
	"time"
)

func request(ctx context.Context) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	ctx = context.WithValue(ctx, "A", "a")
	go handle(ctx)

	// do something ...
	time.Sleep(time.Second)
}

func handle(ctx context.Context) {
	println("1/3 handle")

	ctx = context.WithValue(ctx, "B", "b")
	cache(ctx)
}

func cache(ctx context.Context) {
	println("2/3 cache:")
	println("  ", ctx.Value("A").(string))	

	ctx = context.WithValue(ctx, "A", "a2")
	database(ctx)
}

func database(ctx context.Context) {
	println("3/3 database.")
	println("  ", ctx.Value("A").(string))	
	println("  ", ctx.Value("B").(string))	
}

func main() {
	request(context.Background())
}

/*

1/3 handle
2/3 cache:
   a
3/3 database.
   a2
   b
   
*/

源码剖析

实现 Context 接口的几种上下文类型。

                    +---------+
                    | Context |
                    +---------+
                         |
         +---------------+---------------+
         |               |               |
   +----------+    +-----------+    +----------+
   | emptyCtx |    | cancelCtx |    | valueCtx |
   +----------+    +-----------+    +----------+
         |               |               |
    Background           |           WithValue
       TODO              |
                +--------+-------+
                |                |
           WithCancel      +----------+
                           | timerCtx |
                           +----------+
                                 |
                      WithTimeout, WithDeadline
                 
// context/context.go

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return }
func (*emptyCtx) Done() <-chan struct{} { return nil }
func (*emptyCtx) Err() error { return nil }
func (*emptyCtx) Value(key any) any { return nil }

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

func Background() Context {
	return background
}

cancel

向下(子)发送 “取消” 广播。

  • 所谓消息,就是 done 被关闭。
  • 即便正常结束,也应该调用 cancel 函数清理资源。
  • 内部同步保护,多次调用 cancel 没有影响。
// A cancelCtx can be canceled. When canceled, it also cancels 
// any children that implement canceler.

type cancelCtx struct {
	Context                          // parent

	mu       sync.Mutex
	done     atomic.Value            // channel
	children map[canceler]struct{}   // broadcast
	err      error
}
  • Context :直属父辈,用于往上递归。
  • children :广播字典。后辈可能跳过几层 Context ,找到并加入。

所有 With 函数都返回一个包装上下文,以 Context 嵌入字段记录父级上下文。
WithCancel 还返回一个用来发送消息的 cancel 函数,其中包括 Err 返回值。

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
    
    // 返回上下文和取消函数(内含取消原因)。
	return &c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

创建时,尝试加入父辈广播网( children ),以便接收消息。

func propagateCancel(parent Context, child canceler) {
    
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}

    // 父辈已经取消。
	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

    // 寻找 cancelCtx 类型的祖先。
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
        
		if p.err != nil {
            
			// 立即响应!
			child.cancel(false, p.err)
            
		} else {
            
            // 加入广播网。
            
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
        
		p.mu.Unlock()
	} else {
        
        // 其他情形,比如没找到 cancelCtx 类型祖先。
        // 新建 goroutine 监控父辈消息,自己广播。
        
		atomic.AddInt32(&goroutines, +1)
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}
func (c *cancelCtx) Done() <-chan struct{} {
	d := c.done.Load()
	if d != nil {
		return d.(chan struct{})
	}
    
	c.mu.Lock()
	defer c.mu.Unlock()
    
	d = c.done.Load()
	if d == nil {
		d = make(chan struct{})  // !!!!
		c.done.Store(d)
	}
    
	return d.(chan struct{})
}

往上递归查找 cancelCtx 类型祖先,只有它有广播字典( children )。

func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	done := parent.Done()
	if done == closedchan || done == nil {
		return nil, false
	}
    
    // 用特别标识,让父辈自行判断.
	p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
	if !ok {
		return nil, false
	}
    
    // 找到!
	pdone, _ := p.done.Load().(chan struct{})
	if pdone != done {
		return nil, false
	}
    
	return p, true
}
func (c *cancelCtx) Value(key any) any {
    
    // 当前已是 cancelCtx,返回自己。
	if key == &cancelCtxKey {
		return c
	}
    
    // 取关联值。
	return value(c.Context, key)
}


func (c *valueCtx) Value(key any) any {
    
    // key != &cancelCtxKey
	if c.key == key {
		return c.val
	}
    
    // 往上递归。
	return value(c.Context, key)
}


func value(c Context, key any) any {
	for {
		switch ctx := c.(type) {
		case *valueCtx:
			if key == ctx.key { return ctx.val }
			c = ctx.Context
		case *cancelCtx:
			if key == &cancelCtxKey { return c }
			c = ctx.Context
		case *timerCtx:
			if key == &cancelCtxKey { return &ctx.cancelCtx }
			c = ctx.Context
		case *emptyCtx:
			return nil
		default:
			return c.Value(key)  // 递归 !!!
		}
	}
}

canel 被调用,广播(递归)消息。

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    c.mu.Lock()
    
    if c.err != nil {
        c.mu.Unlock()
        return // already canceled
    }
    
    c.err = err
    
    // 关闭 done channel。
    d, _ := c.done.Load().(chan struct{})
    if d == nil {
        c.done.Store(closedchan)
    } else {
        close(d)
    }
    
    // 向下递归广播取消。
    for child := range c.children {
        child.cancel(false, err)
    }
    c.children = nil
    
    c.mu.Unlock()
    
    // 从广播字典中移除(delete)。
    if removeFromParent {
        removeChild(c.Context, c)
    }
}

timer

这是 WithTimeoutWithDeadline 的内核。

简单点说,就是继承 cancelCtx 手动取消功能后,另加定时器自动执行。

// A timerCtx carries a timer and a deadline. It embeds 
// a cancelCtx to implement Done and Err. It implements 
// cancel by stopping its timer then delegating to 
// cancelCtx.cancel.

type timerCtx struct {
	cancelCtx
    
	timer *time.Timer // Under cancelCtx.mu.
	deadline time.Time
}
func WithTimeout(parent Context, timeout Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    
    // 如果父辈截止时间更早,以父辈为准。
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		return WithCancel(parent)
	}
    
    // 创建上下文。
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
    
    // 加入父辈广播网
	propagateCancel(parent, c)
    
    // 如果已过了截止时间,立即执行。
	dur := time.Until(d)
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}

	c.mu.Lock()
	defer c.mu.Unlock()

    // 创建定时器,用于自动执行取消操作。
	if c.err == nil {
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
    
	return c, func() { c.cancel(true, Canceled) }
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
    
    // 取消,从广播字典移除。
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// Remove this timerCtx from its parent cancelCtx's children.
		removeChild(c.cancelCtx.Context, c)
	}
    
    // 停止定时器。(截止时间之前被手动取消)
	c.mu.Lock()
	if c.timer != nil {
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

value

以递归方式通过层级关系向上(父)查找。

// A valueCtx carries a key-value pair. 
// It implements Value for that key and delegates all 
// other calls to the embedded Context.

type valueCtx struct {
	Context
	key, val any
}
func (c *valueCtx) Value(key any) any {
    
    // 本地。
	if c.key == key {
		return c.val
	}
    
    // 向上递归。
	return value(c.Context, key)
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文