Skip to content

Commit

Permalink
add LockPostgreSQL and BenchmarkDistLockers
Browse files Browse the repository at this point in the history
  • Loading branch information
leeym committed Mar 15, 2024
1 parent c1474e1 commit 52e5fb6
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ jobs:
ALLOW_ANONYMOUS_LOGIN: yes
ports:
- 2181:2181
postgresql:
image: bitnami/postgresql
env:
ALLOW_EMPTY_PASSWORD: yes
ports:
- 5432:5432

steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ all: test

test:
docker-compose up -d
ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" MEMCACHED_ADDR="127.0.0.1:11211" go test -race -v -failfast
ETCD_ENDPOINTS="127.0.0.1:2379" REDIS_ADDR="127.0.0.1:6379" ZOOKEEPER_ENDPOINTS="127.0.0.1" CONSUL_ADDR="127.0.0.1:8500" AWS_ADDR="127.0.0.1:8000" MEMCACHED_ADDR="127.0.0.1:11211" go test -race -v -failfast -bench=.
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,10 @@ services:
command: "-jar DynamoDBLocal.jar -inMemory"
ports:
- "8000:8000"

postgresql:
image: bitnami/postgresql
environment:
ALLOW_EMPTY_PASSWORD: yes
ports:
- "5432:5432"
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/aws/aws-sdk-go-v2 v1.17.6
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.19.1
github.com/cenkalti/backoff/v3 v3.2.2
github.com/lib/pq v1.10.9
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA=
github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA=
github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o=
Expand Down
31 changes: 26 additions & 5 deletions limiters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package limiters_test

import (
"context"
"database/sql"
"fmt"
"hash/fnv"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -71,6 +73,7 @@ type LimitersTestSuite struct {
dynamodbClient *dynamodb.Client
dynamoDBTableProps l.DynamoDBTableProperties
memcacheClient *memcache.Client
pgDb *sql.DB
}

func (s *LimitersTestSuite) SetupSuite() {
Expand Down Expand Up @@ -118,13 +121,19 @@ func (s *LimitersTestSuite) SetupSuite() {

s.memcacheClient = memcache.New(strings.Split(os.Getenv("MEMCACHED_ADDR"), ",")...)
s.Require().NoError(s.memcacheClient.Ping())

psqlconn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", "localhost", 5432, "postgres", "", "")
s.pgDb, err = sql.Open("postgres", psqlconn)
s.Require().NoError(err)
s.Require().NoError(s.pgDb.Ping())
}

func (s *LimitersTestSuite) TearDownSuite() {
s.Assert().NoError(s.etcdClient.Close())
s.Assert().NoError(s.redisClient.Close())
s.Assert().NoError(DeleteTestDynamoDBTable(context.Background(), s.dynamodbClient))
s.Assert().NoError(s.memcacheClient.Close())
s.Assert().NoError(s.pgDb.Close())
}

func TestLimitersTestSuite(t *testing.T) {
Expand All @@ -138,6 +147,15 @@ func (s *LimitersTestSuite) lockers(generateKeys bool) map[string]l.DistLocker {
return lockers
}

func hash(s string) uint32 {
h := fnv.New32a()
_, err := h.Write([]byte(s))
if err != nil {
panic(err)
}
return h.Sum32()
}

// distLockers returns distributed lockers only.
func (s *LimitersTestSuite) distLockers(generateKeys bool) map[string]l.DistLocker {
randomKey := uuid.New().String()
Expand All @@ -146,21 +164,24 @@ func (s *LimitersTestSuite) distLockers(generateKeys bool) map[string]l.DistLock
zkKey := "/" + randomKey
redisKey := randomKey
memcacheKey := randomKey
pgKey := randomKey
if !generateKeys {
consulKey = "dist_locker"
etcdKey = "dist_locker"
zkKey = "/dist_locker"
redisKey = "dist_locker"
memcacheKey = "dist_locker"
pgKey = "dist_locker"
}
consulLock, err := s.consulClient.LockKey(consulKey)
s.Require().NoError(err)
return map[string]l.DistLocker{
"LockEtcd": l.NewLockEtcd(s.etcdClient, etcdKey, s.logger),
"LockConsul": l.NewLockConsul(consulLock),
"LockZookeeper": l.NewLockZookeeper(zk.NewLock(s.zkConn, zkKey, zk.WorldACL(zk.PermAll))),
"LockRedis": l.NewLockRedis(goredis.NewPool(s.redisClient), redisKey),
"LockMemcached": l.NewLockMemcached(s.memcacheClient, memcacheKey),
"LockEtcd": l.NewLockEtcd(s.etcdClient, etcdKey, s.logger),
"LockConsul": l.NewLockConsul(consulLock),
"LockZookeeper": l.NewLockZookeeper(zk.NewLock(s.zkConn, zkKey, zk.WorldACL(zk.PermAll))),
"LockRedis": l.NewLockRedis(goredis.NewPool(s.redisClient), redisKey),
"LockMemcached": l.NewLockMemcached(s.memcacheClient, memcacheKey),
"LockPostgreSQL": l.NewLockPostgreSQL(s.pgDb, hash(pgKey)),
}
}

Expand Down
28 changes: 28 additions & 0 deletions locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package limiters

import (
"context"
"database/sql"
"github.com/alessandro-c/gomemcached-lock"
"github.com/alessandro-c/gomemcached-lock/adapters/gomemcache"
"github.com/bradfitz/gomemcache/memcache"
"github.com/cenkalti/backoff/v3"
"github.com/go-redsync/redsync/v4"
redsyncredis "github.com/go-redsync/redsync/v4/redis"
"github.com/hashicorp/consul/api"
_ "github.com/lib/pq"
"github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -187,3 +189,29 @@ func (l *LockMemcached) Lock(ctx context.Context) error {
func (l *LockMemcached) Unlock(ctx context.Context) error {
return l.locker.Release()
}

// LockPostgreSQL is an implementation of the DistLocker interface using PostgreSQL's advisory lock.
type LockPostgreSQL struct {
db *sql.DB
id uint32
}

// NewLockPostgreSQL creates a new LockPostgreSQL.
func NewLockPostgreSQL(db *sql.DB, id uint32) *LockPostgreSQL {
return &LockPostgreSQL{db, id}
}

// Make sure LockPostgreSQL implements DistLocker interface
var _ DistLocker = (*LockPostgreSQL)(nil)

// Lock acquire an advisory lock in PostgreSQL
func (l *LockPostgreSQL) Lock(ctx context.Context) error {
_, err := l.db.ExecContext(ctx, "SELECT pg_advisory_lock($1)", l.id)
return err
}

// Unlock releases an advisory lock in PostgreSQL
func (l *LockPostgreSQL) Unlock(ctx context.Context) error {
_, err := l.db.ExecContext(ctx, "SELECT pg_advisory_unlock($1)", l.id)
return err
}
17 changes: 17 additions & 0 deletions locks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package limiters_test
import (
"context"
"sync"
"testing"
"time"

"github.com/mennanov/limiters"
Expand Down Expand Up @@ -44,3 +45,19 @@ func (s *LimitersTestSuite) TestDistLockers() {
})
}
}

func BenchmarkDistLockers(b *testing.B) {
s := new(LimitersTestSuite)
s.SetT(&testing.T{})
s.SetupSuite()
lockers := s.distLockers(false)
for name, locker := range lockers {
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
s.Require().NoError(locker.Lock(context.Background()))
s.Require().NoError(locker.Unlock(context.Background()))
}
})
}
s.TearDownSuite()
}

0 comments on commit 52e5fb6

Please sign in to comment.