返回介绍

上卷 程序设计

中卷 标准库

下卷 运行时

源码剖析

附录

4.2.1 P

发布于 2024-10-12 19:15:57 字数 3409 浏览 0 评论 0 收藏 0

调度器初始化函数最重要的事情是初始化 P 数量。

// proc.go

func schedinit() {
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}    
}

调整 P 数量会导致 STW。

// runtime2.go

// len(allp) == gomaxprocs; may change at safe points, otherwise
// immutable.
var	allp []*p
// Change number of processors.
// Returns list of Ps with local work, they need to be scheduled by the caller.

func procresize(nprocs int32) *p {

    // !!!!
	assertWorldStopped()

    // 当前值。
	old := gomaxprocs

	// 必要时对 allp 扩容。
	if nprocs > int32(len(allp)) {
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			nallp := make([]*p, nprocs)
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
	}

	// 初始化新建 P。
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		}
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}

    // 检查当前 P 是否在裁剪范围。(nprocs < old)
	_g_ := getg()
	if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
		// 未被裁剪,继续。
		_g_.m.p.ptr().status = _Prunning
		_g_.m.p.ptr().mcache.prepareForSweep()
	} else {
        
        // 在裁剪范围,解除当前绑定。
		if _g_.m.p != 0 {
			_g_.m.p.ptr().m = 0
		}
        
		_g_.m.p = 0
        
        // 重新绑定 allp[0]。
		p := allp[0]
		p.m = 0
		p.status = _Pidle
		acquirep(p)
	}

	// 释放被裁剪 P 资源。
	for i := nprocs; i < old; i++ {
		p := allp[i]
		p.destroy()
	}

	// 剔除 allp 被裁剪部分。
	if int32(len(allp)) != nprocs {
		allp = allp[:nprocs]
	}

	var runnablePs *p
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
        
        // 跳过当前 P。
		if _g_.m.p.ptr() == p {
			continue
		}
        
        // 将有本地任务的 P 构成链表,用于返回。
		p.status = _Pidle
		if runqempty(p) {
			pidleput(p)
		} else {
			p.m.set(mget())
			p.link.set(runnablePs)
			runnablePs = p
		}
	}
    
	return runnablePs
}

初始化和释放操作,会对相关绑定资源进行处理。

func (pp *p) init(id int32) {
	pp.id = id
	pp.status = _Pgcstop
    
	pp.sudogcache = pp.sudogbuf[:0]
	pp.deferpool = pp.deferpoolbuf[:0]
	pp.wbBuf.reset()
    
	if pp.mcache == nil {
		if id == 0 {
			pp.mcache = mcache0
		} else {
			pp.mcache = allocmcache()
		}
	}
}
func (pp *p) destroy() {
    
    // !!!
	assertWorldStopped()

	// 将本地队列的任务转移到全局队列。
	for pp.runqhead != pp.runqtail {
		pp.runqtail--
		gp := pp.runq[pp.runqtail%uint32(len(pp.runq))].ptr()
		globrunqputhead(gp)
	}
	if pp.runnext != 0 {
		globrunqputhead(pp.runnext.ptr())
		pp.runnext = 0
	}
    
    // 停止并转移其他相关资源 ...
	if len(pp.timers) > 0 {
		plocal := getg().m.p.ptr()
		moveTimers(plocal, pp.timers)
		pp.timers = nil
		pp.numTimers = 0
		pp.deletedTimers = 0
		atomic.Store64(&pp.timer0When, 0)
	}
    
	if gcphase != _GCoff {
		wbBufFlush1(pp)
		pp.gcw.dispose()
	}
    
	for i := range pp.sudogbuf {
		pp.sudogbuf[i] = nil
	}
	pp.sudogcache = pp.sudogbuf[:0]
    
	for j := range pp.deferpoolbuf {
		pp.deferpoolbuf[j] = nil
	}
	pp.deferpool = pp.deferpoolbuf[:0]
    
	systemstack(func() {
		for i := 0; i < pp.mspancache.len; i++ {
			mheap_.spanalloc.free(unsafe.Pointer(pp.mspancache.buf[i]))
		}
		pp.mspancache.len = 0
		pp.pcache.flush(&mheap_.pages)
	})
	freemcache(pp.mcache)
	pp.mcache = nil
    
	gfpurge(pp)
	traceProcFree(pp)
    
    // 重置状态。
	pp.gcAssistTime = 0
	pp.status = _Pdead
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文