aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRoger Peppe <rogpeppe@gmail.com>2010-12-06 14:19:30 -0500
committerRob Pike <r@golang.org>2010-12-06 14:19:30 -0500
commite2d1595c819ea6baeec42bae2107c141eb935b9e (patch)
treeeed9a906b914a0f1e73a2c63787a3d6808f85b3e
parent415545e539c65cbb6ec7dcf7ef032a2e75f891e1 (diff)
downloadgo-e2d1595c819ea6baeec42bae2107c141eb935b9e.tar.gz
go-e2d1595c819ea6baeec42bae2107c141eb935b9e.zip
time: make After use fewer goroutines and host processes.
With credit to Gustavo Niemeyer, who hinted at this approach in #go-nuts. R=adg, rsc, niemeyer, r CC=golang-dev https://golang.org/cl/3416043
-rw-r--r--src/pkg/time/sleep.go114
-rw-r--r--src/pkg/time/sleep_test.go58
2 files changed, 160 insertions, 12 deletions
diff --git a/src/pkg/time/sleep.go b/src/pkg/time/sleep.go
index 702ced1304..77b7b4a593 100644
--- a/src/pkg/time/sleep.go
+++ b/src/pkg/time/sleep.go
@@ -7,8 +7,26 @@ package time
import (
"os"
"syscall"
+ "sync"
+ "container/heap"
)
+// The event type represents a single After event.
+type event struct {
+ t int64 // The absolute time that the event should fire.
+ c chan<- int64 // The channel to send on.
+ sleeping bool // A sleeper is sleeping for this event.
+}
+
+type eventHeap []*event
+
+var events eventHeap
+var eventMutex sync.Mutex
+
+func init() {
+ events.Push(&event{1 << 62, nil, true}) // sentinel
+}
+
// Sleep pauses the current goroutine for at least ns nanoseconds.
// Higher resolution sleeping may be provided by syscall.Nanosleep
// on some operating systems.
@@ -17,18 +35,6 @@ func Sleep(ns int64) os.Error {
return err
}
-// After waits at least ns nanoseconds before sending the current time
-// on the returned channel.
-func After(ns int64) <-chan int64 {
- t := Nanoseconds()
- ch := make(chan int64, 1)
- go func() {
- t, _ = sleep(t, ns)
- ch <- t
- }()
- return ch
-}
-
// sleep takes the current time and a duration,
// pauses for at least ns nanoseconds, and
// returns the current time and an error.
@@ -44,3 +50,87 @@ func sleep(t, ns int64) (int64, os.Error) {
}
return t, nil
}
+
+// After waits at least ns nanoseconds before sending the current time
+// on the returned channel.
+func After(ns int64) <-chan int64 {
+ c := make(chan int64, 1)
+ t := ns + Nanoseconds()
+ eventMutex.Lock()
+ t0 := events[0].t
+ heap.Push(events, &event{t, c, false})
+ if t < t0 {
+ go sleeper()
+ }
+ eventMutex.Unlock()
+ return c
+}
+
+// sleeper continually looks at the earliest event in the queue, marks it
+// as sleeping, waits until it happens, then removes any events
+// in the queue that are due. It stops when it finds an event that is
+// already marked as sleeping. When an event is inserted before the first item,
+// a new sleeper is started.
+//
+// Scheduling vagaries mean that sleepers may not wake up in
+// exactly the order of the events that they are waiting for,
+// but this does not matter as long as there are at least as
+// many sleepers as events marked sleeping (invariant). This ensures that
+// there is always a sleeper to service the remaining events.
+//
+// A sleeper will remove at least the event it has been waiting for
+// unless the event has already been removed by another sleeper. Both
+// cases preserve the invariant described above.
+func sleeper() {
+ eventMutex.Lock()
+ e := events[0]
+ for !e.sleeping {
+ t := Nanoseconds()
+ if dt := e.t - t; dt > 0 {
+ e.sleeping = true
+ eventMutex.Unlock()
+ if nt, err := sleep(t, dt); err != nil {
+ // If sleep has encountered an error,
+ // there's not much we can do. We pretend
+ // that time really has advanced by the required
+ // amount and lie to the rest of the system.
+ t = e.t
+ } else {
+ t = nt
+ }
+ eventMutex.Lock()
+ e = events[0]
+ }
+ for t >= e.t {
+ e.c <- t
+ heap.Pop(events)
+ e = events[0]
+ }
+ }
+ eventMutex.Unlock()
+}
+
+func (eventHeap) Len() int {
+ return len(events)
+}
+
+func (eventHeap) Less(i, j int) bool {
+ return events[i].t < events[j].t
+}
+
+func (eventHeap) Swap(i, j int) {
+ events[i], events[j] = events[j], events[i]
+}
+
+func (eventHeap) Push(x interface{}) {
+ events = append(events, x.(*event))
+}
+
+func (eventHeap) Pop() interface{} {
+ // TODO: possibly shrink array.
+ n := len(events) - 1
+ e := events[n]
+ events[n] = nil
+ events = events[0:n]
+ return e
+}
diff --git a/src/pkg/time/sleep_test.go b/src/pkg/time/sleep_test.go
index 4934a38691..9fd38d18d1 100644
--- a/src/pkg/time/sleep_test.go
+++ b/src/pkg/time/sleep_test.go
@@ -8,6 +8,7 @@ import (
"os"
"syscall"
"testing"
+ "sort"
. "time"
)
@@ -36,3 +37,60 @@ func TestAfter(t *testing.T) {
t.Fatalf("After(%d) expect >= %d, got %d", delay, min, end)
}
}
+
+func TestAfterTick(t *testing.T) {
+ const (
+ Delta = 100 * 1e6
+ Count = 10
+ )
+ t0 := Nanoseconds()
+ for i := 0; i < Count; i++ {
+ <-After(Delta)
+ }
+ t1 := Nanoseconds()
+ ns := t1 - t0
+ target := int64(Delta * Count)
+ slop := target * 2 / 10
+ if ns < target-slop || ns > target+slop {
+ t.Fatalf("%d ticks of %g ns took %g ns, expected %g", Count, float64(Delta), float64(ns), float64(target))
+ }
+}
+
+var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0}
+
+type afterResult struct {
+ slot int
+ t int64
+}
+
+func await(slot int, result chan<- afterResult, ac <-chan int64) {
+ result <- afterResult{slot, <-ac}
+}
+
+func TestAfterQueuing(t *testing.T) {
+ const (
+ Delta = 100 * 1e6
+ )
+ // make the result channel buffered because we don't want
+ // to depend on channel queueing semantics that might
+ // possibly change in the future.
+ result := make(chan afterResult, len(slots))
+
+ t0 := Nanoseconds()
+ for _, slot := range slots {
+ go await(slot, result, After(int64(slot)*Delta))
+ }
+ sort.SortInts(slots)
+ for _, slot := range slots {
+ r := <-result
+ if r.slot != slot {
+ t.Fatalf("after queue got slot %d, expected %d", r.slot, slot)
+ }
+ ns := r.t - t0
+ target := int64(slot * Delta)
+ slop := int64(Delta) / 10
+ if ns < target-slop || ns > target+slop {
+ t.Fatalf("after queue slot %d arrived at %g, expected %g", slot, float64(ns), float64(target))
+ }
+ }
+}