Skip to content

Commit

Permalink
Refactor and reorder the code and document
Browse files Browse the repository at this point in the history
  • Loading branch information
leeym committed Feb 23, 2024
1 parent f497b4b commit b2ed121
Show file tree
Hide file tree
Showing 12 changed files with 341 additions and 376 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ jobs:
ALLOW_EMPTY_PASSWORD: yes
ports:
- 6379:6379
memcached:
image: bitnami/memcached
ports:
- 11211:11211
consul:
image: bitnami/consul
ports:
Expand All @@ -33,10 +37,6 @@ jobs:
ALLOW_ANONYMOUS_LOGIN: yes
ports:
- 2181:2181
memcached:
image: bitnami/memcached
ports:
- 11211:11211

steps:
- uses: actions/checkout@v3
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ Most common implementations are already provided.
- [`Token bucket`](https://en.wikipedia.org/wiki/Token_bucket)
- in-memory (local)
- redis
- memcached
- etcd
- dynamodb
- memcached

Allows requests at a certain input rate with possible bursts configured by the capacity parameter.
The output rate equals to the input rate.
Expand All @@ -22,9 +22,9 @@ Most common implementations are already provided.
- [`Leaky bucket`](https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue)
- in-memory (local)
- redis
- memcached
- etcd
- dynamodb
- memcached

Puts requests in a FIFO queue to be processed at a constant rate.
There are no restrictions on the input rate except for the capacity of the queue.
Expand All @@ -33,8 +33,8 @@ Most common implementations are already provided.
- [`Fixed window counter`](https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm/)
- in-memory (local)
- redis
- dynamodb
- memcached
- dynamodb

Simple and resources efficient algorithm that does not need a lock.
Precision may be adjusted by the size of the window.
Expand All @@ -43,8 +43,8 @@ Most common implementations are already provided.
- [`Sliding window counter`](https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm/)
- in-memory (local)
- redis
- dynamodb
- memcached
- dynamodb

Smoothes out the bursts around the boundary between 2 adjacent windows.
Needs as twice more memory as the `Fixed Window` algorithm (2 windows instead of 1 at a time).
Expand Down Expand Up @@ -133,7 +133,7 @@ Supported backends:

Run tests locally:
```bash
docker-compose up -d # start etcd, Redis, zookeeper, consul, memcached, and localstack
docker-compose up -d # start etcd, Redis, memcached, zookeeper, consul, and localstack
ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" MEMCACHED_ADDR="127.0.0.1:11211" go test -race -v
```

Expand Down
69 changes: 21 additions & 48 deletions concurrent_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ type SortedSetNode struct {
Value string
}

// Add adds the request with the given key to the serialize slice in Memcached and returns the total number of requests in it.
// Add adds the request with the given key to the slice in Memcached and returns the total number of requests in it.
// It also removes the keys with expired TTL.
func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (int64, error) {
var err error
Expand Down Expand Up @@ -218,27 +218,15 @@ func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (in
err = errors.Wrap(err, "failed to Encode")
return
}
item = &memcache.Item{
Key: c.key,
Value: b.Bytes(),
CasID: casId,
}
if casId > 0 {
err = c.cli.CompareAndSwap(&memcache.Item{
Key: c.key,
Value: b.Bytes(),
Expiration: int32(c.clock.Now().Add(c.ttl).Unix()),
CasID: casId,
})
if err != nil {
err = errors.Wrap(err, "failed to CompareAndSwap")
return
}
err = c.cli.CompareAndSwap(item)
} else {
err = c.cli.Add(&memcache.Item{
Key: c.key,
Value: b.Bytes(),
Expiration: int32(c.clock.Now().Add(c.ttl).Unix()),
})
if err != nil {
err = errors.Wrap(err, "failed to Add")
return
}
err = c.cli.Add(item)
}
}()

Expand All @@ -258,7 +246,7 @@ func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (in
}
}

// Remove removes the request identified by the key from the serialized slice in Memcached.
// Remove removes the request identified by the key from the slice in Memcached.
func (c *ConcurrentBufferMemcached) Remove(ctx context.Context, key string) error {
var err error
now := c.clock.Now()
Expand Down Expand Up @@ -296,33 +284,18 @@ func (c *ConcurrentBufferMemcached) Remove(ctx context.Context, key string) erro
if err != nil {
return errors.Wrap(err, "failed to Encode")
}
if casId > 0 {
err = c.cli.CompareAndSwap(&memcache.Item{
Key: c.key,
Value: b.Bytes(),
Expiration: int32(c.clock.Now().Add(c.ttl).Unix()),
CasID: casId,
})
if err != nil {
if errors.Is(err, memcache.ErrCASConflict) || errors.Is(err, memcache.ErrNotStored) || errors.Is(err, memcache.ErrCacheMiss) {
return c.Remove(ctx, key)
} else {
return errors.Wrap(err, "failed to CompareAndSwap")
}
}
} else {
err = c.cli.Add(&memcache.Item{
Key: c.key,
Value: b.Bytes(),
Expiration: int32(c.clock.Now().Add(c.ttl).Unix()),
})
if err != nil {
if errors.Is(err, memcache.ErrCASConflict) || errors.Is(err, memcache.ErrNotStored) || errors.Is(err, memcache.ErrCacheMiss) {
return c.Remove(ctx, key)
} else {
return errors.Wrap(err, "failed to Add")
}
item = &memcache.Item{
Key: c.key,
Value: b.Bytes(),
CasID: casId,
}
err = c.cli.CompareAndSwap(item)
if err != nil {
if errors.Is(err, memcache.ErrCASConflict) || errors.Is(err, memcache.ErrNotStored) || errors.Is(err, memcache.ErrCacheMiss) {
return c.Remove(ctx, key)
} else {
return errors.Wrap(err, "failed to CompareAndSwap")
}
}
return err
return nil
}
10 changes: 5 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ services:
ports:
- "6379:6379"

memcached:
image: bitnami/memcached
ports:
- "11211:11211"

consul:
image: bitnami/consul
ports:
Expand All @@ -32,8 +37,3 @@ services:
command: "-jar DynamoDBLocal.jar -inMemory"
ports:
- "8000:8000"

memcached:
image: bitnami/memcached
ports:
- "11211:11211"
94 changes: 47 additions & 47 deletions fixedwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,53 @@ func (f *FixedWindowRedis) Increment(ctx context.Context, window time.Time, ttl
}
}

// FixedWindowMemcached implements FixedWindow in Memcached.
type FixedWindowMemcached struct {
cli *memcache.Client
prefix string
}

// NewFixedWindowMemcached returns a new instance of FixedWindowMemcached.
// Prefix is the key prefix used to store all the keys used in this implementation in Memcached.
func NewFixedWindowMemcached(cli *memcache.Client, prefix string) *FixedWindowMemcached {
return &FixedWindowMemcached{cli: cli, prefix: prefix + ":FixedWindow"}
}

// Increment increments the window's counter in Memcached.
func (f *FixedWindowMemcached) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error) {
var newValue uint64
var err error
done := make(chan struct{})
go func() {
defer close(done)
key := fmt.Sprintf("%s:%d", f.prefix, window.UnixNano())
newValue, err = f.cli.Increment(key, 1)
if err != nil && errors.Is(err, memcache.ErrCacheMiss) {
newValue = 1
item := &memcache.Item{
Key: key,
Value: []byte(strconv.FormatUint(newValue, 10)),
}
err = f.cli.Add(item)
}
}()

select {
case <-done:
if err != nil {
if errors.Is(err, memcache.ErrNotStored) {
return f.Increment(ctx, window, ttl)
} else {
return 0, errors.Wrap(err, "failed to Increment or Add")
}
} else {
return int64(newValue), err
}
case <-ctx.Done():
return 0, ctx.Err()
}
}

// FixedWindowDynamoDB implements FixedWindow in DynamoDB.
type FixedWindowDynamoDB struct {
client *dynamodb.Client
Expand Down Expand Up @@ -205,50 +252,3 @@ func (f *FixedWindowDynamoDB) Increment(ctx context.Context, window time.Time, t

return int64(count), nil
}

// FixedWindowMemcached implements FixedWindow in Memcached.
type FixedWindowMemcached struct {
cli *memcache.Client
prefix string
}

// NewFixedWindowMemcached returns a new instance of FixedWindowMemcached.
// Prefix is the key prefix used to store all the keys used in this implementation in Memcached.
func NewFixedWindowMemcached(cli *memcache.Client, prefix string) *FixedWindowMemcached {
return &FixedWindowMemcached{cli: cli, prefix: prefix + ":FixedWindow"}
}

// Increment increments the window's counter in Memcached.
func (f *FixedWindowMemcached) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error) {
var newValue uint64
var err error
done := make(chan struct{})
go func() {
defer close(done)
key := fmt.Sprintf("%s:%d", f.prefix, window.UnixNano())
newValue, err = f.cli.Increment(key, 1)
if err != nil && errors.Is(err, memcache.ErrCacheMiss) {
newValue = 1
err = f.cli.Add(&memcache.Item{
Key: key,
Value: []byte(strconv.FormatUint(newValue, 10)),
Expiration: int32(time.Now().Add(ttl).Unix()),
})
}
}()

select {
case <-done:
if err != nil {
if errors.Is(err, memcache.ErrNotStored) {
return f.Increment(ctx, window, ttl)
} else {
return 0, errors.Wrap(err, "memcached transaction failed")
}
} else {
return int64(newValue), err
}
case <-ctx.Done():
return 0, ctx.Err()
}
}
2 changes: 1 addition & 1 deletion fixedwindow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func (s *LimitersTestSuite) fixedWindowIncrementers() []l.FixedWindowIncrementer
return []l.FixedWindowIncrementer{
l.NewFixedWindowInMemory(),
l.NewFixedWindowRedis(s.redisClient, uuid.New().String()),
l.NewFixedWindowDynamoDB(s.dynamodbClient, uuid.New().String(), s.dynamoDBTableProps),
l.NewFixedWindowMemcached(s.memcacheClient, uuid.New().String()),
l.NewFixedWindowDynamoDB(s.dynamodbClient, uuid.New().String(), s.dynamoDBTableProps),
}
}

Expand Down
Loading

0 comments on commit b2ed121

Please sign in to comment.