runtime: convert schedt.npidle to atomic type

Note that this converts npidle from uint32 to int32 for consistency with
the other count fields in schedt and the type of p.id.

Note that this changes previously unsynchronized operations to
synchronized operations in:
  * handoffp
  * injectglist
  * schedtrace
  * schedEnableUser
  * sync_runtime_canSpin

For #53821.

Change-Id: I36d1b3b4a28131c9d47884fade6bc44439dd6937
Reviewed-on: https://go-review.googlesource.com/c/go/+/419445
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Austin Clements <austin@google.com>
Run-TryBot: Michael Pratt <mpratt@google.com>
This commit is contained in:
Michael Pratt 2022-07-20 18:01:31 -04:00
parent 449691b3ef
commit 58a1dabfd1
3 changed files with 16 additions and 17 deletions

View file

@ -750,7 +750,7 @@ func (c *gcControllerState) enlistWorker() {
// If there are idle Ps, wake one so it will run an idle worker. // If there are idle Ps, wake one so it will run an idle worker.
// NOTE: This is suspected of causing deadlocks. See golang.org/issue/19112. // NOTE: This is suspected of causing deadlocks. See golang.org/issue/19112.
// //
// if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { // if sched.npidle.Load() != 0 && atomic.Load(&sched.nmspinning) == 0 {
// wakep() // wakep()
// return // return
// } // }

View file

@ -2362,7 +2362,7 @@ func handoffp(pp *p) {
} }
// no local work, check that there are no spinning/idle M's, // no local work, check that there are no spinning/idle M's,
// otherwise our help is not required // otherwise our help is not required
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic if int32(atomic.Load(&sched.nmspinning))+sched.npidle.Load() == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
startm(pp, true) startm(pp, true)
return return
} }
@ -2390,7 +2390,7 @@ func handoffp(pp *p) {
} }
// If this is the last running P and nobody is polling network, // If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network. // need to wakeup another M to poll network.
if sched.npidle == uint32(gomaxprocs-1) && sched.lastpoll.Load() != 0 { if sched.npidle.Load() == gomaxprocs-1 && sched.lastpoll.Load() != 0 {
unlock(&sched.lock) unlock(&sched.lock)
startm(pp, false) startm(pp, false)
return return
@ -2410,7 +2410,7 @@ func handoffp(pp *p) {
// Tries to add one more P to execute G's. // Tries to add one more P to execute G's.
// Called when a G is made runnable (newproc, ready). // Called when a G is made runnable (newproc, ready).
func wakep() { func wakep() {
if atomic.Load(&sched.npidle) == 0 { if sched.npidle.Load() == 0 {
return return
} }
// be conservative about spinning threads // be conservative about spinning threads
@ -2649,8 +2649,7 @@ top:
// Limit the number of spinning Ms to half the number of busy Ps. // Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when // This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low. // GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs) if mp.spinning || int32(2*atomic.Load(&sched.nmspinning)) < gomaxprocs-sched.npidle.Load() {
if mp.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !mp.spinning { if !mp.spinning {
mp.spinning = true mp.spinning = true
atomic.Xadd(&sched.nmspinning, 1) atomic.Xadd(&sched.nmspinning, 1)
@ -3136,7 +3135,7 @@ func injectglist(glist *gList) {
*glist = gList{} *glist = gList{}
startIdle := func(n int) { startIdle := func(n int) {
for ; n != 0 && sched.npidle != 0; n-- { for ; n != 0 && sched.npidle.Load() != 0; n-- {
startm(nil, false) startm(nil, false)
} }
} }
@ -3150,7 +3149,7 @@ func injectglist(glist *gList) {
return return
} }
npidle := int(atomic.Load(&sched.npidle)) npidle := int(sched.npidle.Load())
var globq gQueue var globq gQueue
var n int var n int
for n = 0; n < npidle && !q.empty(); n++ { for n = 0; n < npidle && !q.empty(); n++ {
@ -5156,9 +5155,9 @@ func sysmon() {
// from a timer to avoid adding system load to applications that spend // from a timer to avoid adding system load to applications that spend
// most of their time sleeping. // most of their time sleeping.
now := nanotime() now := nanotime()
if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || sched.npidle.Load() == gomaxprocs) {
lock(&sched.lock) lock(&sched.lock)
if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { if atomic.Load(&sched.gcwaiting) != 0 || sched.npidle.Load() == gomaxprocs {
syscallWake := false syscallWake := false
next := timeSleepUntil() next := timeSleepUntil()
if next > now { if next > now {
@ -5318,7 +5317,7 @@ func retake(now int64) uint32 {
// On the one hand we don't want to retake Ps if there is no other work to do, // On the one hand we don't want to retake Ps if there is no other work to do,
// but on the other hand we want to retake them eventually // but on the other hand we want to retake them eventually
// because they can prevent the sysmon thread from deep sleep. // because they can prevent the sysmon thread from deep sleep.
if runqempty(pp) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { if runqempty(pp) && atomic.Load(&sched.nmspinning)+uint32(sched.npidle.Load()) > 0 && pd.syscallwhen+10*1000*1000 > now {
continue continue
} }
// Drop allpLock so we can take sched.lock. // Drop allpLock so we can take sched.lock.
@ -5409,7 +5408,7 @@ func schedtrace(detailed bool) {
} }
lock(&sched.lock) lock(&sched.lock)
print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle, " threads=", mcount(), " spinningthreads=", sched.nmspinning, " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize) print("SCHED ", (now-starttime)/1e6, "ms: gomaxprocs=", gomaxprocs, " idleprocs=", sched.npidle.Load(), " threads=", mcount(), " spinningthreads=", sched.nmspinning, " idlethreads=", sched.nmidle, " runqueue=", sched.runqsize)
if detailed { if detailed {
print(" gcwaiting=", sched.gcwaiting, " nmidlelocked=", sched.nmidlelocked, " stopwait=", sched.stopwait, " sysmonwait=", sched.sysmonwait, "\n") print(" gcwaiting=", sched.gcwaiting, " nmidlelocked=", sched.nmidlelocked, " stopwait=", sched.stopwait, " sysmonwait=", sched.sysmonwait, "\n")
} }
@ -5505,7 +5504,7 @@ func schedEnableUser(enable bool) {
sched.disable.n = 0 sched.disable.n = 0
globrunqputbatch(&sched.disable.runnable, n) globrunqputbatch(&sched.disable.runnable, n)
unlock(&sched.lock) unlock(&sched.lock)
for ; n != 0 && sched.npidle != 0; n-- { for ; n != 0 && sched.npidle.Load() != 0; n-- {
startm(nil, false) startm(nil, false)
} }
} else { } else {
@ -5713,7 +5712,7 @@ func pidleput(pp *p, now int64) int64 {
idlepMask.set(pp.id) idlepMask.set(pp.id)
pp.link = sched.pidle pp.link = sched.pidle
sched.pidle.set(pp) sched.pidle.set(pp)
atomic.Xadd(&sched.npidle, 1) sched.npidle.Add(1)
if !pp.limiterEvent.start(limiterEventIdle, now) { if !pp.limiterEvent.start(limiterEventIdle, now) {
throw("must be able to track idle limiter event") throw("must be able to track idle limiter event")
} }
@ -5739,7 +5738,7 @@ func pidleget(now int64) (*p, int64) {
timerpMask.set(pp.id) timerpMask.set(pp.id)
idlepMask.clear(pp.id) idlepMask.clear(pp.id)
sched.pidle = pp.link sched.pidle = pp.link
atomic.Xadd(&sched.npidle, -1) sched.npidle.Add(-1)
pp.limiterEvent.stop(limiterEventIdle, now) pp.limiterEvent.stop(limiterEventIdle, now)
} }
return pp, now return pp, now
@ -6194,7 +6193,7 @@ func sync_runtime_canSpin(i int) bool {
// GOMAXPROCS>1 and there is at least one other running P and local runq is empty. // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
// As opposed to runtime mutex we don't do passive spinning here, // As opposed to runtime mutex we don't do passive spinning here,
// because there can be work on global runq or on other Ps. // because there can be work on global runq or on other Ps.
if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+int32(sched.nmspinning)+1 {
return false return false
} }
if p := getg().m.p.ptr(); !runqempty(p) { if p := getg().m.p.ptr(); !runqempty(p) {

View file

@ -778,7 +778,7 @@ type schedt struct {
ngsys atomic.Int32 // number of system goroutines ngsys atomic.Int32 // number of system goroutines
pidle puintptr // idle p's pidle puintptr // idle p's
npidle uint32 npidle atomic.Int32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue. // Global runnable queue.