Skip to content

Commit

Permalink
feat: add alarms to express future time intent
Browse files Browse the repository at this point in the history
  • Loading branch information
hpidcock committed Jul 3, 2024
1 parent 18a29fa commit c4d7d79
Show file tree
Hide file tree
Showing 6 changed files with 437 additions and 34 deletions.
48 changes: 36 additions & 12 deletions clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,57 @@ type Clock interface {
// NewTimer creates a new Timer that will send the current time
// on its channel after at least duration d.
NewTimer(d time.Duration) Timer
}

// Alarm returns a channel that will have the time sent on it at some point
// after the supplied time occurs.
//
// This is short for c.After(t.Sub(c.Now())).
func Alarm(c Clock, t time.Time) <-chan time.Time {
return c.After(t.Sub(c.Now()))
// At waits for the time to pass and then sends the
// current time on the returned channel.
At(t time.Time) <-chan time.Time

// AtFunc waits for the time to pass and then calls f in its own goroutine.
// It returns an Alarm that can be used to cancel the call using its Stop method.
AtFunc(t time.Time, f func()) Alarm

// NewAlarm creates a new Alarm that will send the current time
// on its channel at or after time t has passed.
NewAlarm(t time.Time) Alarm
}

// The Timer type represents a single event.
// A Timer must be created with AfterFunc.
// Timer type represents a single event.
// Timers must be created with AfterFunc or NewTimer.
// This interface follows time.Timer's methods but provides easier mocking.
type Timer interface {
// When the Timer expires, the current time will be sent on the
// channel returned from Chan, unless the Timer was created by
// When the timer expires, the current time will be sent on the
// channel returned from Chan, unless the timer was created by
// AfterFunc.
Chan() <-chan time.Time

// Reset changes the timer to expire after duration d.
// It returns true if the timer had been active, false if
// the timer had expired or been stopped.
Reset(time.Duration) bool
Reset(d time.Duration) bool

// Stop prevents the Timer from firing. It returns true if
// the call stops the timer, false if the timer has already expired or been stopped.
// Stop does not close the channel, to prevent a read
// from the channel succeeding incorrectly.
Stop() bool
}

// Alarm type represents a single event.
// Alarms must be created with AtFunc or NewAlarm.
type Alarm interface {
// When the alarm expires, the current time will be sent on the
// channel returned from Chan, unless the alarm was created by
// AtFunc.
Chan() <-chan time.Time

// Reset changes the alarm to expire at or after time t.
// It returns true if the alarm had been active, false if
// the alarm had fired or been stopped.
Reset(t time.Time) bool

// Stop prevents the alarm from firing. It returns true if
// the call stops the alarm, false if the alarm has already fired or been stopped.
// Stop does not close the channel, to prevent a read
// from the channel succeeding incorrectly.
Stop() bool
}
77 changes: 75 additions & 2 deletions testclock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/juju/loggo"
)

// timer implements a mock clock.Timer for testing purposes.
// timer implements the Timer interface.
type timer struct {
deadline time.Time
clock *Clock
Expand All @@ -41,6 +41,16 @@ func (t *timer) Chan() <-chan time.Time {
return t.c
}

// alarm implements the Alarm interface.
type alarm struct {
*timer
}

// Reset is part of the clock.Timer interface.
func (a *alarm) Reset(t time.Time) bool {
return a.clock.resetTime(a.timer, t)
}

// Clock implements a mock clock.Clock for testing purposes.
type Clock struct {
mu sync.Mutex
Expand Down Expand Up @@ -74,6 +84,7 @@ func (clock *Clock) After(d time.Duration) <-chan time.Time {
return clock.NewTimer(d).Chan()
}

// NewTimer is part of the clock.Clock interface.
func (clock *Clock) NewTimer(d time.Duration) clock.Timer {
c := make(chan time.Time, 1)
return clock.addAlarm(d, c, func() {
Expand All @@ -88,6 +99,26 @@ func (clock *Clock) AfterFunc(d time.Duration, f func()) clock.Timer {
})
}

// At is part of the clock.Clock interface.
func (clock *Clock) At(t time.Time) <-chan time.Time {
return clock.NewAlarm(t).Chan()
}

// NewAlarm is part of the clock.Clock interface.
func (clock *Clock) NewAlarm(t time.Time) clock.Alarm {
c := make(chan time.Time, 1)
return clock.addTimeAlarm(t, c, func() {
c <- clock.now
})
}

// AtFunc is part of the clock.Clock interface.
func (clock *Clock) AtFunc(t time.Time, f func()) clock.Alarm {
return clock.addTimeAlarm(t, nil, func() {
go f()
})
}

func (clock *Clock) addAlarm(d time.Duration, c chan time.Time, trigger func()) *timer {
defer clock.notifyAlarm()
clock.mu.Lock()
Expand All @@ -104,6 +135,22 @@ func (clock *Clock) addAlarm(d time.Duration, c chan time.Time, trigger func())
return t
}

func (clock *Clock) addTimeAlarm(deadline time.Time, c chan time.Time, trigger func()) *alarm {
defer clock.notifyAlarm()
clock.mu.Lock()
defer clock.mu.Unlock()
t := &timer{
c: c,
deadline: deadline,
clock: clock,
trigger: trigger,
stack: debug.Stack(),
}
clock.addTimer(t)
clock.triggerAll()
return &alarm{t}
}

// Advance advances the result of Now by the supplied duration, and sends
// the "current" time on all alarms which are no longer "in the future".
func (clock *Clock) Advance(d time.Duration) {
Expand Down Expand Up @@ -206,7 +253,33 @@ func (clock *Clock) reset(t *timer, d time.Duration) bool {
sort.Sort(byDeadline(clock.waiting))
if d <= 0 {
// If duration is <= 0, that means we should be triggering the
// Timer right away, as "now" has already occured.
// Timer right away, as "now" has already occurred.
clock.triggerAll()
}
return found
}

// resetTime is the underlying implementation of clock.Alarm.Reset, which may be
// called by any Alarm backed by this Clock.
func (clock *Clock) resetTime(t *timer, deadline time.Time) bool {
defer clock.notifyAlarm()
clock.mu.Lock()
defer clock.mu.Unlock()

found := false
for _, wt := range clock.waiting {
if wt == t {
found = true
}
}
if !found {
clock.waiting = append(clock.waiting, t)
}
t.deadline = deadline
sort.Sort(byDeadline(clock.waiting))
if clock.now.After(t.deadline) {
// If the time has already passed, that means we should be triggering the
// Timer right away, as "now" has already occurred.
clock.triggerAll()
}
return found
Expand Down
102 changes: 102 additions & 0 deletions testclock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,30 @@ func (*clockSuite) TestAdvanceWithAfter(c *gc.C) {
c.Assert(cl.Now().UTC(), gc.Equals, t0.Add(4*time.Second).UTC())
}

func (*clockSuite) TestAdvanceWithAt(c *gc.C) {
t0 := time.Now()
cl := testclock.NewClock(t0)

// Use current time to schedule for 1 minute, but advance between.
target := cl.Now().Add(time.Minute)
cl.Advance(59 * time.Second)
ch := cl.At(target)
select {
case <-ch:
c.Fatalf("received unexpected event")
case <-time.After(shortWait):
}

cl.Advance(time.Second)

select {
case t1 := <-ch:
c.Assert(t1.Sub(target) >= 0, jc.IsTrue)
case <-time.After(shortWait):
c.Fatalf("expected event to be triggered")
}
}

func (*clockSuite) TestAdvanceWithAfterFunc(c *gc.C) {
// Most of the details have been checked in TestAdvanceWithAfter,
// so just check that AfterFunc is wired up correctly.
Expand All @@ -122,6 +146,32 @@ func (*clockSuite) TestAdvanceWithAfterFunc(c *gc.C) {
}
}

func (*clockSuite) TestAdvanceWithAtFunc(c *gc.C) {
t0 := time.Now()
cl := testclock.NewClock(t0)

fired := make(chan struct{})
// Use current time to schedule for 1 minute, but advance between.
target := cl.Now().Add(time.Minute)
cl.Advance(59 * time.Second)
cl.AtFunc(target, func() {
close(fired)
})
select {
case <-fired:
c.Fatalf("received unexpected event")
case <-time.After(shortWait):
}

cl.Advance(time.Second)

select {
case <-fired:
case <-time.After(shortWait):
c.Fatalf("expected event to be triggered")
}
}

func (*clockSuite) TestAfterFuncStop(c *gc.C) {
t0 := time.Now()
cl := testclock.NewClock(t0)
Expand All @@ -138,6 +188,22 @@ func (*clockSuite) TestAfterFuncStop(c *gc.C) {
}
}

func (*clockSuite) TestAtFuncStop(c *gc.C) {
t0 := time.Now()
cl := testclock.NewClock(t0)
fired := make(chan struct{})
alarm := cl.AtFunc(cl.Now().Add(time.Second), func() {
close(fired)
})
cl.Advance(50 * time.Millisecond)
alarm.Stop()
select {
case <-fired:
c.Fatalf("received unexpected event")
case <-time.After(shortWait):
}
}

func (*clockSuite) TestNewTimerReset(c *gc.C) {
t0 := time.Now()
cl := testclock.NewClock(t0)
Expand All @@ -160,6 +226,31 @@ func (*clockSuite) TestNewTimerReset(c *gc.C) {
}
}

func (*clockSuite) TestNewAlarmReset(c *gc.C) {
t0 := time.Now()
cl := testclock.NewClock(t0)
alarm := cl.NewAlarm(cl.Now().Add(time.Second))
cl.Advance(time.Second)
select {
case t := <-alarm.Chan():
c.Assert(t.UTC(), gc.Equals, t0.Add(time.Second).UTC())
case <-time.After(longWait):
c.Fatalf("expected event to be triggered")
}

neverFired := alarm.Reset(cl.Now().Add(time.Hour))
c.Assert(neverFired, jc.IsFalse)
neverFired = alarm.Reset(cl.Now().Add(50 * time.Millisecond))
c.Assert(neverFired, jc.IsTrue)
cl.Advance(100 * time.Millisecond)
select {
case t := <-alarm.Chan():
c.Assert(t.UTC(), gc.Equals, t0.Add(time.Second+100*time.Millisecond).UTC())
case <-time.After(longWait):
c.Fatalf("expected event to be triggered")
}
}

func (*clockSuite) TestNewTimerAsyncReset(c *gc.C) {
t0 := time.Now()
clock := testclock.NewClock(t0)
Expand Down Expand Up @@ -323,3 +414,14 @@ func (*clockSuite) TestMultipleWaiters(c *gc.C) {
}

}

func (*clockSuite) TestPastAlarmFired(c *gc.C) {
t0 := time.Now()
cl := testclock.NewClock(t0)
alarm := cl.NewAlarm(cl.Now().Add(-time.Nanosecond))
select {
case <-alarm.Chan():
case <-time.After(testing.ShortWait):
c.Fatal("alarm did not fire by deadline")
}
}
Loading

0 comments on commit c4d7d79

Please sign in to comment.