diff --git a/src/runtime/proc1.go b/src/runtime/proc1.go index d37c4f1a5ac..166d7c84eb3 100644 --- a/src/runtime/proc1.go +++ b/src/runtime/proc1.go @@ -145,7 +145,7 @@ func ready(gp *g, traceskip int) { // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) - runqput(_g_.m.p.ptr(), gp) + runqput(_g_.m.p.ptr(), gp, true) if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 { // TODO: fast atomic wakep() } @@ -185,12 +185,12 @@ func readyExecute(gp *g, traceskip int) { // Preempt the current g casgstatus(_g_, _Grunning, _Grunnable) - runqput(_g_.m.p.ptr(), _g_) + runqput(_g_.m.p.ptr(), _g_, false) dropg() // Ready gp and switch to it casgstatus(gp, _Gwaiting, _Grunnable) - execute(gp) + execute(gp, false) }) } @@ -1233,15 +1233,19 @@ func gcstopm() { } // Schedules gp to run on the current M. +// If inheritTime is true, gp inherits the remaining time in the +// current time slice. Otherwise, it starts a new time slice. // Never returns. -func execute(gp *g) { +func execute(gp *g, inheritTime bool) { _g_ := getg() casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard - _g_.m.p.ptr().schedtick++ + if !inheritTime { + _g_.m.p.ptr().schedtick++ + } _g_.m.curg = gp gp.m = _g_.m @@ -1260,7 +1264,7 @@ func execute(gp *g) { // Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from global queue, poll network. -func findrunnable() *g { +func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: @@ -1275,8 +1279,8 @@ top: } // local runq - if gp := runqget(_g_.m.p.ptr()); gp != nil { - return gp + if gp, inheritTime := runqget(_g_.m.p.ptr()); gp != nil { + return gp, inheritTime } // global runq @@ -1285,7 +1289,7 @@ top: gp := globrunqget(_g_.m.p.ptr(), 0) unlock(&sched.lock) if gp != nil { - return gp + return gp, false } } @@ -1303,7 +1307,7 @@ top: if trace.enabled { traceGoUnpark(gp, 0) } - return gp + return gp, false } } @@ -1325,12 +1329,12 @@ top: _p_ := allp[fastrand1()%uint32(gomaxprocs)] var gp *g if _p_ == _g_.m.p.ptr() { - gp = runqget(_p_) + gp, _ = runqget(_p_) } else { gp = runqsteal(_g_.m.p.ptr(), _p_) } if gp != nil { - return gp + return gp, false } } stop: @@ -1344,7 +1348,7 @@ stop: if trace.enabled { traceGoUnpark(gp, 0) } - return gp + return gp, false } // return P and block @@ -1356,7 +1360,7 @@ stop: if sched.runqsize != 0 { gp := globrunqget(_g_.m.p.ptr(), 0) unlock(&sched.lock) - return gp + return gp, false } _p_ := releasep() pidleput(_p_) @@ -1402,7 +1406,7 @@ stop: if trace.enabled { traceGoUnpark(gp, 0) } - return gp + return gp, false } injectglist(gp) } @@ -1468,7 +1472,7 @@ func schedule() { if _g_.m.lockedg != nil { stoplockedm() - execute(_g_.m.lockedg) // Never returns. + execute(_g_.m.lockedg, false) // Never returns. } top: @@ -1478,6 +1482,7 @@ top: } var gp *g + var inheritTime bool if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { @@ -1506,13 +1511,13 @@ top: } } if gp == nil { - gp = runqget(_g_.m.p.ptr()) + gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } if gp == nil { - gp = findrunnable() // blocks until work is available + gp, inheritTime = findrunnable() // blocks until work is available resetspinning() } @@ -1523,7 +1528,7 @@ top: goto top } - execute(gp) + execute(gp, inheritTime) } // dropg removes the association between m and the current goroutine m->curg (gp for short). @@ -1568,7 +1573,7 @@ func park_m(gp *g) { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) - execute(gp) // Schedule it back, never returns. + execute(gp, true) // Schedule it back, never returns. } } schedule() @@ -2007,12 +2012,12 @@ func exitsyscall0(gp *g) { unlock(&sched.lock) if _p_ != nil { acquirep(_p_) - execute(gp) // Never returns. + execute(gp, false) // Never returns. } if _g_.m.lockedg != nil { // Wait until another thread schedules gp and so m again. stoplockedm() - execute(gp) // Never returns. + execute(gp, false) // Never returns. } stopm() schedule() // Never returns. @@ -2168,7 +2173,7 @@ func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr if trace.enabled { traceGoCreate(newg, newg.startpc) } - runqput(_p_, newg) + runqput(_p_, newg, true) if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 && unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) { // TODO: fast atomic wakep() @@ -2596,12 +2601,11 @@ func procresize(nprocs int32) *p { p.runqtail-- gp := p.runq[p.runqtail%uint32(len(p.runq))] // push onto head of global queue - gp.schedlink = sched.runqhead - sched.runqhead.set(gp) - if sched.runqtail == 0 { - sched.runqtail.set(gp) - } - sched.runqsize++ + globrunqputhead(gp) + } + if p.runnext != 0 { + globrunqputhead(p.runnext.ptr()) + p.runnext = 0 } // if there's a background worker, make it runnable and put // it on the global queue so it can clean itself up @@ -3150,6 +3154,19 @@ func globrunqput(gp *g) { sched.runqsize++ } +// Put gp at the head of the global runnable queue. +// Sched must be locked. +// May run during STW, so write barriers are not allowed. +//go:nowritebarrier +func globrunqputhead(gp *g) { + gp.schedlink = sched.runqhead + sched.runqhead.set(gp) + if sched.runqtail == 0 { + sched.runqtail.set(gp) + } + sched.runqsize++ +} + // Put a batch of runnable goroutines on the global runnable queue. // Sched must be locked. func globrunqputbatch(ghead *g, gtail *g, n int32) { @@ -3192,7 +3209,7 @@ func globrunqget(_p_ *p, max int32) *g { for ; n > 0; n-- { gp1 := sched.runqhead.ptr() sched.runqhead = gp1.schedlink - runqput(_p_, gp1) + runqput(_p_, gp1, false) } return gp } @@ -3223,13 +3240,28 @@ func pidleget() *p { // runqempty returns true if _p_ has no Gs on its local run queue. // Note that this test is generally racy. func runqempty(_p_ *p) bool { - return _p_.runqhead == _p_.runqtail + return _p_.runqhead == _p_.runqtail && _p_.runnext == 0 } -// Try to put g on local runnable queue. -// If it's full, put onto global queue. +// runqput tries to put g on the local runnable queue. +// If next if false, runqput adds g to the tail of the runnable queue. +// If next is true, runqput puts g in the _p_.runnext slot. +// If the run queue is full, runnext puts g on the global queue. // Executed only by the owner P. -func runqput(_p_ *p, gp *g) { +func runqput(_p_ *p, gp *g, next bool) { + if next { + retryNext: + oldnext := _p_.runnext + if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { + goto retryNext + } + if oldnext == 0 { + return + } + // Kick the old runnext out to the regular run queue. + gp = oldnext.ptr() + } + retry: h := atomicload(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail @@ -3277,17 +3309,30 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool { } // Get g from local runnable queue. +// If inheritTime is true, gp should inherit the remaining time in the +// current time slice. Otherwise, it should start a new time slice. // Executed only by the owner P. -func runqget(_p_ *p) *g { +func runqget(_p_ *p) (gp *g, inheritTime bool) { + // If there's a runnext, it's the next G to run. + for { + next := _p_.runnext + if next == 0 { + break + } + if _p_.runnext.cas(next, 0) { + return next.ptr(), true + } + } + for { h := atomicload(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail if t == h { - return nil + return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))] if cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume - return gp + return gp, false } } } @@ -3302,6 +3347,14 @@ func runqgrab(_p_ *p, batch []*g) uint32 { n := t - h n = n - n/2 if n == 0 { + // Try to steal from _p_.runnext. + if next := _p_.runnext; next != 0 { + if !_p_.runnext.cas(next, 0) { + continue + } + batch[0] = next.ptr() + return 1 + } return 0 } if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t @@ -3347,19 +3400,19 @@ func testSchedLocalQueue() { _p_ := new(p) gs := make([]g, len(_p_.runq)) for i := 0; i < len(_p_.runq); i++ { - if runqget(_p_) != nil { + if g, _ := runqget(_p_); g != nil { throw("runq is not empty initially") } for j := 0; j < i; j++ { - runqput(_p_, &gs[i]) + runqput(_p_, &gs[i], false) } for j := 0; j < i; j++ { - if runqget(_p_) != &gs[i] { + if g, _ := runqget(_p_); g != &gs[i] { print("bad element at iter ", i, "/", j, "\n") throw("bad element") } } - if runqget(_p_) != nil { + if g, _ := runqget(_p_); g != nil { throw("runq is not empty afterwards") } } @@ -3372,7 +3425,7 @@ func testSchedLocalQueueSteal() { for i := 0; i < len(p1.runq); i++ { for j := 0; j < i; j++ { gs[j].sig = 0 - runqput(p1, &gs[j]) + runqput(p1, &gs[j], false) } gp := runqsteal(p2, p1) s := 0 @@ -3381,7 +3434,7 @@ func testSchedLocalQueueSteal() { gp.sig++ } for { - gp = runqget(p2) + gp, _ = runqget(p2) if gp == nil { break } @@ -3389,7 +3442,7 @@ func testSchedLocalQueueSteal() { gp.sig++ } for { - gp = runqget(p1) + gp, _ = runqget(p1) if gp == nil { break } diff --git a/src/runtime/proc_test.go b/src/runtime/proc_test.go index af90215238f..fccf3970621 100644 --- a/src/runtime/proc_test.go +++ b/src/runtime/proc_test.go @@ -292,6 +292,57 @@ func main() { } ` +func TestPingPongHog(t *testing.T) { + if testing.Short() { + t.Skip("skipping in -short mode") + } + + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) + done := make(chan bool) + hogChan, lightChan := make(chan bool), make(chan bool) + hogCount, lightCount := 0, 0 + + run := func(limit int, counter *int, wake chan bool) { + for { + select { + case <-done: + return + + case <-wake: + for i := 0; i < limit; i++ { + *counter++ + } + wake <- true + } + } + } + + // Start two co-scheduled hog goroutines. + for i := 0; i < 2; i++ { + go run(1e6, &hogCount, hogChan) + } + + // Start two co-scheduled light goroutines. + for i := 0; i < 2; i++ { + go run(1e3, &lightCount, lightChan) + } + + // Start goroutine pairs and wait for a few preemption rounds. + hogChan <- true + lightChan <- true + time.Sleep(100 * time.Millisecond) + close(done) + <-hogChan + <-lightChan + + // Check that hogCount and lightCount are within a factor of + // 2, which indicates that both pairs of goroutines handed off + // the P within a time-slice to their buddy. + if hogCount > lightCount*2 || lightCount > hogCount*2 { + t.Fatalf("want hogCount/lightCount in [0.5, 2]; got %d/%d = %g", hogCount, lightCount, float64(hogCount)/float64(lightCount)) + } +} + func BenchmarkPingPongHog(b *testing.B) { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index eacf5f094bb..476108e36ca 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -129,6 +129,9 @@ type guintptr uintptr func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } +func (gp *guintptr) cas(old, new guintptr) bool { + return casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new)) +} type puintptr uintptr @@ -350,10 +353,20 @@ type p struct { goidcache uint64 goidcacheend uint64 - // Queue of runnable goroutines. + // Queue of runnable goroutines. Accessed without lock. runqhead uint32 runqtail uint32 runq [256]*g + // runnext, if non-nil, is a runnable G that was ready'd by + // the current G and should be run next instead of what's in + // runq if there's time remaining in the running G's time + // slice. It will inherit the time left in the current time + // slice. If a set of goroutines is locked in a + // communicate-and-wait pattern, this schedules that set as a + // unit and eliminates the (potentially large) scheduling + // latency that otherwise arises from adding the ready'd + // goroutines to the end of the run queue. + runnext guintptr // Available G's (status == Gdead) gfree *g