Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added Reset to TokenBucket #90

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions tokenbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,33 @@ func (t *TokenBucket) Limit(ctx context.Context) (time.Duration, error) {
return t.Take(ctx, 1)
}

// Reset the bucket to zero state
func (t *TokenBucket) Reset(ctx context.Context) (time.Duration, error) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the returned time.Duration value used for?
Is it needed? If so, please add a comment explaining what it means.

t.mu.Lock()
defer t.mu.Unlock()
if err := t.locker.Lock(ctx); err != nil {
return 0, err
}
defer func() {
if err := t.locker.Unlock(ctx); err != nil {
t.logger.Log(err)
}
}()
state, err := t.backend.State(ctx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you don't need you retrieve the state first and immediately override its fields.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh yeah, my bad i will remove the code. Is the other stuff fine?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition occurs while using a new TokenBucketState. Without retrieving the state and using it, error occurs.

For Reference Memcache CompareAndSwap. It expects the item's key to be the same while other values may differ.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can skip that check if the state is zero.

diff --git a/tokenbucket.go b/tokenbucket.go
index 16fc8ec..0b499b1 100644
--- a/tokenbucket.go
+++ b/tokenbucket.go
@@ -562,7 +562,7 @@ func (t *TokenBucketMemcached) SetState(ctx context.Context, state TokenBucketSt
                        Value: b.Bytes(),
                        CasID: t.casId,
                }
-               if t.raceCheck && t.casId > 0 {
+               if t.raceCheck && t.casId > 0 && !state.isZero() {
                        err = t.cli.CompareAndSwap(item)
                } else {
                        err = t.cli.Set(item)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we may consider adding a separate Reset() method to the TokenBucketStateBackend interface.
That way we won't need a special logic in the SetState() to handle a reset.
However we will need to implement it for every backend.

What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this? #91

if err != nil {
return 0, err
}

// Reset bucket to initial state
state.Available = 0
state.Last = 0

if err = t.backend.SetState(ctx, state); err != nil {
return 0, err
}
return 0, nil
}

// TokenBucketInMemory is an in-memory implementation of TokenBucketStateBackend.
//
// The state is not shared nor persisted so it won't survive restarts or failures.
Expand Down
44 changes: 44 additions & 0 deletions tokenbucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,50 @@ func (s *LimitersTestSuite) TestTokenBucketRefill() {
}
}

func (s *LimitersTestSuite) TestTokenBucketReset() {
for name, backend := range s.tokenBucketBackends() {
s.Run(name, func() {
clock := newFakeClock()

bucketCapacity := 2
bucket := l.NewTokenBucket(int64(bucketCapacity), time.Second*1, l.NewLockNoop(), backend, clock, s.logger)

// Check while Bucket is full, partially full and empty
noOfAccess := 4

for access := range noOfAccess {
clock.reset()
for i := range access {
_, err := bucket.Limit(context.TODO())

// Handle err when bucket capacity is reached
if i >= bucketCapacity {
s.Require().Equal(l.ErrLimitExhausted, err)
} else {
s.Require().NoError(err)
}
}

_, err := bucket.Reset(context.TODO())
s.Require().NoError(err)

// Since bucket state is zero valued, Limit needs to be invoked
wait, err := bucket.Limit(context.TODO())
s.Require().NoError(err)
s.Equal(time.Duration(0), wait)

state, err := backend.State(context.TODO())
s.Require().NoError(err, "unable to retrieve backend state")
s.Require().Equal(int64(bucketCapacity-1), state.Available)

// Cleanup, Set Bucket to initial state
_, err = bucket.Reset(context.TODO())
s.Require().NoError(err)
}
})
}
}

func BenchmarkTokenBuckets(b *testing.B) {
s := new(LimitersTestSuite)
s.SetT(&testing.T{})
Expand Down
Loading