diff options
author | Roger Peppe <rogpeppe@gmail.com> | 2010-12-06 14:19:30 -0500 |
---|---|---|
committer | Rob Pike <r@golang.org> | 2010-12-06 14:19:30 -0500 |
commit | e2d1595c819ea6baeec42bae2107c141eb935b9e (patch) | |
tree | eed9a906b914a0f1e73a2c63787a3d6808f85b3e | |
parent | 415545e539c65cbb6ec7dcf7ef032a2e75f891e1 (diff) | |
download | go-e2d1595c819ea6baeec42bae2107c141eb935b9e.tar.gz go-e2d1595c819ea6baeec42bae2107c141eb935b9e.zip |
time: make After use fewer goroutines and host processes.
With credit to Gustavo Niemeyer, who hinted at this approach
in #go-nuts.
R=adg, rsc, niemeyer, r
CC=golang-dev
https://golang.org/cl/3416043
-rw-r--r-- | src/pkg/time/sleep.go | 114 | ||||
-rw-r--r-- | src/pkg/time/sleep_test.go | 58 |
2 files changed, 160 insertions, 12 deletions
diff --git a/src/pkg/time/sleep.go b/src/pkg/time/sleep.go index 702ced1304..77b7b4a593 100644 --- a/src/pkg/time/sleep.go +++ b/src/pkg/time/sleep.go @@ -7,8 +7,26 @@ package time import ( "os" "syscall" + "sync" + "container/heap" ) +// The event type represents a single After event. +type event struct { + t int64 // The absolute time that the event should fire. + c chan<- int64 // The channel to send on. + sleeping bool // A sleeper is sleeping for this event. +} + +type eventHeap []*event + +var events eventHeap +var eventMutex sync.Mutex + +func init() { + events.Push(&event{1 << 62, nil, true}) // sentinel +} + // Sleep pauses the current goroutine for at least ns nanoseconds. // Higher resolution sleeping may be provided by syscall.Nanosleep // on some operating systems. @@ -17,18 +35,6 @@ func Sleep(ns int64) os.Error { return err } -// After waits at least ns nanoseconds before sending the current time -// on the returned channel. -func After(ns int64) <-chan int64 { - t := Nanoseconds() - ch := make(chan int64, 1) - go func() { - t, _ = sleep(t, ns) - ch <- t - }() - return ch -} - // sleep takes the current time and a duration, // pauses for at least ns nanoseconds, and // returns the current time and an error. @@ -44,3 +50,87 @@ func sleep(t, ns int64) (int64, os.Error) { } return t, nil } + +// After waits at least ns nanoseconds before sending the current time +// on the returned channel. +func After(ns int64) <-chan int64 { + c := make(chan int64, 1) + t := ns + Nanoseconds() + eventMutex.Lock() + t0 := events[0].t + heap.Push(events, &event{t, c, false}) + if t < t0 { + go sleeper() + } + eventMutex.Unlock() + return c +} + +// sleeper continually looks at the earliest event in the queue, marks it +// as sleeping, waits until it happens, then removes any events +// in the queue that are due. It stops when it finds an event that is +// already marked as sleeping. When an event is inserted before the first item, +// a new sleeper is started. +// +// Scheduling vagaries mean that sleepers may not wake up in +// exactly the order of the events that they are waiting for, +// but this does not matter as long as there are at least as +// many sleepers as events marked sleeping (invariant). This ensures that +// there is always a sleeper to service the remaining events. +// +// A sleeper will remove at least the event it has been waiting for +// unless the event has already been removed by another sleeper. Both +// cases preserve the invariant described above. +func sleeper() { + eventMutex.Lock() + e := events[0] + for !e.sleeping { + t := Nanoseconds() + if dt := e.t - t; dt > 0 { + e.sleeping = true + eventMutex.Unlock() + if nt, err := sleep(t, dt); err != nil { + // If sleep has encountered an error, + // there's not much we can do. We pretend + // that time really has advanced by the required + // amount and lie to the rest of the system. + t = e.t + } else { + t = nt + } + eventMutex.Lock() + e = events[0] + } + for t >= e.t { + e.c <- t + heap.Pop(events) + e = events[0] + } + } + eventMutex.Unlock() +} + +func (eventHeap) Len() int { + return len(events) +} + +func (eventHeap) Less(i, j int) bool { + return events[i].t < events[j].t +} + +func (eventHeap) Swap(i, j int) { + events[i], events[j] = events[j], events[i] +} + +func (eventHeap) Push(x interface{}) { + events = append(events, x.(*event)) +} + +func (eventHeap) Pop() interface{} { + // TODO: possibly shrink array. + n := len(events) - 1 + e := events[n] + events[n] = nil + events = events[0:n] + return e +} diff --git a/src/pkg/time/sleep_test.go b/src/pkg/time/sleep_test.go index 4934a38691..9fd38d18d1 100644 --- a/src/pkg/time/sleep_test.go +++ b/src/pkg/time/sleep_test.go @@ -8,6 +8,7 @@ import ( "os" "syscall" "testing" + "sort" . "time" ) @@ -36,3 +37,60 @@ func TestAfter(t *testing.T) { t.Fatalf("After(%d) expect >= %d, got %d", delay, min, end) } } + +func TestAfterTick(t *testing.T) { + const ( + Delta = 100 * 1e6 + Count = 10 + ) + t0 := Nanoseconds() + for i := 0; i < Count; i++ { + <-After(Delta) + } + t1 := Nanoseconds() + ns := t1 - t0 + target := int64(Delta * Count) + slop := target * 2 / 10 + if ns < target-slop || ns > target+slop { + t.Fatalf("%d ticks of %g ns took %g ns, expected %g", Count, float64(Delta), float64(ns), float64(target)) + } +} + +var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0} + +type afterResult struct { + slot int + t int64 +} + +func await(slot int, result chan<- afterResult, ac <-chan int64) { + result <- afterResult{slot, <-ac} +} + +func TestAfterQueuing(t *testing.T) { + const ( + Delta = 100 * 1e6 + ) + // make the result channel buffered because we don't want + // to depend on channel queueing semantics that might + // possibly change in the future. + result := make(chan afterResult, len(slots)) + + t0 := Nanoseconds() + for _, slot := range slots { + go await(slot, result, After(int64(slot)*Delta)) + } + sort.SortInts(slots) + for _, slot := range slots { + r := <-result + if r.slot != slot { + t.Fatalf("after queue got slot %d, expected %d", r.slot, slot) + } + ns := r.t - t0 + target := int64(slot * Delta) + slop := int64(Delta) / 10 + if ns < target-slop || ns > target+slop { + t.Fatalf("after queue slot %d arrived at %g, expected %g", slot, float64(ns), float64(target)) + } + } +} |