上卷 程序设计
中卷 标准库
- bufio 1.18
- bytes 1.18
- io 1.18
- container 1.18
- encoding 1.18
- crypto 1.18
- hash 1.18
- index 1.18
- sort 1.18
- context 1.18
- database 1.18
- connection
- query
- queryrow
- exec
- prepare
- transaction
- scan & null
- context
- tcp
- udp
- http
- server
- handler
- client
- h2、tls
- url
- rpc
- exec
- signal
- embed 1.18
- plugin 1.18
- reflect 1.18
- runtime 1.18
- KeepAlived
- ReadMemStats
- SetFinalizer
- Stack
- sync 1.18
- atomic
- mutex
- rwmutex
- waitgroup
- cond
- once
- map
- pool
- copycheck
- nocopy
- unsafe 1.18
- fmt 1.18
- log 1.18
- math 1.18
- time 1.18
- timer
下卷 运行时
源码剖析
附录
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
3.6.2 缓存队列
设计用来在垃圾标记时保存灰色对象。
同样是本地队列(P.gcw)和全局队列(work)两级结构。
workbuf
用来持有数据的是 workbuf 结构,以数组管理多个指针形成数据包。
在本地和全局转移数据时,以 workbuf 为基本单位,实现批量操作。
// mgcwork.go type workbuf struct { workbufhdr obj [(_WorkbufSize - unsafe.Sizeof(workbufhdr{})) / sys.PtrSize]uintptr } type workbufhdr struct { node lfnode // must be first nobj int }
+---------+ +---------+ +------+ | P.gcw | <----- | workbuf | -----> | work | +---------+ +---------+ +------+ | wbuf1 | +---------+ | wbuf2 | +---------+
gcw
透过 gcWork 相关方法,可以看到具体工作方式。
// runtime2.go type p struct { gcw gcWork }
// mgcwork.go type gcWork struct { wbuf1, wbuf2 *workbuf }
gcWork 只所以持有两个 workbuf,是为了:
1. 平衡本地和全局数据;
2. 转移时减少数据复制。
func (w *gcWork) put(obj uintptr) { wbuf := w.wbuf1 if wbuf == nil { w.init() wbuf = w.wbuf1 } else if wbuf.nobj == len(wbuf.obj) { // 已满(计数和数组长度相等)。 w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1 // 交换。 wbuf = w.wbuf1 if wbuf.nobj == len(wbuf.obj) { // 另一个也满了,两个都满了。 putfull(wbuf) // 上交一个,平衡。 wbuf = getempty() // 获取空的复用。 w.wbuf1 = wbuf } } // 保存。 wbuf.obj[wbuf.nobj] = obj wbuf.nobj++ }
当 2 个都满了,put 只上交 1 个,相当于平衡一半到全局。
余下一半则依旧供本地 get 使用。
同理,get 在 wbuf1 找不到任务时,也会交换 wbuf2。
func (w *gcWork) tryGet() uintptr { wbuf := w.wbuf1 if wbuf == nil { w.init() wbuf = w.wbuf1 } if wbuf.nobj == 0 { // 已空。 w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1 // 交换。 wbuf = w.wbuf1 if wbuf.nobj == 0 { // 依然为空,两个都空了。 owbuf := wbuf wbuf = trygetfull() // 从全局提取任务。 if wbuf == nil { return 0 } putempty(owbuf) // 上交空节点。 w.wbuf1 = wbuf // 换上丛全局获取的包。 } } // 提取。 wbuf.nobj-- return wbuf.obj[wbuf.nobj] }
work
全局变量 work 用两个栈结构管理 workbuf 节点。
// mgc.go var work struct { full lfstack // lock-free list of full blocks workbuf empty lfstack // lock-free list of empty blocks workbuf }
- full : 存储有数据 workbuf 节点。
- empty : 存储无数据 workbuf 节点,以待复用。
// mgcwork.go type workbufhdr struct { node lfnode // must be first nobj int }
node 字段被 lfstack 使用,用于维护 stack 结构。
当 gcWork 所持有的 workbuf 已满,则将整个 workbuf (一批数据)提交到 work.full 栈。
然后从 work.empty 获取(或新建)一个空的 workbuf 使用。
// mgcwork.go func putfull(b *workbuf) { work.full.push(&b.node) }
func getempty() *workbuf { var b *workbuf if work.empty != 0 { b = (*workbuf)(work.empty.pop()) } if b == nil { // Allocate more workbufs. ... } return b }
lfstack
全局队列 Lock-Free Stack,就是用 CAS(Compare & Swap) 实现原子操作的链表。
用
Node Pointer + Node.PushCount
实现了 Double-CAS。
// lfstack.go // lfstack is the head of a lock-free stack. type lfstack uint64
// runtime2. go // Lock-free stack node. type lfnode struct { next uint64 pushcnt uintptr }
用循环确保原子操作成功,很常见的无锁算法。
// lfstack.go func (head *lfstack) push(node *lfnode) { // 累加计数器,与地址共同生成唯一流水号。 node.pushcnt++ new := lfstackPack(node, node.pushcnt) for { // 添加到链表头部。 old := atomic.Load64((*uint64)(head)) node.next = old // 将新节点地址保存到 lfstack 做为链表头。 if atomic.Cas64((*uint64)(head), old, new) { // 只有操作成功才跳出,否则循环重试。 break } } }
func (head *lfstack) pop() unsafe.Pointer { for { // 直接从 lfstack 提取头个节点。 old := atomic.Load64((*uint64)(head)) if old == 0 { return nil } // 确保将下一个节点存入头部。 node := lfstackUnpack(old) // 从流水号提取地址。 next := atomic.Load64(&node.next) if atomic.Cas64((*uint64)(head), old, next) { // 链表修改成功,返回。 return unsafe.Pointer(node) } } }
如果 CAS 判断的仅是 old 指针地址,而该地址又被意外重用,那就会造成错误结果,这就是所谓 ABA 问题。利用 地址 + 计数器
生成唯一流水号,实现 Double-CAS,就能避开。
// lfstack_64bit.go func lfstackPack(node *lfnode, cnt uintptr) uint64 { return uint64(uintptr(unsafe.Pointer(node)))<<(64-addrBits) | uint64(cnt&(1<<cntBits-1)) } // 从流水号提取地址。 func lfstackUnpack(val uint64) *lfnode { if GOARCH == "amd64" { return (*lfnode)(unsafe.Pointer(uintptr(int64(val) >> cntBits << 3))) } return (*lfnode)(unsafe.Pointer(uintptr(val >> cntBits << 3))) }
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论