Skip to content

Commit

Permalink
Merge branch 'master' into feat.fetchRemoteSchemaAsync
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Dec 9, 2024
2 parents ff6aca3 + 446666b commit 9415887
Show file tree
Hide file tree
Showing 18 changed files with 1,185 additions and 71 deletions.
156 changes: 156 additions & 0 deletions enterprise/reporting/event_sampler/badger_event_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package event_sampler

import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/badger/v4/options"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/misc"
)

type BadgerEventSampler struct {
db *badger.DB
mu sync.Mutex
ttl config.ValueLoader[time.Duration]
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func DefaultPath(pathName string) (string, error) {
tmpDirPath, err := misc.CreateTMPDIR()
if err != nil {
return "", err
}
return fmt.Sprintf(`%v%v`, tmpDirPath, pathName), nil
}

func NewBadgerEventSampler(ctx context.Context, pathName string, ttl config.ValueLoader[time.Duration], conf *config.Config, log logger.Logger) (*BadgerEventSampler, error) {
dbPath, err := DefaultPath(pathName)
if err != nil || dbPath == "" {
return nil, err
}

opts := badger.DefaultOptions(dbPath).
WithLogger(badgerLogger{log}).
WithCompression(options.None).
WithIndexCacheSize(16 << 20). // 16mb
WithNumGoroutines(1).
WithBlockCacheSize(0).
WithNumVersionsToKeep(1).
WithNumMemtables(conf.GetInt("Reporting.eventSampling.badgerDB.numMemtable", 5)).
WithValueThreshold(conf.GetInt64("Reporting.eventSampling.badgerDB.valueThreshold", 1048576)).
WithNumLevelZeroTables(conf.GetInt("Reporting.eventSampling.badgerDB.numLevelZeroTables", 5)).
WithNumLevelZeroTablesStall(conf.GetInt("Reporting.eventSampling.badgerDB.numLevelZeroTablesStall", 15)).
WithSyncWrites(conf.GetBool("Reporting.eventSampling.badgerDB.syncWrites", false)).
WithDetectConflicts(conf.GetBool("Reporting.eventSampling.badgerDB.detectConflicts", false))

ctx, cancel := context.WithCancel(ctx)

db, err := badger.Open(opts)

es := &BadgerEventSampler{
db: db,
ttl: ttl,
ctx: ctx,
cancel: cancel,
wg: sync.WaitGroup{},
}

if err != nil {
return nil, err
}

es.wg.Add(1)
rruntime.Go(func() {
defer es.wg.Done()
es.gcLoop()
})

return es, nil
}

func (es *BadgerEventSampler) Get(key string) (bool, error) {
es.mu.Lock()
defer es.mu.Unlock()

var found bool

err := es.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
}

found = item != nil
return nil
})

if err == badger.ErrKeyNotFound {
return false, nil
} else if err != nil {
return false, err
}

return found, nil
}

func (es *BadgerEventSampler) Put(key string) error {
es.mu.Lock()
defer es.mu.Unlock()

return es.db.Update(func(txn *badger.Txn) error {
entry := badger.NewEntry([]byte(key), []byte{1}).WithTTL(es.ttl.Load())
return txn.SetEntry(entry)
})
}

func (es *BadgerEventSampler) gcLoop() {
for {
select {
case <-es.ctx.Done():
_ = es.db.RunValueLogGC(0.5)
return
case <-time.After(5 * time.Minute):
}
again:
if es.ctx.Err() != nil {
return
}
// One call would only result in removal of at max one log file.
// As an optimization, you could also immediately re-run it whenever it returns nil error
// (this is why `goto again` is used).
err := es.db.RunValueLogGC(0.5)
if err == nil {
goto again
}
}
}

func (es *BadgerEventSampler) Close() {
es.cancel()
es.wg.Wait()
if es.db != nil {
_ = es.db.Close()
}
}

type badgerLogger struct {
logger.Logger
}

func (badgerLogger) Errorf(format string, a ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, format, a...)
}

func (badgerLogger) Warningf(format string, a ...interface{}) {
_, _ = fmt.Fprintf(os.Stderr, format, a...)
}
45 changes: 45 additions & 0 deletions enterprise/reporting/event_sampler/event_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package event_sampler

import (
"context"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)

const (
BadgerTypeEventSampler = "badger"
InMemoryCacheTypeEventSampler = "in_memory_cache"
BadgerEventSamplerPathName = "/reporting-badger"
)

type EventSampler interface {
Put(key string) error
Get(key string) (bool, error)
Close()
}

func NewEventSampler(
ctx context.Context,
ttl config.ValueLoader[time.Duration],
eventSamplerType config.ValueLoader[string],
eventSamplingCardinality config.ValueLoader[int],
conf *config.Config,
log logger.Logger,
) (es EventSampler, err error) {
switch eventSamplerType.Load() {
case BadgerTypeEventSampler:
es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log)
case InMemoryCacheTypeEventSampler:
es, err = NewInMemoryCacheEventSampler(ctx, ttl, eventSamplingCardinality)
default:
log.Warnf("invalid event sampler type: %s. Using default badger event sampler", eventSamplerType.Load())
es, err = NewBadgerEventSampler(ctx, BadgerEventSamplerPathName, ttl, conf, log)
}

if err != nil {
return nil, err
}
return es, nil
}
178 changes: 178 additions & 0 deletions enterprise/reporting/event_sampler/event_sampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package event_sampler

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)

func TestBadger(t *testing.T) {
ctx := context.Background()
conf := config.New()
ttl := conf.GetReloadableDurationVar(3000, time.Millisecond, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("badger", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(10, 1, "Reporting.eventSampling.cardinality")
log := logger.NewLogger()

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
assert.False(t, val4, "Expected key4 to not be present")
es.Close()
})

t.Run("should not get evicted keys", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())

es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
defer es.Close()

_ = es.Put("key1")

require.Eventually(t, func() bool {
val1, _ := es.Get("key1")
return !val1
}, 1*time.Second, 50*time.Millisecond)
})
}

func TestInMemoryCache(t *testing.T) {
ctx := context.Background()
conf := config.New()
eventSamplerType := conf.GetReloadableStringVar("in_memory_cache", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(3, 1, "Reporting.eventSampling.cardinality")
ttl := conf.GetReloadableDurationVar(3000, time.Millisecond, "Reporting.eventSampling.durationInMinutes")
log := logger.NewLogger()

t.Run("should put and get keys", func(t *testing.T) {
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
assert.False(t, val4, "Expected key4 to not be present")
})

t.Run("should not get evicted keys", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 100)
assert.Equal(t, 100*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")

require.Eventually(t, func() bool {
val1, _ := es.Get("key1")
return !val1
}, 1*time.Second, 50*time.Millisecond)
})

t.Run("should not add keys if length exceeds", func(t *testing.T) {
conf.Set("Reporting.eventSampling.durationInMinutes", 3000)
assert.Equal(t, 3000*time.Millisecond, ttl.Load())
es, _ := NewEventSampler(ctx, ttl, eventSamplerType, eventSamplingCardinality, conf, log)
_ = es.Put("key1")
_ = es.Put("key2")
_ = es.Put("key3")
_ = es.Put("key4")
_ = es.Put("key5")

val1, _ := es.Get("key1")
val2, _ := es.Get("key2")
val3, _ := es.Get("key3")
val4, _ := es.Get("key4")
val5, _ := es.Get("key5")

assert.True(t, val1, "Expected key1 to be present")
assert.True(t, val2, "Expected key2 to be present")
assert.True(t, val3, "Expected key3 to be present")
assert.False(t, val4, "Expected key4 to not be added")
assert.False(t, val5, "Expected key5 to not be added")
})
}

func BenchmarkEventSampler(b *testing.B) {
testCases := []struct {
name string
eventSamplerType string
}{
{
name: "Badger",
eventSamplerType: "badger",
},
{
name: "InMemoryCache",
eventSamplerType: "in_memory_cache",
},
}

ctx := context.Background()
conf := config.New()
ttl := conf.GetReloadableDurationVar(1, time.Minute, "Reporting.eventSampling.durationInMinutes")
eventSamplerType := conf.GetReloadableStringVar("default", "Reporting.eventSampling.type")
eventSamplingCardinality := conf.GetReloadableIntVar(10, 1, "Reporting.eventSampling.cardinality")
log := logger.NewLogger()

for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
conf.Set("Reporting.eventSampling.type", tc.eventSamplerType)

eventSampler, err := NewEventSampler(
ctx,
ttl,
eventSamplerType,
eventSamplingCardinality,
conf,
log,
)
require.NoError(b, err)

b.Run("Put", func(b *testing.B) {
for i := 0; i < b.N; i++ {
key := uuid.New().String()
err := eventSampler.Put(key)
require.NoError(b, err)
}
})

b.Run("Get", func(b *testing.B) {
for i := 0; i < b.N; i++ {
key := uuid.New().String()

err := eventSampler.Put(key)
require.NoError(b, err)

_, err = eventSampler.Get(key)
require.NoError(b, err)
}
})

eventSampler.Close()
})
}
}
Loading

0 comments on commit 9415887

Please sign in to comment.