原創(chuàng)文章,歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明出處,謝謝。 0. 前言 在 第四講 我們介紹了 main goroutine 是如何運行的。其中針對 main goroutine 介紹了調(diào)度函數(shù) schedule 是怎么工作的,對于整個調(diào)度器的調(diào)度策略并沒有介紹,這點是不完整的,這一講會完善調(diào)度器的調(diào)度策略部分。
原創(chuàng)文章,歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明出處,謝謝。
在 第四講 我們介紹了 main goroutine 是如何運行的。其中針對 main goroutine 介紹了調(diào)度函數(shù) schedule 是怎么工作的,對于整個調(diào)度器的調(diào)度策略并沒有介紹,這點是不完整的,這一講會完善調(diào)度器的調(diào)度策略部分。
runtime.schedule
實現(xiàn)了調(diào)度器的調(diào)度策略。那么對于調(diào)度時間點,查看哪些函數(shù)調(diào)用的
runtime.schedule
即可順藤摸瓜理出調(diào)度器的調(diào)度時間點,如下圖:
調(diào)度時間點不是本講的重點,這里有興趣的同學可以順藤摸瓜,摸摸觸發(fā)調(diào)度時間點的路徑,這里就跳過了。
調(diào)度策略才是我們的重點,進到
runtime.schedule
:
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
mp := getg().m // 獲取當前執(zhí)行線程
top:
pp := mp.p.ptr() // 獲取執(zhí)行線程綁定的 P
pp.preempt = false
// Safety check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
...
execute(gp, inheritTime) // 執(zhí)行找到的 goroutine
}
runtime.schedule
的重點在
findRunnable()
。
findRunnable()
函數(shù)很長,為避免影響可讀性,這里對大部分流程做了注釋,后面在有重點的加以介紹。進入
findRunnable()
:
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
// tryWakeP indicates that the returned goroutine is not normal (GC worker, trace
// reader) so the caller should try to wake a P.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m // 獲取當前執(zhí)行線程
top:
pp := mp.p.ptr() // 獲取線程綁定的 P
...
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if pp.schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp := globrunqget(pp, 1)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}
// local runq
if gp, inheritTime := runqget(pp); gp != nil { // 從 P 的本地隊列中獲取 goroutine
return gp, inheritTime, false
}
// global runq
if sched.runqsize != 0 { // 如果本地隊列獲取不到就判斷全局隊列中有無 goroutine
lock(&sched.lock) // 如果有的話,為全局變量加鎖
gp := globrunqget(pp, 0) // 從全局隊列中拿 goroutine
unlock(&sched.lock) // 為全局變量解鎖
if gp != nil {
return gp, false, false
}
}
// 如果全局隊列中沒有 goroutine 則從 network poller 中取 goroutine
if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
...
}
// 如果 network poller 中也沒有 goroutine,那么嘗試從其它 P 中偷 goroutine
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
// 如果下面兩個條件至少有一個滿足,則進入偷 goroutine 邏輯
// 條件 1: 當前線程是 spinning 自旋狀態(tài)
// 條件 2: 當前活躍的 P 要遠大于自旋的線程,說明需要線程去分擔活躍線程的壓力,不要睡覺了
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
if !mp.spinning { // 因為是兩個條件至少滿足一個即可,這里首先判斷當前線程是不是自旋狀態(tài)
mp.becomeSpinning() // 如果不是,更新線程的狀態(tài)為自旋狀態(tài)
}
gp, inheritTime, tnow, w, newWork := stealWork(now) // 偷 goroutine
if gp != nil {
// Successfully stole.
return gp, inheritTime, false // 如果 gp 不等于 nil,表示偷到了,返回偷到的 goroutine
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top // 如果 gp 不等于 nil,且 network 為 true,則跳到 top 標簽重新找 goroutine
}
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
...
if sched.runqsize != 0 { // 偷都沒偷到,還要在找一遍全局隊列,防止偷的過程中,全局隊列又有 goroutine 了
gp := globrunqget(pp, 0)
unlock(&sched.lock)
return gp, false, false
}
if !mp.spinning && sched.needspinning.Load() == 1 { // 在判斷一遍,如果 mp 不是自旋狀態(tài),且 sched.needspinning == 1 則更新 mp 為自旋,調(diào)用 top 重新找一遍 goroutine
// See "Delicate dance" comment below.
mp.becomeSpinning()
unlock(&sched.lock)
goto top
}
// 實在找不到 goroutine,表明當前線程多, goroutine 少,準備掛起線程
// 首先,調(diào)用 releasep 取消線程和 P 的綁定
if releasep() != pp {
throw("findrunnable: wrong p")
}
...
now = pidleput(pp, now) // 將解綁的 P 放到全局空閑隊列中
unlock(&sched.lock)
wasSpinning := mp.spinning // 到這里 mp.spinning == true,線程處于自旋狀態(tài)
if mp.spinning {
mp.spinning = false // 設置 mp.spinning = false,這是要準備休眠了
if sched.nmspinning.Add(-1) < 0 { // 將全局變量的自旋線程數(shù)減 1,因為當前線程準備休眠,不偷 goroutine 了
throw("findrunnable: negative nmspinning")
}
...
}
stopm() // 線程休眠,直到喚醒
goto top // 能執(zhí)行到這里,說明線程已經(jīng)被喚醒了,繼續(xù)找一遍 goroutine
}
看完線程的調(diào)度策略我都要被感動到了,何其的敬業(yè),窮盡一切方式去找活干,找不到活,休眠之前還要在找一遍,真的是勞模啊。
大致流程是比較清楚的,我們把其中一些值得深挖的部分在單拎出來。
首先,從本地隊列中找 goroutine,如果找不到則進入全局隊列找,這里如果看
gp := globrunqget(pp, 0)
可能會覺得疑惑,從全局隊列中拿 goroutine 為什么要把 P 傳進去,我們看這個函數(shù)在做什么:
// Try get a batch of G's from the global runnable queue.
// sched.lock must be held. // 注釋說的挺清晰了,把全局隊列的 goroutine 放到 P 的本地隊列
func globrunqget(pp *p, max int32) *g {
assertLockHeld(&sched.lock)
if sched.runqsize == 0 {
return nil
}
n := sched.runqsize/gomaxprocs + 1 // 全局隊列是線程共享的,這里要除 gomaxprocs 平攤到每個線程綁定的 P
if n > sched.runqsize {
n = sched.runqsize // 執(zhí)行到這里,說明 gomaxprocs == 1
}
if max > 0 && n > max {
n = max
}
if n > int32(len(pp.runq))/2 {
n = int32(len(pp.runq)) / 2 // 如果 n 比本地隊列長度的一半要長,則 n == len(P.runq)/2
}
sched.runqsize -= n // 全局隊列長度減 n,準備從全局隊列中拿 n 個 goroutine 到 P 中
gp := sched.runq.pop() // 把全局隊列隊頭的 goroutine 拿出來,這個 goroutine 是要返回的 goroutine
n-- // 拿出了一個隊頭的 goroutine,這里 n 要減 1
for ; n > 0; n-- {
gp1 := sched.runq.pop() // 循環(huán)拿全局隊列中的 goroutine 出來
runqput(pp, gp1, false) // 將拿出的 goroutine 放到全局隊列中
}
return gp
}
調(diào)用
globrunqget
說明本地隊列沒有 goroutine 要從全局隊列拿,那么就可以把全局隊列中的 goroutine 放到 P 中,提高了全局隊列 goroutine 的優(yōu)先級。
如果全局隊列也沒找到 goroutine,在從
network poller
找,如果
network poller
也沒找到,則準備進入自旋,從別的線程的 P 那里偷活干。我們看線程是怎么偷活的:
// stealWork attempts to steal a runnable goroutine or timer from any P.
//
// If newWork is true, new work may have been readied.
//
// If now is not 0 it is the current time. stealWork returns the passed time or
// the current time if now was passed as 0.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
pp := getg().m.p.ptr() // pp 是當前線程綁定的 P
ranTimer := false
const stealTries = 4 // 線程偷四次,每次都要隨機循環(huán)一遍所有 P
for i := 0; i < stealTries; i++ {
stealTimersOrRunNextG := i == stealTries-1
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { // 為保證偷的隨機性,隨機開始偷 P。隨機開始,后面每個 P 都可以輪到
...
p2 := allp[enum.position()] // 從 allp 中獲取 P
if pp == p2 {
continue // 如果獲取的是當前線程綁定的 P,則繼續(xù)循環(huán)下一個 P
}
...
// Don't bother to attempt to steal if p2 is idle.
if !idlepMask.read(enum.position()) { // 判斷拿到的 P 是不是 idle 狀態(tài),如果是,表明 P 還沒有 goroutine,跳過它,偷下一家
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil { // P 不是 idle,調(diào)用 runqsteal 偷它!
return gp, false, now, pollUntil, ranTimer
}
}
}
}
// No goroutines found to steal. Regardless, running a timer may have
// made some goroutine ready that we missed. Indicate the next timer to
// wait for.
return nil, false, now, pollUntil, ranTimer
}
線程隨機的偷一個可偷的 P,偷 P 的實現(xiàn)在
runqsteal
,查看
runqsteal
怎么偷的:
// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed). // 給寶寶餓壞了,直接偷一半的 goroutine 啊,夠狠的!
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
t := pp.runqtail // t 指向當前 P 本地隊列的隊尾
n := runqgrab(p2, &pp.runq, t, stealRunNextG) // runqgrab 把 P2 本地隊列的一半 goroutine 拿到 P 的 runq 隊列中
if n == 0 {
return nil
}
n--
gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr() // 把偷到的本地隊列隊尾的 goroutine 拿出來
if n == 0 {
return gp // 如果只偷到了這一個,則直接返回。有總比沒有好
}
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
if t-h+n >= uint32(len(pp.runq)) {
throw("runqsteal: runq overflow") // 如果 t-h+n >= len(p.runq) 表示偷多了...
}
atomic.StoreRel(&pp.runqtail, t+n) // 更新 P 的本地隊列的隊尾
return gp
}
這個偷就是把“地主家”(P2)的余糧 (goroutine) 給它搶一半過來,沒辦法我也要吃飯啊。
如果連偷都沒偷到(好吧,太慘了點...),那就準備休眠了,不干活了還不行嘛。不干活之前在去看看全局隊列有沒有 goroutine 了(口是心非的 M 人)。還是沒活,好吧,準備休眠了。
準備休眠,首先解除和 P 的綁定:
func releasep() *p {
gp := getg()
if gp.m.p == 0 {
throw("releasep: invalid arg")
}
pp := gp.m.p.ptr()
if pp.m.ptr() != gp.m || pp.status != _Prunning {
print("releasep: m=", gp.m, " m->p=", gp.m.p.ptr(), " p->m=", hex(pp.m), " p->status=", pp.status, "\n")
throw("releasep: invalid p state")
}
...
gp.m.p = 0
pp.m = 0
pp.status = _Pidle
return pp
}
就是指針的解綁操作,代碼很清晰,連注釋都不用,我們也不講了。
解綁之后,
pidleput
把空閑的 P 放到全局空閑隊列中。
接著,更新線程的狀態(tài),從自旋更新為非自旋,調(diào)用
stopm
準備休眠:
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
gp := getg() // 當前線程執(zhí)行的 goroutine
...
lock(&sched.lock)
mput(gp.m) // 將線程放到全局空閑線程隊列中
unlock(&sched.lock)
mPark()
acquirep(gp.m.nextp.ptr())
gp.m.nextp = 0
}
stopm
將線程放到全局空閑線程隊列,接著調(diào)用
mPark
休眠線程:
// mPark causes a thread to park itself, returning once woken.
//
//go:nosplit
func mPark() {
gp := getg()
notesleep(&gp.m.park) // notesleep 線程休眠
noteclear(&gp.m.park)
}
func notesleep(n *note) {
gp := getg()
if gp != gp.m.g0 {
throw("notesleep not on g0")
}
ns := int64(-1)
if *cgo_yield != nil {
// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
ns = 10e6
}
for atomic.Load(key32(&n.key)) == 0 { // 這里通過 n.key 判斷線程是否喚醒,如果等于 0,表示未喚醒,線程繼續(xù)休眠
gp.m.blocked = true
futexsleep(key32(&n.key), 0, ns) // 調(diào)用 futex 休眠線程,線程會“阻塞”在這里,直到被喚醒
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
gp.m.blocked = false // “喚醒”,設置線程的 blocked 標記為 false
}
}
// One-time notifications.
func noteclear(n *note) {
n.key = 0 // 執(zhí)行到 noteclear 說明,線程已經(jīng)被喚醒了,這時候線程重置 n.key 標志位為 0
}
線程休眠是通過調(diào)用
futex
進入操作系統(tǒng)內(nèi)核完成線程休眠的,關(guān)于
futex
的內(nèi)容可以參考
這里
。
線程的 n.key 是休眠的標志位,當 n.key 不等于 0 時表示有線程在喚醒休眠線程,線程從休眠狀態(tài)恢復到正常狀態(tài)。喚醒休眠線程通過調(diào)用
notewakeup(&nmp.park)
函數(shù)實現(xiàn):
func notewakeup(n *note) {
old := atomic.Xchg(key32(&n.key), 1)
if old != 0 {
print("notewakeup - double wakeup (", old, ")\n")
throw("notewakeup - double wakeup")
}
futexwakeup(key32(&n.key), 1) // 調(diào)用 futexwakeup 喚醒休眠線程
}
首先,線程是怎么找到休眠線程的?線程通過全局空閑線程隊列找到空閑的線程,并且將空閑線程的休眠標志位 m.park 傳給
notewakeup
,最后調(diào)用
futexwakeup
喚醒休眠線程。
值得一提的是,喚醒的線程在喚醒之后還是會繼續(xù)找可運行的 goroutine 直到找到:
func stopm() {
...
mPark() // 如果 mPark 返回,表示線程被喚醒,開始正常工作
acquirep(gp.m.nextp.ptr()) // 前面休眠前,線程已經(jīng)和 P 解綁了。這里在給線程找一個 P 綁定
gp.m.nextp = 0 // 線程已經(jīng)綁定到 P 了,重置 nextp
}
基本這就是調(diào)度策略中很重要的一部分,線程如何找 goroutine。找到 goroutine 之后調(diào)用
gogo
執(zhí)行該 goroutine。
本講繼續(xù)豐富了調(diào)度器的調(diào)度策略,下一講,我們開始非 main goroutine 的介紹。
本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請發(fā)郵件[email protected]
湘ICP備2022002427號-10 湘公網(wǎng)安備:43070202000427號© 2013~2025 haote.com 好特網(wǎng)