mirror of
https://github.com/golang/go
synced 2024-11-22 08:04:39 -07:00
time: make After use fewer goroutines and host processes.
With credit to Gustavo Niemeyer, who hinted at this approach in #go-nuts. R=adg, rsc, niemeyer, r CC=golang-dev https://golang.org/cl/3416043
This commit is contained in:
parent
415545e539
commit
e2d1595c81
@ -7,8 +7,26 @@ package time
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"sync"
|
||||||
|
"container/heap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The event type represents a single After event.
|
||||||
|
type event struct {
|
||||||
|
t int64 // The absolute time that the event should fire.
|
||||||
|
c chan<- int64 // The channel to send on.
|
||||||
|
sleeping bool // A sleeper is sleeping for this event.
|
||||||
|
}
|
||||||
|
|
||||||
|
type eventHeap []*event
|
||||||
|
|
||||||
|
var events eventHeap
|
||||||
|
var eventMutex sync.Mutex
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
events.Push(&event{1 << 62, nil, true}) // sentinel
|
||||||
|
}
|
||||||
|
|
||||||
// Sleep pauses the current goroutine for at least ns nanoseconds.
|
// Sleep pauses the current goroutine for at least ns nanoseconds.
|
||||||
// Higher resolution sleeping may be provided by syscall.Nanosleep
|
// Higher resolution sleeping may be provided by syscall.Nanosleep
|
||||||
// on some operating systems.
|
// on some operating systems.
|
||||||
@ -17,18 +35,6 @@ func Sleep(ns int64) os.Error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// After waits at least ns nanoseconds before sending the current time
|
|
||||||
// on the returned channel.
|
|
||||||
func After(ns int64) <-chan int64 {
|
|
||||||
t := Nanoseconds()
|
|
||||||
ch := make(chan int64, 1)
|
|
||||||
go func() {
|
|
||||||
t, _ = sleep(t, ns)
|
|
||||||
ch <- t
|
|
||||||
}()
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
// sleep takes the current time and a duration,
|
// sleep takes the current time and a duration,
|
||||||
// pauses for at least ns nanoseconds, and
|
// pauses for at least ns nanoseconds, and
|
||||||
// returns the current time and an error.
|
// returns the current time and an error.
|
||||||
@ -44,3 +50,87 @@ func sleep(t, ns int64) (int64, os.Error) {
|
|||||||
}
|
}
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// After waits at least ns nanoseconds before sending the current time
|
||||||
|
// on the returned channel.
|
||||||
|
func After(ns int64) <-chan int64 {
|
||||||
|
c := make(chan int64, 1)
|
||||||
|
t := ns + Nanoseconds()
|
||||||
|
eventMutex.Lock()
|
||||||
|
t0 := events[0].t
|
||||||
|
heap.Push(events, &event{t, c, false})
|
||||||
|
if t < t0 {
|
||||||
|
go sleeper()
|
||||||
|
}
|
||||||
|
eventMutex.Unlock()
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
// sleeper continually looks at the earliest event in the queue, marks it
|
||||||
|
// as sleeping, waits until it happens, then removes any events
|
||||||
|
// in the queue that are due. It stops when it finds an event that is
|
||||||
|
// already marked as sleeping. When an event is inserted before the first item,
|
||||||
|
// a new sleeper is started.
|
||||||
|
//
|
||||||
|
// Scheduling vagaries mean that sleepers may not wake up in
|
||||||
|
// exactly the order of the events that they are waiting for,
|
||||||
|
// but this does not matter as long as there are at least as
|
||||||
|
// many sleepers as events marked sleeping (invariant). This ensures that
|
||||||
|
// there is always a sleeper to service the remaining events.
|
||||||
|
//
|
||||||
|
// A sleeper will remove at least the event it has been waiting for
|
||||||
|
// unless the event has already been removed by another sleeper. Both
|
||||||
|
// cases preserve the invariant described above.
|
||||||
|
func sleeper() {
|
||||||
|
eventMutex.Lock()
|
||||||
|
e := events[0]
|
||||||
|
for !e.sleeping {
|
||||||
|
t := Nanoseconds()
|
||||||
|
if dt := e.t - t; dt > 0 {
|
||||||
|
e.sleeping = true
|
||||||
|
eventMutex.Unlock()
|
||||||
|
if nt, err := sleep(t, dt); err != nil {
|
||||||
|
// If sleep has encountered an error,
|
||||||
|
// there's not much we can do. We pretend
|
||||||
|
// that time really has advanced by the required
|
||||||
|
// amount and lie to the rest of the system.
|
||||||
|
t = e.t
|
||||||
|
} else {
|
||||||
|
t = nt
|
||||||
|
}
|
||||||
|
eventMutex.Lock()
|
||||||
|
e = events[0]
|
||||||
|
}
|
||||||
|
for t >= e.t {
|
||||||
|
e.c <- t
|
||||||
|
heap.Pop(events)
|
||||||
|
e = events[0]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
eventMutex.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eventHeap) Len() int {
|
||||||
|
return len(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eventHeap) Less(i, j int) bool {
|
||||||
|
return events[i].t < events[j].t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eventHeap) Swap(i, j int) {
|
||||||
|
events[i], events[j] = events[j], events[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eventHeap) Push(x interface{}) {
|
||||||
|
events = append(events, x.(*event))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (eventHeap) Pop() interface{} {
|
||||||
|
// TODO: possibly shrink array.
|
||||||
|
n := len(events) - 1
|
||||||
|
e := events[n]
|
||||||
|
events[n] = nil
|
||||||
|
events = events[0:n]
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
@ -8,6 +8,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
"testing"
|
"testing"
|
||||||
|
"sort"
|
||||||
. "time"
|
. "time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,3 +37,60 @@ func TestAfter(t *testing.T) {
|
|||||||
t.Fatalf("After(%d) expect >= %d, got %d", delay, min, end)
|
t.Fatalf("After(%d) expect >= %d, got %d", delay, min, end)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestAfterTick(t *testing.T) {
|
||||||
|
const (
|
||||||
|
Delta = 100 * 1e6
|
||||||
|
Count = 10
|
||||||
|
)
|
||||||
|
t0 := Nanoseconds()
|
||||||
|
for i := 0; i < Count; i++ {
|
||||||
|
<-After(Delta)
|
||||||
|
}
|
||||||
|
t1 := Nanoseconds()
|
||||||
|
ns := t1 - t0
|
||||||
|
target := int64(Delta * Count)
|
||||||
|
slop := target * 2 / 10
|
||||||
|
if ns < target-slop || ns > target+slop {
|
||||||
|
t.Fatalf("%d ticks of %g ns took %g ns, expected %g", Count, float64(Delta), float64(ns), float64(target))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0}
|
||||||
|
|
||||||
|
type afterResult struct {
|
||||||
|
slot int
|
||||||
|
t int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func await(slot int, result chan<- afterResult, ac <-chan int64) {
|
||||||
|
result <- afterResult{slot, <-ac}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAfterQueuing(t *testing.T) {
|
||||||
|
const (
|
||||||
|
Delta = 100 * 1e6
|
||||||
|
)
|
||||||
|
// make the result channel buffered because we don't want
|
||||||
|
// to depend on channel queueing semantics that might
|
||||||
|
// possibly change in the future.
|
||||||
|
result := make(chan afterResult, len(slots))
|
||||||
|
|
||||||
|
t0 := Nanoseconds()
|
||||||
|
for _, slot := range slots {
|
||||||
|
go await(slot, result, After(int64(slot)*Delta))
|
||||||
|
}
|
||||||
|
sort.SortInts(slots)
|
||||||
|
for _, slot := range slots {
|
||||||
|
r := <-result
|
||||||
|
if r.slot != slot {
|
||||||
|
t.Fatalf("after queue got slot %d, expected %d", r.slot, slot)
|
||||||
|
}
|
||||||
|
ns := r.t - t0
|
||||||
|
target := int64(slot * Delta)
|
||||||
|
slop := int64(Delta) / 10
|
||||||
|
if ns < target-slop || ns > target+slop {
|
||||||
|
t.Fatalf("after queue slot %d arrived at %g, expected %g", slot, float64(ns), float64(target))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user