Skip to content

Commit

Permalink
Replaced clock offset with Clock interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ziflex committed May 4, 2024
1 parent 09ef17f commit 7cf20b2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 62 deletions.
41 changes: 15 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,43 +46,32 @@ func (c *ApiClient) Do(ctx context.Context, req *http.Request) (*http.Response,

## Options

### Clock offset
Since client and server machines have different clocks they are probably out of sync, thus you might want to add a clock offset between the throttler's time windows.

#### Static offset
Just a static value
### Clock
`Clock` type is an interface that allows you to provide custom clock mechanism that's different from the system one.
It has just 2 methods: ``Now()`` and ``Sleep(time.Duration)``.
It might be useful to use a custom implementation to provide a more nuanced timing mechanism.

```go
package myapp

import (
"time"
"github.com/ziflex/throttle"
"time"
"github.com/ziflex/throttle"
)

func main() {
throttler := throttle.New[any](10, throttle.WithStaticClockOffset(time.Millisecond * 250))
type MyClock struct {
offset time.Duration
}
```

#### Dynamic offset
A function the receives the calculated sleep duration and returns an offset that is added to it:

```go
package myapp
func (c *MyClock) Now() time.Time {
return time.Now().Add(c.offset)
}

import (
"time"
"github.com/ziflex/throttle"
)
func (c *MyClock) Sleep(dur time.Duration) {
time.Sleep(dur + c.offset)
}

func main() {
throttler := throttle.New[any](10, throttle.WithDynamicClockOffset(func(sleepDur time.Duration) time.Duration {
if sleepDur < (time.Millisecond * 100) {
return time.Millisecond * 100
}

return sleepDur
}))
throttler := throttle.New[any](10, throttle.WithClock(&MyClock{time.Millisecond * 250}))
}
```
18 changes: 18 additions & 0 deletions clock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package throttle

import "time"

type Clock interface {
Now() time.Time
Sleep(dur time.Duration)
}

type DefaultClock struct{}

func (c *DefaultClock) Now() time.Time {
return time.Now()
}

func (c *DefaultClock) Sleep(dur time.Duration) {
time.Sleep(dur)
}
30 changes: 6 additions & 24 deletions options.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package throttle

import "time"

type (
// options holds configuration settings for the throttler.
options struct {
// additional time to be added when calculating sleep durations
clockOffset ClockOffsetProvider
clock Clock
}

Option func(opts *options)
Expand All @@ -19,31 +16,16 @@ func buildOptions(setters []Option) *options {
setter(opts)
}

if opts.clockOffset == nil {
opts.clockOffset = zeroClockOffset
if opts.clock == nil {
opts.clock = &DefaultClock{}
}

return opts
}

func zeroClockOffset(_ time.Duration) time.Duration {
return 0
}

// WithStaticClockOffset returns an Option that sets the static clock offset in the throttler options.
// This is useful for adding extra time to the throttle's wait periods, for example, to account for clock skew.
func WithStaticClockOffset(offset time.Duration) Option {
return func(opts *options) {
opts.clockOffset = func(_ time.Duration) time.Duration {
return offset
}
}
}

// WithDynamicClockOffset returns an Option that sets the dynamic clock offset in the throttler options.
// This is useful for adding extra time to the throttle's wait periods, for example, to account for clock skew.
func WithDynamicClockOffset(provider ClockOffsetProvider) Option {
// WithClock sets a custom implementation of Clock interface.
func WithClock(clock Clock) Option {
return func(opts *options) {
opts.clockOffset = provider
opts.clock = clock
}
}
23 changes: 11 additions & 12 deletions throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,16 @@ import (
const windowSize = time.Second

type (
ClockOffsetProvider func(sleepDur time.Duration) time.Duration

// Fn represents a function that returns a value of type T and an error.
Fn[T any] func() (T, error)

// Throttler manages the execution of operations so that they don't exceed a specified rate limit.
Throttler[T any] struct {
mu sync.Mutex
window time.Time
clockOffset ClockOffsetProvider
counter uint64
limit uint64
mu sync.Mutex
window time.Time
clock Clock
counter uint64
limit uint64
}
)

Expand All @@ -28,8 +26,8 @@ func New[T any](limit uint64, setters ...Option) *Throttler[T] {
opts := buildOptions(setters)

return &Throttler[T]{
limit: limit,
clockOffset: opts.clockOffset,
limit: limit,
clock: opts.clock,
}
}

Expand All @@ -50,7 +48,8 @@ func (t *Throttler[T]) advance() {
return
}

now := time.Now()
clock := t.clock
now := clock.Now()

// if this is the first operation, initialize the window
if t.window.IsZero() {
Expand Down Expand Up @@ -81,10 +80,10 @@ func (t *Throttler[T]) advance() {

// if the limit is reached, wait until the current window expires
// we use an optional clock offset to account for clock skew.
time.Sleep(sleepDur + t.clockOffset(sleepDur))
clock.Sleep(sleepDur)

// after sleeping, reset to a new window starting now
t.reset(time.Now())
t.reset(clock.Now())
}

// reset starts a new window from the specified start time and resets the operation counter.
Expand Down

0 comments on commit 7cf20b2

Please sign in to comment.