mirror of
https://github.com/golang/go
synced 2024-11-02 09:28:34 +00:00
runtime: synchronize P wakeup and dropping Ps
CL 310850 dropped work re-checks on non-spinning Ms to fix #43997. This introduced a new race condition: a non-spinning M may drop its P and then park at the same time a spinning M attempts to wake a P to handle some new work. The spinning M fails to find an idle P (because the non-spinning M hasn't quite made its P idle yet), and does nothing assuming that the system is fully loaded. This results in loss of work conservation. In the worst case we could have a complete deadlock if injectglist fails to wake anything just as all Ps are going idle. sched.needspinning adds new synchronization to cover this case. If work submission fails to find a P, it sets needspinning to indicate that a spinning M is required. When non-spinning Ms prepare to drop their P, they check needspinning and abort going idle to become a spinning M instead. This addresses the race without extra spurious wakeups. In the normal (non-racing case), an M will become spinning via the normal path and clear the flag. injectglist must change in addition to wakep because it is a similar form of work submission, notably used following netpoll at a point when we might not have a P that would guarantee the work runs. Fixes #45867 Change-Id: Ieb623a6d4162fb8c2be7b4ff8acdebcc3a0d69a8 Reviewed-on: https://go-review.googlesource.com/c/go/+/389014 TryBot-Result: Gopher Robot <gobot@golang.org> Reviewed-by: Michael Knyszek <mknyszek@google.com> Run-TryBot: Michael Pratt <mpratt@google.com> Auto-Submit: Michael Pratt <mpratt@google.com>
This commit is contained in:
parent
54cf1b107d
commit
8cb350d69a
2 changed files with 140 additions and 46 deletions
|
@ -73,7 +73,7 @@ var modinfo string
|
|||
// If there is at least one spinning thread (sched.nmspinning>1), we don't
|
||||
// unpark new threads when submitting work. To compensate for that, if the last
|
||||
// spinning thread finds work and stops spinning, it must unpark a new spinning
|
||||
// thread. This approach smooths out unjustified spikes of thread unparking,
|
||||
// thread. This approach smooths out unjustified spikes of thread unparking,
|
||||
// but at the same time guarantees eventual maximal CPU parallelism
|
||||
// utilization.
|
||||
//
|
||||
|
@ -827,6 +827,12 @@ func mcommoninit(mp *m, id int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (mp *m) becomeSpinning() {
|
||||
mp.spinning = true
|
||||
sched.nmspinning.Add(1)
|
||||
sched.needspinning.Store(0)
|
||||
}
|
||||
|
||||
var fastrandseed uintptr
|
||||
|
||||
func fastrandinit() {
|
||||
|
@ -2242,8 +2248,8 @@ func mspinning() {
|
|||
// Schedules some M to run the p (creates an M if necessary).
|
||||
// If p==nil, tries to get an idle P, if no idle P's does nothing.
|
||||
// May run with m.p==nil, so write barriers are not allowed.
|
||||
// If spinning is set, the caller has incremented nmspinning and startm will
|
||||
// either decrement nmspinning or set m.spinning in the newly started M.
|
||||
// If spinning is set, the caller has incremented nmspinning and must provide a
|
||||
// P. startm will set m.spinning in the newly started M.
|
||||
//
|
||||
// Callers passing a non-nil P must call from a non-preemptible context. See
|
||||
// comment on acquirem below.
|
||||
|
@ -2271,16 +2277,15 @@ func startm(pp *p, spinning bool) {
|
|||
mp := acquirem()
|
||||
lock(&sched.lock)
|
||||
if pp == nil {
|
||||
if spinning {
|
||||
// TODO(prattmic): All remaining calls to this function
|
||||
// with _p_ == nil could be cleaned up to find a P
|
||||
// before calling startm.
|
||||
throw("startm: P required for spinning=true")
|
||||
}
|
||||
pp, _ = pidleget(0)
|
||||
if pp == nil {
|
||||
unlock(&sched.lock)
|
||||
if spinning {
|
||||
// The caller incremented nmspinning, but there are no idle Ps,
|
||||
// so it's okay to just undo the increment and give up.
|
||||
if sched.nmspinning.Add(-1) < 0 {
|
||||
throw("startm: negative nmspinning")
|
||||
}
|
||||
}
|
||||
releasem(mp)
|
||||
return
|
||||
}
|
||||
|
@ -2358,6 +2363,7 @@ func handoffp(pp *p) {
|
|||
// no local work, check that there are no spinning/idle M's,
|
||||
// otherwise our help is not required
|
||||
if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) { // TODO: fast atomic
|
||||
sched.needspinning.Store(0)
|
||||
startm(pp, true)
|
||||
return
|
||||
}
|
||||
|
@ -2404,15 +2410,41 @@ func handoffp(pp *p) {
|
|||
|
||||
// Tries to add one more P to execute G's.
|
||||
// Called when a G is made runnable (newproc, ready).
|
||||
// Must be called with a P.
|
||||
func wakep() {
|
||||
if sched.npidle.Load() == 0 {
|
||||
return
|
||||
}
|
||||
// be conservative about spinning threads
|
||||
// Be conservative about spinning threads, only start one if none exist
|
||||
// already.
|
||||
if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
|
||||
return
|
||||
}
|
||||
startm(nil, true)
|
||||
|
||||
// Disable preemption until ownership of pp transfers to the next M in
|
||||
// startm. Otherwise preemption here would leave pp stuck waiting to
|
||||
// enter _Pgcstop.
|
||||
//
|
||||
// See preemption comment on acquirem in startm for more details.
|
||||
mp := acquirem()
|
||||
|
||||
var pp *p
|
||||
lock(&sched.lock)
|
||||
pp, _ = pidlegetSpinning(0)
|
||||
if pp == nil {
|
||||
if sched.nmspinning.Add(-1) < 0 {
|
||||
throw("wakep: negative nmspinning")
|
||||
}
|
||||
unlock(&sched.lock)
|
||||
releasem(mp)
|
||||
return
|
||||
}
|
||||
// Since we always have a P, the race in the "No M is available"
|
||||
// comment in startm doesn't apply during the small window between the
|
||||
// unlock here and lock in startm. A checkdead in between will always
|
||||
// see at least one running M (ours).
|
||||
unlock(&sched.lock)
|
||||
|
||||
startm(pp, true)
|
||||
|
||||
releasem(mp)
|
||||
}
|
||||
|
||||
// Stops execution of the current m that is locked to a g until the g is runnable again.
|
||||
|
@ -2646,8 +2678,7 @@ top:
|
|||
// GOMAXPROCS>>1 but the program parallelism is low.
|
||||
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
|
||||
if !mp.spinning {
|
||||
mp.spinning = true
|
||||
sched.nmspinning.Add(1)
|
||||
mp.becomeSpinning()
|
||||
}
|
||||
|
||||
gp, inheritTime, tnow, w, newWork := stealWork(now)
|
||||
|
@ -2723,6 +2754,12 @@ top:
|
|||
unlock(&sched.lock)
|
||||
return gp, false, false
|
||||
}
|
||||
if !mp.spinning && sched.needspinning.Load() == 1 {
|
||||
// See "Delicate dance" comment below.
|
||||
mp.becomeSpinning()
|
||||
unlock(&sched.lock)
|
||||
goto top
|
||||
}
|
||||
if releasep() != pp {
|
||||
throw("findrunnable: wrong p")
|
||||
}
|
||||
|
@ -2743,12 +2780,28 @@ top:
|
|||
// * New/modified-earlier timers on a per-P timer heap.
|
||||
// * Idle-priority GC work (barring golang.org/issue/19112).
|
||||
//
|
||||
// If we discover new work below, we need to restore m.spinning as a signal
|
||||
// for resetspinning to unpark a new worker thread (because there can be more
|
||||
// than one starving goroutine). However, if after discovering new work
|
||||
// we also observe no idle Ps it is OK to skip unparking a new worker
|
||||
// thread: the system is fully loaded so no spinning threads are required.
|
||||
// Also see "Worker thread parking/unparking" comment at the top of the file.
|
||||
// If we discover new work below, we need to restore m.spinning as a
|
||||
// signal for resetspinning to unpark a new worker thread (because
|
||||
// there can be more than one starving goroutine).
|
||||
//
|
||||
// However, if after discovering new work we also observe no idle Ps
|
||||
// (either here or in resetspinning), we have a problem. We may be
|
||||
// racing with a non-spinning M in the block above, having found no
|
||||
// work and preparing to release its P and park. Allowing that P to go
|
||||
// idle will result in loss of work conservation (idle P while there is
|
||||
// runnable work). This could result in complete deadlock in the
|
||||
// unlikely event that we discover new work (from netpoll) right as we
|
||||
// are racing with _all_ other Ps going idle.
|
||||
//
|
||||
// We use sched.needspinning to synchronize with non-spinning Ms going
|
||||
// idle. If needspinning is set when they are about to drop their P,
|
||||
// they abort the drop and instead become a new spinning M on our
|
||||
// behalf. If we are not racing and the system is truly fully loaded
|
||||
// then no spinning threads are required, and the next thread to
|
||||
// naturally become spinning will clear the flag.
|
||||
//
|
||||
// Also see "Worker thread parking/unparking" comment at the top of the
|
||||
// file.
|
||||
wasSpinning := mp.spinning
|
||||
if mp.spinning {
|
||||
mp.spinning = false
|
||||
|
@ -2758,16 +2811,18 @@ top:
|
|||
|
||||
// Note the for correctness, only the last M transitioning from
|
||||
// spinning to non-spinning must perform these rechecks to
|
||||
// ensure no missed work. We are performing it on every M that
|
||||
// transitions as a conservative change to monitor effects on
|
||||
// latency. See golang.org/issue/43997.
|
||||
// ensure no missed work. However, the runtime has some cases
|
||||
// of transient increments of nmspinning that are decremented
|
||||
// without going through this path, so we must be conservative
|
||||
// and perform the check on all spinning Ms.
|
||||
//
|
||||
// See https://go.dev/issue/43997.
|
||||
|
||||
// Check all runqueues once again.
|
||||
pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
|
||||
if pp != nil {
|
||||
acquirep(pp)
|
||||
mp.spinning = true
|
||||
sched.nmspinning.Add(1)
|
||||
mp.becomeSpinning()
|
||||
goto top
|
||||
}
|
||||
|
||||
|
@ -2775,8 +2830,7 @@ top:
|
|||
pp, gp := checkIdleGCNoP()
|
||||
if pp != nil {
|
||||
acquirep(pp)
|
||||
mp.spinning = true
|
||||
sched.nmspinning.Add(1)
|
||||
mp.becomeSpinning()
|
||||
|
||||
// Run the idle worker.
|
||||
pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
|
||||
|
@ -2844,8 +2898,7 @@ top:
|
|||
return gp, false, false
|
||||
}
|
||||
if wasSpinning {
|
||||
mp.spinning = true
|
||||
sched.nmspinning.Add(1)
|
||||
mp.becomeSpinning()
|
||||
}
|
||||
goto top
|
||||
}
|
||||
|
@ -2964,17 +3017,18 @@ func checkRunqsNoP(allpSnapshot []*p, idlepMaskSnapshot pMask) *p {
|
|||
for id, p2 := range allpSnapshot {
|
||||
if !idlepMaskSnapshot.read(uint32(id)) && !runqempty(p2) {
|
||||
lock(&sched.lock)
|
||||
pp, _ := pidleget(0)
|
||||
unlock(&sched.lock)
|
||||
if pp != nil {
|
||||
return pp
|
||||
pp, _ := pidlegetSpinning(0)
|
||||
if pp == nil {
|
||||
// Can't get a P, don't bother checking remaining Ps.
|
||||
unlock(&sched.lock)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Can't get a P, don't bother checking remaining Ps.
|
||||
break
|
||||
unlock(&sched.lock)
|
||||
return pp
|
||||
}
|
||||
}
|
||||
|
||||
// No work available.
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -3030,7 +3084,7 @@ func checkIdleGCNoP() (*p, *g) {
|
|||
// the assumption in gcControllerState.findRunnableGCWorker that an
|
||||
// empty gcBgMarkWorkerPool is only possible if gcMarkDone is running.
|
||||
lock(&sched.lock)
|
||||
pp, now := pidleget(0)
|
||||
pp, now := pidlegetSpinning(0)
|
||||
if pp == nil {
|
||||
unlock(&sched.lock)
|
||||
return nil, nil
|
||||
|
@ -3130,8 +3184,20 @@ func injectglist(glist *gList) {
|
|||
*glist = gList{}
|
||||
|
||||
startIdle := func(n int) {
|
||||
for ; n != 0 && sched.npidle.Load() != 0; n-- {
|
||||
startm(nil, false)
|
||||
for i := 0; i < n; i++ {
|
||||
mp := acquirem() // See comment in startm.
|
||||
lock(&sched.lock)
|
||||
|
||||
pp, _ := pidlegetSpinning(0)
|
||||
if pp == nil {
|
||||
unlock(&sched.lock)
|
||||
releasem(mp)
|
||||
break
|
||||
}
|
||||
|
||||
unlock(&sched.lock)
|
||||
startm(pp, false)
|
||||
releasem(mp)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5406,7 +5472,7 @@ func schedtrace(detailed bool) {
|
|||
}
|
||||
|
||||
lock(&sched.lock)
|
||||
print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle.Load(), " threads=", mcount(), " spinningthreads=", sched.nmspinning.Load(), " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize)
|
||||
print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle.Load(), " threads=", mcount(), " spinningthreads=", sched.nmspinning.Load(), " needspinning=", sched.needspinning.Load(), " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize)
|
||||
if detailed {
|
||||
print(" gcwaiting=", sched.gcwaiting.Load(), " nmidlelocked=", sched.nmidlelocked, " stopwait=", sched.stopwait, " sysmonwait=", sched.sysmonwait.Load(), "\n")
|
||||
}
|
||||
|
@ -5742,6 +5808,31 @@ func pidleget(now int64) (*p, int64) {
|
|||
return pp, now
|
||||
}
|
||||
|
||||
// pidlegetSpinning tries to get a p from the _Pidle list, acquiring ownership.
|
||||
// This is called by spinning Ms (or callers than need a spinning M) that have
|
||||
// found work. If no P is available, this must synchronized with non-spinning
|
||||
// Ms that may be preparing to drop their P without discovering this work.
|
||||
//
|
||||
// sched.lock must be held.
|
||||
//
|
||||
// May run during STW, so write barriers are not allowed.
|
||||
//
|
||||
//go:nowritebarrierrec
|
||||
func pidlegetSpinning(now int64) (*p, int64) {
|
||||
assertLockHeld(&sched.lock)
|
||||
|
||||
pp, now := pidleget(now)
|
||||
if pp == nil {
|
||||
// See "Delicate dance" comment in findrunnable. We found work
|
||||
// that we cannot take, we must synchronize with non-spinning
|
||||
// Ms that may be preparing to drop their P.
|
||||
sched.needspinning.Store(1)
|
||||
return nil, now
|
||||
}
|
||||
|
||||
return pp, now
|
||||
}
|
||||
|
||||
// runqempty reports whether pp has no Gs on its local run queue.
|
||||
// It never returns true spuriously.
|
||||
func runqempty(pp *p) bool {
|
||||
|
|
|
@ -777,9 +777,10 @@ type schedt struct {
|
|||
|
||||
ngsys atomic.Int32 // number of system goroutines
|
||||
|
||||
pidle puintptr // idle p's
|
||||
npidle atomic.Int32
|
||||
nmspinning atomic.Int32 // See "Worker thread parking/unparking" comment in proc.go.
|
||||
pidle puintptr // idle p's
|
||||
npidle atomic.Int32
|
||||
nmspinning atomic.Int32 // See "Worker thread parking/unparking" comment in proc.go.
|
||||
needspinning atomic.Uint32 // See "Delicate dance" comment in proc.go. Boolean. Must hold sched.lock to set to 1.
|
||||
|
||||
// Global runnable queue.
|
||||
runq gQueue
|
||||
|
@ -840,6 +841,8 @@ type schedt struct {
|
|||
// with the rest of the runtime.
|
||||
sysmonlock mutex
|
||||
|
||||
_ uint32 // ensure timeToRun has 8-byte alignment
|
||||
|
||||
// timeToRun is a distribution of scheduling latencies, defined
|
||||
// as the sum of time a G spends in the _Grunnable state before
|
||||
// it transitions to _Grunning.
|
||||
|
|
Loading…
Reference in a new issue