diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 09ef784a04..edff454491 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -4122,6 +4122,7 @@ func (pp *p) destroy() { // The world is stopped so we don't need to hold timersLock. moveTimers(plocal, pp.timers) pp.timers = nil + pp.adjustTimers = 0 } // If there's a background worker, make it runnable and put // it on the global queue so it can clean itself up. diff --git a/src/runtime/runtime2.go b/src/runtime/runtime2.go index f44cd2fb14..b57ae75baf 100644 --- a/src/runtime/runtime2.go +++ b/src/runtime/runtime2.go @@ -607,6 +607,12 @@ type p struct { // Must hold timersLock to access. timers []*timer + // Number of timerModifiedEarlier timers on P's heap. + // This should only be modified while holding timersLock, + // or while the timer status is in a transient state + // such as timerModifying. + adjustTimers uint32 + pad cpu.CacheLinePad } diff --git a/src/runtime/time.go b/src/runtime/time.go index f2dd40e6b4..54cbb0b6e4 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -131,6 +131,16 @@ type timersBucket struct { // timerRunning -> wait until status changes // timerMoving -> wait until status changes // timerModifying -> panic: concurrent deltimer/modtimer calls +// modtimer: +// timerWaiting -> timerModifying -> timerModifiedXX +// timerModifiedXX -> timerModifying -> timerModifiedYY +// timerNoStatus -> timerWaiting +// timerRemoved -> timerWaiting +// timerRunning -> wait until status changes +// timerMoving -> wait until status changes +// timerRemoving -> wait until status changes +// timerDeleted -> panic: concurrent modtimer/deltimer calls +// timerModifying -> panic: concurrent modtimer calls // Values for the timer status field. const ( @@ -270,6 +280,11 @@ func addtimer(t *timer) { } t.status = timerWaiting + addInitializedTimer(t) +} + +// addInitializedTimer adds an initialized timer to the current P. +func addInitializedTimer(t *timer) { when := t.when pp := getg().m.p.ptr() @@ -363,7 +378,9 @@ func deltimer(t *timer) bool { return true } case timerModifiedEarlier: + tpp := t.pp.ptr() if atomic.Cas(&t.status, s, timerModifying) { + atomic.Xadd(&tpp.adjustTimers, -1) if !atomic.Cas(&t.status, timerModifying, timerDeleted) { badTimer() } @@ -438,12 +455,94 @@ func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) { return true, ok } +// modtimer modifies an existing timer. +// This is called by the netpoll code. func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) { if oldTimers { modtimerOld(t, when, period, f, arg, seq) return } - throw("new modtimer not yet implemented") + + if when < 0 { + when = maxWhen + } + + status := uint32(timerNoStatus) + wasRemoved := false +loop: + for { + switch status = atomic.Load(&t.status); status { + case timerWaiting, timerModifiedEarlier, timerModifiedLater: + if atomic.Cas(&t.status, status, timerModifying) { + break loop + } + case timerNoStatus, timerRemoved: + // Timer was already run and t is no longer in a heap. + // Act like addtimer. + wasRemoved = true + atomic.Store(&t.status, timerWaiting) + break loop + case timerRunning, timerRemoving, timerMoving: + // The timer is being run or moved, by a different P. + // Wait for it to complete. + osyield() + case timerDeleted: + // Simultaneous calls to modtimer and deltimer. + badTimer() + case timerModifying: + // Multiple simultaneous calls to modtimer. + badTimer() + default: + badTimer() + } + } + + t.period = period + t.f = f + t.arg = arg + t.seq = seq + + if wasRemoved { + t.when = when + addInitializedTimer(t) + } else { + // The timer is in some other P's heap, so we can't change + // the when field. If we did, the other P's heap would + // be out of order. So we put the new when value in the + // nextwhen field, and let the other P set the when field + // when it is prepared to resort the heap. + t.nextwhen = when + + newStatus := uint32(timerModifiedLater) + if when < t.when { + newStatus = timerModifiedEarlier + } + + // Update the adjustTimers field. Subtract one if we + // are removing a timerModifiedEarlier, add one if we + // are adding a timerModifiedEarlier. + tpp := t.pp.ptr() + adjust := int32(0) + if status == timerModifiedEarlier { + adjust-- + } + if newStatus == timerModifiedEarlier { + adjust++ + } + if adjust != 0 { + atomic.Xadd(&tpp.adjustTimers, adjust) + } + + // Set the new status of the timer. + if !atomic.Cas(&t.status, timerModifying, newStatus) { + badTimer() + } + + // If the new status is earlier, wake up the poller. + if newStatus == timerModifiedEarlier { + wakeNetPoller(when) + } + } } func modtimerOld(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {