From fb6f8a96f24f6b30e99cc77d78bc0194ffec7a41 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Tue, 8 Dec 2015 15:11:27 +0100 Subject: [PATCH] runtime: remove unnecessary wakeups of worker threads Currently we wake up new worker threads whenever we pass through the scheduler with nmspinning==0. This leads to lots of unnecessary thread wake ups. Instead let only spinning threads wake up new spinning threads. For the following program: package main import "runtime" func main() { for i := 0; i < 1e7; i++ { runtime.Gosched() } } Before: $ time ./test real 0m4.278s user 0m7.634s sys 0m1.423s $ strace -c ./test % time seconds usecs/call calls errors syscall 99.93 9.314936 3 2685009 17536 futex After: $ time ./test real 0m1.200s user 0m1.181s sys 0m0.024s $ strace -c ./test % time seconds usecs/call calls errors syscall 3.11 0.000049 25 2 futex Fixes #13527 Change-Id: Ia1f5bf8a896dcc25d8b04beb1f4317aa9ff16f74 Reviewed-on: https://go-review.googlesource.com/17540 Reviewed-by: Austin Clements Run-TryBot: Dmitry Vyukov TryBot-Result: Gobot Gobot --- src/runtime/proc.go | 142 +++++++++++++++++++++++++++++---------- src/runtime/proc_test.go | 74 ++++++++++++++++++++ src/runtime/runtime2.go | 2 +- 3 files changed, 181 insertions(+), 37 deletions(-) diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 9ef7bfb954..c0df6f1d05 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -22,6 +22,57 @@ import ( // // Design doc at https://golang.org/s/go11sched. +// Worker thread parking/unparking. +// We need to balance between keeping enough running worker threads to utilize +// available hardware parallelism and parking excessive running worker threads +// to conserve CPU resources and power. This is not simple for two reasons: +// (1) scheduler state is intentionally distributed (in particular, per-P work +// queues), so it is not possible to compute global predicates on fast paths; +// (2) for optimal thread management we would need to know the future (don't park +// a worker thread when a new goroutine will be readied in near future). +// +// Three rejected approaches that would work badly: +// 1. Centralize all scheduler state (would inhibit scalability). +// 2. Direct goroutine handoff. That is, when we ready a new goroutine and there +// is a spare P, unpark a thread and handoff it the thread and the goroutine. +// This would lead to thread state thrashing, as the thread that readied the +// goroutine can be out of work the very next moment, we will need to park it. +// Also, it would destroy locality of computation as we want to preserve +// dependent goroutines on the same thread; and introduce additional latency. +// 3. Unpark an additional thread whenever we ready a goroutine and there is an +// idle P, but don't do handoff. This would lead to excessive thread parking/ +// unparking as the additional threads will instantly park without discovering +// any work to do. +// +// The current approach: +// We unpark an additional thread when we ready a goroutine if (1) there is an +// idle P and there are no "spinning" worker threads. A worker thread is considered +// spinning if it is out of local work and did not find work in global run queue/ +// netpoller; the spinning state is denoted in m.spinning and in sched.nmspinning. +// Threads unparked this way are also considered spinning; we don't do goroutine +// handoff so such threads are out of work initially. Spinning threads do some +// spinning looking for work in per-P run queues before parking. If a spinning +// thread finds work it takes itself out of the spinning state and proceeds to +// execution. If it does not find work it takes itself out of the spinning state +// and then parks. +// If there is at least one spinning thread (sched.nmspinning>1), we don't unpark +// new threads when readying goroutines. To compensate for that, if the last spinning +// thread finds work and stops spinning, it must unpark a new spinning thread. +// This approach smooths out unjustified spikes of thread unparking, +// but at the same time guarantees eventual maximal CPU parallelism utilization. +// +// The main implementation complication is that we need to be very careful during +// spinning->non-spinning thread transition. This transition can race with submission +// of a new goroutine, and either one part or another needs to unpark another worker +// thread. If they both fail to do that, we can end up with semi-persistent CPU +// underutilization. The general pattern for goroutine readying is: submit a goroutine +// to local work queue, #StoreLoad-style memory barrier, check sched.nmspinning. +// The general pattern for spinning->non-spinning transition is: decrement nmspinning, +// #StoreLoad-style memory barrier, check all per-P work queues for new work. +// Note that all this complexity does not apply to global run queue as we are not +// sloppy about thread unparking when submitting to global queue. Also see comments +// for nmspinning manipulation. + var ( m0 m g0 g @@ -1454,8 +1505,7 @@ func stopm() { throw("stopm holding p") } if _g_.m.spinning { - _g_.m.spinning = false - atomic.Xadd(&sched.nmspinning, -1) + throw("stopm spinning") } retry: @@ -1476,22 +1526,15 @@ retry: } func mspinning() { - gp := getg() - if !runqempty(gp.m.nextp.ptr()) { - // Something (presumably the GC) was readied while the - // runtime was starting up this M, so the M is no - // longer spinning. - if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { - throw("mspinning: nmspinning underflowed") - } - } else { - gp.m.spinning = true - } + // startm's caller incremented nmspinning. Set the new M's spinning. + getg().m.spinning = true } // Schedules some M to run the p (creates an M if necessary). // If p==nil, tries to get an idle P, if no idle P's does nothing. // May run with m.p==nil, so write barriers are not allowed. +// If spinning is set, the caller has incremented nmspinning and startm will +// either decrement nmspinning or set m.spinning in the newly started M. //go:nowritebarrier func startm(_p_ *p, spinning bool) { lock(&sched.lock) @@ -1500,7 +1543,11 @@ func startm(_p_ *p, spinning bool) { if _p_ == nil { unlock(&sched.lock) if spinning { - atomic.Xadd(&sched.nmspinning, -1) + // The caller incremented nmspinning, but there are no idle Ps, + // so it's okay to just undo the increment and give up. + if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { + throw("startm: negative nmspinning") + } } return } @@ -1510,6 +1557,7 @@ func startm(_p_ *p, spinning bool) { if mp == nil { var fn func() if spinning { + // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) @@ -1524,6 +1572,7 @@ func startm(_p_ *p, spinning bool) { if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } + // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning mp.nextp.set(_p_) notewakeup(&mp.park) @@ -1645,7 +1694,11 @@ func gcstopm() { } if _g_.m.spinning { _g_.m.spinning = false - atomic.Xadd(&sched.nmspinning, -1) + // OK to just drop nmspinning here, + // startTheWorld will unpark threads as necessary. + if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { + throw("gcstopm: negative nmspinning") + } } _p_ := releasep() lock(&sched.lock) @@ -1818,9 +1871,26 @@ stop: _p_ := releasep() pidleput(_p_) unlock(&sched.lock) + + // Delicate dance: thread transitions from spinning to non-spinning state, + // potentially concurrently with submission of new goroutines. We must + // drop nmspinning first and then check all per-P queues again (with + // #StoreLoad memory barrier in between). If we do it the other way around, + // another thread can submit a goroutine after we've checked all run queues + // but before we drop nmspinning; as the result nobody will unpark a thread + // to run the goroutine. + // If we discover new work below, we need to restore m.spinning as a signal + // for resetspinning to unpark a new worker thread (because there can be more + // than one starving goroutine). However, if after discovering new work + // we also observe no idle Ps, it is OK to just park the current thread: + // the system is fully loaded so no spinning threads are required. + // Also see "Worker thread parking/unparking" comment at the top of the file. + wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false - atomic.Xadd(&sched.nmspinning, -1) + if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { + throw("findrunnable: negative nmspinning") + } } // check all runqueues once again @@ -1832,6 +1902,10 @@ stop: unlock(&sched.lock) if _p_ != nil { acquirep(_p_) + if wasSpinning { + _g_.m.spinning = true + atomic.Xadd(&sched.nmspinning, 1) + } goto top } break @@ -1870,20 +1944,17 @@ stop: func resetspinning() { _g_ := getg() - - var nmspinning uint32 - if _g_.m.spinning { - _g_.m.spinning = false - nmspinning = atomic.Xadd(&sched.nmspinning, -1) - if int32(nmspinning) < 0 { - throw("findrunnable: negative nmspinning") - } - } else { - nmspinning = atomic.Load(&sched.nmspinning) + if !_g_.m.spinning { + throw("resetspinning: not a spinning m") } - - // M wakeup policy is deliberately somewhat conservative (see nmspinning handling), - // so see if we need to wakeup another P here. + _g_.m.spinning = false + nmspinning := atomic.Xadd(&sched.nmspinning, -1) + if int32(nmspinning) < 0 { + throw("findrunnable: negative nmspinning") + } + // M wakeup policy is deliberately somewhat conservative, so check if we + // need to wakeup another P here. See "Worker thread parking/unparking" + // comment at the top of the file for details. if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { wakep() } @@ -1944,14 +2015,10 @@ top: if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) - resetspinning() } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) - if gp != nil { - resetspinning() - } } if gp == nil { // Check the global runnable queue once in a while to ensure fairness. @@ -1961,9 +2028,6 @@ top: lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) - if gp != nil { - resetspinning() - } } } if gp == nil { @@ -1974,6 +2038,12 @@ top: } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available + } + + // This thread is going to run a goroutine and is not spinning anymore, + // so if it was marked as spinning we need to reset it now and potentially + // start a new spinning M. + if _g_.m.spinning { resetspinning() } diff --git a/src/runtime/proc_test.go b/src/runtime/proc_test.go index 2be103e3a6..c0213086b3 100644 --- a/src/runtime/proc_test.go +++ b/src/runtime/proc_test.go @@ -6,6 +6,7 @@ package runtime_test import ( "math" + "net" "runtime" "runtime/debug" "sync" @@ -132,6 +133,79 @@ func TestGoroutineParallelism(t *testing.T) { } } +// Test that all runnable goroutines are scheduled at the same time. +func TestGoroutineParallelism2(t *testing.T) { + //testGoroutineParallelism2(t, false, false) + testGoroutineParallelism2(t, true, false) + testGoroutineParallelism2(t, false, true) + testGoroutineParallelism2(t, true, true) +} + +func testGoroutineParallelism2(t *testing.T, load, netpoll bool) { + if runtime.NumCPU() == 1 { + // Takes too long, too easy to deadlock, etc. + t.Skip("skipping on uniprocessor") + } + P := 4 + N := 10 + if testing.Short() { + N = 3 + } + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P)) + // If runtime triggers a forced GC during this test then it will deadlock, + // since the goroutines can't be stopped/preempted. + // Disable GC for this test (see issue #10958). + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + for try := 0; try < N; try++ { + if load { + // Create P goroutines and wait until they all run. + // When we run the actual test below, worker threads + // running the goroutines will start parking. + done := make(chan bool) + x := uint32(0) + for p := 0; p < P; p++ { + go func() { + if atomic.AddUint32(&x, 1) == uint32(P) { + done <- true + return + } + for atomic.LoadUint32(&x) != uint32(P) { + } + }() + } + <-done + } + if netpoll { + // Enable netpoller, affects schedler behavior. + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + defer ln.Close() // yup, defer in a loop + } + } + done := make(chan bool) + x := uint32(0) + // Spawn P goroutines in a nested fashion just to differ from TestGoroutineParallelism. + for p := 0; p < P/2; p++ { + go func(p int) { + for p2 := 0; p2 < 2; p2++ { + go func(p2 int) { + for i := 0; i < 3; i++ { + expected := uint32(P*i + p*2 + p2) + for atomic.LoadUint32(&x) != expected { + } + atomic.StoreUint32(&x, expected+1) + } + done <- true + }(p2) + } + }(p) + } + for p := 0; p < P; p++ { + <-done + } + } +} + func TestBlockLocked(t *testing.T) { const N = 10 c := make(chan bool) diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index cfe4589448..86ed846064 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -419,7 +419,7 @@ type schedt struct { pidle puintptr // idle p's npidle uint32 - nmspinning uint32 // limited to [0, 2^31-1] + nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. // Global runnable queue. runqhead guintptr