Skip to content
This repository was archived by the owner on Aug 30, 2025. It is now read-only.

Commit fb6e598

Browse files
Fixes Postgres insert identity with OnConflict bug + fixes flaky postgres integration test (#3408)
1 parent 2a99947 commit fb6e598

File tree

3 files changed

+15
-10
lines changed

3 files changed

+15
-10
lines changed

internal/integration-tests/worker/workflow/workflow-integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func Test_Workflow(t *testing.T) {
112112
t.Parallel()
113113
test_postgres_schema_reconciliation(t, ctx, postgres, neosyncApi, dbManagers, accountId, sourceConn, destConn, true)
114114
})
115-
t.Run("no_truncate", func(t *testing.T) {
115+
t.Run("retain_data", func(t *testing.T) {
116116
t.Parallel()
117117
test_postgres_schema_reconciliation(t, ctx, postgres, neosyncApi, dbManagers, accountId, sourceConn, destConn, false)
118118
})

internal/testutil/testdata/postgres/schema-init/alter-job-mappings.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ func GetAlteredSyncJobMappings(schema string) []*mgmtv1alpha1.JobMapping {
2424
Column: "region_number",
2525
Transformer: &mgmtv1alpha1.JobMappingTransformer{
2626
Config: &mgmtv1alpha1.TransformerConfig{
27-
Config: &mgmtv1alpha1.TransformerConfig_GenerateDefaultConfig{
28-
GenerateDefaultConfig: &mgmtv1alpha1.GenerateDefault{},
27+
Config: &mgmtv1alpha1.TransformerConfig_PassthroughConfig{
28+
PassthroughConfig: &mgmtv1alpha1.Passthrough{},
2929
},
3030
},
3131
},

worker/pkg/query-builder/insert-query-builder.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,19 @@ type PostgresDriver struct {
134134
}
135135

136136
func (d *PostgresDriver) BuildInsertQuery(rows []map[string]any) (query string, queryargs []any, err error) {
137-
goquRows := toGoquRecords(rows)
137+
insertQuery, args, err := d.buildInsertQuery(rows)
138+
if err != nil {
139+
return "", nil, fmt.Errorf("failed to build postgres insert query: %w", err)
140+
}
141+
142+
if d.options.shouldOverrideColumnDefault {
143+
insertQuery = sqlmanager_postgres.BuildPgInsertIdentityAlwaysSql(insertQuery)
144+
}
145+
return insertQuery, args, nil
146+
}
138147

148+
func (d *PostgresDriver) buildInsertQuery(rows []map[string]any) (sql string, args []any, err error) {
149+
goquRows := toGoquRecords(rows)
139150
if d.options.conflictConfig.onConflictDoUpdate != nil {
140151
if len(rows) == 0 {
141152
return "", []any{}, errors.New("no rows to insert")
@@ -147,9 +158,6 @@ func (d *PostgresDriver) BuildInsertQuery(rows []map[string]any) (query string,
147158
if err != nil {
148159
return "", nil, fmt.Errorf("failed to build insert query on conflict do nothing fallback: %w", err)
149160
}
150-
if d.options.shouldOverrideColumnDefault {
151-
insertQuery = sqlmanager_postgres.BuildPgInsertIdentityAlwaysSql(insertQuery)
152-
}
153161
return insertQuery, args, nil
154162
}
155163

@@ -168,9 +176,6 @@ func (d *PostgresDriver) BuildInsertQuery(rows []map[string]any) (query string,
168176
if err != nil {
169177
return "", nil, fmt.Errorf("failed to build insert query: %w", err)
170178
}
171-
if d.options.shouldOverrideColumnDefault {
172-
insertQuery = sqlmanager_postgres.BuildPgInsertIdentityAlwaysSql(insertQuery)
173-
}
174179
return insertQuery, args, nil
175180
}
176181

0 commit comments

Comments
 (0)