Skip to content

Commit

Permalink
feat: add Reset to leaky bucket and token bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
leeym committed Oct 26, 2024
1 parent 3ac06eb commit 55d2944
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 0 deletions.
46 changes: 46 additions & 0 deletions leakybucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error) {
return time.Duration(wait), nil
}

// Reset the bucket
func (t *LeakyBucket) Reset(ctx context.Context) error {
return t.backend.Reset(ctx)
}

// LeakyBucketInMemory is an in-memory implementation of LeakyBucketStateBackend.
type LeakyBucketInMemory struct {
state LeakyBucketState
Expand All @@ -131,6 +136,14 @@ func (l *LeakyBucketInMemory) SetState(ctx context.Context, state LeakyBucketSta
return ctx.Err()
}

// Reset resets the current state of the bucket.
func (l *LeakyBucketInMemory) Reset(ctx context.Context) error {
state := LeakyBucketState{
Last: 0,
}
return l.SetState(ctx, state)
}

const (
etcdKeyLBLease = "lease"
etcdKeyLBLast = "last"
Expand Down Expand Up @@ -266,6 +279,14 @@ func (l *LeakyBucketEtcd) SetState(ctx context.Context, state LeakyBucketState)
return l.save(ctx, state)
}

// Reset resets the state of the bucket in etcd.
func (l *LeakyBucketEtcd) Reset(ctx context.Context) error {
state := LeakyBucketState{
Last: 0,
}
return l.SetState(ctx, state)
}

const (
redisKeyLBLast = "last"
redisKeyLBVersion = "version"
Expand Down Expand Up @@ -401,6 +422,14 @@ func (t *LeakyBucketRedis) SetState(ctx context.Context, state LeakyBucketState)
return errors.Wrap(err, "failed to save keys to redis")
}

// Reset resets the state in Redis.
func (t *LeakyBucketRedis) Reset(ctx context.Context) error {
state := LeakyBucketState{
Last: 0,
}
return t.SetState(ctx, state)
}

// LeakyBucketMemcached is a Memcached implementation of a LeakyBucketStateBackend.
type LeakyBucketMemcached struct {
cli *memcache.Client
Expand Down Expand Up @@ -491,6 +520,15 @@ func (t *LeakyBucketMemcached) SetState(ctx context.Context, state LeakyBucketSt
return errors.Wrap(err, "failed to save keys to memcached")
}

// Reset resets the state in Memcached.
func (t *LeakyBucketMemcached) Reset(ctx context.Context) error {
state := LeakyBucketState{
Last: 0,
}
t.casId = 0
return t.SetState(ctx, state)
}

// LeakyBucketDynamoDB is a DyanamoDB implementation of a LeakyBucketStateBackend.
type LeakyBucketDynamoDB struct {
client *dynamodb.Client
Expand Down Expand Up @@ -562,6 +600,14 @@ func (t *LeakyBucketDynamoDB) SetState(ctx context.Context, state LeakyBucketSta
return err
}

// Reset resets the state in DynamoDB.
func (t *LeakyBucketDynamoDB) Reset(ctx context.Context) error {
state := LeakyBucketState{
Last: 0,
}
return t.SetState(ctx, state)
}

const (
dynamodbBucketRaceConditionExpression = "Version <= :version"
dynamoDBBucketLastKey = "Last"
Expand Down
30 changes: 30 additions & 0 deletions leakybucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,36 @@ func (s *LimitersTestSuite) TestLeakyBucketOverflow() {
}
}

func (s *LimitersTestSuite) TestLeakyBucketReset() {
rate := time.Second
capacity := int64(2)
clock := newFakeClock()
for name, bucket := range s.leakyBuckets(capacity, rate, clock) {
s.Run(name, func() {
clock.reset()
// The first call has no wait since there were no calls before.
wait, err := bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(time.Duration(0), wait)
// The second call increments the queue size by 1.
wait, err = bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(rate, wait)
// The third call overflows the bucket capacity.
wait, err = bucket.Limit(context.TODO())
s.Require().Equal(l.ErrLimitExhausted, err)
s.Equal(rate*2, wait)
// Reset the bucket
err = bucket.Reset(context.TODO())
s.Require().NoError(err)
// Retry the last call. This time it should succeed.
wait, err = bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(time.Duration(0), wait)
})
}
}

func TestLeakyBucket_ZeroCapacity_ReturnsError(t *testing.T) {
capacity := int64(0)
rate := time.Hour
Expand Down
51 changes: 51 additions & 0 deletions tokenbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ func (t *TokenBucket) Limit(ctx context.Context) (time.Duration, error) {
return t.Take(ctx, 1)
}

// Reset resets the bucket.
func (t *TokenBucket) Reset(ctx context.Context) error {
return t.backend.Reset(ctx)
}

// TokenBucketInMemory is an in-memory implementation of TokenBucketStateBackend.
//
// The state is not shared nor persisted so it won't survive restarts or failures.
Expand Down Expand Up @@ -151,6 +156,15 @@ func (t *TokenBucketInMemory) SetState(ctx context.Context, state TokenBucketSta
return ctx.Err()
}

// Reset resets the current bucket's state.
func (t *TokenBucketInMemory) Reset(ctx context.Context) error {
state := TokenBucketState{
Last: 0,
Available: 0,
}
return t.SetState(ctx, state)
}

const (
etcdKeyTBLease = "lease"
etcdKeyTBAvailable = "available"
Expand Down Expand Up @@ -327,6 +341,15 @@ func (t *TokenBucketEtcd) SetState(ctx context.Context, state TokenBucketState)
return t.save(ctx, state)
}

// Reset resets the state of the bucket.
func (t *TokenBucketEtcd) Reset(ctx context.Context) error {
state := TokenBucketState{
Last: 0,
Available: 0,
}
return t.SetState(ctx, state)
}

const (
redisKeyTBAvailable = "available"
redisKeyTBLast = "last"
Expand Down Expand Up @@ -489,6 +512,15 @@ func (t *TokenBucketRedis) SetState(ctx context.Context, state TokenBucketState)
return errors.Wrap(err, "failed to save keys to redis")
}

// Reset resets the state in Redis.
func (t *TokenBucketRedis) Reset(ctx context.Context) error {
state := TokenBucketState{
Last: 0,
Available: 0,
}
return t.SetState(ctx, state)
}

// TokenBucketMemcached is a Memcached implementation of a TokenBucketStateBackend.
//
// Memcached is a distributed memory object caching system.
Expand Down Expand Up @@ -581,6 +613,16 @@ func (t *TokenBucketMemcached) SetState(ctx context.Context, state TokenBucketSt
return errors.Wrap(err, "failed to save keys to memcached")
}

// Reset resets the state in Memcached.
func (t *TokenBucketMemcached) Reset(ctx context.Context) error {
state := TokenBucketState{
Last: 0,
Available: 0,
}
t.casId = 0
return t.SetState(ctx, state)
}

// TokenBucketDynamoDB is a DynamoDB implementation of a TokenBucketStateBackend.
type TokenBucketDynamoDB struct {
client *dynamodb.Client
Expand Down Expand Up @@ -652,6 +694,15 @@ func (t *TokenBucketDynamoDB) SetState(ctx context.Context, state TokenBucketSta
return err
}

// Reset resets the state in DynamoDB.
func (t *TokenBucketDynamoDB) Reset(ctx context.Context) error {
state := TokenBucketState{
Last: 0,
Available: 0,
}
return t.SetState(ctx, state)
}

const dynamoDBBucketAvailableKey = "Available"

func (t *TokenBucketDynamoDB) getGetItemInput() *dynamodb.GetItemInput {
Expand Down
26 changes: 26 additions & 0 deletions tokenbucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,32 @@ func (s *LimitersTestSuite) TestTokenBucketOverflow() {
}
}

func (s *LimitersTestSuite) TestTokenBucketReset() {
clock := newFakeClock()
rate := time.Second
for name, bucket := range s.tokenBuckets(2, rate, clock) {
s.Run(name, func() {
clock.reset()
wait, err := bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(time.Duration(0), wait)
wait, err = bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(time.Duration(0), wait)
// The third call should fail.
wait, err = bucket.Limit(context.TODO())
s.Require().Equal(l.ErrLimitExhausted, err)
s.Equal(rate, wait)
err = bucket.Reset(context.TODO())
s.Require().NoError(err)
// Retry the last call.
wait, err = bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(time.Duration(0), wait)
})
}
}

func (s *LimitersTestSuite) TestTokenBucketRefill() {
for name, backend := range s.tokenBucketBackends() {
s.Run(name, func() {
Expand Down

0 comments on commit 55d2944

Please sign in to comment.