1
0
mirror of https://github.com/golang/go synced 2024-11-18 07:04:52 -07:00

runtime: remove unnecessary wakeups of worker threads

Currently we wake up new worker threads whenever we pass
through the scheduler with nmspinning==0. This leads to
lots of unnecessary thread wake ups.
Instead let only spinning threads wake up new spinning threads.

For the following program:

package main
import "runtime"
func main() {
	for i := 0; i < 1e7; i++ {
		runtime.Gosched()
	}
}

Before:
$ time ./test
real	0m4.278s
user	0m7.634s
sys	0m1.423s

$ strace -c ./test
% time     seconds  usecs/call     calls    errors syscall
 99.93    9.314936           3   2685009     17536 futex

After:
$ time ./test
real	0m1.200s
user	0m1.181s
sys	0m0.024s

$ strace -c ./test
% time     seconds  usecs/call     calls    errors syscall
  3.11    0.000049          25         2           futex

Fixes #13527

Change-Id: Ia1f5bf8a896dcc25d8b04beb1f4317aa9ff16f74
Reviewed-on: https://go-review.googlesource.com/17540
Reviewed-by: Austin Clements <austin@google.com>
Run-TryBot: Dmitry Vyukov <dvyukov@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
This commit is contained in:
Dmitry Vyukov 2015-12-08 15:11:27 +01:00
parent 8545ea9cee
commit fb6f8a96f2
3 changed files with 181 additions and 37 deletions

View File

@ -22,6 +22,57 @@ import (
// //
// Design doc at https://golang.org/s/go11sched. // Design doc at https://golang.org/s/go11sched.
// Worker thread parking/unparking.
// We need to balance between keeping enough running worker threads to utilize
// available hardware parallelism and parking excessive running worker threads
// to conserve CPU resources and power. This is not simple for two reasons:
// (1) scheduler state is intentionally distributed (in particular, per-P work
// queues), so it is not possible to compute global predicates on fast paths;
// (2) for optimal thread management we would need to know the future (don't park
// a worker thread when a new goroutine will be readied in near future).
//
// Three rejected approaches that would work badly:
// 1. Centralize all scheduler state (would inhibit scalability).
// 2. Direct goroutine handoff. That is, when we ready a new goroutine and there
// is a spare P, unpark a thread and handoff it the thread and the goroutine.
// This would lead to thread state thrashing, as the thread that readied the
// goroutine can be out of work the very next moment, we will need to park it.
// Also, it would destroy locality of computation as we want to preserve
// dependent goroutines on the same thread; and introduce additional latency.
// 3. Unpark an additional thread whenever we ready a goroutine and there is an
// idle P, but don't do handoff. This would lead to excessive thread parking/
// unparking as the additional threads will instantly park without discovering
// any work to do.
//
// The current approach:
// We unpark an additional thread when we ready a goroutine if (1) there is an
// idle P and there are no "spinning" worker threads. A worker thread is considered
// spinning if it is out of local work and did not find work in global run queue/
// netpoller; the spinning state is denoted in m.spinning and in sched.nmspinning.
// Threads unparked this way are also considered spinning; we don't do goroutine
// handoff so such threads are out of work initially. Spinning threads do some
// spinning looking for work in per-P run queues before parking. If a spinning
// thread finds work it takes itself out of the spinning state and proceeds to
// execution. If it does not find work it takes itself out of the spinning state
// and then parks.
// If there is at least one spinning thread (sched.nmspinning>1), we don't unpark
// new threads when readying goroutines. To compensate for that, if the last spinning
// thread finds work and stops spinning, it must unpark a new spinning thread.
// This approach smooths out unjustified spikes of thread unparking,
// but at the same time guarantees eventual maximal CPU parallelism utilization.
//
// The main implementation complication is that we need to be very careful during
// spinning->non-spinning thread transition. This transition can race with submission
// of a new goroutine, and either one part or another needs to unpark another worker
// thread. If they both fail to do that, we can end up with semi-persistent CPU
// underutilization. The general pattern for goroutine readying is: submit a goroutine
// to local work queue, #StoreLoad-style memory barrier, check sched.nmspinning.
// The general pattern for spinning->non-spinning transition is: decrement nmspinning,
// #StoreLoad-style memory barrier, check all per-P work queues for new work.
// Note that all this complexity does not apply to global run queue as we are not
// sloppy about thread unparking when submitting to global queue. Also see comments
// for nmspinning manipulation.
var ( var (
m0 m m0 m
g0 g g0 g
@ -1454,8 +1505,7 @@ func stopm() {
throw("stopm holding p") throw("stopm holding p")
} }
if _g_.m.spinning { if _g_.m.spinning {
_g_.m.spinning = false throw("stopm spinning")
atomic.Xadd(&sched.nmspinning, -1)
} }
retry: retry:
@ -1476,22 +1526,15 @@ retry:
} }
func mspinning() { func mspinning() {
gp := getg() // startm's caller incremented nmspinning. Set the new M's spinning.
if !runqempty(gp.m.nextp.ptr()) { getg().m.spinning = true
// Something (presumably the GC) was readied while the
// runtime was starting up this M, so the M is no
// longer spinning.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("mspinning: nmspinning underflowed")
}
} else {
gp.m.spinning = true
}
} }
// Schedules some M to run the p (creates an M if necessary). // Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's does nothing. // If p==nil, tries to get an idle P, if no idle P's does nothing.
// May run with m.p==nil, so write barriers are not allowed. // May run with m.p==nil, so write barriers are not allowed.
// If spinning is set, the caller has incremented nmspinning and startm will
// either decrement nmspinning or set m.spinning in the newly started M.
//go:nowritebarrier //go:nowritebarrier
func startm(_p_ *p, spinning bool) { func startm(_p_ *p, spinning bool) {
lock(&sched.lock) lock(&sched.lock)
@ -1500,7 +1543,11 @@ func startm(_p_ *p, spinning bool) {
if _p_ == nil { if _p_ == nil {
unlock(&sched.lock) unlock(&sched.lock)
if spinning { if spinning {
atomic.Xadd(&sched.nmspinning, -1) // The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
} }
return return
} }
@ -1510,6 +1557,7 @@ func startm(_p_ *p, spinning bool) {
if mp == nil { if mp == nil {
var fn func() var fn func()
if spinning { if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning fn = mspinning
} }
newm(fn, _p_) newm(fn, _p_)
@ -1524,6 +1572,7 @@ func startm(_p_ *p, spinning bool) {
if spinning && !runqempty(_p_) { if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs") throw("startm: p has runnable gs")
} }
// The caller incremented nmspinning, so set m.spinning in the new M.
mp.spinning = spinning mp.spinning = spinning
mp.nextp.set(_p_) mp.nextp.set(_p_)
notewakeup(&mp.park) notewakeup(&mp.park)
@ -1645,7 +1694,11 @@ func gcstopm() {
} }
if _g_.m.spinning { if _g_.m.spinning {
_g_.m.spinning = false _g_.m.spinning = false
atomic.Xadd(&sched.nmspinning, -1) // OK to just drop nmspinning here,
// startTheWorld will unpark threads as necessary.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("gcstopm: negative nmspinning")
}
} }
_p_ := releasep() _p_ := releasep()
lock(&sched.lock) lock(&sched.lock)
@ -1818,9 +1871,26 @@ stop:
_p_ := releasep() _p_ := releasep()
pidleput(_p_) pidleput(_p_)
unlock(&sched.lock) unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning { if _g_.m.spinning {
_g_.m.spinning = false _g_.m.spinning = false
atomic.Xadd(&sched.nmspinning, -1) if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
} }
// check all runqueues once again // check all runqueues once again
@ -1832,6 +1902,10 @@ stop:
unlock(&sched.lock) unlock(&sched.lock)
if _p_ != nil { if _p_ != nil {
acquirep(_p_) acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top goto top
} }
break break
@ -1870,20 +1944,17 @@ stop:
func resetspinning() { func resetspinning() {
_g_ := getg() _g_ := getg()
if !_g_.m.spinning {
var nmspinning uint32 throw("resetspinning: not a spinning m")
if _g_.m.spinning { }
_g_.m.spinning = false _g_.m.spinning = false
nmspinning = atomic.Xadd(&sched.nmspinning, -1) nmspinning := atomic.Xadd(&sched.nmspinning, -1)
if int32(nmspinning) < 0 { if int32(nmspinning) < 0 {
throw("findrunnable: negative nmspinning") throw("findrunnable: negative nmspinning")
} }
} else { // M wakeup policy is deliberately somewhat conservative, so check if we
nmspinning = atomic.Load(&sched.nmspinning) // need to wakeup another P here. See "Worker thread parking/unparking"
} // comment at the top of the file for details.
// M wakeup policy is deliberately somewhat conservative (see nmspinning handling),
// so see if we need to wakeup another P here.
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 { if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep() wakep()
} }
@ -1944,14 +2015,10 @@ top:
if gp != nil { if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable) casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0) traceGoUnpark(gp, 0)
resetspinning()
} }
} }
if gp == nil && gcBlackenEnabled != 0 { if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
if gp != nil {
resetspinning()
}
} }
if gp == nil { if gp == nil {
// Check the global runnable queue once in a while to ensure fairness. // Check the global runnable queue once in a while to ensure fairness.
@ -1961,9 +2028,6 @@ top:
lock(&sched.lock) lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1) gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock) unlock(&sched.lock)
if gp != nil {
resetspinning()
}
} }
} }
if gp == nil { if gp == nil {
@ -1974,6 +2038,12 @@ top:
} }
if gp == nil { if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available gp, inheritTime = findrunnable() // blocks until work is available
}
// This thread is going to run a goroutine and is not spinning anymore,
// so if it was marked as spinning we need to reset it now and potentially
// start a new spinning M.
if _g_.m.spinning {
resetspinning() resetspinning()
} }

View File

@ -6,6 +6,7 @@ package runtime_test
import ( import (
"math" "math"
"net"
"runtime" "runtime"
"runtime/debug" "runtime/debug"
"sync" "sync"
@ -132,6 +133,79 @@ func TestGoroutineParallelism(t *testing.T) {
} }
} }
// Test that all runnable goroutines are scheduled at the same time.
func TestGoroutineParallelism2(t *testing.T) {
//testGoroutineParallelism2(t, false, false)
testGoroutineParallelism2(t, true, false)
testGoroutineParallelism2(t, false, true)
testGoroutineParallelism2(t, true, true)
}
func testGoroutineParallelism2(t *testing.T, load, netpoll bool) {
if runtime.NumCPU() == 1 {
// Takes too long, too easy to deadlock, etc.
t.Skip("skipping on uniprocessor")
}
P := 4
N := 10
if testing.Short() {
N = 3
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
// If runtime triggers a forced GC during this test then it will deadlock,
// since the goroutines can't be stopped/preempted.
// Disable GC for this test (see issue #10958).
defer debug.SetGCPercent(debug.SetGCPercent(-1))
for try := 0; try < N; try++ {
if load {
// Create P goroutines and wait until they all run.
// When we run the actual test below, worker threads
// running the goroutines will start parking.
done := make(chan bool)
x := uint32(0)
for p := 0; p < P; p++ {
go func() {
if atomic.AddUint32(&x, 1) == uint32(P) {
done <- true
return
}
for atomic.LoadUint32(&x) != uint32(P) {
}
}()
}
<-done
}
if netpoll {
// Enable netpoller, affects schedler behavior.
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
defer ln.Close() // yup, defer in a loop
}
}
done := make(chan bool)
x := uint32(0)
// Spawn P goroutines in a nested fashion just to differ from TestGoroutineParallelism.
for p := 0; p < P/2; p++ {
go func(p int) {
for p2 := 0; p2 < 2; p2++ {
go func(p2 int) {
for i := 0; i < 3; i++ {
expected := uint32(P*i + p*2 + p2)
for atomic.LoadUint32(&x) != expected {
}
atomic.StoreUint32(&x, expected+1)
}
done <- true
}(p2)
}
}(p)
}
for p := 0; p < P; p++ {
<-done
}
}
}
func TestBlockLocked(t *testing.T) { func TestBlockLocked(t *testing.T) {
const N = 10 const N = 10
c := make(chan bool) c := make(chan bool)

View File

@ -419,7 +419,7 @@ type schedt struct {
pidle puintptr // idle p's pidle puintptr // idle p's
npidle uint32 npidle uint32
nmspinning uint32 // limited to [0, 2^31-1] nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue. // Global runnable queue.
runqhead guintptr runqhead guintptr