Skip to content

Commit

Permalink
chore: add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 committed Jan 2, 2025
1 parent 5ab3717 commit b013c87
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 19 deletions.
3 changes: 1 addition & 2 deletions warehouse/integrations/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"github.com/rudderlabs/rudder-server/warehouse/integrations/types"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand All @@ -22,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-server/warehouse/integrations/postgres"
"github.com/rudderlabs/rudder-server/warehouse/integrations/redshift"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
"github.com/rudderlabs/rudder-server/warehouse/integrations/types"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)
Expand Down
5 changes: 2 additions & 3 deletions warehouse/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"

"github.com/samber/lo"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down Expand Up @@ -204,7 +203,7 @@ func (r *Router) Start(ctx context.Context) error {
return r.CronTracker(gCtx)
}))
g.Go(crash.NotifyWarehouse(func() error {
return r.syncRemoteSchema(gCtx)
return r.sync(gCtx)
}))
return g.Wait()
}
Expand Down
3 changes: 1 addition & 2 deletions warehouse/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
"testing"
"time"

"golang.org/x/sync/errgroup"

"github.com/ory/dockertest/v3"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"golang.org/x/sync/errgroup"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
Expand Down
24 changes: 14 additions & 10 deletions warehouse/router/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,24 @@ package router

import (
"context"
"encoding/json"
"fmt"
"time"

warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/schema"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

func (r *Router) syncRemoteSchema(ctx context.Context) error {
type syncSchemaRepo interface {
GetLocalSchema(ctx context.Context) (model.Schema, error)
UpdateLocalSchemaWithWarehouse(ctx context.Context, schema model.Schema) error
HasSchemaChanged(schema model.Schema) bool
FetchSchemaFromWarehouse(ctx context.Context, m schema.FetchSchemaRepo) (model.Schema, error)
}

func (r *Router) sync(ctx context.Context) error {
for {
r.configSubscriberLock.RLock()
warehouses := append([]model.Warehouse{}, r.warehouses...)
Expand All @@ -37,10 +42,12 @@ func (r *Router) syncRemoteSchema(ctx context.Context) error {
r.logger.Child("syncer"),
r.statsFactory,
)
if err := r.SyncRemoteSchema(ctx, whManager, sh); err != nil {
if err := r.syncRemoteSchema(ctx, whManager, sh); err != nil {
r.logger.Errorn("failed to sync schema", obskit.Error(err))
whManager.Cleanup(ctx)
continue
}
whManager.Cleanup(ctx)
}
nextExecTime := execTime.Add(r.config.syncSchemaFrequency)
select {
Expand All @@ -52,8 +59,8 @@ func (r *Router) syncRemoteSchema(ctx context.Context) error {
}
}

func (r *Router) SyncRemoteSchema(ctx context.Context, m manager.Manager, sh *schema.Schema) error {
localSchema, err := sh.GetLocalSchema(ctx)
func (r *Router) syncRemoteSchema(ctx context.Context, m schema.FetchSchemaRepo, sh syncSchemaRepo) error {
_, err := sh.GetLocalSchema(ctx)
if err != nil {
return fmt.Errorf("fetching schema from local: %w", err)
}
Expand All @@ -63,9 +70,6 @@ func (r *Router) SyncRemoteSchema(ctx context.Context, m manager.Manager, sh *sc
return fmt.Errorf("fetching schema from warehouse: %w", err)
}

res, _ := json.Marshal(schemaFromWarehouse)
res2, _ := json.Marshal(localSchema)
r.logger.Infof("schema from warehouse %v with local schema %v", string(res), string(res2))
if sh.HasSchemaChanged(schemaFromWarehouse) {
err := sh.UpdateLocalSchemaWithWarehouse(ctx, schemaFromWarehouse)
if err != nil {
Expand Down
261 changes: 261 additions & 0 deletions warehouse/router/sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package router

import (
"context"
"errors"
"fmt"
"reflect"
"testing"
"time"

miniogo "github.com/minio/minio-go/v7"
"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/schema"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type mockFetchSchemaRepo struct {
schemaInWarehouse model.Schema
err error
}

func (m *mockFetchSchemaRepo) FetchSchema(_ context.Context) (model.Schema, error) {
if m.err != nil {
return model.Schema{}, m.err
}
return m.schemaInWarehouse, nil
}

type mockSyncSchemaRepo struct {
getLocalSchemaerr error
updateLocalSchemaErr error
fetchSchemaErr error
hasChanged bool
schemaMap map[string]model.Schema
}

func schemaKey(sourceID, destinationID, namespace string) string {
return fmt.Sprintf("%s_%s_%s", sourceID, destinationID, namespace)
}

func (m *mockSyncSchemaRepo) GetLocalSchema(_ context.Context) (model.Schema, error) {
if m.getLocalSchemaerr != nil {
return model.Schema{}, m.getLocalSchemaerr
}

return model.Schema{}, nil
}

func (m *mockSyncSchemaRepo) UpdateLocalSchemaWithWarehouse(_ context.Context, _ model.Schema) error {
if m.updateLocalSchemaErr != nil && m.hasChanged {
return m.updateLocalSchemaErr
}
return nil
}

func (m *mockSyncSchemaRepo) HasSchemaChanged(_ model.Schema) bool {
return m.hasChanged
}

func (m *mockSyncSchemaRepo) FetchSchemaFromWarehouse(_ context.Context, _ schema.FetchSchemaRepo) (model.Schema, error) {
if m.fetchSchemaErr != nil {
return nil, m.fetchSchemaErr
}

return m.schemaMap[schemaKey("test-sourceID", "test-destinationID", "test-namespace")], nil
}

func TestSync_SyncRemoteSchema(t *testing.T) {
t.Run("fetching schema from local fails", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
getLocalSchemaerr: fmt.Errorf("error fetching schema from local"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
require.True(t, errors.Is(err, mock.getLocalSchemaerr))
})
t.Run("fetching schema from warehouse fails", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
fetchSchemaErr: fmt.Errorf("error fetching schema from warehouse"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
require.True(t, errors.Is(err, mock.fetchSchemaErr))
})
t.Run("schema has changed and updating errors", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
hasChanged: true,
updateLocalSchemaErr: fmt.Errorf("error updating local schema"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.Error(t, err)
})
t.Run("schema has changed and updating errors", func(t *testing.T) {
mock := &mockSyncSchemaRepo{
hasChanged: false,
updateLocalSchemaErr: fmt.Errorf("error updating local schema"),
}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.NoError(t, err)
})
t.Run("fetching schema succeeds with no error", func(t *testing.T) {
mock := &mockSyncSchemaRepo{}
mockFetchSchema := &mockFetchSchemaRepo{}
r := &Router{
logger: logger.NOP,
}
err := r.syncRemoteSchema(context.Background(), mockFetchSchema, mock)
require.NoError(t, err)
})
}

func TestSync_SyncRemoteSchemaIntegration(t *testing.T) {
destinationType := warehouseutils.POSTGRES
bucket := "some-bucket"
sourceID := "test-source-id"
destinationID := "test-destination-id"
workspaceID := "test-workspace-id"
provider := "MINIO"
sslMode := "disable"
testNamespace := "test_namespace"
testTable := "test_table"

ctx, cancel := context.WithCancel(context.Background())
pool, err := dockertest.NewPool("")
require.NoError(t, err)
pgResource, err := postgres.Setup(pool, t)
require.NoError(t, err)
minioResource, err := minio.Setup(pool, t)
require.NoError(t, err)

err = minioResource.Client.MakeBucket(ctx, bucket, miniogo.MakeBucketOptions{
Region: "us-east-1",
})
require.NoError(t, err)
t.Log("db:", pgResource.DBDsn)
conf := config.New()
err = (&migrator.Migrator{
Handle: pgResource.DB,
MigrationsTable: "wh_schema_migrations",
}).Migrate("warehouse")
require.NoError(t, err)

db := sqlmiddleware.New(pgResource.DB)

warehouse := model.Warehouse{
WorkspaceID: workspaceID,
Source: backendconfig.SourceT{
ID: sourceID,
},
Destination: backendconfig.DestinationT{
ID: destinationID,
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: destinationType,
},
Config: map[string]interface{}{
"host": pgResource.Host,
"port": pgResource.Port,
"database": pgResource.Database,
"user": pgResource.User,
"password": pgResource.Password,
"sslMode": sslMode,
"bucketProvider": provider,
"bucketName": minioResource.BucketName,
"accessKeyID": minioResource.AccessKeyID,
"secretAccessKey": minioResource.AccessKeySecret,
"endPoint": minioResource.Endpoint,
},
},
Namespace: "test_namespace",
Identifier: "RS:test-source-id:test-destination-id-create-jobs",
}
r := Router{
logger: logger.NOP,
conf: conf,
db: db,
warehouses: []model.Warehouse{warehouse},
destType: warehouseutils.POSTGRES,
statsFactory: stats.NOP,
}

setupCh := make(chan struct{})
go func() {
defer close(setupCh)
err = r.sync(ctx)
require.NoError(t, err)
}()

t.Run("fetching schema from postgres", func(t *testing.T) {
schemaSql := fmt.Sprintf("CREATE SCHEMA %q;", testNamespace)
_, err = pgResource.DB.Exec(schemaSql)
require.NoError(t, err)

tableSql := fmt.Sprintf(`CREATE TABLE %q.%q (
job_id BIGSERIAL PRIMARY KEY,
workspace_id TEXT NOT NULL DEFAULT '',
uuid UUID NOT NULL,
user_id TEXT NOT NULL,
parameters JSONB NOT NULL,
custom_val VARCHAR(64) NOT NULL,
event_payload JSONB NOT NULL,
event_count INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, testNamespace, testTable)
_, err = pgResource.DB.Exec(tableSql)
require.NoError(t, err)

sh := schema.New(
r.db,
warehouse,
r.conf,
r.logger.Child("syncer"),
r.statsFactory,
)
require.Eventually(t, func() bool {
schema, err := sh.GetLocalSchema(ctx)
require.NoError(t, err)
return reflect.DeepEqual(schema, model.Schema{
"test_table": model.TableSchema{
"created_at": "datetime",
"event_count": "int",
"event_payload": "json",
"expire_at": "datetime",
"job_id": "int",
"parameters": "json",
"user_id": "string",
"workspace_id": "string",
},
})
}, 30*time.Second, 100*time.Millisecond)
cancel()
})
<-setupCh
}
4 changes: 2 additions & 2 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type stagingFileRepo interface {
GetSchemasByIDs(ctx context.Context, ids []int64) ([]model.Schema, error)
}

type fetchSchemaRepo interface {
type FetchSchemaRepo interface {
FetchSchema(ctx context.Context) (model.Schema, error)
}

Expand Down Expand Up @@ -309,7 +309,7 @@ func (sh *Schema) GetLocalSchema(ctx context.Context) (model.Schema, error) {
// 1. Fetches schema from warehouse
// 2. Removes deprecated columns from schema
// 3. Updates local warehouse schema and unrecognized schema instance
func (sh *Schema) FetchSchemaFromWarehouse(ctx context.Context, repo fetchSchemaRepo) (model.Schema, error) {
func (sh *Schema) FetchSchemaFromWarehouse(ctx context.Context, repo FetchSchemaRepo) (model.Schema, error) {
warehouseSchema, err := repo.FetchSchema(ctx)
if err != nil {
return nil, fmt.Errorf("fetching schema: %w", err)
Expand Down

0 comments on commit b013c87

Please sign in to comment.