Skip to content

Commit aa4f042

Browse files
refactor: review comments 1
1 parent 9df59d0 commit aa4f042

File tree

3 files changed

+26
-27
lines changed

3 files changed

+26
-27
lines changed

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

+14-12
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary
4040

4141
func New(
4242
conf *config.Config,
43-
mLogger logger.Logger,
43+
log logger.Logger,
4444
statsFactory stats.Stats,
4545
destination *backendconfig.DestinationT,
4646
) *Manager {
4747
m := &Manager{
4848
appConfig: conf,
49-
logger: mLogger.Child("snowpipestreaming").Withn(
49+
logger: log.Child("snowpipestreaming").Withn(
5050
obskit.WorkspaceID(destination.WorkspaceID),
5151
obskit.DestinationID(destination.ID),
5252
obskit.DestinationType(destination.DestinationDefinition.Name),
@@ -170,13 +170,7 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
170170
if err != nil {
171171
var authzErr *snowpipeAuthzError
172172
if errors.As(err, &authzErr) {
173-
// Ignoring this error so that the jobs are marked as failed and not aborted since
174-
// we want these jobs to be retried the next time.
175-
m.logger.Warnn("Failed to initialize channel with schema",
176-
logger.NewStringField("table", discardsTable()),
177-
obskit.Error(err),
178-
)
179-
shouldResetBackoff = false
173+
return m.failedJobs(asyncDest, err.Error())
180174
} else {
181175
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
182176
}
@@ -216,9 +210,7 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
216210
if err != nil {
217211
var authzErr *snowpipeAuthzError
218212
if errors.As(err, &authzErr) {
219-
if shouldResetBackoff {
220-
shouldResetBackoff = false
221-
}
213+
shouldResetBackoff = false
222214
}
223215
m.logger.Warnn("Failed to send events to Snowpipe",
224216
logger.NewStringField("table", info.tableName),
@@ -398,6 +390,16 @@ func (m *Manager) abortJobs(asyncDest *common.AsyncDestinationStruct, abortReaso
398390
}
399391
}
400392

393+
func (m *Manager) failedJobs(asyncDest *common.AsyncDestinationStruct, failedReason string) common.AsyncUploadOutput {
394+
m.stats.jobs.failed.Count(len(asyncDest.ImportingJobIDs))
395+
return common.AsyncUploadOutput{
396+
AbortJobIDs: asyncDest.ImportingJobIDs,
397+
AbortCount: len(asyncDest.ImportingJobIDs),
398+
FailedReason: failedReason,
399+
DestinationID: asyncDest.Destination.ID,
400+
}
401+
}
402+
401403
// Poll checks the status of multiple imports using the import ID from pollInput.
402404
// For the once which have reached the terminal state (success or failure), it caches the import infos in polledImportInfoMap. Later if Poll is called again, it does not need to do the status check again.
403405
// Once all the imports have reached the terminal state, if any imports have failed, it deletes the channels for those imports.

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -50,24 +50,20 @@ func (m *mockAPI) GetStatus(_ context.Context, channelID string) (*model.StatusR
5050

5151
type mockManager struct {
5252
manager.Manager
53-
throwSchemaErr bool
53+
createSchemaErr error
5454
}
5555

56-
func newMockManager(m manager.Manager, throwSchemaErr bool) *mockManager {
56+
func newMockManager(m manager.Manager) *mockManager {
5757
return &mockManager{
58-
Manager: m,
59-
throwSchemaErr: throwSchemaErr,
58+
Manager: m,
6059
}
6160
}
6261

6362
func (m *mockManager) CreateSchema(ctx context.Context) (err error) {
64-
if m.throwSchemaErr {
65-
return fmt.Errorf("failed to create schema")
66-
}
67-
return nil
63+
return m.createSchemaErr
6864
}
6965

70-
func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap manager.ModelTableSchema) (err error) {
66+
func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap whutils.ModelTableSchema) (err error) {
7167
return nil
7268
}
7369

@@ -358,7 +354,9 @@ func TestSnowpipeStreaming(t *testing.T) {
358354
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
359355
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
360356
managerCreatorCallCount++
361-
return newMockManager(sm, true), nil
357+
mockManager := newMockManager(sm)
358+
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
359+
return mockManager, nil
362360
}
363361
sm.authzBackoff = newAuthzBackoff(time.Second * 10)
364362
asyncDestStruct := &common.AsyncDestinationStruct{
@@ -391,7 +389,7 @@ func TestSnowpipeStreaming(t *testing.T) {
391389
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
392390
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
393391
managerCreatorCallCount++
394-
return newMockManager(sm, false), nil
392+
return newMockManager(sm), nil
395393
}
396394
sm.now = func() time.Time {
397395
return time.Now().UTC().Add(time.Second * 200)
@@ -420,7 +418,9 @@ func TestSnowpipeStreaming(t *testing.T) {
420418
}
421419
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
422420
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
423-
return newMockManager(sm, true), nil
421+
mockManager := newMockManager(sm)
422+
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
423+
return mockManager, nil
424424
}
425425
output := sm.Upload(&common.AsyncDestinationStruct{
426426
Destination: destination,

warehouse/integrations/manager/manager.go

-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ import (
2626
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
2727
)
2828

29-
// Creating an alias since "model.TableSchema" is defined in an internal module
30-
type ModelTableSchema = model.TableSchema
31-
3229
type Manager interface {
3330
Setup(ctx context.Context, warehouse model.Warehouse, uploader warehouseutils.Uploader) error
3431
FetchSchema(ctx context.Context) (model.Schema, error)

0 commit comments

Comments
 (0)