Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add memcached #33

Merged
merged 12 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ jobs:
ALLOW_EMPTY_PASSWORD: yes
ports:
- 6379:6379
memcached:
image: bitnami/memcached
ports:
- 11211:11211
consul:
image: bitnami/consul
ports:
Expand Down Expand Up @@ -51,6 +55,7 @@ jobs:
CONSUL_ADDR: 'localhost:8500'
ZOOKEEPER_ENDPOINTS: 'localhost:2181'
AWS_ADDR: 'localhost:8000'
MEMCACHED_ADDR: '127.0.0.1:11211'
run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./...
- uses: codecov/codecov-action@v3
with:
Expand Down
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Most common implementations are already provided.
- [`Token bucket`](https://en.wikipedia.org/wiki/Token_bucket)
- in-memory (local)
- redis
- memcached
- etcd
- dynamodb

Expand All @@ -21,6 +22,7 @@ Most common implementations are already provided.
- [`Leaky bucket`](https://en.wikipedia.org/wiki/Leaky_bucket#As_a_queue)
- in-memory (local)
- redis
- memcached
- etcd
- dynamodb

Expand All @@ -31,6 +33,7 @@ Most common implementations are already provided.
- [`Fixed window counter`](https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm/)
- in-memory (local)
- redis
- memcached
- dynamodb

Simple and resources efficient algorithm that does not need a lock.
Expand All @@ -40,6 +43,7 @@ Most common implementations are already provided.
- [`Sliding window counter`](https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm/)
- in-memory (local)
- redis
- memcached
- dynamodb

Smoothes out the bursts around the boundary between 2 adjacent windows.
Expand All @@ -50,6 +54,7 @@ Most common implementations are already provided.
- `Concurrent buffer`
- in-memory (local)
- redis
- memcached

Allows concurrent requests up to the given capacity.
Requires a lock (provided).
Expand Down Expand Up @@ -122,13 +127,23 @@ Supported backends:
- [Consul](https://www.consul.io/)
- [Zookeeper](https://zookeeper.apache.org/)
- [Redis](https://redis.io/)
- [Memcached](https://memcached.org/)

## Memcached

It's important to understand that memcached is not ideal for implementing reliable locks or data persistence due to its inherent limitations:

- No guaranteed data retention: Memcached can evict data at any point due to memory pressure, even if it appears to have space available. This can lead to unexpected lock releases or data loss.
- Lack of distributed locking features: Memcached doesn't offer functionalities like distributed coordination required for consistent locking across multiple servers.

If memcached exists already and it is okay to handle burst traffic caused by unexpected evicted data, Memcached-based implementations are convenient, otherwise Redis-based implementations will be better choices.

## Testing

Run tests locally:
```bash
docker-compose up -d # start etcd, Redis, zookeeper, consul, and localstack
ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" go test -race -v
docker-compose up -d # start etcd, Redis, memcached, zookeeper, consul, and localstack
ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" MEMCACHED_ADDR="127.0.0.1:11211" go test -race -v
```

Run [Drone](https://drone.io) CI tests locally:
Expand Down
122 changes: 122 additions & 0 deletions concurrent_buffer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package limiters

import (
"bytes"
"context"
"encoding/gob"
"fmt"
"sync"
"time"

"github.com/bradfitz/gomemcache/memcache"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
)
Expand Down Expand Up @@ -152,3 +155,122 @@ func (c *ConcurrentBufferRedis) Add(ctx context.Context, key string) (int64, err
func (c *ConcurrentBufferRedis) Remove(ctx context.Context, key string) error {
return errors.Wrap(c.cli.ZRem(ctx, c.key, key).Err(), "failed to remove an item from redis set")
}

// ConcurrentBufferMemcached implements ConcurrentBufferBackend in Memcached.
type ConcurrentBufferMemcached struct {
clock Clock
cli *memcache.Client
key string
ttl time.Duration
}

// NewConcurrentBufferMemcached creates a new instance of ConcurrentBufferMemcached.
// When the TTL of a key exceeds the key is removed from the buffer. This is needed in case if the process that added
// that key to the buffer did not call Done() for some reason.
func NewConcurrentBufferMemcached(cli *memcache.Client, key string, ttl time.Duration, clock Clock) *ConcurrentBufferMemcached {
return &ConcurrentBufferMemcached{clock: clock, cli: cli, key: key, ttl: ttl}
}

type SortedSetNode struct {
CreatedAt int64
Value string
}

// Add adds the request with the given key to the slice in Memcached and returns the total number of requests in it.
// It also removes the keys with expired TTL.
func (c *ConcurrentBufferMemcached) Add(ctx context.Context, element string) (int64, error) {
var err error
done := make(chan struct{})
now := c.clock.Now()
var newNodes []SortedSetNode
var casId uint64 = 0

go func() {
defer close(done)
var item *memcache.Item
item, err = c.cli.Get(c.key)
if err != nil {
if !errors.Is(err, memcache.ErrCacheMiss) {
return
}
} else {
casId = item.CasID
b := bytes.NewBuffer(item.Value)
var oldNodes []SortedSetNode
_ = gob.NewDecoder(b).Decode(&oldNodes)
for _, node := range oldNodes {
if node.CreatedAt > now.UnixNano() {
newNodes = append(newNodes, node)
}
}
}
newNodes = append(newNodes, SortedSetNode{CreatedAt: now.Add(c.ttl).UnixNano(), Value: element})
var b bytes.Buffer
_ = gob.NewEncoder(&b).Encode(newNodes)
item = &memcache.Item{
Key: c.key,
Value: b.Bytes(),
CasID: casId,
}
if casId > 0 {
err = c.cli.CompareAndSwap(item)
} else {
err = c.cli.Add(item)
}
}()

select {
case <-ctx.Done():
return 0, ctx.Err()

case <-done:
if err != nil {
if errors.Is(err, memcache.ErrCASConflict) || errors.Is(err, memcache.ErrNotStored) || errors.Is(err, memcache.ErrCacheMiss) {
return c.Add(ctx, element)
}
return 0, errors.Wrap(err, "failed to add in memcached")
}
return int64(len(newNodes)), nil
}
}

// Remove removes the request identified by the key from the slice in Memcached.
func (c *ConcurrentBufferMemcached) Remove(ctx context.Context, key string) error {
var err error
now := c.clock.Now()
var newNodes []SortedSetNode
var casId uint64 = 0
deleted := false
item, err := c.cli.Get(c.key)
if err != nil {
if errors.Is(err, memcache.ErrCacheMiss) {
return nil
}
return errors.Wrap(err, "failed to Get")
}
casId = item.CasID
var oldNodes []SortedSetNode
_ = gob.NewDecoder(bytes.NewBuffer(item.Value)).Decode(&oldNodes)
for _, node := range oldNodes {
if node.CreatedAt > now.UnixNano() {
if node.Value == key && !deleted {
deleted = true
} else {
newNodes = append(newNodes, node)
}
}
}

var b bytes.Buffer
_ = gob.NewEncoder(&b).Encode(newNodes)
item = &memcache.Item{
Key: c.key,
Value: b.Bytes(),
CasID: casId,
}
err = c.cli.CompareAndSwap(item)
if err != nil && (errors.Is(err, memcache.ErrCASConflict) || errors.Is(err, memcache.ErrNotStored) || errors.Is(err, memcache.ErrCacheMiss)) {
return c.Remove(ctx, key)
}
return errors.Wrap(err, "failed to CompareAndSwap")
}
1 change: 1 addition & 0 deletions concurrent_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (s *LimitersTestSuite) concurrentBufferBackends(ttl time.Duration, clock l.
return []l.ConcurrentBufferBackend{
l.NewConcurrentBufferInMemory(l.NewRegistry(), ttl, clock),
l.NewConcurrentBufferRedis(s.redisClient, uuid.New().String(), ttl, clock),
l.NewConcurrentBufferMemcached(s.memcacheClient, uuid.New().String(), ttl, clock),
}
}

Expand Down
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ services:
ports:
- "6379:6379"

memcached:
image: bitnami/memcached
ports:
- "11211:11211"

consul:
image: bitnami/consul
ports:
Expand Down
46 changes: 46 additions & 0 deletions fixedwindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/bradfitz/gomemcache/memcache"
"github.com/pkg/errors"
"github.com/redis/go-redis/v9"
)
Expand Down Expand Up @@ -132,6 +133,51 @@ func (f *FixedWindowRedis) Increment(ctx context.Context, window time.Time, ttl
}
}

// FixedWindowMemcached implements FixedWindow in Memcached.
type FixedWindowMemcached struct {
cli *memcache.Client
prefix string
}

// NewFixedWindowMemcached returns a new instance of FixedWindowMemcached.
// Prefix is the key prefix used to store all the keys used in this implementation in Memcached.
func NewFixedWindowMemcached(cli *memcache.Client, prefix string) *FixedWindowMemcached {
return &FixedWindowMemcached{cli: cli, prefix: prefix}
}

// Increment increments the window's counter in Memcached.
func (f *FixedWindowMemcached) Increment(ctx context.Context, window time.Time, ttl time.Duration) (int64, error) {
var newValue uint64
var err error
done := make(chan struct{})
go func() {
defer close(done)
key := fmt.Sprintf("%s:%d", f.prefix, window.UnixNano())
newValue, err = f.cli.Increment(key, 1)
if err != nil && errors.Is(err, memcache.ErrCacheMiss) {
newValue = 1
item := &memcache.Item{
Key: key,
Value: []byte(strconv.FormatUint(newValue, 10)),
}
err = f.cli.Add(item)
}
}()

select {
case <-done:
if err != nil {
if errors.Is(err, memcache.ErrNotStored) {
return f.Increment(ctx, window, ttl)
}
return 0, errors.Wrap(err, "failed to Increment or Add")
}
return int64(newValue), err
case <-ctx.Done():
return 0, ctx.Err()
}
}

// FixedWindowDynamoDB implements FixedWindow in DynamoDB.
type FixedWindowDynamoDB struct {
client *dynamodb.Client
Expand Down
1 change: 1 addition & 0 deletions fixedwindow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (s *LimitersTestSuite) fixedWindowIncrementers() []l.FixedWindowIncrementer
return []l.FixedWindowIncrementer{
l.NewFixedWindowInMemory(),
l.NewFixedWindowRedis(s.redisClient, uuid.New().String()),
l.NewFixedWindowMemcached(s.memcacheClient, uuid.New().String()),
l.NewFixedWindowDynamoDB(s.dynamodbClient, uuid.New().String(), s.dynamoDBTableProps),
}
}
Expand Down
Loading
Loading