Skip to content

Commit

Permalink
refactor: review comments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Dec 30, 2024
1 parent 9df59d0 commit e268f73
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,26 @@ import (
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type snowpipeAuthzError struct {
err error
type errCode int

const (
errBackoff errCode = iota
errAuthz
)

type snowflakeConnectionErr struct {
code errCode
err error
}

func newSnowflakeConnectionErr(code errCode, err error) *snowflakeConnectionErr {
return &snowflakeConnectionErr{
code: code,
err: err,
}
}

func (sae *snowpipeAuthzError) Error() string {
func (sae *snowflakeConnectionErr) Error() string {
return sae.err.Error()
}

Expand Down Expand Up @@ -74,8 +89,7 @@ func (m *Manager) addColumns(ctx context.Context, namespace, tableName string, c
snowflakeManager.Cleanup(ctx)
}()
if err = snowflakeManager.AddColumns(ctx, tableName, columns); err != nil {
m.authzBackoff.set()
return &snowpipeAuthzError{fmt.Errorf("adding column: %w", err)}
return newSnowflakeConnectionErr(errAuthz, fmt.Errorf("adding column: %w", err))

Check warning on line 92 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L92

Added line #L92 was not covered by tests
}
return nil
}
Expand Down Expand Up @@ -166,12 +180,10 @@ func (m *Manager) handleSchemaError(
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateSchema(ctx); err != nil {
m.authzBackoff.set()
return nil, &snowpipeAuthzError{fmt.Errorf("creating schema: %w", err)}
return nil, newSnowflakeConnectionErr(errAuthz, fmt.Errorf("creating schema: %w", err))
}
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
m.authzBackoff.set()
return nil, &snowpipeAuthzError{fmt.Errorf("creating table: %w", err)}
return nil, newSnowflakeConnectionErr(errAuthz, fmt.Errorf("creating table: %w", err))

Check warning on line 186 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L186

Added line #L186 was not covered by tests
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand All @@ -196,8 +208,7 @@ func (m *Manager) handleTableError(
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
m.authzBackoff.set()
return nil, &snowpipeAuthzError{fmt.Errorf("creating table: %w", err)}
return nil, newSnowflakeConnectionErr(errAuthz, fmt.Errorf("creating table: %w", err))

Check warning on line 211 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L211

Added line #L211 was not covered by tests
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand Down Expand Up @@ -237,8 +248,8 @@ func (m *Manager) deleteChannel(ctx context.Context, tableName, channelID string
}

func (m *Manager) createSnowflakeManager(ctx context.Context, namespace string) (manager.Manager, error) {
if m.now().Before(m.authzBackoff.nextBackoffTime()) {
return nil, &snowpipeAuthzError{fmt.Errorf("skipping snowflake manager creation due to backoff")}
if m.authzBackoff.isInBackoff() {
return nil, newSnowflakeConnectionErr(errBackoff, fmt.Errorf("skipping snowflake manager creation due to backoff"))
}
modelWarehouse := whutils.ModelWarehouse{
WorkspaceID: m.destination.WorkspaceID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,29 @@ import (

var json = jsoniter.ConfigCompatibleWithStandardLibrary

type systemClock struct{}

func (t systemClock) Now() time.Time {
return timeutil.Now()
}

func New(
conf *config.Config,
mLogger logger.Logger,
log logger.Logger,
statsFactory stats.Stats,
destination *backendconfig.DestinationT,
) *Manager {
clock := systemClock{}
m := &Manager{
appConfig: conf,
logger: mLogger.Child("snowpipestreaming").Withn(
logger: log.Child("snowpipestreaming").Withn(
obskit.WorkspaceID(destination.WorkspaceID),
obskit.DestinationID(destination.ID),
obskit.DestinationType(destination.DestinationDefinition.Name),
),
statsFactory: statsFactory,
destination: destination,
now: timeutil.Now,
now: clock.Now,
channelCache: sync.Map{},
polledImportInfoMap: make(map[string]*importInfo),
}
Expand All @@ -69,7 +76,7 @@ func New(
m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryMax", 5)
m.config.instanceID = conf.GetString("INSTANCE_ID", "1")
m.config.maxBufferCapacity = conf.GetReloadableInt64Var(512*bytesize.KB, bytesize.B, "SnowpipeStreaming.maxBufferCapacity")
m.authzBackoff = newAuthzBackoff(conf.GetDuration("SnowpipeStreaming.backoffDuration", 1, time.Second))
m.authzBackoff = newAuthzBackoff(conf.GetDuration("SnowpipeStreaming.backoffDuration", 1, time.Second), clock)

tags := stats.Tags{
"module": "batch_router",
Expand Down Expand Up @@ -168,15 +175,12 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU

discardsChannel, err := m.initializeChannelWithSchema(ctx, asyncDest.Destination.ID, &destConf, discardsTable(), discardsSchema())
if err != nil {
var authzErr *snowpipeAuthzError
if errors.As(err, &authzErr) {
// Ignoring this error so that the jobs are marked as failed and not aborted since
// we want these jobs to be retried the next time.
m.logger.Warnn("Failed to initialize channel with schema",
logger.NewStringField("table", discardsTable()),
obskit.Error(err),
)
shouldResetBackoff = false
var sfConnectionErr *snowflakeConnectionErr
if errors.As(err, &sfConnectionErr) {
if sfConnectionErr.code == errAuthz {
m.authzBackoff.set()
}
return m.failedJobs(asyncDest, err.Error())
} else {
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
Expand Down Expand Up @@ -214,11 +218,12 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
for _, info := range uploadInfos {
imInfo, discardImInfo, err := m.sendEventsToSnowpipe(ctx, asyncDest.Destination.ID, &destConf, info)
if err != nil {
var authzErr *snowpipeAuthzError
if errors.As(err, &authzErr) {
if shouldResetBackoff {
shouldResetBackoff = false
var sfConnectionErr *snowflakeConnectionErr
if errors.As(err, &sfConnectionErr) {
if sfConnectionErr.code == errAuthz {
m.authzBackoff.set()
}
shouldResetBackoff = false
}
m.logger.Warnn("Failed to send events to Snowpipe",
logger.NewStringField("table", info.tableName),
Expand Down Expand Up @@ -398,6 +403,16 @@ func (m *Manager) abortJobs(asyncDest *common.AsyncDestinationStruct, abortReaso
}
}

func (m *Manager) failedJobs(asyncDest *common.AsyncDestinationStruct, failedReason string) common.AsyncUploadOutput {
m.stats.jobs.failed.Count(len(asyncDest.ImportingJobIDs))
return common.AsyncUploadOutput{
ImportingJobIDs: asyncDest.ImportingJobIDs,
ImportingCount: len(asyncDest.ImportingJobIDs),
FailedReason: failedReason,
DestinationID: asyncDest.Destination.ID,
}
}

// Poll checks the status of multiple imports using the import ID from pollInput.
// For the once which have reached the terminal state (success or failure), it caches the import infos in polledImportInfoMap. Later if Poll is called again, it does not need to do the status check again.
// Once all the imports have reached the terminal state, if any imports have failed, it deletes the channels for those imports.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,31 @@ func (m *mockAPI) GetStatus(_ context.Context, channelID string) (*model.StatusR

type mockManager struct {
manager.Manager
throwSchemaErr bool
createSchemaErr error
}

func newMockManager(m manager.Manager, throwSchemaErr bool) *mockManager {
func newMockManager(m manager.Manager) *mockManager {
return &mockManager{
Manager: m,
throwSchemaErr: throwSchemaErr,
Manager: m,
}
}

func (m *mockManager) CreateSchema(ctx context.Context) (err error) {
if m.throwSchemaErr {
return fmt.Errorf("failed to create schema")
}
return nil
return m.createSchemaErr
}

func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap manager.ModelTableSchema) (err error) {
func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap whutils.ModelTableSchema) (err error) {
return nil
}

type mockClock struct {
nowTime time.Time
}

func (m *mockClock) Now() time.Time {
return m.nowTime
}

var (
usersChannelResponse = &model.ChannelResponse{
ChannelID: "test-users-channel",
Expand Down Expand Up @@ -358,49 +362,47 @@ func TestSnowpipeStreaming(t *testing.T) {
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
return newMockManager(sm, true), nil
mockManager := newMockManager(sm)
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
return mockManager, nil
}
sm.authzBackoff = newAuthzBackoff(time.Second * 10)
mockClock := &mockClock{}
mockClock.nowTime = sm.now()
sm.authzBackoff = newAuthzBackoff(time.Second*10, mockClock)
asyncDestStruct := &common.AsyncDestinationStruct{
Destination: destination,
FileName: "testdata/successful_user_records.txt",
}
require.Equal(t, false, sm.authzBackoff.isInBackoff())
output1 := sm.Upload(asyncDestStruct)
require.Equal(t, 2, output1.FailedCount)
require.Equal(t, 0, output1.AbortCount)
require.Equal(t, 1, managerCreatorCallCount)
require.Equal(t, time.Second*10, sm.authzBackoff.backoffDuration)
require.Equal(t, false, sm.authzBackoff.lastestErrorTime.IsZero())
require.Equal(t, true, sm.authzBackoff.isInBackoff())

sm.Upload(asyncDestStruct)
// client is not created again due to backoff error
require.Equal(t, 1, managerCreatorCallCount)
require.Equal(t, time.Second*10, sm.authzBackoff.backoffDuration)
require.Equal(t, false, sm.authzBackoff.lastestErrorTime.IsZero())
require.Equal(t, true, sm.authzBackoff.isInBackoff())

sm.now = func() time.Time {
return time.Now().UTC().Add(time.Second * 100)
}
mockClock.nowTime = sm.now().Add(time.Second * 5)
require.Equal(t, true, sm.authzBackoff.isInBackoff())
mockClock.nowTime = sm.now().Add(time.Second * 20)
require.Equal(t, false, sm.authzBackoff.isInBackoff())

sm.Upload(asyncDestStruct)
// client created again since backoff duration has been exceeded
require.Equal(t, 2, managerCreatorCallCount)
require.Equal(t, time.Second*20, sm.authzBackoff.backoffDuration)
require.Equal(t, false, sm.authzBackoff.lastestErrorTime.IsZero())
require.Equal(t, false, sm.authzBackoff.isInBackoff())

sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
return newMockManager(sm, false), nil
}
sm.now = func() time.Time {
return time.Now().UTC().Add(time.Second * 200)
return newMockManager(sm), nil
}
sm.Upload(asyncDestStruct)
require.Equal(t, 3, managerCreatorCallCount)
// no error should reset the backoff config
require.Equal(t, time.Duration(0), sm.authzBackoff.backoffDuration)
require.Equal(t, true, sm.authzBackoff.lastestErrorTime.IsZero())
require.Equal(t, false, sm.authzBackoff.isInBackoff())
})

t.Run("Upload with discards table authorization error should not abort the job", func(t *testing.T) {
Expand All @@ -420,14 +422,21 @@ func TestSnowpipeStreaming(t *testing.T) {
}
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
return newMockManager(sm, true), nil
mockManager := newMockManager(sm)
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
return mockManager, nil
}
output := sm.Upload(&common.AsyncDestinationStruct{
Destination: destination,
FileName: "testdata/successful_user_records.txt",
ImportingJobIDs: []int64{1},
Destination: destination,
FileName: "testdata/successful_user_records.txt",
})
require.Equal(t, 2, output.FailedCount)
require.Equal(t, 0, output.FailedCount)
require.Equal(t, 0, output.AbortCount)
require.Equal(t, 1, output.ImportingCount)
require.NotEmpty(t, output.FailedReason)
require.Empty(t, output.AbortReason)
require.Equal(t, true, sm.authzBackoff.isInBackoff())
})

t.Run("Upload insert error for all events", func(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
"github.com/rudderlabs/rudder-server/utils/timeutil"

"github.com/cenkalti/backoff/v4"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
Expand Down Expand Up @@ -133,12 +134,12 @@ type (
}

authzBackoff struct {
// time at which the an attempt was made to create a resource but it failed likely due to permission issues.
lastestErrorTime time.Time
// If lastAttemptedTime is not zero, then the next attempt to create a SF connection will be made after backoffDuration.
// If an attempt was made to create a resource but it failed likely due to permission issues,
// then the next attempt to create a SF connection will be made after backoffDuration.
// This approach prevents repeatedly activating the warehouse even though the permission issue remains unresolved.
backoffDuration time.Duration
initialBackoffDuration time.Duration
backoff *backoff.ExponentialBackOff
options []backoff.ExponentialBackOffOpts
nextBackoff time.Duration
}
)

Expand All @@ -163,26 +164,34 @@ func (e *event) setUUIDTimestamp(formattedTimestamp string) {
}
}

func newAuthzBackoff(initialBackoffDuration time.Duration) *authzBackoff {
func newAuthzBackoff(initialInterval time.Duration, clock backoff.Clock) *authzBackoff {
return &authzBackoff{
initialBackoffDuration: initialBackoffDuration,
options: []backoff.ExponentialBackOffOpts{
backoff.WithInitialInterval(initialInterval),
backoff.WithMultiplier(2),
backoff.WithClockProvider(clock),
backoff.WithRandomizationFactor(0),
backoff.WithMaxElapsedTime(0),
backoff.WithMaxInterval(0),
},
}
}

func (abe *authzBackoff) set() {
abe.lastestErrorTime = timeutil.Now()
if abe.backoffDuration == 0 {
abe.backoffDuration = abe.initialBackoffDuration
} else {
abe.backoffDuration = abe.backoffDuration * 2
if abe.backoff == nil {
abe.backoff = backoff.NewExponentialBackOff(abe.options...)
}
// nextBackoff can't be a derived field since everytime NextBackOff is called, internal state of backoff is updated.
abe.nextBackoff = abe.backoff.NextBackOff()
}

func (abe *authzBackoff) reset() {
abe.lastestErrorTime = time.Time{}
abe.backoffDuration = 0
abe.backoff = nil
}

func (abe *authzBackoff) nextBackoffTime() time.Time {
return abe.lastestErrorTime.Add(abe.backoffDuration)
func (abe *authzBackoff) isInBackoff() bool {
if abe.backoff == nil {
return false
}
return abe.backoff.GetElapsedTime() < abe.nextBackoff
}
3 changes: 0 additions & 3 deletions warehouse/integrations/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

// Creating an alias since "model.TableSchema" is defined in an internal module
type ModelTableSchema = model.TableSchema

type Manager interface {
Setup(ctx context.Context, warehouse model.Warehouse, uploader warehouseutils.Uploader) error
FetchSchema(ctx context.Context) (model.Schema, error)
Expand Down

0 comments on commit e268f73

Please sign in to comment.