mirror of
https://github.com/golang/go
synced 2024-11-19 14:34:42 -07:00
runtime: improve timers scalability on multi-CPU systems
Use per-P timers, so each P may work with its own timers. This CL improves performance on multi-CPU systems in the following cases: - When serving high number of concurrent connections with read/write deadlines set (for instance, highly loaded net/http server). - When using high number of concurrent timers. These timers may be implicitly created via context.WithDeadline or context.WithTimeout. Production servers should usually set timeout on connections and external requests in order to prevent from resource leakage. See https://blog.cloudflare.com/the-complete-guide-to-golang-net-http-timeouts/ Below are relevant benchmark results for various GOMAXPROCS values on linux/amd64: context package: name old time/op new time/op delta WithTimeout/concurrency=40 4.92µs ± 0% 5.17µs ± 1% +5.07% (p=0.000 n=9+9) WithTimeout/concurrency=4000 6.03µs ± 1% 6.49µs ± 0% +7.63% (p=0.000 n=8+10) WithTimeout/concurrency=400000 8.58µs ± 7% 9.02µs ± 4% +5.02% (p=0.019 n=10+10) name old time/op new time/op delta WithTimeout/concurrency=40-2 3.70µs ± 1% 2.78µs ± 4% -24.90% (p=0.000 n=8+9) WithTimeout/concurrency=4000-2 4.49µs ± 4% 3.67µs ± 5% -18.26% (p=0.000 n=10+10) WithTimeout/concurrency=400000-2 6.16µs ±10% 5.15µs ±13% -16.30% (p=0.000 n=10+10) name old time/op new time/op delta WithTimeout/concurrency=40-4 3.58µs ± 1% 2.64µs ± 2% -26.13% (p=0.000 n=9+10) WithTimeout/concurrency=4000-4 4.17µs ± 0% 3.32µs ± 1% -20.36% (p=0.000 n=10+10) WithTimeout/concurrency=400000-4 5.57µs ± 9% 4.83µs ±10% -13.27% (p=0.001 n=10+10) time package: name old time/op new time/op delta AfterFunc 6.15ms ± 3% 6.07ms ± 2% ~ (p=0.133 n=10+9) AfterFunc-2 3.43ms ± 1% 3.56ms ± 1% +3.91% (p=0.000 n=10+9) AfterFunc-4 5.04ms ± 2% 2.36ms ± 0% -53.20% (p=0.000 n=10+9) After 6.54ms ± 2% 6.49ms ± 3% ~ (p=0.393 n=10+10) After-2 3.68ms ± 1% 3.87ms ± 0% +5.14% (p=0.000 n=9+9) After-4 6.66ms ± 1% 2.87ms ± 1% -56.89% (p=0.000 n=10+10) Stop 698µs ± 2% 689µs ± 1% -1.26% (p=0.011 n=10+10) Stop-2 729µs ± 2% 434µs ± 3% -40.49% (p=0.000 n=10+10) Stop-4 837µs ± 3% 333µs ± 2% -60.20% (p=0.000 n=10+10) SimultaneousAfterFunc 694µs ± 1% 692µs ± 7% ~ (p=0.481 n=10+10) SimultaneousAfterFunc-2 714µs ± 3% 569µs ± 2% -20.33% (p=0.000 n=10+10) SimultaneousAfterFunc-4 782µs ± 2% 386µs ± 2% -50.67% (p=0.000 n=10+10) StartStop 267µs ± 3% 274µs ± 0% +2.64% (p=0.000 n=8+9) StartStop-2 238µs ± 2% 140µs ± 3% -40.95% (p=0.000 n=10+8) StartStop-4 320µs ± 1% 125µs ± 1% -61.02% (p=0.000 n=9+9) Reset 75.0µs ± 1% 77.5µs ± 2% +3.38% (p=0.000 n=10+10) Reset-2 150µs ± 2% 40µs ± 5% -73.09% (p=0.000 n=10+9) Reset-4 226µs ± 1% 33µs ± 1% -85.42% (p=0.000 n=10+10) Sleep 857µs ± 6% 878µs ± 9% ~ (p=0.079 n=10+9) Sleep-2 617µs ± 4% 585µs ± 2% -5.21% (p=0.000 n=10+10) Sleep-4 689µs ± 3% 465µs ± 4% -32.53% (p=0.000 n=10+10) Ticker 55.9ms ± 2% 55.9ms ± 2% ~ (p=0.971 n=10+10) Ticker-2 28.7ms ± 2% 28.1ms ± 1% -2.06% (p=0.000 n=10+10) Ticker-4 14.6ms ± 0% 13.6ms ± 1% -6.80% (p=0.000 n=9+10) Fixes #15133 Change-Id: I6f4b09d2db8c5bec93146db6501b44dbfe5c0ac4 Reviewed-on: https://go-review.googlesource.com/34784 Reviewed-by: Austin Clements <austin@google.com>
This commit is contained in:
parent
7377d0c7e9
commit
76f4fd8a52
@ -7,10 +7,64 @@ package context_test
|
|||||||
import (
|
import (
|
||||||
. "context"
|
. "context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func BenchmarkContextCancelTree(b *testing.B) {
|
func BenchmarkWithTimeout(b *testing.B) {
|
||||||
|
for concurrency := 40; concurrency <= 4e5; concurrency *= 100 {
|
||||||
|
name := fmt.Sprintf("concurrency=%d", concurrency)
|
||||||
|
b.Run(name, func(b *testing.B) {
|
||||||
|
benchmarkWithTimeout(b, concurrency)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkWithTimeout(b *testing.B, concurrentContexts int) {
|
||||||
|
gomaxprocs := runtime.GOMAXPROCS(0)
|
||||||
|
perPContexts := concurrentContexts / gomaxprocs
|
||||||
|
root := Background()
|
||||||
|
|
||||||
|
// Generate concurrent contexts.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
ccf := make([][]CancelFunc, gomaxprocs)
|
||||||
|
for i := range ccf {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
cf := make([]CancelFunc, perPContexts)
|
||||||
|
for j := range cf {
|
||||||
|
_, cf[j] = WithTimeout(root, time.Hour)
|
||||||
|
}
|
||||||
|
ccf[i] = cf
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
wcf := make([]CancelFunc, 10)
|
||||||
|
for pb.Next() {
|
||||||
|
for i := range wcf {
|
||||||
|
_, wcf[i] = WithTimeout(root, time.Hour)
|
||||||
|
}
|
||||||
|
for _, f := range wcf {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
b.StopTimer()
|
||||||
|
|
||||||
|
for _, cf := range ccf {
|
||||||
|
for _, f := range cf {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkCancelTree(b *testing.B) {
|
||||||
depths := []int{1, 10, 100, 1000}
|
depths := []int{1, 10, 100, 1000}
|
||||||
for _, d := range depths {
|
for _, d := range depths {
|
||||||
b.Run(fmt.Sprintf("depth=%d", d), func(b *testing.B) {
|
b.Run(fmt.Sprintf("depth=%d", d), func(b *testing.B) {
|
||||||
|
@ -270,8 +270,9 @@ func parseHeader(buf []byte) (int, error) {
|
|||||||
// It does analyze and verify per-event-type arguments.
|
// It does analyze and verify per-event-type arguments.
|
||||||
func parseEvents(ver int, rawEvents []rawEvent, strings map[uint64]string) (events []*Event, stacks map[uint64][]*Frame, err error) {
|
func parseEvents(ver int, rawEvents []rawEvent, strings map[uint64]string) (events []*Event, stacks map[uint64][]*Frame, err error) {
|
||||||
var ticksPerSec, lastSeq, lastTs int64
|
var ticksPerSec, lastSeq, lastTs int64
|
||||||
var lastG, timerGoid uint64
|
var lastG uint64
|
||||||
var lastP int
|
var lastP int
|
||||||
|
timerGoids := make(map[uint64]bool)
|
||||||
lastGs := make(map[int]uint64) // last goroutine running on P
|
lastGs := make(map[int]uint64) // last goroutine running on P
|
||||||
stacks = make(map[uint64][]*Frame)
|
stacks = make(map[uint64][]*Frame)
|
||||||
batches := make(map[int][]*Event) // events by P
|
batches := make(map[int][]*Event) // events by P
|
||||||
@ -308,7 +309,7 @@ func parseEvents(ver int, rawEvents []rawEvent, strings map[uint64]string) (even
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
case EvTimerGoroutine:
|
case EvTimerGoroutine:
|
||||||
timerGoid = raw.args[0]
|
timerGoids[raw.args[0]] = true
|
||||||
case EvStack:
|
case EvStack:
|
||||||
if len(raw.args) < 2 {
|
if len(raw.args) < 2 {
|
||||||
err = fmt.Errorf("EvStack has wrong number of arguments at offset 0x%x: want at least 2, got %v",
|
err = fmt.Errorf("EvStack has wrong number of arguments at offset 0x%x: want at least 2, got %v",
|
||||||
@ -431,7 +432,7 @@ func parseEvents(ver int, rawEvents []rawEvent, strings map[uint64]string) (even
|
|||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
ev.Ts = int64(float64(ev.Ts-minTs) * freq)
|
ev.Ts = int64(float64(ev.Ts-minTs) * freq)
|
||||||
// Move timers and syscalls to separate fake Ps.
|
// Move timers and syscalls to separate fake Ps.
|
||||||
if timerGoid != 0 && ev.G == timerGoid && ev.Type == EvGoUnblock {
|
if timerGoids[ev.G] && ev.Type == EvGoUnblock {
|
||||||
ev.P = TimerP
|
ev.P = TimerP
|
||||||
}
|
}
|
||||||
if ev.Type == EvGoSysExit {
|
if ev.Type == EvGoSysExit {
|
||||||
|
@ -3863,15 +3863,11 @@ func sysmon() {
|
|||||||
}
|
}
|
||||||
shouldRelax := true
|
shouldRelax := true
|
||||||
if osRelaxMinNS > 0 {
|
if osRelaxMinNS > 0 {
|
||||||
lock(&timers.lock)
|
next := timeSleepUntil()
|
||||||
if timers.sleeping {
|
now := nanotime()
|
||||||
now := nanotime()
|
if next-now < osRelaxMinNS {
|
||||||
next := timers.sleepUntil
|
shouldRelax = false
|
||||||
if next-now < osRelaxMinNS {
|
|
||||||
shouldRelax = false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
unlock(&timers.lock)
|
|
||||||
}
|
}
|
||||||
if shouldRelax {
|
if shouldRelax {
|
||||||
osRelax(true)
|
osRelax(true)
|
||||||
|
@ -6,14 +6,18 @@
|
|||||||
|
|
||||||
package runtime
|
package runtime
|
||||||
|
|
||||||
import "unsafe"
|
import (
|
||||||
|
"runtime/internal/sys"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
// Package time knows the layout of this structure.
|
// Package time knows the layout of this structure.
|
||||||
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
|
// If this struct changes, adjust ../time/sleep.go:/runtimeTimer.
|
||||||
// For GOOS=nacl, package syscall knows the layout of this structure.
|
// For GOOS=nacl, package syscall knows the layout of this structure.
|
||||||
// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.
|
// If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer.
|
||||||
type timer struct {
|
type timer struct {
|
||||||
i int // heap index
|
tb *timersBucket // the bucket the timer lives in
|
||||||
|
i int // heap index
|
||||||
|
|
||||||
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
|
// Timer wakes up at when, and then at when+period, ... (period > 0 only)
|
||||||
// each time calling f(arg, now) in the timer goroutine, so f must be
|
// each time calling f(arg, now) in the timer goroutine, so f must be
|
||||||
@ -25,7 +29,37 @@ type timer struct {
|
|||||||
seq uintptr
|
seq uintptr
|
||||||
}
|
}
|
||||||
|
|
||||||
var timers struct {
|
// timersLen is the length of timers array.
|
||||||
|
//
|
||||||
|
// Ideally, this would be set to GOMAXPROCS, but that would require
|
||||||
|
// dynamic reallocation
|
||||||
|
//
|
||||||
|
// The current value is a compromise between memory usage and performance
|
||||||
|
// that should cover the majority of GOMAXPROCS values used in the wild.
|
||||||
|
const timersLen = 64
|
||||||
|
|
||||||
|
// timers contains "per-P" timer heaps.
|
||||||
|
//
|
||||||
|
// Timers are queued into timersBucket associated with the current P,
|
||||||
|
// so each P may work with its own timers independently of other P instances.
|
||||||
|
//
|
||||||
|
// Each timersBucket may be associated with multiple P
|
||||||
|
// if GOMAXPROCS > timersLen.
|
||||||
|
var timers [timersLen]struct {
|
||||||
|
timersBucket
|
||||||
|
|
||||||
|
// The padding should eliminate false sharing
|
||||||
|
// between timersBucket values.
|
||||||
|
pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *timer) assignBucket() *timersBucket {
|
||||||
|
id := uint8(getg().m.p.ptr().id) % timersLen
|
||||||
|
t.tb = &timers[id].timersBucket
|
||||||
|
return t.tb
|
||||||
|
}
|
||||||
|
|
||||||
|
type timersBucket struct {
|
||||||
lock mutex
|
lock mutex
|
||||||
gp *g
|
gp *g
|
||||||
created bool
|
created bool
|
||||||
@ -51,18 +85,20 @@ func timeSleep(ns int64) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
t := getg().timer
|
gp := getg()
|
||||||
|
t := gp.timer
|
||||||
if t == nil {
|
if t == nil {
|
||||||
t = new(timer)
|
t = new(timer)
|
||||||
getg().timer = t
|
gp.timer = t
|
||||||
}
|
}
|
||||||
*t = timer{}
|
*t = timer{}
|
||||||
t.when = nanotime() + ns
|
t.when = nanotime() + ns
|
||||||
t.f = goroutineReady
|
t.f = goroutineReady
|
||||||
t.arg = getg()
|
t.arg = gp
|
||||||
lock(&timers.lock)
|
tb := t.assignBucket()
|
||||||
addtimerLocked(t)
|
lock(&tb.lock)
|
||||||
goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2)
|
tb.addtimerLocked(t)
|
||||||
|
goparkunlock(&tb.lock, "sleep", traceEvGoSleep, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// startTimer adds t to the timer heap.
|
// startTimer adds t to the timer heap.
|
||||||
@ -89,87 +125,86 @@ func goroutineReady(arg interface{}, seq uintptr) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func addtimer(t *timer) {
|
func addtimer(t *timer) {
|
||||||
lock(&timers.lock)
|
tb := t.assignBucket()
|
||||||
addtimerLocked(t)
|
lock(&tb.lock)
|
||||||
unlock(&timers.lock)
|
tb.addtimerLocked(t)
|
||||||
|
unlock(&tb.lock)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add a timer to the heap and start or kick timerproc if the new timer is
|
// Add a timer to the heap and start or kick timerproc if the new timer is
|
||||||
// earlier than any of the others.
|
// earlier than any of the others.
|
||||||
// Timers are locked.
|
// Timers are locked.
|
||||||
func addtimerLocked(t *timer) {
|
func (tb *timersBucket) addtimerLocked(t *timer) {
|
||||||
// when must never be negative; otherwise timerproc will overflow
|
// when must never be negative; otherwise timerproc will overflow
|
||||||
// during its delta calculation and never expire other runtime timers.
|
// during its delta calculation and never expire other runtime timers.
|
||||||
if t.when < 0 {
|
if t.when < 0 {
|
||||||
t.when = 1<<63 - 1
|
t.when = 1<<63 - 1
|
||||||
}
|
}
|
||||||
t.i = len(timers.t)
|
t.i = len(tb.t)
|
||||||
timers.t = append(timers.t, t)
|
tb.t = append(tb.t, t)
|
||||||
siftupTimer(t.i)
|
tb.siftupTimer(t.i)
|
||||||
if t.i == 0 {
|
if t.i == 0 {
|
||||||
// siftup moved to top: new earliest deadline.
|
// siftup moved to top: new earliest deadline.
|
||||||
if timers.sleeping {
|
if tb.sleeping {
|
||||||
timers.sleeping = false
|
tb.sleeping = false
|
||||||
notewakeup(&timers.waitnote)
|
notewakeup(&tb.waitnote)
|
||||||
}
|
}
|
||||||
if timers.rescheduling {
|
if tb.rescheduling {
|
||||||
timers.rescheduling = false
|
tb.rescheduling = false
|
||||||
goready(timers.gp, 0)
|
goready(tb.gp, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !timers.created {
|
if !tb.created {
|
||||||
timers.created = true
|
tb.created = true
|
||||||
go timerproc()
|
go timerproc(tb)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete timer t from the heap.
|
// Delete timer t from the heap.
|
||||||
// Do not need to update the timerproc: if it wakes up early, no big deal.
|
// Do not need to update the timerproc: if it wakes up early, no big deal.
|
||||||
func deltimer(t *timer) bool {
|
func deltimer(t *timer) bool {
|
||||||
// Dereference t so that any panic happens before the lock is held.
|
tb := t.tb
|
||||||
// Discard result, because t might be moving in the heap.
|
|
||||||
_ = t.i
|
|
||||||
|
|
||||||
lock(&timers.lock)
|
lock(&tb.lock)
|
||||||
// t may not be registered anymore and may have
|
// t may not be registered anymore and may have
|
||||||
// a bogus i (typically 0, if generated by Go).
|
// a bogus i (typically 0, if generated by Go).
|
||||||
// Verify it before proceeding.
|
// Verify it before proceeding.
|
||||||
i := t.i
|
i := t.i
|
||||||
last := len(timers.t) - 1
|
last := len(tb.t) - 1
|
||||||
if i < 0 || i > last || timers.t[i] != t {
|
if i < 0 || i > last || tb.t[i] != t {
|
||||||
unlock(&timers.lock)
|
unlock(&tb.lock)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if i != last {
|
if i != last {
|
||||||
timers.t[i] = timers.t[last]
|
tb.t[i] = tb.t[last]
|
||||||
timers.t[i].i = i
|
tb.t[i].i = i
|
||||||
}
|
}
|
||||||
timers.t[last] = nil
|
tb.t[last] = nil
|
||||||
timers.t = timers.t[:last]
|
tb.t = tb.t[:last]
|
||||||
if i != last {
|
if i != last {
|
||||||
siftupTimer(i)
|
tb.siftupTimer(i)
|
||||||
siftdownTimer(i)
|
tb.siftdownTimer(i)
|
||||||
}
|
}
|
||||||
unlock(&timers.lock)
|
unlock(&tb.lock)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Timerproc runs the time-driven events.
|
// Timerproc runs the time-driven events.
|
||||||
// It sleeps until the next event in the timers heap.
|
// It sleeps until the next event in the tb heap.
|
||||||
// If addtimer inserts a new earlier event, it wakes timerproc early.
|
// If addtimer inserts a new earlier event, it wakes timerproc early.
|
||||||
func timerproc() {
|
func timerproc(tb *timersBucket) {
|
||||||
timers.gp = getg()
|
tb.gp = getg()
|
||||||
for {
|
for {
|
||||||
lock(&timers.lock)
|
lock(&tb.lock)
|
||||||
timers.sleeping = false
|
tb.sleeping = false
|
||||||
now := nanotime()
|
now := nanotime()
|
||||||
delta := int64(-1)
|
delta := int64(-1)
|
||||||
for {
|
for {
|
||||||
if len(timers.t) == 0 {
|
if len(tb.t) == 0 {
|
||||||
delta = -1
|
delta = -1
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
t := timers.t[0]
|
t := tb.t[0]
|
||||||
delta = t.when - now
|
delta = t.when - now
|
||||||
if delta > 0 {
|
if delta > 0 {
|
||||||
break
|
break
|
||||||
@ -177,43 +212,43 @@ func timerproc() {
|
|||||||
if t.period > 0 {
|
if t.period > 0 {
|
||||||
// leave in heap but adjust next time to fire
|
// leave in heap but adjust next time to fire
|
||||||
t.when += t.period * (1 + -delta/t.period)
|
t.when += t.period * (1 + -delta/t.period)
|
||||||
siftdownTimer(0)
|
tb.siftdownTimer(0)
|
||||||
} else {
|
} else {
|
||||||
// remove from heap
|
// remove from heap
|
||||||
last := len(timers.t) - 1
|
last := len(tb.t) - 1
|
||||||
if last > 0 {
|
if last > 0 {
|
||||||
timers.t[0] = timers.t[last]
|
tb.t[0] = tb.t[last]
|
||||||
timers.t[0].i = 0
|
tb.t[0].i = 0
|
||||||
}
|
}
|
||||||
timers.t[last] = nil
|
tb.t[last] = nil
|
||||||
timers.t = timers.t[:last]
|
tb.t = tb.t[:last]
|
||||||
if last > 0 {
|
if last > 0 {
|
||||||
siftdownTimer(0)
|
tb.siftdownTimer(0)
|
||||||
}
|
}
|
||||||
t.i = -1 // mark as removed
|
t.i = -1 // mark as removed
|
||||||
}
|
}
|
||||||
f := t.f
|
f := t.f
|
||||||
arg := t.arg
|
arg := t.arg
|
||||||
seq := t.seq
|
seq := t.seq
|
||||||
unlock(&timers.lock)
|
unlock(&tb.lock)
|
||||||
if raceenabled {
|
if raceenabled {
|
||||||
raceacquire(unsafe.Pointer(t))
|
raceacquire(unsafe.Pointer(t))
|
||||||
}
|
}
|
||||||
f(arg, seq)
|
f(arg, seq)
|
||||||
lock(&timers.lock)
|
lock(&tb.lock)
|
||||||
}
|
}
|
||||||
if delta < 0 || faketime > 0 {
|
if delta < 0 || faketime > 0 {
|
||||||
// No timers left - put goroutine to sleep.
|
// No timers left - put goroutine to sleep.
|
||||||
timers.rescheduling = true
|
tb.rescheduling = true
|
||||||
goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
|
goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// At least one timer pending. Sleep until then.
|
// At least one timer pending. Sleep until then.
|
||||||
timers.sleeping = true
|
tb.sleeping = true
|
||||||
timers.sleepUntil = now + delta
|
tb.sleepUntil = now + delta
|
||||||
noteclear(&timers.waitnote)
|
noteclear(&tb.waitnote)
|
||||||
unlock(&timers.lock)
|
unlock(&tb.lock)
|
||||||
notetsleepg(&timers.waitnote, delta)
|
notetsleepg(&tb.waitnote, delta)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -222,28 +257,68 @@ func timejump() *g {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lock(&timers.lock)
|
for i := range timers {
|
||||||
if !timers.created || len(timers.t) == 0 {
|
lock(&timers[i].lock)
|
||||||
unlock(&timers.lock)
|
}
|
||||||
|
gp := timejumpLocked()
|
||||||
|
for i := range timers {
|
||||||
|
unlock(&timers[i].lock)
|
||||||
|
}
|
||||||
|
|
||||||
|
return gp
|
||||||
|
}
|
||||||
|
|
||||||
|
func timejumpLocked() *g {
|
||||||
|
// Determine a timer bucket with minimum when.
|
||||||
|
var minT *timer
|
||||||
|
for i := range timers {
|
||||||
|
tb := &timers[i]
|
||||||
|
if !tb.created || len(tb.t) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
t := tb.t[0]
|
||||||
|
if minT == nil || t.when < minT.when {
|
||||||
|
minT = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if minT == nil || minT.when <= faketime {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var gp *g
|
faketime = minT.when
|
||||||
if faketime < timers.t[0].when {
|
tb := minT.tb
|
||||||
faketime = timers.t[0].when
|
if !tb.rescheduling {
|
||||||
if timers.rescheduling {
|
return nil
|
||||||
timers.rescheduling = false
|
|
||||||
gp = timers.gp
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
unlock(&timers.lock)
|
tb.rescheduling = false
|
||||||
return gp
|
return tb.gp
|
||||||
|
}
|
||||||
|
|
||||||
|
func timeSleepUntil() int64 {
|
||||||
|
next := int64(1<<63 - 1)
|
||||||
|
|
||||||
|
// Determine minimum sleepUntil across all the timer buckets.
|
||||||
|
//
|
||||||
|
// The function can not return a precise answer,
|
||||||
|
// as another timer may pop in as soon as timers have been unlocked.
|
||||||
|
// So lock the timers one by one instead of all at once.
|
||||||
|
for i := range timers {
|
||||||
|
tb := &timers[i]
|
||||||
|
|
||||||
|
lock(&tb.lock)
|
||||||
|
if tb.sleeping && tb.sleepUntil < next {
|
||||||
|
next = tb.sleepUntil
|
||||||
|
}
|
||||||
|
unlock(&tb.lock)
|
||||||
|
}
|
||||||
|
|
||||||
|
return next
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heap maintenance algorithms.
|
// Heap maintenance algorithms.
|
||||||
|
|
||||||
func siftupTimer(i int) {
|
func (tb *timersBucket) siftupTimer(i int) {
|
||||||
t := timers.t
|
t := tb.t
|
||||||
when := t[i].when
|
when := t[i].when
|
||||||
tmp := t[i]
|
tmp := t[i]
|
||||||
for i > 0 {
|
for i > 0 {
|
||||||
@ -259,8 +334,8 @@ func siftupTimer(i int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func siftdownTimer(i int) {
|
func (tb *timersBucket) siftdownTimer(i int) {
|
||||||
t := timers.t
|
t := tb.t
|
||||||
n := len(t)
|
n := len(t)
|
||||||
when := t[i].when
|
when := t[i].when
|
||||||
tmp := t[i]
|
tmp := t[i]
|
||||||
|
@ -408,9 +408,12 @@ func ReadTrace() []byte {
|
|||||||
var data []byte
|
var data []byte
|
||||||
data = append(data, traceEvFrequency|0<<traceArgCountShift)
|
data = append(data, traceEvFrequency|0<<traceArgCountShift)
|
||||||
data = traceAppend(data, uint64(freq))
|
data = traceAppend(data, uint64(freq))
|
||||||
if timers.gp != nil {
|
for i := range timers {
|
||||||
data = append(data, traceEvTimerGoroutine|0<<traceArgCountShift)
|
tb := &timers[i]
|
||||||
data = traceAppend(data, uint64(timers.gp.goid))
|
if tb.gp != nil {
|
||||||
|
data = append(data, traceEvTimerGoroutine|0<<traceArgCountShift)
|
||||||
|
data = traceAppend(data, uint64(tb.gp.goid))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// This will emit a bunch of full buffers, we will pick them up
|
// This will emit a bunch of full buffers, we will pick them up
|
||||||
// on the next iteration.
|
// on the next iteration.
|
||||||
|
@ -16,11 +16,13 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Interface to timers implemented in package runtime.
|
// Interface to timers implemented in package runtime.
|
||||||
// Must be in sync with ../runtime/runtime.h:/^struct.Timer$
|
// Must be in sync with ../runtime/time.go:/^type timer
|
||||||
// Really for use by package time, but we cannot import time here.
|
// Really for use by package time, but we cannot import time here.
|
||||||
|
|
||||||
type runtimeTimer struct {
|
type runtimeTimer struct {
|
||||||
i int
|
tb uintptr
|
||||||
|
i int
|
||||||
|
|
||||||
when int64
|
when int64
|
||||||
period int64
|
period int64
|
||||||
f func(interface{}, uintptr) // NOTE: must not be closure
|
f func(interface{}, uintptr) // NOTE: must not be closure
|
||||||
@ -49,13 +51,14 @@ func (t *timer) start(q *queue, deadline int64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *timer) stop() {
|
func (t *timer) stop() {
|
||||||
|
if t.r.f == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
stopTimer(&t.r)
|
stopTimer(&t.r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *timer) reset(q *queue, deadline int64) {
|
func (t *timer) reset(q *queue, deadline int64) {
|
||||||
if t.r.f != nil {
|
t.stop()
|
||||||
t.stop()
|
|
||||||
}
|
|
||||||
if deadline == 0 {
|
if deadline == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -14,7 +14,9 @@ func runtimeNano() int64
|
|||||||
// Interface to timers implemented in package runtime.
|
// Interface to timers implemented in package runtime.
|
||||||
// Must be in sync with ../runtime/time.go:/^type timer
|
// Must be in sync with ../runtime/time.go:/^type timer
|
||||||
type runtimeTimer struct {
|
type runtimeTimer struct {
|
||||||
i int
|
tb uintptr
|
||||||
|
i int
|
||||||
|
|
||||||
when int64
|
when int64
|
||||||
period int64
|
period int64
|
||||||
f func(interface{}, uintptr) // NOTE: must not be closure
|
f func(interface{}, uintptr) // NOTE: must not be closure
|
||||||
|
@ -82,21 +82,36 @@ func TestAfterStress(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func benchmark(b *testing.B, bench func(n int)) {
|
func benchmark(b *testing.B, bench func(n int)) {
|
||||||
garbage := make([]*Timer, 1<<17)
|
|
||||||
for i := 0; i < len(garbage); i++ {
|
|
||||||
garbage[i] = AfterFunc(Hour, nil)
|
|
||||||
}
|
|
||||||
b.ResetTimer()
|
|
||||||
|
|
||||||
|
// Create equal number of garbage timers on each P before starting
|
||||||
|
// the benchmark.
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
garbageAll := make([][]*Timer, runtime.GOMAXPROCS(0))
|
||||||
|
for i := range garbageAll {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(i int) {
|
||||||
|
defer wg.Done()
|
||||||
|
garbage := make([]*Timer, 1<<15)
|
||||||
|
for j := range garbage {
|
||||||
|
garbage[j] = AfterFunc(Hour, nil)
|
||||||
|
}
|
||||||
|
garbageAll[i] = garbage
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
b.RunParallel(func(pb *testing.PB) {
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
for pb.Next() {
|
for pb.Next() {
|
||||||
bench(1000)
|
bench(1000)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
for i := 0; i < len(garbage); i++ {
|
|
||||||
garbage[i].Stop()
|
for _, garbage := range garbageAll {
|
||||||
|
for _, t := range garbage {
|
||||||
|
t.Stop()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -158,6 +173,30 @@ func BenchmarkStartStop(b *testing.B) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkReset(b *testing.B) {
|
||||||
|
benchmark(b, func(n int) {
|
||||||
|
t := NewTimer(Hour)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
t.Reset(Hour)
|
||||||
|
}
|
||||||
|
t.Stop()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkSleep(b *testing.B) {
|
||||||
|
benchmark(b, func(n int) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
go func() {
|
||||||
|
Sleep(Nanosecond)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestAfter(t *testing.T) {
|
func TestAfter(t *testing.T) {
|
||||||
const delay = 100 * Millisecond
|
const delay = 100 * Millisecond
|
||||||
start := Now()
|
start := Now()
|
||||||
|
@ -67,12 +67,11 @@ func TestNewTickerLtZeroDuration(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkTicker(b *testing.B) {
|
func BenchmarkTicker(b *testing.B) {
|
||||||
ticker := NewTicker(1)
|
benchmark(b, func(n int) {
|
||||||
b.ResetTimer()
|
ticker := NewTicker(Nanosecond)
|
||||||
b.StartTimer()
|
for i := 0; i < n; i++ {
|
||||||
for i := 0; i < b.N; i++ {
|
<-ticker.C
|
||||||
<-ticker.C
|
}
|
||||||
}
|
ticker.Stop()
|
||||||
b.StopTimer()
|
})
|
||||||
ticker.Stop()
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user