Skip to content

Commit

Permalink
Refactors Badger usage for transform in separated steps
Browse files Browse the repository at this point in the history
  • Loading branch information
cuducos committed Oct 29, 2024
1 parent fc2e4cc commit 6628490
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 64 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,3 @@ require (
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)

// +heroku goVersion go1.22
8 changes: 4 additions & 4 deletions transform/company_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestNewCompany(t *testing.T) {
}

t.Run("with privacy", func(t *testing.T) {
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
Expand All @@ -117,7 +117,7 @@ func TestNewCompany(t *testing.T) {
if err != nil {
t.Errorf("expected no error creating badger, got %s", err)
}
defer kv.close(false)
defer kv.close()
lookups, err := newLookups(testdata)
if err != nil {
t.Errorf("expected no errors creating look up tables, got %v", err)
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestNewCompany(t *testing.T) {
}
})
t.Run("without privacy", func(t *testing.T) {
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
Expand All @@ -273,7 +273,7 @@ func TestNewCompany(t *testing.T) {
if err != nil {
t.Errorf("expected no error creating badger, got %s", err)
}
defer kv.close(false)
defer kv.close()
lookups, err := newLookups(testdata)
if err != nil {
t.Errorf("expected no errors creating look up tables, got %v", err)
Expand Down
28 changes: 11 additions & 17 deletions transform/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,31 +162,25 @@ func (kv *badgerStorage) enrichCompany(c *company) error {
return nil
}

func (b *badgerStorage) close(k bool) error {
b.db.Close()
if !k {
if err := os.RemoveAll(b.path); err != nil {
return fmt.Errorf("error cleaning up badger storage directory: %w", err)
}
}
return nil
func (b *badgerStorage) close() error {
return b.db.Close()
}

type badgerLogger struct{}
type noLogger struct{}

func (*badgerLogger) Errorf(string, ...interface{}) {}
func (*badgerLogger) Warningf(string, ...interface{}) {}
func (*badgerLogger) Infof(string, ...interface{}) {}
func (*badgerLogger) Debugf(string, ...interface{}) {}
func (*noLogger) Errorf(string, ...interface{}) {}
func (*noLogger) Warningf(string, ...interface{}) {}
func (*noLogger) Infof(string, ...interface{}) {}
func (*noLogger) Debugf(string, ...interface{}) {}

func newBadgerStorage(dir string) (*badgerStorage, error) {
var err error
var opt badger.Options
opt := badger.DefaultOptions(dir)
if os.Getenv("DEBUG") != "" {
log.Output(1, fmt.Sprintf("Creating temporary key-value storage at %s", dir))
} else {
opt = opt.WithLogger(&noLogger{})
}
opt = badger.DefaultOptions(dir)
db, err := badger.Open(opt.WithLogger(&badgerLogger{}))
db, err := badger.Open(opt)
if err != nil {
return nil, fmt.Errorf("error creating badger key-value object: %w", err)
}
Expand Down
15 changes: 6 additions & 9 deletions transform/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestBadgerStorageClose(t *testing.T) {
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
Expand All @@ -19,12 +19,9 @@ func TestBadgerStorageClose(t *testing.T) {
if err != nil {
t.Errorf("expected no error creating badger storage, got %s", err)
}
if err := kv.close(false); err != nil {
if err := kv.close(); err != nil {
t.Errorf("expected no error closing badger storage, got %s", err)
}
if _, err := os.Stat(kv.path); err == nil || !os.IsNotExist(err) {
t.Errorf("expected %s to be gone, but got %s when opening it", kv.path, err)
}
}

func TestNewItem(t *testing.T) {
Expand Down Expand Up @@ -65,7 +62,7 @@ func TestLoad(t *testing.T) {
if err != nil {
t.Fatalf("could not create lookups: %s", err)
}
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
Expand All @@ -74,7 +71,7 @@ func TestLoad(t *testing.T) {
if err != nil {
t.Fatalf("could not create badger storage: %s", err)
}
defer kv.close(false)
defer kv.close()
if err := kv.load(testdata, &l); err != nil {
t.Errorf("expected no error loading data, got %s", err)
}
Expand All @@ -94,7 +91,7 @@ func TestEnrichCompany(t *testing.T) {
if err != nil {
t.Fatalf("could not create lookups: %s", err)
}
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
Expand All @@ -103,7 +100,7 @@ func TestEnrichCompany(t *testing.T) {
if err != nil {
t.Fatalf("could not create badger storage: %s", err)
}
defer kv.close(false)
defer kv.close()
if err := kv.load(testdata, &l); err != nil {
t.Errorf("expected no error loading data, got %s", err)
}
Expand Down
62 changes: 32 additions & 30 deletions transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ const (
// BatchSize determines the size of the batches used to create the initial JSON
// data in the database.
BatchSize = 8192

badgerFilePrefix = "minha-receita-badger-"
)

type database interface {
Expand All @@ -33,7 +31,7 @@ type database interface {
type kvStorage interface {
load(string, *lookups) error
enrichCompany(*company) error
close(bool) error
close() error
}

type mode int
Expand Down Expand Up @@ -75,31 +73,24 @@ func saveUpdatedAt(db database, dir string) error {
return db.MetaSave("updated-at", string(v))
}

func runStepOne(dir string, l lookups, isolated bool) (string, error) {
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
if err != nil {
return "", fmt.Errorf("error creating temporary key-value storage: %w", err)
}
kv, err := newBadgerStorage(tmp)
func runStepOne(dir string, pth string, l lookups) error {
kv, err := newBadgerStorage(pth)
if err != nil {
return "", fmt.Errorf("could not create badger storage: %w", err)
return fmt.Errorf("could not create badger storage: %w", err)
}
defer kv.close(isolated)
defer kv.close()
if err := kv.load(dir, &l); err != nil {
return "", fmt.Errorf("error loading data to badger: %w", err)
}
if isolated {
fmt.Println(kv.path)
return fmt.Errorf("error loading data to badger: %w", err)
}
return kv.path, nil
return nil
}

func runStepTwo(dir string, tmp string, db database, l lookups, maxParallelDBQueries, batchSize int, privacy, isolated bool) error {
kv, err := newBadgerStorage(tmp)
func runStepTwo(dir string, pth string, db database, l lookups, maxParallelDBQueries, batchSize int, privacy bool) error {
kv, err := newBadgerStorage(pth)
if err != nil {
return fmt.Errorf("could not create badger storage: %w", err)
}
defer kv.close(isolated)
defer kv.close()
j, err := createJSONRecordsTask(dir, db, &l, kv, batchSize, privacy)
if err != nil {
return fmt.Errorf("error creating new task for venues in %s: %w", dir, err)
Expand All @@ -112,27 +103,38 @@ func runStepTwo(dir string, tmp string, db database, l lookups, maxParallelDBQue

// Transform the downloaded files for company venues creating a database record
// per CNPJ
func Transform(dir string, db database, maxParallelDBQueries, batchSize int, privacy, s1 bool, s2 string) error {
func Transform(dir string, db database, max, s int, p, s1 bool, s2 string) error {
m, err := transformMode(s1, s2)
if err != nil {
return fmt.Errorf("error determining transform mode: %w", err)
}
var tmp string
var pth string
if m == stepTwo {
pth = s2
} else {
pth, err = os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
}
if err != nil {
return fmt.Errorf("error creating temporary key-value storage: %w", err)
}
defer os.RemoveAll(pth)
l, err := newLookups(dir)
if err != nil {
return fmt.Errorf("error creating look up tables from %s: %w", dir, err)
}
if m != stepTwo {
tmp, err = runStepOne(dir, l, m == stepOne)
if err != nil {
return fmt.Errorf("error creating key-value storage: %w", err)
switch m {
case stepOne:
if err := runStepOne(dir, pth, l); err != nil {
return err
}
}
if m != stepOne {
if s2 != "" {
tmp = s2
fmt.Println(pth)
case stepTwo:
return runStepTwo(dir, pth, db, l, max, s, p)
case both:
if err := runStepOne(dir, pth, l); err != nil {
return err
}
return runStepTwo(dir, tmp, db, l, maxParallelDBQueries, batchSize, privacy, m == stepTwo)
return runStepTwo(dir, pth, db, l, max, s, p)
}
return nil
}
4 changes: 2 additions & 2 deletions transform/venues_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

func TestTaskRun(t *testing.T) {
db := newTestDB(t)
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
tmp, err := os.MkdirTemp("", fmt.Sprintf("minha-receita-%s-*", time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
Expand All @@ -18,7 +18,7 @@ func TestTaskRun(t *testing.T) {
if err != nil {
t.Errorf("expected no error creating badger, got %s", err)
}
defer kv.close(false)
defer kv.close()
lookups, err := newLookups(testdata)
if err != nil {
t.Errorf("expected no errors creating look up tables, got %v", err)
Expand Down

0 comments on commit 6628490

Please sign in to comment.