Skip to content

Commit

Permalink
chore: event_payload column can be JSONB, BYTEA or TEXT
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Dec 13, 2024
1 parent f22e8f4 commit 58a72f5
Show file tree
Hide file tree
Showing 3 changed files with 453 additions and 3 deletions.
65 changes: 63 additions & 2 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ const (
pgErrorCodeTableReadonly = "RS001"
)

type payloadColumnType int

const (
JSONB payloadColumnType = iota
BYTEA
TEXT
// JSON // Explore afterwards?
)

// QueryConditions holds jobsdb query conditions
type QueryConditions struct {
// if IgnoreCustomValFiltersInQuery is true, CustomValFilters is not going to be used
Expand Down Expand Up @@ -499,6 +508,7 @@ type Handle struct {

config *config.Config
conf struct {
payloadColumnType payloadColumnType
maxTableSize config.ValueLoader[int64]
cacheExpiration config.ValueLoader[time.Duration]
addNewDSLoopSleepDuration config.ValueLoader[time.Duration]
Expand Down Expand Up @@ -702,6 +712,18 @@ func WithStats(s stats.Stats) OptsFunc {
}
}

func WithBinaryPayload() OptsFunc {
return func(jd *Handle) {
jd.conf.payloadColumnType = 1
}
}

func WithTextPayload() OptsFunc {
return func(jd *Handle) {
jd.conf.payloadColumnType = 2
}
}

func WithSkipMaintenanceErr(ignore bool) OptsFunc {
return func(jd *Handle) {
jd.conf.skipMaintenanceError = ignore
Expand Down Expand Up @@ -770,6 +792,8 @@ func (jd *Handle) init() {
jd.config = config.Default
}

jd.conf.payloadColumnType = payloadColumnType(jd.config.GetIntVar(0, 1, jd.tablePrefix+"."+string(jd.ownerType)+".payloadColumnType", jd.tablePrefix+".payloadColumnType"))

if jd.stats == nil {
jd.stats = stats.Default
}
Expand Down Expand Up @@ -1429,14 +1453,23 @@ func (jd *Handle) createDSInTx(tx *Tx, newDS dataSetT) error {
}

func (jd *Handle) createDSTablesInTx(ctx context.Context, tx *Tx, newDS dataSetT) error {
var payloadColumnType string
switch jd.conf.payloadColumnType {
case JSONB:
payloadColumnType = "JSONB"
case BYTEA:
payloadColumnType = "BYTEA"
case TEXT:
payloadColumnType = "TEXT"
}
if _, err := tx.ExecContext(ctx, fmt.Sprintf(`CREATE TABLE %q (
job_id BIGSERIAL PRIMARY KEY,
workspace_id TEXT NOT NULL DEFAULT '',
uuid UUID NOT NULL,
user_id TEXT NOT NULL,
parameters JSONB NOT NULL,
custom_val VARCHAR(64) NOT NULL,
event_payload JSONB NOT NULL,
event_payload `+payloadColumnType+` NOT NULL,
event_count INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
expire_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());`, newDS.JobTable)); err != nil {
Expand Down Expand Up @@ -2215,6 +2248,7 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
resultsetStates := map[string]struct{}{}
for rows.Next() {
var job JobT
var payload Payload
var jsState sql.NullString
var jsAttemptNum sql.NullInt64
var jsExecTime sql.NullTime
Expand All @@ -2223,13 +2257,14 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
var jsErrorResponse []byte
var jsParameters []byte
err := rows.Scan(&job.JobID, &job.UUID, &job.UserID, &job.Parameters, &job.CustomVal,
&job.EventPayload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize,
&payload, &job.EventCount, &job.CreatedAt, &job.ExpireAt, &job.WorkspaceId, &job.PayloadSize, &runningEventCount, &runningPayloadSize,
&jsState, &jsAttemptNum,
&jsExecTime, &jsRetryTime,
&jsErrorCode, &jsErrorResponse, &jsParameters)
if err != nil {
return JobsResult{}, false, err
}
job.EventPayload = payload.PayloadBytes()
if jsState.Valid {
resultsetStates[jsState.String] = struct{}{}
job.LastJobStatus.JobState = jsState.String
Expand Down Expand Up @@ -2289,6 +2324,32 @@ func (jd *Handle) getJobsDS(ctx context.Context, ds dataSetT, lastDS bool, param
}, true, nil
}

type Payload struct {
S string
B []byte
}

func (p *Payload) PayloadBytes() []byte {
if p.B != nil {
return p.B
}
return []byte(p.S)
}

func (p *Payload) Scan(src interface{}) error {
b, ok := src.([]byte)
if !ok {
s, ok := src.(string)
if !ok {
return errors.New("neither string nor bytes")
}
p.S = s
return nil
}
p.B = b
return nil
}

func (jd *Handle) updateJobStatusDSInTx(ctx context.Context, tx *Tx, ds dataSetT, statusList []*JobStatusT, tags statTags) (updatedStates map[string]map[string]map[ParameterFilterT]struct{}, err error) {
if len(statusList) == 0 {
return
Expand Down
71 changes: 70 additions & 1 deletion jobsdb/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,18 +459,86 @@ func (jd *Handle) getMigrationList(dsList []dataSetT) (migrateFrom []dsWithPendi
return
}

func getColumnConversion(srcType, destType string) string {
if srcType == destType {
return "j.event_payload"
}
switch srcType {
case "jsonb":
switch destType {
case "text":
return "j.event_payload::TEXT"
case "bytea":
return "convert_to(j.event_payload::TEXT, 'UTF8')"
default:
panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType))
}
case "bytea":
switch destType {
case "text":
return "convert_from(j.event_payload, 'UTF8')"
case "jsonb":
return "convert_from(j.event_payload, 'UTF8')::jsonb"
default:
panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType))
}
case "text":
switch destType {
case "jsonb":
return "j.event_payload::jsonb"
case "bytea":
return "convert_to(j.event_payload, 'UTF8')"
default:
panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType))
}
default:
panic(fmt.Sprintf("unsupported payload column types: src-%s, dest-%s", srcType, destType))
}
}

func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dataSetT) (int, error) {
defer jd.getTimerStat(
"migration_jobs",
&statTags{CustomValFilters: []string{jd.tablePrefix}},
).RecordDuration()()

columnTypeMap := map[string]string{srcDS.JobTable: "jsonb", destDS.JobTable: "jsonb"}
// find colummn types first - to differentiate between `bytea` and `jsonb`
rows, err := tx.QueryContext(
ctx,
fmt.Sprintf(
`select table_name, data_type
from information_schema.columns
where table_name IN ('%[1]s', '%[2]s') and column_name='event_payload';`,
srcDS.JobTable, destDS.JobTable,
),
)
if err != nil {
return 0, fmt.Errorf("failed to get column types: %w", err)
}
defer rows.Close()
var jobsTable, columnType string
for rows.Next() {
if err = rows.Scan(&jobsTable, &columnType); err != nil {
return 0, fmt.Errorf("failed to scan column types: %w", err)
}
if columnType != "bytea" && columnType != "jsonb" && columnType != "text" {
return 0, fmt.Errorf("unsupported column type %s", columnType)
}
columnTypeMap[jobsTable] = columnType
}
if err = rows.Err(); err != nil {
return 0, fmt.Errorf("rows.Err() on column types: %w", err)
}
payloadLiteral := getColumnConversion(columnTypeMap[srcDS.JobTable], columnTypeMap[destDS.JobTable])
jd.logger.Info(payloadLiteral)

compactDSQuery := fmt.Sprintf(
`with last_status as (select * from "v_last_%[1]s"),
inserted_jobs as
(
insert into %[3]q (job_id, workspace_id, uuid, user_id, custom_val, parameters, event_payload, event_count, created_at, expire_at)
(select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, j.event_payload, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id
(select j.job_id, j.workspace_id, j.uuid, j.user_id, j.custom_val, j.parameters, %[6]s, j.event_count, j.created_at, j.expire_at from %[2]q j left join last_status js on js.job_id = j.job_id
where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id
),
insertedStatuses as
Expand All @@ -484,6 +552,7 @@ func (jd *Handle) migrateJobsInTx(ctx context.Context, tx *Tx, srcDS, destDS dat
destDS.JobTable,
destDS.JobStatusTable,
strings.Join(validNonTerminalStates, ","),
payloadLiteral,
)

var numJobsMigrated int64
Expand Down
Loading

0 comments on commit 58a72f5

Please sign in to comment.