返回介绍

6. 看几个主动让出 CPU 的场景

发布于 2024-04-25 12:55:11 字数 13913 浏览 0 评论 0 收藏 0

6.1 time.Sleep

当代码中调用 time.Sleep 的时候我们是要 black 住程序不在继续往下执行,此时该 goroutine 不会做其他事情了,理应把 CPU 资源释放出来,下面看下实现:

// runtime/time.go

func timeSleep(ns int64) {
  if ns <= 0 {
    return
  }

  t := getg().timer
  if t == nil {
    t = new(timer)
    getg().timer = t
  }
  *t = timer{} // 每个定时任务都创建一个 timer
  t.when = nanotime() + ns
  t.f = goroutineReady // 记录唤醒该 G 的方法,唤醒时通过该方法执行唤醒
  t.arg = getg()       // 把 timer 与当前 G 关联,时间到了唤醒时通过该参数找到所在的 G
  lock(&timers.lock)
  addtimerLocked(t)                                      // 把 timer 添加到最小堆里
  goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2) // 切到 G0 让出 CPU,进入休眠
}



// runtime/proc.go

func goparkunlock(lock *mutex, reason string, traceEv byte, traceskip int) {
  gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}

timeSleep 函数里通过 addtimerLocked 把定时器加入到 timer 管理器(timer 通过最小堆的数据结构存放每个定时器,在这不做详细说明)后,再通过 goparkunlock 实现把当前 G 休眠,这里看到了上面提到的 gopark 方法进行调度循环的上下文切换。

上面介绍的是一个 G 如何进入到休眠状态的过程,该例子是个定时器,当时间到了的话,当前 G 就要被唤醒继续执行了。下面就介绍下唤醒的流程。

返回到最开始 timeSleep 方法里在进入调度方法之前有一个 addtimerLocked 方法,看下这个方法做了什么。

// runtime/time.go

func addtimerLocked(t *timer) {
  // when must never be negative; otherwise timerproc will overflow
  // during its delta calculation and never expire other runtime timers.
  if t.when < 0 {
    t.when = 1<<63 - 1
  }
  t.i = len(timers.t)
  timers.t = append(timers.t, t) //将当前 timer 添加到 timer 管理器里
  siftupTimer(t.i)
  
  ......
  
  // 如果没有启动 timer 管理定时器,则启动。timerproc 只会启动一次,即全局 timer 管理器
  if !timers.created {
    timers.created = true
    go timerproc()
  }
}
// runtime/time.go

// Timerproc runs the time-driven events.
// It sleeps until the next event in the timers heap.
// If addtimer inserts a new earlier event, it wakes timerproc early.
func timerproc() {
  timers.gp = getg()
  for {
    lock(&timers.lock)
    timers.sleeping = false
    now := nanotime()
    delta := int64(-1)
    for {
      if len(timers.t) == 0 {
        delta = -1
        break
      }
      t := timers.t[0]
      delta = t.when - now
      if delta > 0 {
        break
      }
      if t.period > 0 {
        // leave in heap but adjust next time to fire
        t.when += t.period * (1 + -delta/t.period)
        siftdownTimer(0)
      } else {
        // remove from heap
        last := len(timers.t) - 1
        if last > 0 {
          timers.t[0] = timers.t[last]
          timers.t[0].i = 0
        }
        timers.t[last] = nil
        timers.t = timers.t[:last]
        if last > 0 {
          siftdownTimer(0)
        }
        t.i = -1 // mark as removed
      }
      f := t.f
      arg := t.arg
      seq := t.seq
      unlock(&timers.lock)
      if raceenabled {
        raceacquire(unsafe.Pointer(t))
      }
      f(arg, seq)
      lock(&timers.lock)
    }
    ......
  }
}

在 addtimerLocked 方法的最下面有个逻辑在运行期间开启了’全局时间事件驱动器’timerproc,该方法会全程遍历最小堆,寻找最早进入 timer 管理器的定时器,然后唤醒。他是怎么找到要唤醒哪个 G 的?回头看下 timeSleep 方法里把当时正在执行的 G 以及唤醒方法 goroutineReady 带到了每个定时器里,而在 timerproc 则通过找到期的定时器执行 f(arg, seq)

即通过 goroutineReady 方法唤醒。方法调用过程: goroutineReady() -> ready()

/// runtime/time.go

func goroutineReady(arg interface{}, seq uintptr) {
  goready(arg.(*g), 0)
}
// runtime/proc.go

func goready(gp *g, traceskip int) {
  systemstack(func() {
    ready(gp, traceskip, true)
  })
}

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
  if trace.enabled {
    traceGoUnpark(gp, traceskip)
  }

  status := readgstatus(gp)

  // Mark runnable.
  _g_ := getg()
  _g_.m.locks++ // disable preemption because it can be holding p in a local var
  if status&^_Gscan != _Gwaiting {
    dumpgstatus(gp)
    throw("bad g->status in ready")
  }

  // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
  casgstatus(gp, _Gwaiting, _Grunnable)
  runqput(_g_.m.p.ptr(), gp, next)
  
  ......
}

在上面的方法里可以看到先把休眠的 G 从_Gwaiting 切换到_Grunnable 状态,表明已经可运行。然后通过 runqput 方法把 G 放到 P 的待运行队列里,就进入到调度器的调度循环里了。

总结:time.Sleep 想要进入阻塞(休眠) 状态,其实是通过 gopark 方法给自己标记个_Gwaiting 状态,然后把自己所占用的 CPU 线程资源给释放出来,继续执行调度任务,调度其它的 G 来运行。而唤醒是通过把 G 更改回_Grunnable 状态后,然后把 G 放入到 P 的待运行队列里等待执行。通过这点还可以看出休眠中的 G 其实并不占用 CPU 资源,最多是占用内存,是个很轻量级的阻塞。

6.2 sync.Mutex

// sync/mutex.go

func (m *Mutex) Lock() {
  // Fast path: grab unlocked mutex.
  // 首先尝试抢锁,如果抢到则直接返回,并标记 mutexLocked 状态
  if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    if race.Enabled {
      race.Acquire(unsafe.Pointer(m))
    }
    return
  }

  var waitStartTime int64
  starving := false
  awoke := false
  iter := 0
  old := m.state
  for {
    // Don't spin in starvation mode, ownership is handed off to waiters
    // so we won't be able to acquire the mutex anyway.
    // 尝试自璇,但有如下几个条件跳过自璇,这里的自璇是用户态自璇,基本 lock 的 cpu 消耗都耗到这里了
    // 1.不在饥饿模式自璇
    // 2.超过 4 次循环,则不再自璇. (runtime_canSpin 里面)
    // 3.全部 P 空闲时,不自璇.(runtime_canSpin 里面)
    // 4.当前 P 里无运行 G 时,不自璇.(runtime_canSpin 里面)
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      // Active spinning makes sense.
      // Try to set mutexWoken flag to inform Unlock
      // to not wake other blocked goroutines.
      if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
        awoke = true
      }
      runtime_doSpin() // doSpin 其实就是用户态自璇 30 次
      iter++
      old = m.state
      continue
    }
    
    ......
    
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
      ......
      
      runtime_SemacquireMutex(&m.sema, queueLifo)                                     // 这里会再次自璇几次,然后最后切换到 g0 把 G 标记_Gwaiting 状态阻塞在这里
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 如果锁等了 1 毫秒才被唤醒,才会标记为饥饿模式
      old = m.state
      
      ......
    } else {
      old = m.state
    }
  }

  if race.Enabled {
    race.Acquire(unsafe.Pointer(m))
  }
}
// runtime/sema.go

func sync_runtime_Semacquire(addr *uint32) {
  semacquire1(addr, false, semaBlockProfile)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
  ......
  
  for {
    ......
    
    // Any semrelease after the cansemacquire knows we're waiting
    // (we set nwait above), so go to sleep.
    root.queue(addr, s, lifo)                                     // 把当前锁的信息存起来以便以后唤醒时找到当前 G,G 是在 queue 里面获取的。
    goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4) // 进行休眠,然后阻塞在这里
    if s.ticket != 0 || cansemacquire(addr) {
      break
    }
  }
}

// queue adds s to the blocked goroutines in semaRoot.
func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
  s.g = getg() // 这里记录了当前的 G,以便唤醒的时候找到要被唤醒的 G
  s.elem = unsafe.Pointer(addr)
  s.next = nil
  s.prev = nil

  var last *sudog
  pt := &root.treap
  for t := *pt; t != nil; t = *pt {
    ......
    
    last = t
    if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
      pt = &t.prev
    } else {
      pt = &t.next
    }
  }

  ......

Mutex.Lock 方法通过调用 runtime_SemacquireMutex 最终还是调用 goparkunlock 实现把 G 进入到休眠状态。在进入休眠之前先把自己加入到队列里 root.queue(addr, s, lifo),在 queue 方法里,记录了当前的 G,以便以后找到并唤醒。

// sync/mutex.go

func (m *Mutex) Unlock() {
  ......
  
  if new&mutexStarving == 0 { // 如果不是饥饿模式
    old := new
    for {
      ......
      
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
        runtime_Semrelease(&m.sema, false) // 唤醒锁
        return
      }
      old = m.state
    }
  } else {
    // Starving mode: handoff mutex ownership to the next waiter.
    // Note: mutexLocked is not set, the waiter will set it after wakeup.
    // But mutex is still considered locked if mutexStarving is set,
    // so new coming goroutines won't acquire it.
    runtime_Semrelease(&m.sema, true) // 唤醒锁
  }
}
// runtime/sema.go

func sync_runtime_Semrelease(addr *uint32, handoff bool) {
  semrelease1(addr, handoff)
}

func semrelease1(addr *uint32, handoff bool) {
  root := semroot(addr)
  s, t0 := root.dequeue(addr)
  if s != nil {
    atomic.Xadd(&root.nwait, -1)
  }
  
  ......
  
  if s != nil { // May be slow, so unlock first
    ......
    
    readyWithTime(s, 5)
  }
}

func readyWithTime(s *sudog, traceskip int) {
  if s.releasetime != 0 {
    s.releasetime = cputicks()
  }
  goready(s.g, traceskip)
}

Mutex. Unlock 方法通过调用 runtime_Semrelease 最终还是调用 goready 实现把 G 唤醒。

6.3 channel

// runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // 寻找一个等待中的 receiver,直接把值传给这个 receiver,绕过下面 channel buffer,
  // 避免从 sender buffer->chan buffer->receiver buffer,而是直接 sender buffer->receiver buffer,仍然做了内存 copy
  if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
  }

  // 如果没有 receiver 等待:
  // 如果当前 chan 里的元素个数小于环形队列大小(也就是 chan 还没满),则把内存拷贝到 channel buffer 里,然后直接返回。
  // 注意 dataqsiz 是允许为 0 的,当为 0 时,也不存在该 if 里面的内存 copy
  if c.qcount < c.dataqsiz {
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx) // 获取即将要写入的 chan buffer 的指针地址
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    // 把元素内存拷贝进去.
    // 注意这里产生了一次内存 copy,也就是说如果没有 receiver 的话,就一定会产生内存拷贝
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++ // 发送索引+1
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++ // 队列元素计数器+1
    unlock(&c.lock)
    return true
  }

  if !block { // 如果是非阻塞的,到这里就可以结束了
    unlock(&c.lock)
    return false
  }

  // ########下面是进入阻塞模式的如何实现阻塞的处理逻辑

  // Block on the channel. Some receiver will complete our operation for us.
  // 把元素相关信息、当前的 G 信息打包到一个 sudog 里,然后扔进 send 队列
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  mysg.g = gp // 把当前 G 也扔进 sudog 里,用于别人唤醒该 G 的时候找到该 G
  mysg.selectdone = nil
  mysg.c = c
  gp.waiting = mysg // 记录当前 G 正在等待的 sudog
  gp.param = nil
  c.sendq.enqueue(mysg)
  // 切换到 g0,把当前 G 切换到_Gwaiting 状态,然后唤醒 lock.
  // 此时当前 G 被阻塞了,P 就继续执行其它 G 去了.
  goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

  ......
  
  return true
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ......
  
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

当给一个 chan 发送消息的时候,实质触发的方法是 chansend。在该方法里不是先进入休眠状态。

1)如果此时有接收者接收这个 chan 的消息则直接把数据通过 send 方法扔给接收者,并唤醒接收者的 G,然后当前 G 则继续执行。

2)如果没有接收者,就把数据 copy 到 chan 的临时内存里,且内存没有满就继续执行当前 G。

3)如果没有接收者且 chan 满了,依然是通过 goparkunlock 方法进入休眠。在休眠前把当前的 G 相关信息存到队列(sendq)以便有接收者接收数据的时候唤醒当前 G。

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ......
  
  if sg := c.sendq.dequeue(); sg != nil {
    // Found a waiting sender. If buffer is size 0, receive value
    // directly from sender. Otherwise, receive from head of queue
    // and add sender's value to the tail of the queue (both map to
    // the same buffer slot because the queue is full).
    // 寻找一个正在等待的 sender
    // 如果 buffer size 是 0,则尝试直接从 sender 获取(这种情况是在环形队列长度(dataqsiz) 为 0 的时候出现)
    // 否则(buffer full 的时候) 从队列 head 接收,并且帮助 sender 在队列满时的阻塞的元素信息拷贝到队列里,然后将 sender 的 G 状态切换为_Grunning,这样 sender 就不阻塞了。
    recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true, true
  }

  // 如果有数据则从 channel buffer 里获取数据后返回(此时环形队列长度 dataqsiz!=0)
  if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx) // 获取即将要读取的 chan buffer 的指针地址
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp) // copy 元素数据内存到 channel buffer
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }

  if !block {
    unlock(&c.lock)
    return false, false
  }

  // ##########下面是无任何数据准备把当前 G 切换为_Gwaiting 状态的逻辑

  // no sender available: block on this channel.
  gp := getg()
  mysg := acquireSudog()
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  // No stack splits between assigning elem and enqueuing mysg
  // on gp.waiting where copystack can find it.
  mysg.elem = ep
  mysg.waitlink = nil
  gp.waiting = mysg
  mysg.g = gp
  mysg.selectdone = nil
  mysg.c = c
  gp.param = nil
  c.recvq.enqueue(mysg)
  // 释放了锁,然后把当前 G 切换为_Gwaiting 状态,阻塞在这里等待有数据进来被唤醒
  goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

  ......
  
  return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  ......
  
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

chanrecv 方法是在 chan 接收者的地方调用的方法。

1)如果有发送者被休眠,则取出数据然后唤醒发送者,当前接收者的 G 拿到数据继续执行。

2)如果没有等待的发送者就看下有没有发送的数据还没被接收,有的话就直接取出数据然后返回,当前接收者的 G 拿到数据继续执行。(注意:这里取的数据不是正在等待的 sender 的数据,而是从 chan 的开头的内存取,如果是 sender 的数据则读出来的数据顺序就乱了)

3)如果即没有发送者,chan 里也没数据就通过 goparkunlock 进行休眠,在休眠之前把当前的 G 相关信息存到 recvq 里面,以便有数据时找到要唤醒的 G。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文