From 58a72f574847167273e86ebd37700f277bcd4d27 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 13 Dec 2024 15:58:00 +0530 Subject: [PATCH 1/9] chore: event_payload column can be JSONB, BYTEA or TEXT --- jobsdb/jobsdb.go | 65 +++++++- jobsdb/migration.go | 71 ++++++++- jobsdb/migration_test.go | 320 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 453 insertions(+), 3 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 2d728509e7..0035cf86f7 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -73,6 +73,15 @@ const ( pgErrorCodeTableReadonly = "RS001" ) +type payloadColumnType int + +const ( + JSONB payloadColumnType = iota + BYTEA + TEXT + // JSON // Explore afterwards? +) + // QueryConditions holds jobsdb query conditions type QueryConditions struct { // if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used @@ -499,6 +508,7 @@ type Handle struct { config *config.Config conf struct { + payloadColumnType payloadColumnType maxTableSize config.ValueLoader[int64] cacheExpiration config.ValueLoader[time.Duration] addNewDSLoopSleepDuration config.ValueLoader[time.Duration] @@ -702,6 +712,18 @@ func WithStats(s stats.Stats) OptsFunc { } } +func WithBinaryPayload() OptsFunc { + return func(jd *Handle) { + jd.conf.payloadColumnType = 1 + } +} + +func WithTextPayload() OptsFunc { + return func(jd *Handle) { + jd.conf.payloadColumnType = 2 + } +} + func WithSkipMaintenanceErr(ignore bool) OptsFunc { return func(jd *Handle) { jd.conf.skipMaintenanceError = ignore @@ -770,6 +792,8 @@ func (jd *Handle) init() { jd.config = config.Default } + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", jd.tablePrefix+".payloadColumnType")) + if jd.stats == nil { jd.stats = stats.Default } @@ -1429,6 +1453,15 @@ func (jd *Handle) createDSInTx(tx *Tx, newDS dataSetT) error { } func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error { + var payloadColumnType string + switch jd.conf.payloadColumnType { + case JSONB: + payloadColumnType = "JSONB" + case BYTEA: + payloadColumnType = "BYTEA" + case TEXT: + payloadColumnType = "TEXT" + } if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %q ( job_id BIGSERIAL PRIMARY KEY, workspace_id TEXT NOT NULL DEFAULT '', @@ -1436,7 +1469,7 @@ func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT user_id TEXT NOT NULL, parameters JSONB NOT NULL, custom_val VARCHAR(64) NOT NULL, - event_payload JSONB NOT NULL, + event_payload `+payloadColumnType+` NOT NULL, event_count INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, newDS.JobTable)); err != nil { @@ -2215,6 +2248,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param resultsetStates := map[string]struct{}{} for rows.Next() { var job JobT + var payload Payload var jsState sql.NullString var jsAttemptNum sql.NullInt64 var jsExecTime sql.NullTime @@ -2223,13 +2257,14 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param var jsErrorResponse []byte var jsParameters []byte err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal, - &job.EventPayload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize, + &payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize, &jsState, &jsAttemptNum, &jsExecTime, &jsRetryTime, &jsErrorCode, &jsErrorResponse, &jsParameters) if err != nil { return JobsResult{}, false, err } + job.EventPayload = payload.PayloadBytes() if jsState.Valid { resultsetStates[jsState.String] = struct{}{} job.LastJobStatus.JobState = jsState.String @@ -2289,6 +2324,32 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param }, true, nil } +type Payload struct { + S string + B []byte +} + +func (p *Payload) PayloadBytes() []byte { + if p.B != nil { + return p.B + } + return []byte(p.S) +} + +func (p *Payload) Scan(src interface{}) error { + b, ok := src.([]byte) + if !ok { + s, ok := src.(string) + if !ok { + return errors.New("neither string nor bytes") + } + p.S = s + return nil + } + p.B = b + return nil +} + func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT, statusList []*JobStatusT, tags statTags) (updatedStates map[string]map[string]map[ParameterFilterT]struct{}, err error) { if len(statusList) == 0 { return diff --git a/jobsdb/migration.go b/jobsdb/migration.go index 37594d6175..ea3c8bf825 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -459,18 +459,86 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dsWithPendi return } +func getColumnConversion(srcType, destType string) string { + if srcType == destType { + return "j.event_payload" + } + switch srcType { + case "jsonb": + switch destType { + case "text": + return "j.event_payload::TEXT" + case "bytea": + return "convert_to(j.event_payload::TEXT, 'UTF8')" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + case "bytea": + switch destType { + case "text": + return "convert_from(j.event_payload, 'UTF8')" + case "jsonb": + return "convert_from(j.event_payload, 'UTF8')::jsonb" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + case "text": + switch destType { + case "jsonb": + return "j.event_payload::jsonb" + case "bytea": + return "convert_to(j.event_payload, 'UTF8')" + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } + default: + panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType)) + } +} + func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dataSetT) (int, error) { defer jd.getTimerStat( "migration_jobs", &statTags{CustomValFilters: []string{jd.tablePrefix}}, ).RecordDuration()() + columnTypeMap := map[string]string{srcDS.JobTable: "jsonb", destDS.JobTable: "jsonb"} + // find colummn types first - to differentiate between `bytea` and `jsonb` + rows, err := tx.QueryContext( + ctx, + fmt.Sprintf( + `select table_name, data_type + from information_schema.columns + where table_name IN ('%[1]s', '%[2]s') and column_name='event_payload';`, + srcDS.JobTable, destDS.JobTable, + ), + ) + if err != nil { + return 0, fmt.Errorf("failed to get column types: %w", err) + } + defer rows.Close() + var jobsTable, columnType string + for rows.Next() { + if err = rows.Scan(&jobsTable, &columnType); err != nil { + return 0, fmt.Errorf("failed to scan column types: %w", err) + } + if columnType != "bytea" && columnType != "jsonb" && columnType != "text" { + return 0, fmt.Errorf("unsupported column type %s", columnType) + } + columnTypeMap[jobsTable] = columnType + } + if err = rows.Err(); err != nil { + return 0, fmt.Errorf("rows.Err() on column types: %w", err) + } + payloadLiteral := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable]) + jd.logger.Info(payloadLiteral) + compactDSQuery := fmt.Sprintf( `with last_status as (select * from "v_last_%[1]s"), inserted_jobs as ( insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at) - (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, j.event_payload, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id + (select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id ), insertedStatuses as @@ -484,6 +552,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat destDS.JobTable, destDS.JobStatusTable, strings.Join(validNonTerminalStates, ","), + payloadLiteral, ) var numJobsMigrated int64 diff --git a/jobsdb/migration_test.go b/jobsdb/migration_test.go index 8036df455e..b834fe0a5b 100644 --- a/jobsdb/migration_test.go +++ b/jobsdb/migration_test.go @@ -11,6 +11,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/testhelper/rand" + "github.com/rudderlabs/rudder-server/utils/tx" ) func TestMigration(t *testing.T) { @@ -326,4 +327,323 @@ func TestMigration(t *testing.T) { updatedTableSizes := getTableSizes(jobDB.getDSList()) require.Equal(t, newTableSizes[fmt.Sprintf("%s_job_status_1", tablePrefix)], updatedTableSizes[fmt.Sprintf("%s_job_status_1", tablePrefix)]) }) + + t.Run("migration between different table types(jsonb, text, bytea)", func(t *testing.T) { + config.Reset() + c := config.New() + c.Set("JobsDB.maxDSSize", 1) + + _ = startPostgres(t) + + triggerAddNewDS := make(chan time.Time) + triggerMigrateDS := make(chan time.Time) + + jobDB := Handle{ + TriggerAddNewDS: func() <-chan time.Time { + return triggerAddNewDS + }, + TriggerMigrateDS: func() <-chan time.Time { + return triggerMigrateDS + }, + config: c, + } + tablePrefix := strings.ToLower(rand.String(5)) + err := jobDB.Setup( + ReadWrite, + true, + tablePrefix, + ) + require.NoError(t, err) + defer jobDB.TearDown() + + c.Set("JobsDB."+tablePrefix+"."+"maxDSRetention", "1ms") + + customVal := rand.String(5) + jobs := genJobs(defaultWorkspaceID, customVal, 30, 1) + require.NoError(t, jobDB.Store(context.Background(), jobs[:10])) + + // let 8 jobs succeed, and 2 repeatedly fail + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[:8], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[:8], "succeeded"), + []string{customVal}, + []ParameterFilterT{}, + ), + ) + + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[8:10], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 1st DS`, + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[8:10], "failed"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 1st DS`, + ) + require.EqualValues(t, 1, jobDB.GetMaxDSIndex()) + + jobDB.conf.payloadColumnType = 1 + triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run + triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish + require.EqualValues(t, 2, jobDB.GetMaxDSIndex()) + + var payloadType string + secondTableName := fmt.Sprintf("%s_jobs_2", tablePrefix) + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, secondTableName)).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "bytea", payloadType) + + // add some more jobs to the new DS + require.NoError(t, jobDB.Store(context.Background(), jobs[10:20])) + + // triggerMigrateDS <- time.Now() + // triggerMigrateDS <- time.Now() + // var payloadType_1_1 string + // err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, tablePrefix+"_jobs_1_1")).Scan(&payloadType_1_1) + // require.NoError(t, err) + // require.EqualValues(t, "bytea", payloadType_1_1) + + triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run + triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish + require.EqualValues(t, 3, jobDB.GetMaxDSIndex()) + thirdTableName := fmt.Sprintf("%s_jobs_3", tablePrefix) + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, thirdTableName)).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "bytea", payloadType) + + // last DS + // should have enough statuses for a clean up to be triggered + // all non-terminal + require.NoError(t, jobDB.Store(context.Background(), jobs[20:30])) + for i := 0; i < 10; i++ { + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[20:30], "executing"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 3rd DS`, + ) + require.NoError( + t, + jobDB.UpdateJobStatus( + context.Background(), + genJobStatuses(jobs[20:30], "failed"), + []string{customVal}, + []ParameterFilterT{}, + ), + `status update failed in 3rd DS`, + ) + } + + c.Set("JobsDB.maxDSSize", 100000) + jobDB.conf.payloadColumnType = 2 + triggerMigrateDS <- time.Now() // trigger migrateDSLoop to run + triggerMigrateDS <- time.Now() // waits for last loop to finish + + // data moved from both jsonb and bytea columns to a text column + + // we should see that in the three DSs we have, + // the first one should only have non-terminal jobs left now(with only the last status) in an jobs_1_1 + // the second one should have all jobs + // the third DS should have all jobs with only the last status per job + + // check that the first DS has only non-terminal jobs + dsList := jobDB.getDSList() + require.Len(t, dsList, 2) // 2_1, 3 + require.Equal(t, `2_1`, dsList[0].Index) + var count int64 + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_jobs_2_1 WHERE %[1]s_jobs_2_1.custom_val = $1`, + tablePrefix, + ), + customVal, + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 12, count) + + err = jobDB.dbHandle.QueryRowContext(context.Background(), fmt.Sprintf(`select data_type from information_schema.columns where table_name='%s' and column_name='event_payload';`, tablePrefix+"_jobs_2_1")).Scan(&payloadType) + require.NoError(t, err) + require.EqualValues(t, "text", payloadType) + + require.Equal(t, `3`, dsList[1].Index) + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_jobs_3 WHERE %[1]s_jobs_3.custom_val = $1`, + tablePrefix, + ), + customVal, + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 10, count) + + err = jobDB.dbHandle.QueryRow( + fmt.Sprintf( + `SELECT COUNT(*) FROM %[1]s_job_status_3 where job_state = 'failed';`, + tablePrefix, + ), + ).Scan(&count) + require.NoError(t, err) + require.EqualValues(t, 100, count) + + getJobs, err := jobDB.GetToProcess(context.Background(), GetQueryParams{ + IgnoreCustomValFiltersInQuery: true, + EventsLimit: 1, + JobsLimit: 1, + }, nil) + require.NoError(t, err) + require.Equal(t, 1, getJobs.EventsCount) + require.JSONEq( + t, + `{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`, + string(getJobs.Jobs[0].EventPayload), + ) + }) +} + +func TestPayloadLiteral(t *testing.T) { + config.Reset() + c := config.New() + c.Set("JobsDB.maxDSSize", 1) + + pg := startPostgres(t) + db := pg.DB + + byteJD := Handle{ + config: c, + } + byteJD.conf.payloadColumnType = 1 + require.NoError(t, byteJD.Setup( + ReadWrite, + true, + "bytea", + )) + defer byteJD.TearDown() + + jsonbJD := Handle{ + config: c, + } + jsonbJD.conf.payloadColumnType = 0 + require.NoError(t, jsonbJD.Setup( + ReadWrite, + true, + "jsonb", + )) + defer jsonbJD.TearDown() + + textJD := Handle{ + config: c, + } + textJD.conf.payloadColumnType = 2 + require.NoError(t, textJD.Setup( + ReadWrite, + true, + "text", + )) + defer textJD.TearDown() + + ctx := context.Background() + jobs := genJobs("wsid", "cv", 1, 1) + require.NoError(t, byteJD.Store(ctx, jobs)) + require.NoError(t, textJD.Store(ctx, jobs)) + require.NoError(t, jsonbJD.Store(ctx, jobs)) + + prefixes := []string{"text", "jsonb", "bytea"} + for i := range prefixes { + _, err := db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_job_status_1 DROP CONSTRAINT fk_%[1]s_job_status_1_job_id`, prefixes[i])) + require.NoError(t, err) + _, err = db.ExecContext(ctx, fmt.Sprintf(`ALTER TABLE %[1]s_jobs_1 DROP CONSTRAINT %[1]s_jobs_1_pkey`, prefixes[i])) + require.NoError(t, err) + } // we drop these two because migrateJobsInTx moved jobIDs too, and we're only interested in moving jobs between two different column types + txn, err := db.Begin() + require.NoError(t, err) + for i := range prefixes { + for j := range prefixes { + if i == j { + continue + } + src := prefixes[i] + dest := prefixes[j] + _, err := textJD.migrateJobsInTx( + ctx, + &tx.Tx{Tx: txn}, + dataSetT{ + JobTable: src + "_jobs_1", + JobStatusTable: src + "_job_status_1", + Index: "1", + }, + dataSetT{ + JobTable: dest + "_jobs_1", + JobStatusTable: dest + "_job_status_1", + Index: "1", + }, + ) + require.NoError(t, err) + } + } + require.NoError(t, txn.Commit()) + + byteJobs, err := byteJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + textJobs, err := textJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + jsonbJobs, err := jsonbJD.GetUnprocessed(ctx, GetQueryParams{ + EventsLimit: 100, JobsLimit: 100, IgnoreCustomValFiltersInQuery: true, + }) + require.NoError(t, err) + require.Equal(t, 4, byteJobs.EventsCount) + require.Equal(t, 7, textJobs.EventsCount) + require.Equal(t, 6, jsonbJobs.EventsCount) + expectedPayload := `{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}` + + for i := range byteJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(byteJobs.Jobs[i].EventPayload), + ) + } + for i := range textJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(textJobs.Jobs[i].EventPayload), + ) + } + for i := range jsonbJobs.Jobs { + require.JSONEq( + t, + expectedPayload, + string(jsonbJobs.Jobs[i].EventPayload), + ) + } } From c5c21e30819b0756b75c9d692a7d201cd70c410e Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Fri, 13 Dec 2024 16:10:00 +0530 Subject: [PATCH 2/9] fixup! chore: event_payload column can be JSONB, BYTEA or TEXT --- jobsdb/jobsdb.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 0035cf86f7..23403395a8 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -792,7 +792,9 @@ func (jd *Handle) init() { jd.config = config.Default } - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", jd.tablePrefix+".payloadColumnType")) + if jd.conf.payloadColumnType == 0 { + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB."+jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", "JobsDB."+jd.tablePrefix+".payloadColumnType")) + } if jd.stats == nil { jd.stats = stats.Default From 08c358abf4b7081da3bba1df700a4b8d08cf2b09 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 14:38:09 +0530 Subject: [PATCH 3/9] chore: accommodate jobsdb startup to create/update lastDS - only w, rw jobsdbs --- jobsdb/jobsdb.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 23403395a8..eff626ee5e 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -793,7 +793,7 @@ func (jd *Handle) init() { } if jd.conf.payloadColumnType == 0 { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB."+jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", "JobsDB."+jd.tablePrefix+".payloadColumnType")) + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB.payloadColumnType")) } if jd.stats == nil { @@ -1083,8 +1083,59 @@ func (jd *Handle) writerSetup(ctx context.Context, l lock.LockToken) { jd.assertError(jd.doRefreshDSRangeList(l)) // If no DS present, add one - if len(jd.getDSList()) == 0 { + var createDS bool + var updateColumnType bool + dsList := jd.getDSList() + if len(dsList) == 0 { + createDS = true + } else { + // first check column type + var columnType string + err := jd.dbHandle.QueryRowContext( + ctx, + fmt.Sprintf( + `select data_type + from information_schema.columns + where table_name = '%[1]s' and column_name='event_payload';`, + dsList[len(dsList)-1].JobTable, + ), + ).Scan(&columnType) + jd.assertError(err) + jd.logger.Infow("previous column type", "type", columnType) + if columnType != string(jd.conf.payloadColumnType) { + var jobID int64 + err := jd.dbHandle.QueryRowContext( + ctx, + fmt.Sprintf(`select job_id from %q order by job_id asc limit 1`, dsList[len(dsList)-1].JobTable), + ).Scan(&jobID) + if errors.Is(err, sql.ErrNoRows) { + updateColumnType = true + } else if err == nil { + createDS = true + } else { + jd.assertError(err) + } + } + } + if createDS { jd.addNewDS(l, newDataSet(jd.tablePrefix, jd.computeNewIdxForAppend(l))) + } else if updateColumnType { + var payloadType string + switch jd.conf.payloadColumnType { + case payloadColumnType(0): + payloadType = "jsonb" + case payloadColumnType(1): + payloadType = "bytea" + case payloadColumnType(2): + payloadType = "text" + default: + jd.assertError(fmt.Errorf("invalid type: %d", jd.conf.payloadColumnType)) + } + _, err := jd.dbHandle.ExecContext( + ctx, + fmt.Sprintf(`alter table %q alter column event_payload type %s`, dsList[len(dsList)-1].JobTable, payloadType), + ) + jd.assertError(err) } jd.backgroundGroup.Go(crash.Wrapper(func() error { From 4129e5e3d44bdd8a66c1f2bf2a9eaae7c24b7450 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 15:24:15 +0530 Subject: [PATCH 4/9] fixup! chore: accommodate jobsdb startup to create/update lastDS - only w, rw jobsdbs --- jobsdb/jobsdb.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index eff626ee5e..c7afd5b9c6 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -73,6 +73,12 @@ const ( pgErrorCodeTableReadonly = "RS001" ) +var payloadTypes = map[payloadColumnType]string{ + JSONB: "jsonb", + TEXT: "text", + BYTEA: "bytea", +} + type payloadColumnType int const ( @@ -1102,7 +1108,7 @@ func (jd *Handle) writerSetup(ctx context.Context, l lock.LockToken) { ).Scan(&columnType) jd.assertError(err) jd.logger.Infow("previous column type", "type", columnType) - if columnType != string(jd.conf.payloadColumnType) { + if columnType != payloadTypes[jd.conf.payloadColumnType] { var jobID int64 err := jd.dbHandle.QueryRowContext( ctx, From de47b4ee15f03f0f805b86c7255da9aadac63c7b Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 15:54:26 +0530 Subject: [PATCH 5/9] fixup! chore: accommodate jobsdb startup to create/update lastDS - only w, rw jobsdbs --- jobsdb/jobsdb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index c7afd5b9c6..148d4194ce 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -799,7 +799,7 @@ func (jd *Handle) init() { } if jd.conf.payloadColumnType == 0 { - jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, "JobsDB.payloadColumnType")) + jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(2, 1, "JobsDB.payloadColumnType")) } if jd.stats == nil { From 06749955abbaa75db2d3f418255ce18f89fcc802 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 17 Dec 2024 18:03:49 +0530 Subject: [PATCH 6/9] chore: jobsdb_sanitizeJSON test --- jobsdb/integration_test.go | 175 ++++++++++++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 3 deletions(-) diff --git a/jobsdb/integration_test.go b/jobsdb/integration_test.go index 085990081d..fabc701b2e 100644 --- a/jobsdb/integration_test.go +++ b/jobsdb/integration_test.go @@ -1026,7 +1026,9 @@ func requireSequential(t *testing.T, jobs []*JobT) { func TestJobsDB_SanitizeJSON(t *testing.T) { _ = startPostgres(t) - jobDB := Handle{config: config.New()} + conf := config.New() + conf.Set("JobsDB.payloadColumnType", 0) + jobDB := Handle{config: conf} ch := func(n int) string { return strings.Repeat("�", n) } @@ -1071,9 +1073,8 @@ func TestJobsDB_SanitizeJSON(t *testing.T) { err := jobDB.Setup(ReadWrite, false, strings.ToLower(rand.String(5))) require.NoError(t, err) - defer jobDB.TearDown() - eventPayload := []byte(`{"batch": [{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`) + eventPayload := []byte(`{"batch":[{"anonymousId":"anon_id","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`) for i, tt := range toValidUTF8Tests { customVal := fmt.Sprintf("TEST_%d", i) @@ -1111,6 +1112,174 @@ func TestJobsDB_SanitizeJSON(t *testing.T) { string(unprocessedJob.Jobs[0].EventPayload), ) } + jobDB.TearDown() + + conf.Set("JobsDB.payloadColumnType", 2) + textDB := &Handle{config: conf} + require.NoError(t, textDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) + + toValidUTF8TestsForText := []struct { + in string + out string + err error + }{ + {`\u0000`, `\u0000`, nil}, + {`\u0000☺\u0000b☺`, `\u0000☺\u0000b☺`, nil}, + // NOTE: we are not handling the following: + // {"\u0000", ""}, + // {"\u0000☺\u0000b☺", "☺b☺"}, + + {"", "", nil}, + {"abc", "abc", nil}, + {"\uFDDD", "\uFDDD", nil}, + {"a\xffb", `a\ufffdb`, nil}, + {"a\xffb\uFFFD", `a\ufffdb�`, nil}, + {"a☺\xffb☺\xC0\xAFc☺\xff", `a☺\ufffdb☺\ufffd\ufffdc☺\ufffd`, nil}, + {"\xC0\xAF", `\ufffd\ufffd`, nil}, + {"\xE0\x80\xAF", `\ufffd\ufffd\ufffd`, nil}, + {"\xed\xa0\x80", `\ufffd\ufffd\ufffd`, nil}, + {"\xed\xbf\xbf", `\ufffd\ufffd\ufffd`, nil}, + {"\xF0\x80\x80\xaf", `\ufffd\ufffd\ufffd\ufffd`, nil}, + {"\xF8\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, + {"\xFC\x80\x80\x80\x80\xAF", `\ufffd\ufffd\ufffd\ufffd\ufffd\ufffd`, nil}, + + // {"\ud800", ""}, + // 15 + {`\ud800`, `\ud800`, nil}, + {`\uDEAD`, `\uDEAD`, nil}, + + {`\uD83D\ub000`, `\uD83D\ub000`, nil}, + {`\uD83D\ude04`, `\uD83D\ude04`, nil}, + + {`\u4e2d\u6587`, `\u4e2d\u6587`, nil}, + {`\ud83d\udc4a`, `\ud83d\udc4a`, nil}, + + // 21 + {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, + {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, + } + for i, tt := range toValidUTF8TestsForText { + + customVal := fmt.Sprintf("TEST_%d", i) + + jobs := []*JobT{{ + Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), + EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), + UserID: uuid.New().String(), + UUID: uuid.New(), + CustomVal: customVal, + WorkspaceId: defaultWorkspaceID, + EventCount: 1, + }} + + err := textDB.Store(context.Background(), jobs) + if tt.err != nil { + require.NoError(t, err, "text column should never error", i) + continue + } + + require.NoError(t, err) + + unprocessedJob, err := textDB.GetUnprocessed(context.Background(), GetQueryParams{ + CustomValFilters: []string{customVal}, + JobsLimit: 10, + ParameterFilters: []ParameterFilterT{}, + }) + require.NoError(t, err, "should not error") + + require.Len(t, unprocessedJob.Jobs, 1) + + require.Equal(t, + string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), + string(unprocessedJob.Jobs[0].EventPayload), + "testCase", i, + ) + } + textDB.TearDown() + + conf.Set("JobsDB.payloadColumnType", 1) + byteaDB := &Handle{config: conf} + require.NoError(t, byteaDB.Setup(ReadWrite, true, strings.ToLower(rand.String(5)))) + + byteaInvalidInputSyntaxError := errors.New("pq: invalid input syntax for type bytea") + toValidUTF8TestsForBytea := []struct { + in string + out string + err error + }{ + {`\u0000`, "", nil}, + {`\u0000☺\u0000b☺`, "☺b☺", nil}, + // NOTE: we are not handling the following: + // {"\u0000", ""}, + // {"\u0000☺\u0000b☺", "☺b☺"}, + + {"", "", nil}, + {"abc", "abc", nil}, + {"\uFDDD", "\uFDDD", nil}, + {"a\xffb", "a" + ch(1) + "b", byteaInvalidInputSyntaxError}, + {"a\xffb\uFFFD", "a" + ch(1) + "b\uFFFD", byteaInvalidInputSyntaxError}, + {"a☺\xffb☺\xC0\xAFc☺\xff", "a☺" + ch(1) + "b☺" + ch(2) + "c☺" + ch(1), byteaInvalidInputSyntaxError}, + {"\xC0\xAF", ch(2), byteaInvalidInputSyntaxError}, + {"\xE0\x80\xAF", ch(3), byteaInvalidInputSyntaxError}, + {"\xed\xa0\x80", ch(3), byteaInvalidInputSyntaxError}, + {"\xed\xbf\xbf", ch(3), byteaInvalidInputSyntaxError}, + {"\xF0\x80\x80\xaf", ch(4), byteaInvalidInputSyntaxError}, + {"\xF8\x80\x80\x80\xAF", ch(5), byteaInvalidInputSyntaxError}, + {"\xFC\x80\x80\x80\x80\xAF", ch(6), byteaInvalidInputSyntaxError}, + + // {"\ud800", ""}, + // 15 + {`\ud800`, ch(1), nil}, + {`\uDEAD`, ch(1), nil}, + + {`\uD83D\ub000`, string([]byte{239, 191, 189, 235, 128, 128}), nil}, + {`\uD83D\ude04`, "😄", nil}, + + {`\u4e2d\u6587`, "中文", nil}, + {`\ud83d\udc4a`, "\xf0\x9f\x91\x8a", nil}, + + {`\U0001f64f`, ch(1), errors.New(`readEscapedChar: invalid escape char after`)}, + {`\uD83D\u00`, ch(1), errors.New(`readU4: expects 0~9 or a~f, but found`)}, + } + for i, tt := range toValidUTF8TestsForBytea { + + customVal := fmt.Sprintf("TEST_%d", i) + + jobs := []*JobT{{ + Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`), + EventPayload: bytes.Replace(eventPayload, []byte("track"), []byte(tt.in), 1), + UserID: uuid.New().String(), + UUID: uuid.New(), + CustomVal: customVal, + WorkspaceId: defaultWorkspaceID, + EventCount: 1, + }} + + err := byteaDB.Store(context.Background(), jobs) + if tt.err != nil { + require.Error(t, err, "should error", i) + require.Contains(t, err.Error(), tt.err.Error(), "should contain error", i) + continue + } + + require.NoError(t, err, i) + + unprocessedJob, err := byteaDB.GetUnprocessed(context.Background(), GetQueryParams{ + CustomValFilters: []string{customVal}, + JobsLimit: 10, + ParameterFilters: []ParameterFilterT{}, + }) + require.NoError(t, err, "should not error", i) + + require.Len(t, unprocessedJob.Jobs, 1) + + require.JSONEq(t, + string(bytes.Replace(eventPayload, []byte("track"), []byte(tt.out), 1)), + string(unprocessedJob.Jobs[0].EventPayload), + i, + ) + } + byteaDB.TearDown() } // BenchmarkJobsdb takes time... keep waiting From 3b802a9910a2f2903f5bf81a82e9245f039fd858 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Wed, 18 Dec 2024 12:30:34 +0530 Subject: [PATCH 7/9] chore: address tests for text payload column --- jobsdb/jobsdb.go | 8 +++- processor/processor_geolocation_test.go | 40 +++++++++---------- .../batchrouter/batchrouter_isolation_test.go | 10 ++++- 3 files changed, 36 insertions(+), 22 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 148d4194ce..08584d1a85 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -2392,7 +2392,13 @@ func (p *Payload) PayloadBytes() []byte { if p.B != nil { return p.B } - return []byte(p.S) + // return []byte(p.S) + buffer := new(bytes.Buffer) + if err := json.Compact(buffer, []byte(p.S)); err != nil { + return []byte(`{}`) + } + + return buffer.Bytes() } func (p *Payload) Scan(src interface{}) error { diff --git a/processor/processor_geolocation_test.go b/processor/processor_geolocation_test.go index 17b143528b..a11e687d9a 100644 --- a/processor/processor_geolocation_test.go +++ b/processor/processor_geolocation_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "database/sql" - "encoding/json" "fmt" "io" "net/http" @@ -44,8 +43,8 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(boxfordIP). WithContextIP(londonIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.Empty(t, gjson.GetBytes(event, "context.geo").Raw, "no geolocation information should be present when the feature is disabled") + Run(t, func(t *testing.T, event string) { + require.Empty(t, gjson.Get(event, "context.geo").Raw, "no geolocation information should be present when the feature is disabled") }) }) @@ -55,8 +54,8 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(false). WithClientIP(boxfordIP). WithContextIP(londonIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.Empty(t, gjson.GetBytes(event, "context.geo").Raw, "no geolocation information should be present when geolocation is disabled at source") + Run(t, func(t *testing.T, event string) { + require.Empty(t, gjson.Get(event, "context.geo").Raw, "no geolocation information should be present when geolocation is disabled at source") }) }) @@ -66,10 +65,10 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(boxfordIP). WithContextIP(londonIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.NotEmpty(t, gjson.GetBytes(event, "context.geo").Raw, string(event), "geolocation information should be present") - require.Equal(t, londonIP, gjson.GetBytes(event, "context.geo.ip").String(), "contex.ip should take precedence over clientIP") - require.Equal(t, "London", gjson.GetBytes(event, "context.geo.city").String(), "contex.ip should take precedence over clientIP") + Run(t, func(t *testing.T, event string) { + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.Equal(t, londonIP, gjson.Get(event, "context.geo.ip").String(), "contex.ip should take precedence over clientIP") + require.Equal(t, "London", gjson.Get(event, "context.geo.city").String(), "contex.ip should take precedence over clientIP") }) }) @@ -79,10 +78,10 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(boxfordIP). WithContextIP(""). - Run(t, func(t *testing.T, event json.RawMessage) { - require.NotEmpty(t, gjson.GetBytes(event, "context.geo").Raw, string(event), "geolocation information should be present") - require.Equal(t, boxfordIP, gjson.GetBytes(event, "context.geo.ip").String(), "clientIP should be used by the geolocation service") - require.Equal(t, "Boxford", gjson.GetBytes(event, "context.geo.city").String(), "clientIP should be used by the geolocation service") + Run(t, func(t *testing.T, event string) { + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.Equal(t, boxfordIP, gjson.Get(event, "context.geo.ip").String(), "clientIP should be used by the geolocation service") + require.Equal(t, "Boxford", gjson.Get(event, "context.geo.city").String(), "clientIP should be used by the geolocation service") }) }) @@ -92,10 +91,10 @@ func TestProcessorGeolocation(t *testing.T) { WithGeolocationEnabledAtSource(true). WithClientIP(londonIP). WithContextIP(invalidIP). - Run(t, func(t *testing.T, event json.RawMessage) { - require.NotEmpty(t, gjson.GetBytes(event, "context.geo").Raw, string(event), "geolocation information should be present") - require.Equal(t, invalidIP, gjson.GetBytes(event, "context.geo.ip").String(), "geolocation service should use the first non blank context.ip even if invalid") - require.Equal(t, "", gjson.GetBytes(event, "context.geo.city").String(), "geolocation service should use the first non blank context.ip even if invalid") + Run(t, func(t *testing.T, event string) { + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.Equal(t, invalidIP, gjson.Get(event, "context.geo.ip").String(), "geolocation service should use the first non blank context.ip even if invalid") + require.Equal(t, "", gjson.Get(event, "context.geo.city").String(), "geolocation service should use the first non blank context.ip even if invalid") }) }) } @@ -127,7 +126,7 @@ func (s *geolocationScenario) WithClientIP(ip string) *geolocationScenario { return s } -func (s *geolocationScenario) Run(t *testing.T, verification func(t *testing.T, event json.RawMessage)) { +func (s *geolocationScenario) Run(t *testing.T, verification func(t *testing.T, event string)) { config.Reset() defer config.Reset() writeKey := "writekey-1" @@ -141,8 +140,8 @@ func (s *geolocationScenario) Run(t *testing.T, verification func(t *testing.T, s.requireJobsCount(t, db, "gw", "succeeded", 1) s.requireJobsCount(t, db, "rt", "aborted", 1) - var payload json.RawMessage - require.NoError(t, db.QueryRow("SELECT event_payload FROM unionjobsdb('rt',1)").Scan(&payload)) + var payload string + require.NoError(t, db.QueryRow("SELECT event_payload FROM rt_jobs_1").Scan(&payload)) verification(t, payload) } @@ -192,6 +191,7 @@ func (s *geolocationScenario) startAll(t *testing.T, writeKey string) (gatewayUr } func (s *geolocationScenario) runRudderServer(ctx context.Context, port int, postgresContainer *postgres.Resource, cbURL, transformerURL, tmpDir string) (err error) { + config.Set("enableStats", false) config.Set("CONFIG_BACKEND_URL", cbURL) config.Set("WORKSPACE_TOKEN", "token") config.Set("DB.host", postgresContainer.Host) diff --git a/router/batchrouter/batchrouter_isolation_test.go b/router/batchrouter/batchrouter_isolation_test.go index d147255f52..13209b92af 100644 --- a/router/batchrouter/batchrouter_isolation_test.go +++ b/router/batchrouter/batchrouter_isolation_test.go @@ -242,6 +242,7 @@ func BatchrouterIsolationScenario(t testing.TB, spec *BrtIsolationScenarioSpec) config.Set("DB.name", postgresContainer.Database) config.Set("DB.password", postgresContainer.Password) config.Set("DB.host", postgresContainer.Host) + config.Set("enableStats", false) config.Set("Warehouse.mode", "off") config.Set("DestinationDebugger.disableEventDeliveryStatusUploads", true) @@ -358,7 +359,14 @@ func BatchrouterIsolationScenario(t testing.TB, spec *BrtIsolationScenarioSpec) } var minExecTime, maxExecTime time.Time - require.NoError(t, postgresContainer.DB.QueryRow("SELECT min(exec_time), max(exec_time) FROM unionjobsdbmetadata('batch_rt',20)").Scan(&minExecTime, &maxExecTime), "it should be able to query the min and max execution times") + require.NoError( + t, + postgresContainer.DB.QueryRowContext( + ctx, + "SELECT min(exec_time), max(exec_time) FROM unionjobsdbmetadata('batch_rt',20)", + ).Scan(&minExecTime, &maxExecTime), + "it should be able to query the min and max execution times", + ) overallDuration = maxExecTime.Sub(minExecTime) cancel() From ef0001a98f31d105835c4b8ebef5e886c7338dd9 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Wed, 18 Dec 2024 12:57:58 +0530 Subject: [PATCH 8/9] fixup! chore: address tests for text payload column --- processor/processor_geolocation_test.go | 6 +++--- processor/processor_isolation_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/processor/processor_geolocation_test.go b/processor/processor_geolocation_test.go index a11e687d9a..b4b0e37cab 100644 --- a/processor/processor_geolocation_test.go +++ b/processor/processor_geolocation_test.go @@ -66,7 +66,7 @@ func TestProcessorGeolocation(t *testing.T) { WithClientIP(boxfordIP). WithContextIP(londonIP). Run(t, func(t *testing.T, event string) { - require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, event, "geolocation information should be present") require.Equal(t, londonIP, gjson.Get(event, "context.geo.ip").String(), "contex.ip should take precedence over clientIP") require.Equal(t, "London", gjson.Get(event, "context.geo.city").String(), "contex.ip should take precedence over clientIP") }) @@ -79,7 +79,7 @@ func TestProcessorGeolocation(t *testing.T) { WithClientIP(boxfordIP). WithContextIP(""). Run(t, func(t *testing.T, event string) { - require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, event, "geolocation information should be present") require.Equal(t, boxfordIP, gjson.Get(event, "context.geo.ip").String(), "clientIP should be used by the geolocation service") require.Equal(t, "Boxford", gjson.Get(event, "context.geo.city").String(), "clientIP should be used by the geolocation service") }) @@ -92,7 +92,7 @@ func TestProcessorGeolocation(t *testing.T) { WithClientIP(londonIP). WithContextIP(invalidIP). Run(t, func(t *testing.T, event string) { - require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, string(event), "geolocation information should be present") + require.NotEmpty(t, gjson.Get(event, "context.geo").Raw, event, "geolocation information should be present") require.Equal(t, invalidIP, gjson.Get(event, "context.geo.ip").String(), "geolocation service should use the first non blank context.ip even if invalid") require.Equal(t, "", gjson.Get(event, "context.geo.city").String(), "geolocation service should use the first non blank context.ip even if invalid") }) diff --git a/processor/processor_isolation_test.go b/processor/processor_isolation_test.go index 70d4703cf8..891cccfe69 100644 --- a/processor/processor_isolation_test.go +++ b/processor/processor_isolation_test.go @@ -31,6 +31,7 @@ import ( trand "github.com/rudderlabs/rudder-go-kit/testhelper/rand" "github.com/rudderlabs/rudder-server/processor/isolation" "github.com/rudderlabs/rudder-server/runner" + "github.com/rudderlabs/rudder-server/services/rmetrics" "github.com/rudderlabs/rudder-server/testhelper/health" "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/types/deployment" @@ -227,6 +228,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa config.Set("JobsDB.migrateDSLoopSleepDuration", "60m") config.Set("Router.toAbortDestinationIDs", destinationID) config.Set("archival.Enabled", false) + config.Set("enableStats", false) config.Set("Processor.isolationMode", string(spec.isolationMode)) @@ -330,9 +332,7 @@ func ProcIsolationScenario(t testing.TB, spec *ProcIsolationScenarioSpec) (overa overallDuration = maxJobTime.Sub(gwMinJobTime) require.Eventually(t, func() bool { - var pendingJobsCount int - require.NoError(t, postgresContainer.DB.QueryRow("SELECT count(*) FROM unionjobsdb('rt',5) WHERE COALESCE(job_state, 'pending') != 'aborted'").Scan(&pendingJobsCount)) - return pendingJobsCount == 0 + return rmetrics.PendingEvents("rt", rmetrics.All, rmetrics.All).IntValue() == 0 }, 100*time.Second, 1*time.Second, "all rt jobs should be aborted") cancel() <-svcDone From 85249336739d3cd0ded179507b777850a3fe9131 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Thu, 19 Dec 2024 15:10:22 +0530 Subject: [PATCH 9/9] fixup! chore: address tests for text payload column --- jobsdb/migration.go | 3 +- .../000014_event_payload_column_type.up.sql | 68 +++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 sql/migrations/node/000014_event_payload_column_type.up.sql diff --git a/jobsdb/migration.go b/jobsdb/migration.go index ea3c8bf825..ba8e0204a1 100644 --- a/jobsdb/migration.go +++ b/jobsdb/migration.go @@ -503,7 +503,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat ).RecordDuration()() columnTypeMap := map[string]string{srcDS.JobTable: "jsonb", destDS.JobTable: "jsonb"} - // find colummn types first - to differentiate between `bytea` and `jsonb` + // find colummn types first - to differentiate between `text`, `bytea` and `jsonb` rows, err := tx.QueryContext( ctx, fmt.Sprintf( @@ -531,7 +531,6 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat return 0, fmt.Errorf("rows.Err() on column types: %w", err) } payloadLiteral := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable]) - jd.logger.Info(payloadLiteral) compactDSQuery := fmt.Sprintf( `with last_status as (select * from "v_last_%[1]s"), diff --git a/sql/migrations/node/000014_event_payload_column_type.up.sql b/sql/migrations/node/000014_event_payload_column_type.up.sql new file mode 100644 index 0000000000..d43a6fb139 --- /dev/null +++ b/sql/migrations/node/000014_event_payload_column_type.up.sql @@ -0,0 +1,68 @@ +DROP FUNCTION IF EXISTS payloadColumnType(TEXT); +DROP FUNCTION IF EXISTS payloadColumnConvertToText(TEXT); +DROP FUNCTION IF EXISTS unionjobsdb(text,integer); + +-- returns payload column tyoe of the jobs table +CREATE OR REPLACE FUNCTION payloadColumnType(tableName TEXT) +RETURNS TEXT +AS $$ +BEGIN +RETURN(SELECT data_type FROM information_schema.columns WHERE table_name = tableName and column_name='event_payload' LIMIT 1); +END; +$$ LANGUAGE plpgsql; + +-- return payload column type conversion literal +CREATE OR REPLACE FUNCTION payloadColumnConvertToText(columnType TEXT) +RETURNS TEXT +AS $$ +DECLARE + ret TEXT; +BEGIN +CASE + WHEN columnType = 'text' THEN + ret = 'event_payload'; + WHEN columnType = 'jsonb' THEN + ret = 'event_payload::TEXT'; + WHEN columnType = 'bytea' THEN + ret = 'convert_from(event_payload, "UTF8")'; + ELSE + ret = 'invalid'; +END CASE; +RETURN ret; +END +$$ LANGUAGE plpgsql; + + +-- change function return table's payload type +CREATE OR REPLACE FUNCTION unionjobsdb(prefix text, num int) +RETURNS table ( + t_name text, + job_id bigint, + workspace_id text, + uuid uuid, + user_id text, + parameters jsonb, + custom_val character varying(64), + event_payload text, + event_count integer, + created_at timestamp with time zone, + expire_at timestamp with time zone, + status_id bigint, + job_state character varying(64), + attempt smallint, + exec_time timestamp with time zone, + error_code character varying(32), + error_response jsonb +) +AS $$ +DECLARE + qry text; +BEGIN +SELECT string_agg( + format('SELECT %1$L, j.job_id, j.workspace_id, j.uuid, j.user_id, j.parameters, j.custom_val, (j.event_payload::TEXT), j.event_count, j.created_at, j.expire_at, latest_status.id, latest_status.job_state, latest_status.attempt, latest_status.exec_time, latest_status.error_code, latest_status.error_response FROM %1$I j LEFT JOIN %2$I latest_status on latest_status.job_id = j.job_id', alltables.table_name, 'v_last_' || prefix || '_job_status_'|| substring(alltables.table_name, char_length(prefix)+7,30)), + ' UNION ') INTO qry + FROM (select table_name from information_schema.tables +WHERE table_name LIKE prefix || '_jobs_%' order by table_name asc LIMIT num) alltables; +RETURN QUERY EXECUTE qry; +END; +$$ LANGUAGE plpgsql; \ No newline at end of file