From c4d7d79755ab1dbe5ab491e0978e74831d553d74 Mon Sep 17 00:00:00 2001 From: Harry Pidcock Date: Tue, 2 Jul 2024 15:37:26 +1000 Subject: [PATCH] feat: add alarms to express future time intent --- clock.go | 48 ++++++++++++---- testclock/clock.go | 77 ++++++++++++++++++++++++- testclock/clock_test.go | 102 +++++++++++++++++++++++++++++++++ testclock/dilated.go | 97 ++++++++++++++++++++++++------- testclock/dilated_test.go | 117 ++++++++++++++++++++++++++++++++++++++ wall.go | 30 ++++++++++ 6 files changed, 437 insertions(+), 34 deletions(-) diff --git a/clock.go b/clock.go index 59a511d..6303cf6 100644 --- a/clock.go +++ b/clock.go @@ -21,29 +21,33 @@ type Clock interface { // NewTimer creates a new Timer that will send the current time // on its channel after at least duration d. NewTimer(d time.Duration) Timer -} -// Alarm returns a channel that will have the time sent on it at some point -// after the supplied time occurs. -// -// This is short for c.After(t.Sub(c.Now())). -func Alarm(c Clock, t time.Time) <-chan time.Time { - return c.After(t.Sub(c.Now())) + // At waits for the time to pass and then sends the + // current time on the returned channel. + At(t time.Time) <-chan time.Time + + // AtFunc waits for the time to pass and then calls f in its own goroutine. + // It returns an Alarm that can be used to cancel the call using its Stop method. + AtFunc(t time.Time, f func()) Alarm + + // NewAlarm creates a new Alarm that will send the current time + // on its channel at or after time t has passed. + NewAlarm(t time.Time) Alarm } -// The Timer type represents a single event. -// A Timer must be created with AfterFunc. +// Timer type represents a single event. +// Timers must be created with AfterFunc or NewTimer. // This interface follows time.Timer's methods but provides easier mocking. type Timer interface { - // When the Timer expires, the current time will be sent on the - // channel returned from Chan, unless the Timer was created by + // When the timer expires, the current time will be sent on the + // channel returned from Chan, unless the timer was created by // AfterFunc. Chan() <-chan time.Time // Reset changes the timer to expire after duration d. // It returns true if the timer had been active, false if // the timer had expired or been stopped. - Reset(time.Duration) bool + Reset(d time.Duration) bool // Stop prevents the Timer from firing. It returns true if // the call stops the timer, false if the timer has already expired or been stopped. @@ -51,3 +55,23 @@ type Timer interface { // from the channel succeeding incorrectly. Stop() bool } + +// Alarm type represents a single event. +// Alarms must be created with AtFunc or NewAlarm. +type Alarm interface { + // When the alarm expires, the current time will be sent on the + // channel returned from Chan, unless the alarm was created by + // AtFunc. + Chan() <-chan time.Time + + // Reset changes the alarm to expire at or after time t. + // It returns true if the alarm had been active, false if + // the alarm had fired or been stopped. + Reset(t time.Time) bool + + // Stop prevents the alarm from firing. It returns true if + // the call stops the alarm, false if the alarm has already fired or been stopped. + // Stop does not close the channel, to prevent a read + // from the channel succeeding incorrectly. + Stop() bool +} diff --git a/testclock/clock.go b/testclock/clock.go index dfa95aa..f6c5c9c 100644 --- a/testclock/clock.go +++ b/testclock/clock.go @@ -15,7 +15,7 @@ import ( "github.com/juju/loggo" ) -// timer implements a mock clock.Timer for testing purposes. +// timer implements the Timer interface. type timer struct { deadline time.Time clock *Clock @@ -41,6 +41,16 @@ func (t *timer) Chan() <-chan time.Time { return t.c } +// alarm implements the Alarm interface. +type alarm struct { + *timer +} + +// Reset is part of the clock.Timer interface. +func (a *alarm) Reset(t time.Time) bool { + return a.clock.resetTime(a.timer, t) +} + // Clock implements a mock clock.Clock for testing purposes. type Clock struct { mu sync.Mutex @@ -74,6 +84,7 @@ func (clock *Clock) After(d time.Duration) <-chan time.Time { return clock.NewTimer(d).Chan() } +// NewTimer is part of the clock.Clock interface. func (clock *Clock) NewTimer(d time.Duration) clock.Timer { c := make(chan time.Time, 1) return clock.addAlarm(d, c, func() { @@ -88,6 +99,26 @@ func (clock *Clock) AfterFunc(d time.Duration, f func()) clock.Timer { }) } +// At is part of the clock.Clock interface. +func (clock *Clock) At(t time.Time) <-chan time.Time { + return clock.NewAlarm(t).Chan() +} + +// NewAlarm is part of the clock.Clock interface. +func (clock *Clock) NewAlarm(t time.Time) clock.Alarm { + c := make(chan time.Time, 1) + return clock.addTimeAlarm(t, c, func() { + c <- clock.now + }) +} + +// AtFunc is part of the clock.Clock interface. +func (clock *Clock) AtFunc(t time.Time, f func()) clock.Alarm { + return clock.addTimeAlarm(t, nil, func() { + go f() + }) +} + func (clock *Clock) addAlarm(d time.Duration, c chan time.Time, trigger func()) *timer { defer clock.notifyAlarm() clock.mu.Lock() @@ -104,6 +135,22 @@ func (clock *Clock) addAlarm(d time.Duration, c chan time.Time, trigger func()) return t } +func (clock *Clock) addTimeAlarm(deadline time.Time, c chan time.Time, trigger func()) *alarm { + defer clock.notifyAlarm() + clock.mu.Lock() + defer clock.mu.Unlock() + t := &timer{ + c: c, + deadline: deadline, + clock: clock, + trigger: trigger, + stack: debug.Stack(), + } + clock.addTimer(t) + clock.triggerAll() + return &alarm{t} +} + // Advance advances the result of Now by the supplied duration, and sends // the "current" time on all alarms which are no longer "in the future". func (clock *Clock) Advance(d time.Duration) { @@ -206,7 +253,33 @@ func (clock *Clock) reset(t *timer, d time.Duration) bool { sort.Sort(byDeadline(clock.waiting)) if d <= 0 { // If duration is <= 0, that means we should be triggering the - // Timer right away, as "now" has already occured. + // Timer right away, as "now" has already occurred. + clock.triggerAll() + } + return found +} + +// resetTime is the underlying implementation of clock.Alarm.Reset, which may be +// called by any Alarm backed by this Clock. +func (clock *Clock) resetTime(t *timer, deadline time.Time) bool { + defer clock.notifyAlarm() + clock.mu.Lock() + defer clock.mu.Unlock() + + found := false + for _, wt := range clock.waiting { + if wt == t { + found = true + } + } + if !found { + clock.waiting = append(clock.waiting, t) + } + t.deadline = deadline + sort.Sort(byDeadline(clock.waiting)) + if clock.now.After(t.deadline) { + // If the time has already passed, that means we should be triggering the + // Timer right away, as "now" has already occurred. clock.triggerAll() } return found diff --git a/testclock/clock_test.go b/testclock/clock_test.go index df453b8..8671317 100644 --- a/testclock/clock_test.go +++ b/testclock/clock_test.go @@ -105,6 +105,30 @@ func (*clockSuite) TestAdvanceWithAfter(c *gc.C) { c.Assert(cl.Now().UTC(), gc.Equals, t0.Add(4*time.Second).UTC()) } +func (*clockSuite) TestAdvanceWithAt(c *gc.C) { + t0 := time.Now() + cl := testclock.NewClock(t0) + + // Use current time to schedule for 1 minute, but advance between. + target := cl.Now().Add(time.Minute) + cl.Advance(59 * time.Second) + ch := cl.At(target) + select { + case <-ch: + c.Fatalf("received unexpected event") + case <-time.After(shortWait): + } + + cl.Advance(time.Second) + + select { + case t1 := <-ch: + c.Assert(t1.Sub(target) >= 0, jc.IsTrue) + case <-time.After(shortWait): + c.Fatalf("expected event to be triggered") + } +} + func (*clockSuite) TestAdvanceWithAfterFunc(c *gc.C) { // Most of the details have been checked in TestAdvanceWithAfter, // so just check that AfterFunc is wired up correctly. @@ -122,6 +146,32 @@ func (*clockSuite) TestAdvanceWithAfterFunc(c *gc.C) { } } +func (*clockSuite) TestAdvanceWithAtFunc(c *gc.C) { + t0 := time.Now() + cl := testclock.NewClock(t0) + + fired := make(chan struct{}) + // Use current time to schedule for 1 minute, but advance between. + target := cl.Now().Add(time.Minute) + cl.Advance(59 * time.Second) + cl.AtFunc(target, func() { + close(fired) + }) + select { + case <-fired: + c.Fatalf("received unexpected event") + case <-time.After(shortWait): + } + + cl.Advance(time.Second) + + select { + case <-fired: + case <-time.After(shortWait): + c.Fatalf("expected event to be triggered") + } +} + func (*clockSuite) TestAfterFuncStop(c *gc.C) { t0 := time.Now() cl := testclock.NewClock(t0) @@ -138,6 +188,22 @@ func (*clockSuite) TestAfterFuncStop(c *gc.C) { } } +func (*clockSuite) TestAtFuncStop(c *gc.C) { + t0 := time.Now() + cl := testclock.NewClock(t0) + fired := make(chan struct{}) + alarm := cl.AtFunc(cl.Now().Add(time.Second), func() { + close(fired) + }) + cl.Advance(50 * time.Millisecond) + alarm.Stop() + select { + case <-fired: + c.Fatalf("received unexpected event") + case <-time.After(shortWait): + } +} + func (*clockSuite) TestNewTimerReset(c *gc.C) { t0 := time.Now() cl := testclock.NewClock(t0) @@ -160,6 +226,31 @@ func (*clockSuite) TestNewTimerReset(c *gc.C) { } } +func (*clockSuite) TestNewAlarmReset(c *gc.C) { + t0 := time.Now() + cl := testclock.NewClock(t0) + alarm := cl.NewAlarm(cl.Now().Add(time.Second)) + cl.Advance(time.Second) + select { + case t := <-alarm.Chan(): + c.Assert(t.UTC(), gc.Equals, t0.Add(time.Second).UTC()) + case <-time.After(longWait): + c.Fatalf("expected event to be triggered") + } + + neverFired := alarm.Reset(cl.Now().Add(time.Hour)) + c.Assert(neverFired, jc.IsFalse) + neverFired = alarm.Reset(cl.Now().Add(50 * time.Millisecond)) + c.Assert(neverFired, jc.IsTrue) + cl.Advance(100 * time.Millisecond) + select { + case t := <-alarm.Chan(): + c.Assert(t.UTC(), gc.Equals, t0.Add(time.Second+100*time.Millisecond).UTC()) + case <-time.After(longWait): + c.Fatalf("expected event to be triggered") + } +} + func (*clockSuite) TestNewTimerAsyncReset(c *gc.C) { t0 := time.Now() clock := testclock.NewClock(t0) @@ -323,3 +414,14 @@ func (*clockSuite) TestMultipleWaiters(c *gc.C) { } } + +func (*clockSuite) TestPastAlarmFired(c *gc.C) { + t0 := time.Now() + cl := testclock.NewClock(t0) + alarm := cl.NewAlarm(cl.Now().Add(-time.Nanosecond)) + select { + case <-alarm.Chan(): + case <-time.After(testing.ShortWait): + c.Fatal("alarm did not fire by deadline") + } +} diff --git a/testclock/dilated.go b/testclock/dilated.go index ed91a4f..211784c 100644 --- a/testclock/dilated.go +++ b/testclock/dilated.go @@ -55,18 +55,34 @@ func (dc *dilationClock) nowWithOffset() (time.Time, time.Duration) { // After implements Clock.After func (dc *dilationClock) After(d time.Duration) <-chan time.Time { - t := newDilatedWallTimer(dc, d, nil) + t := newDilatedWallTimer(dc, instant{d: &d}, nil) return t.c } // AfterFunc implements Clock.AfterFunc func (dc *dilationClock) AfterFunc(d time.Duration, f func()) clock.Timer { - return newDilatedWallTimer(dc, d, f) + return newDilatedWallTimer(dc, instant{d: &d}, f) } // NewTimer implements Clock.NewTimer func (dc *dilationClock) NewTimer(d time.Duration) clock.Timer { - return newDilatedWallTimer(dc, d, nil) + return newDilatedWallTimer(dc, instant{d: &d}, nil) +} + +// At implements Clock.At +func (dc *dilationClock) At(t time.Time) <-chan time.Time { + timer := newDilatedWallTimer(dc, instant{t: &t}, nil) + return timer.c +} + +// AtFunc implements Clock.AtFunc +func (dc *dilationClock) AtFunc(t time.Time, f func()) clock.Alarm { + return &dilatedWallAlarm{newDilatedWallTimer(dc, instant{t: &t}, f)} +} + +// NewAlarm implements Clock.NewAlarm +func (dc *dilationClock) NewAlarm(t time.Time) clock.Alarm { + return &dilatedWallAlarm{newDilatedWallTimer(dc, instant{t: &t}, nil)} } // Advance implements AdvanceableClock.Advance @@ -93,27 +109,33 @@ type dilatedWallTimer struct { stopChan chan chan bool } +// dilatedWallAlarm implements the Alarm interface. +type dilatedWallAlarm struct { + *dilatedWallTimer +} + type resetReq struct { - d time.Duration + f instant r chan bool } -func newDilatedWallTimer(dc *dilationClock, d time.Duration, after func()) *dilatedWallTimer { +func newDilatedWallTimer(dc *dilationClock, f instant, after func()) *dilatedWallTimer { t := &dilatedWallTimer{ dc: dc, c: make(chan time.Time), resetChan: make(chan resetReq), stopChan: make(chan chan bool), } - t.start(d, after) + t.start(f, after) return t } -func (t *dilatedWallTimer) start(d time.Duration, after func()) { +func (t *dilatedWallTimer) start(f instant, after func()) { t.dc.offsetChangedMutex.RLock() - dialatedNow, offset := t.dc.nowWithOffset() - realDuration := time.Duration(float64(d) * t.dc.realSecondDuration.Seconds()) - t.target = dialatedNow.Add(d) + dilatedNow, offset := t.dc.nowWithOffset() + until := f.until(dilatedNow) + realDuration := time.Duration(float64(until) * t.dc.realSecondDuration.Seconds()) + t.target = f.deadline(dilatedNow) t.timer = time.NewTimer(realDuration) t.offset = offset t.after = after @@ -130,9 +152,10 @@ func (t *dilatedWallTimer) run() { select { case reset := <-t.resetChan: realNow := time.Now() - dialatedNow := dilateTime(t.dc.epoch, realNow, t.dc.realSecondDuration, t.offset) - realDuration := time.Duration(float64(reset.d) * t.dc.realSecondDuration.Seconds()) - t.target = dialatedNow.Add(reset.d) + dilatedNow := dilateTime(t.dc.epoch, realNow, t.dc.realSecondDuration, t.offset) + until := reset.f.until(dilatedNow) + realDuration := time.Duration(float64(until) * t.dc.realSecondDuration.Seconds()) + t.target = reset.f.deadline(dilatedNow) sendChan = nil sendTime = time.Time{} reset.r <- t.timer.Reset(realDuration) @@ -165,14 +188,14 @@ func (t *dilatedWallTimer) run() { continue } realNow := time.Now() - dialatedNow := dilateTime(t.dc.epoch, realNow, t.dc.realSecondDuration, t.offset) - dialatedDuration := t.target.Sub(dialatedNow) - if dialatedDuration <= 0 { + dilatedNow := dilateTime(t.dc.epoch, realNow, t.dc.realSecondDuration, t.offset) + dilatedDuration := t.target.Sub(dilatedNow) + if dilatedDuration <= 0 { sendChan = t.c - sendTime = dialatedNow + sendTime = dilatedNow continue } - realDuration := time.Duration(float64(dialatedDuration) * t.dc.realSecondDuration.Seconds()) + realDuration := time.Duration(float64(dilatedDuration) * t.dc.realSecondDuration.Seconds()) t.timer.Reset(realDuration) } } @@ -185,15 +208,24 @@ func (t *dilatedWallTimer) Chan() <-chan time.Time { // Chan implements Timer.Reset func (t *dilatedWallTimer) Reset(d time.Duration) bool { + return t.reset(instant{d: &d}) +} + +// Chan implements Alarm.Reset +func (a *dilatedWallAlarm) Reset(t time.Time) bool { + return a.reset(instant{t: &t}) +} + +func (t *dilatedWallTimer) reset(f instant) bool { t.resetMutex.Lock() defer t.resetMutex.Unlock() reset := resetReq{ - d: d, + f: f, r: make(chan bool), } select { case <-t.done: - t.start(d, nil) + t.start(f, nil) return true case t.resetChan <- reset: return <-reset.r @@ -219,3 +251,28 @@ func dilateTime(epoch, realNow time.Time, } return epoch.Add(dilatedOffset).Add(time.Duration(float64(delta) / realSecondDuration.Seconds())) } + +type instant struct { + t *time.Time + d *time.Duration +} + +func (f instant) until(now time.Time) time.Duration { + switch { + case f.t != nil: + return f.t.Sub(now) + case f.d != nil: + return *f.d + } + return time.Duration(0) +} + +func (f instant) deadline(now time.Time) time.Time { + switch { + case f.t != nil: + return *f.t + case f.d != nil: + return now.Add(*f.d) + } + return now +} diff --git a/testclock/dilated_test.go b/testclock/dilated_test.go index 77e81c5..ebf85a8 100644 --- a/testclock/dilated_test.go +++ b/testclock/dilated_test.go @@ -50,6 +50,28 @@ func (*dilatedClockSuite) TestFastAfter(c *gc.C) { c.Assert(d1.Sub(d0).Milliseconds(), jc.LessThan, 1010) } +func (*dilatedClockSuite) TestSlowedAt(c *gc.C) { + cl := testclock.NewDilatedWallClock(doubleSecond) + t0 := time.Now() + d0 := cl.Now() + d1 := <-cl.At(d0.Add(time.Second)) + t1 := time.Now() + c.Assert(t1.Sub(t0).Seconds(), jc.GreaterThan, 1.9) + c.Assert(d1.Sub(d0).Seconds(), jc.GreaterThan, 0.9) + c.Assert(d1.Sub(d0).Seconds(), jc.LessThan, 1.1) +} + +func (*dilatedClockSuite) TestFastAt(c *gc.C) { + cl := testclock.NewDilatedWallClock(halfSecond) + t0 := time.Now() + d0 := cl.Now() + d1 := <-cl.At(d0.Add(time.Second)) + t1 := time.Now() + c.Assert(t1.Sub(t0).Milliseconds(), jc.LessThan, 600) + c.Assert(d1.Sub(d0).Milliseconds(), jc.GreaterThan, 990) + c.Assert(d1.Sub(d0).Milliseconds(), jc.LessThan, 1010) +} + func (*dilatedClockSuite) TestSlowedAfterFunc(c *gc.C) { t0 := time.Now() cl := testclock.NewDilatedWallClock(doubleSecond) @@ -74,6 +96,32 @@ func (*dilatedClockSuite) TestFastAfterFunc(c *gc.C) { mut.Lock() } +func (*dilatedClockSuite) TestSlowedAtFunc(c *gc.C) { + t0 := time.Now() + cl := testclock.NewDilatedWallClock(doubleSecond) + d0 := cl.Now() + mut := sync.Mutex{} + mut.Lock() + cl.AtFunc(d0.Add(time.Second), func() { + defer mut.Unlock() + c.Check(time.Since(t0).Seconds(), jc.GreaterThan, 1.9) + }) + mut.Lock() +} + +func (*dilatedClockSuite) TestFastAtFunc(c *gc.C) { + t0 := time.Now() + cl := testclock.NewDilatedWallClock(halfSecond) + d0 := cl.Now() + mut := sync.Mutex{} + mut.Lock() + cl.AtFunc(d0.Add(time.Second), func() { + defer mut.Unlock() + c.Check(time.Since(t0).Milliseconds(), jc.LessThan, 600) + }) + mut.Lock() +} + func (*dilatedClockSuite) TestSlowedNow(c *gc.C) { t0 := time.Now() cl := testclock.NewDilatedWallClock(doubleSecond) @@ -114,6 +162,20 @@ func (*dilatedClockSuite) TestAdvance(c *gc.C) { } } +func (*dilatedClockSuite) TestAdvanceAt(c *gc.C) { + t0 := time.Now() + cl := testclock.NewDilatedWallClock(halfSecond) + first := cl.At(cl.Now().Add(time.Second)) + cl.Advance(halfSecond) + <-time.After(250 * time.Millisecond) + select { + case t := <-first: + c.Assert(t.Sub(t0).Milliseconds(), jc.GreaterThan, 249) + case <-time.After(shortWait): + c.Fatal("timer failed to trigger early") + } +} + func (*dilatedClockSuite) TestAdvanceMulti(c *gc.C) { cl := testclock.NewDilatedWallClock(halfSecond) first := cl.After(time.Second) @@ -272,6 +334,28 @@ func (*dilatedClockSuite) TestAdvanceReset(c *gc.C) { } } +func (*dilatedClockSuite) TestAdvanceAlarmReset(c *gc.C) { + cl := testclock.NewDilatedWallClock(time.Minute) + alarms := make([]clock.Alarm, 0, 10) + d0 := cl.Now() + for i := 0; i < 10; i++ { + alarms = append(alarms, cl.NewAlarm(d0.Add(time.Millisecond))) + } + deadline := time.After(10 * time.Second) + for i := 0; i < 1000; i++ { + cl.Advance(time.Millisecond) + d0 = cl.Now() + for _, alarm := range alarms { + select { + case <-alarm.Chan(): + case <-deadline: + c.Fatal("alarm did not fire by deadline") + } + alarm.Reset(d0.Add(time.Millisecond)) + } + } +} + func (*dilatedClockSuite) TestAdvanceResetRacey(c *gc.C) { cl := testclock.NewDilatedWallClock(time.Second) timers := make([]clock.Timer, 0, 10) @@ -292,3 +376,36 @@ func (*dilatedClockSuite) TestAdvanceResetRacey(c *gc.C) { } } } + +func (*dilatedClockSuite) TestAdvanceAlarmResetRacey(c *gc.C) { + cl := testclock.NewDilatedWallClock(time.Second) + alarms := make([]clock.Alarm, 0, 10) + d0 := cl.Now() + for i := 0; i < 10; i++ { + alarms = append(alarms, cl.NewAlarm(d0.Add(time.Millisecond))) + } + deadline := time.After(2 * time.Second) + for i := 0; i < 1000; i++ { + time.Sleep(999 * time.Microsecond) + cl.Advance(time.Microsecond * time.Duration(rand.Intn(2))) + d0 = cl.Now() + for _, alarm := range alarms { + select { + case <-alarm.Chan(): + case <-deadline: + c.Fatal("alarm did not fire by deadline") + } + alarm.Reset(d0.Add(time.Millisecond)) + } + } +} + +func (*dilatedClockSuite) TestPastAlarmFired(c *gc.C) { + cl := testclock.NewDilatedWallClock(time.Second) + alarm := cl.NewAlarm(cl.Now().Add(-time.Nanosecond)) + select { + case <-alarm.Chan(): + case <-time.After(testing.ShortWait): + c.Fatal("alarm did not fire by deadline") + } +} diff --git a/wall.go b/wall.go index 9bfc351..1a4b021 100644 --- a/wall.go +++ b/wall.go @@ -45,3 +45,33 @@ type wallTimer struct { func (t wallTimer) Chan() <-chan time.Time { return t.C } + +// At implements Clock.At. +func (wallClock) At(t time.Time) <-chan time.Time { + return time.After(time.Until(t)) +} + +// AtFunc implements Clock.AtFunc. +func (wallClock) AtFunc(t time.Time, f func()) Alarm { + return wallAlarm{time.AfterFunc(time.Until(t), f)} +} + +// NewAlarm implements Clock.NewAlarm. +func (wallClock) NewAlarm(t time.Time) Alarm { + return wallAlarm{time.NewTimer(time.Until(t))} +} + +// wallAlarm implements the Alarm interface. +type wallAlarm struct { + *time.Timer +} + +// Chan implements Alarm.Chan. +func (a wallAlarm) Chan() <-chan time.Time { + return a.C +} + +// Reset implements Alarm.Reset +func (a wallAlarm) Reset(t time.Time) bool { + return a.Timer.Reset(time.Until(t)) +}