返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

3.6.2 缓存队列

发布于 2024-10-12 19:16:04 字数 5217 浏览 0 评论 0 收藏 0

设计用来在垃圾标记时保存灰色对象。

同样是本地队列(P.gcw)和全局队列(work)两级结构。

workbuf

用来持有数据的是 workbuf 结构,以数组管理多个指针形成数据包。

在本地和全局转移数据时,以 workbuf 为基本单位,实现批量操作。

// mgcwork.go

type workbuf struct {
    workbufhdr
    obj [(_WorkbufSize - unsafe.Sizeof(workbufhdr{})) / sys.PtrSize]uintptr
}

type workbufhdr struct {
    node lfnode           // must be first
    nobj int
}
  +---------+        +---------+        +------+
  | P.gcw   | <----- | workbuf | -----> | work |
  +---------+        +---------+        +------+
  |   wbuf1 |
  +---------+
  |   wbuf2 |
  +---------+

gcw

透过 gcWork 相关方法,可以看到具体工作方式。

// runtime2.go

type p struct {
    gcw gcWork
}
// mgcwork.go

type gcWork struct {
    wbuf1, wbuf2 *workbuf
}

gcWork 只所以持有两个 workbuf,是为了:

1. 平衡本地和全局数据;

2. 转移时减少数据复制。

func (w *gcWork) put(obj uintptr) {
    
    wbuf := w.wbuf1
    
    if wbuf == nil {
        w.init()
        wbuf = w.wbuf1
    } else if wbuf.nobj == len(wbuf.obj) {     // 已满(计数和数组长度相等)。
        
        w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1    // 交换。
        wbuf = w.wbuf1
        
        if wbuf.nobj == len(wbuf.obj) {        // 另一个也满了,两个都满了。
            putfull(wbuf)                      // 上交一个,平衡。
            wbuf = getempty()                  // 获取空的复用。
            w.wbuf1 = wbuf
        }
    }
    
    // 保存。
    wbuf.obj[wbuf.nobj] = obj
    wbuf.nobj++
}

当 2 个都满了,put 只上交 1 个,相当于平衡一半到全局。

余下一半则依旧供本地 get 使用。

同理,get 在 wbuf1 找不到任务时,也会交换 wbuf2。

func (w *gcWork) tryGet() uintptr {
    
    wbuf := w.wbuf1
    if wbuf == nil {
        w.init()
        wbuf = w.wbuf1
    }
    
    if wbuf.nobj == 0 {                       // 已空。
        w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1   // 交换。
        wbuf = w.wbuf1
        
        if wbuf.nobj == 0 {                   // 依然为空,两个都空了。
            owbuf := wbuf
            wbuf = trygetfull()               // 从全局提取任务。
            if wbuf == nil {
                return 0
            }
            
            putempty(owbuf)                   // 上交空节点。
            w.wbuf1 = wbuf                    // 换上丛全局获取的包。
        }
    }
    
    // 提取。
    wbuf.nobj--
    
    return wbuf.obj[wbuf.nobj]
}

work

全局变量 work 用两个栈结构管理 workbuf 节点。

// mgc.go

var work struct {
    full  lfstack        // lock-free list of full blocks workbuf
    empty lfstack        // lock-free list of empty blocks workbuf
}
  • full : 存储有数据 workbuf 节点。
  • empty : 存储无数据 workbuf 节点,以待复用。
// mgcwork.go

type workbufhdr struct {
    node lfnode              // must be first
    nobj int
}

node 字段被 lfstack 使用,用于维护 stack 结构。

当 gcWork 所持有的 workbuf 已满,则将整个 workbuf (一批数据)提交到 work.full 栈。

然后从 work.empty 获取(或新建)一个空的 workbuf 使用。

// mgcwork.go

func putfull(b *workbuf) {
    work.full.push(&b.node)
}
func getempty() *workbuf {
    var b *workbuf
    
    if work.empty != 0 {
        b = (*workbuf)(work.empty.pop())
    }
    
    if b == nil {
        // Allocate more workbufs.
        ...
    }
    
    return b
}

lfstack

全局队列 Lock-Free Stack,就是用 CAS(Compare & Swap) 实现原子操作的链表。

Node Pointer + Node.PushCount 实现了 Double-CAS。

// lfstack.go

// lfstack is the head of a lock-free stack.

type lfstack uint64
// runtime2. go

// Lock-free stack node.

type lfnode struct {
    next    uint64
    pushcnt uintptr
}

用循环确保原子操作成功,很常见的无锁算法。

// lfstack.go

func (head *lfstack) push(node *lfnode) {
    
    // 累加计数器,与地址共同生成唯一流水号。
    node.pushcnt++
    
    new := lfstackPack(node, node.pushcnt)
    
    for {
        // 添加到链表头部。
        old := atomic.Load64((*uint64)(head))
        node.next = old
        
        // 将新节点地址保存到 lfstack 做为链表头。
        if atomic.Cas64((*uint64)(head), old, new) {
            // 只有操作成功才跳出,否则循环重试。
            break
        }
    }
}
func (head *lfstack) pop() unsafe.Pointer {
    for {
        // 直接从 lfstack 提取头个节点。
        old := atomic.Load64((*uint64)(head))
        if old == 0 {
            return nil
        }
        
        // 确保将下一个节点存入头部。
        node := lfstackUnpack(old)           // 从流水号提取地址。
        next := atomic.Load64(&node.next)
        if atomic.Cas64((*uint64)(head), old, next) {
            // 链表修改成功,返回。
            return unsafe.Pointer(node)
        }
    }
}

如果 CAS 判断的仅是 old 指针地址,而该地址又被意外重用,那就会造成错误结果,这就是所谓 ABA 问题。利用 地址 + 计数器 生成唯一流水号,实现 Double-CAS,就能避开。

// lfstack_64bit.go

func lfstackPack(node *lfnode, cnt uintptr) uint64 {
    return uint64(uintptr(unsafe.Pointer(node)))<<(64-addrBits) | uint64(cnt&(1<<cntBits-1))
}

// 从流水号提取地址。
func lfstackUnpack(val uint64) *lfnode {
    if GOARCH == "amd64" {
        return (*lfnode)(unsafe.Pointer(uintptr(int64(val) >> cntBits << 3)))
    }
    
    return (*lfnode)(unsafe.Pointer(uintptr(val >> cntBits << 3)))
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文