From 06ac26279cb93140bb2b03bcef9a3300c166cade Mon Sep 17 00:00:00 2001 From: Ian Lance Taylor Date: Fri, 5 Apr 2019 16:24:14 -0700 Subject: [PATCH] runtime: initial scheduler changes for timers on P's Add support to the main scheduler loop for handling timers on P's. This is not used yet, as timers are not yet put on P's. Updates #6239 Updates #27707 Change-Id: I6a359df408629f333a9232142ce19e8be8496dae Reviewed-on: https://go-review.googlesource.com/c/go/+/171826 Run-TryBot: Ian Lance Taylor TryBot-Result: Gobot Gobot Reviewed-by: Michael Knyszek --- src/runtime/proc.go | 166 ++++++++++++++++++++++++++++++++++++---- src/runtime/runtime2.go | 14 +++- src/runtime/time.go | 21 +++++ 3 files changed, 186 insertions(+), 15 deletions(-) diff --git a/src/runtime/proc.go b/src/runtime/proc.go index c419dee771..1a51b1d83b 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -2221,6 +2221,9 @@ top: if _p_.runSafePointFn != 0 { runSafePointFn() } + + now, pollUntil, _ := checkTimers(_p_, 0) + if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) @@ -2266,12 +2269,7 @@ top: // Steal work from other P's. procs := uint32(gomaxprocs) - if atomic.Load(&sched.npidle) == procs-1 { - // Either GOMAXPROCS=1 or everybody, except for us, is idle already. - // New work can appear from returning syscall/cgocall, network or timers. - // Neither of that submits to local run queues, so no point in stealing. - goto stop - } + ranTimer := false // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. @@ -2288,11 +2286,48 @@ top: goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g - if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { + p2 := allp[enum.position()] + if _p_ == p2 { + continue + } + if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } + + // Consider stealing timers from p2. + // This call to checkTimers is the only place where + // we hold a lock on a different P's timers. + // Lock contention can be a problem here, so avoid + // grabbing the lock if p2 is running and not marked + // for preemption. If p2 is running and not being + // preempted we assume it will handle its own timers. + if i > 2 && shouldStealTimers(p2) { + tnow, w, ran := checkTimers(p2, now) + now = tnow + if w != 0 && (pollUntil == 0 || w < pollUntil) { + pollUntil = w + } + if ran { + // Running the timers may have + // made an arbitrary number of G's + // ready and added them to this P's + // local run queue. That invalidates + // the assumption of runqsteal + // that is always has room to add + // stolen G's. So check now if there + // is a local G to run. + if gp, inheritTime := runqget(_p_); gp != nil { + return gp, inheritTime + } + ranTimer = true + } + } } } + if ranTimer { + // Running a timer may have made some goroutine ready. + goto top + } stop: @@ -2309,6 +2344,12 @@ stop: return gp, false } + delta := int64(-1) + if pollUntil != 0 { + // checkTimers ensures that polluntil > now. + delta = pollUntil - now + } + // wasm only: // If a callback returned and no other goroutine is awake, // then pause execution until a callback was triggered. @@ -2400,14 +2441,16 @@ stop: } // poll network - if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { + if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 { + atomic.Store64(&sched.pollUntil, uint64(pollUntil)) if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } - list := netpoll(-1) // block until new work is available + list := netpoll(delta) // block until new work is available + atomic.Store64(&sched.pollUntil, 0) atomic.Store64(&sched.lastpoll, uint64(nanotime())) lock(&sched.lock) _p_ = pidleget() @@ -2431,6 +2474,11 @@ stop: } goto top } + } else if pollUntil != 0 && netpollinited() { + pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) + if pollerPollUntil == 0 || pollerPollUntil > pollUntil { + netpollBreak() + } } stopm() goto top @@ -2457,6 +2505,22 @@ func pollWork() bool { return false } +// wakeNetPoller wakes up the thread sleeping in the network poller, +// if there is one, and if it isn't going to wake up anyhow before +// the when argument. +func wakeNetPoller(when int64) { + if atomic.Load64(&sched.lastpoll) == 0 { + // In findrunnable we ensure that when polling the pollUntil + // field is either zero or the time to which the current + // poll is expected to run. This can have a spurious wakeup + // but should never miss a wakeup. + pollerPollUntil := int64(atomic.Load64(&sched.pollUntil)) + if pollerPollUntil == 0 || pollerPollUntil > when { + netpollBreak() + } + } +} + func resetspinning() { _g_ := getg() if !_g_.m.spinning { @@ -2525,10 +2589,20 @@ top: gcstopm() goto top } - if _g_.m.p.ptr().runSafePointFn != 0 { + pp := _g_.m.p.ptr() + if pp.runSafePointFn != 0 { runSafePointFn() } + // Sanity check: if we are spinning, the run queue should be empty. + // Check this before calling checkTimers, as that might call + // goready to put a ready goroutine on the local run queue. + if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) { + throw("schedule: spinning with local work") + } + + checkTimers(pp, 0) + var gp *g var inheritTime bool @@ -2560,9 +2634,8 @@ top: } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) - if gp != nil && _g_.m.spinning { - throw("schedule: spinning with local work") - } + // We can see gp != nil here even if the M is spinning, + // if checkTimers added a local goroutine via goready. } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available @@ -2623,6 +2696,60 @@ func dropg() { setGNoWB(&_g_.m.curg, nil) } +// checkTimers runs any timers for the P that are ready. +// If now is not 0 it is the current time. +// It returns the current time or 0 if it is not known, +// and the time when the next timer should run or 0 if there is no next timer, +// and reports whether it ran any timers. +// If the time when the next timer should run is not 0, +// it is always larger than the returned time. +// We pass now in and out to avoid extra calls of nanotime. +//go:yeswritebarrierrec +func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { + lock(&pp.timersLock) + + adjusttimers(pp) + + rnow = now + if len(pp.timers) > 0 { + if rnow == 0 { + rnow = nanotime() + } + for len(pp.timers) > 0 { + if tw := runtimer(pp, rnow); tw != 0 { + if tw > 0 { + pollUntil = tw + } + break + } + ran = true + } + } + + unlock(&pp.timersLock) + + return rnow, pollUntil, ran +} + +// shouldStealTimers reports whether we should try stealing the timers from p2. +// We don't steal timers from a running P that is not marked for preemption, +// on the assumption that it will run its own timers. This reduces +// contention on the timers lock. +func shouldStealTimers(p2 *p) bool { + if p2.status != _Prunning { + return true + } + mp := p2.m.ptr() + if mp == nil || mp.locks > 0 { + return false + } + gp := mp.curg + if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt { + return false + } + return true +} + func parkunlock_c(gp *g, lock unsafe.Pointer) bool { unlock((*mutex)(lock)) return true @@ -4305,6 +4432,13 @@ func checkdead() { return } + // There are no goroutines running, so we can look at the P's. + for _, _p_ := range allp { + if len(_p_.timers) > 0 { + return + } + } + getg().m.throwing = -1 // do not dump full stacks throw("all goroutines are asleep - deadlock!") } @@ -4392,6 +4526,12 @@ func sysmon() { incidlelocked(1) } } + if timeSleepUntil() < now { + // There are timers that should have already run, + // perhaps because there is an unpreemptible P. + // Try to start an M to run them. + startm(nil, false) + } // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index dd399e00a6..f44cd2fb14 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -598,13 +598,23 @@ type p struct { runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point + // Lock for timers. We normally access the timers while running + // on this P, but the scheduler can also do it from a different P. + timersLock mutex + + // Actions to take at some time. This is used to implement the + // standard library's time package. + // Must hold timersLock to access. + timers []*timer + pad cpu.CacheLinePad } type schedt struct { // accessed atomically. keep at top to ensure alignment on 32-bit systems. - goidgen uint64 - lastpoll uint64 + goidgen uint64 + lastpoll uint64 // time of last network poll, 0 if currently polling + pollUntil uint64 // time to which current poll is sleeping lock mutex diff --git a/src/runtime/time.go b/src/runtime/time.go index 5521b8a807..1bbb5684cb 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -325,6 +325,27 @@ func timerproc(tb *timersBucket) { } } +// adjusttimers looks through the timers in the current P's heap for +// any timers that have been modified to run earlier, and puts them in +// the correct place in the heap. +// The caller must have locked the timers for pp. +func adjusttimers(pp *p) { + if len(pp.timers) == 0 { + return + } + throw("adjusttimers: not yet implemented") +} + +// runtimer examines the first timer in timers. If it is ready based on now, +// it runs the timer and removes or updates it. +// Returns 0 if it ran a timer, -1 if there are no more timers, or the time +// when the first timer should run. +// The caller must have locked the timers for pp. +func runtimer(pp *p, now int64) int64 { + throw("runtimer: not yet implemented") + return -1 +} + func timejump() *g { if faketime == 0 { return nil