diff --git a/flow/connectors/postgres/sink_pg.go b/flow/connectors/postgres/sink_pg.go index 8d9d06e747..df2315d4c8 100644 --- a/flow/connectors/postgres/sink_pg.go +++ b/flow/connectors/postgres/sink_pg.go @@ -53,17 +53,18 @@ func (p PgCopyWriter) ExecuteQueryWithTx( ) (int, error) { defer shared.RollbackTx(tx, qe.logger) - if qe.snapshot != "" { - _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) - if err != nil { - qe.logger.Error("[pg_query_executor] failed to set snapshot", - slog.Any("error", err), slog.String("query", query)) - err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) - p.Close(err) - return 0, err - } - } - + // if qe.snapshot != "" { + // _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) + // if err != nil { + // qe.logger.Error("[pg_query_executor] failed to set snapshot", + // slog.Any("error", err), slog.String("query", query)) + // err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) + // p.Close(err) + // return 0, err + // } + // } + // log that we are ignoring snapshot for customer-orange + qe.logger.Info("[pg_query_executor] ignoring snapshot for customer-orange") norows, err := tx.Query(ctx, query+" limit 0", args...) if err != nil { return 0, err diff --git a/flow/connectors/postgres/sink_q.go b/flow/connectors/postgres/sink_q.go index 89dab6a94f..7b7102b952 100644 --- a/flow/connectors/postgres/sink_q.go +++ b/flow/connectors/postgres/sink_q.go @@ -24,16 +24,18 @@ func (stream RecordStreamSink) ExecuteQueryWithTx( ) (int, error) { defer shared.RollbackTx(tx, qe.logger) - if qe.snapshot != "" { - _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) - if err != nil { - qe.logger.Error("[pg_query_executor] failed to set snapshot", - slog.Any("error", err), slog.String("query", query)) - err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) - stream.Close(err) - return 0, err - } - } + // if qe.snapshot != "" { + // _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot)) + // if err != nil { + // qe.logger.Error("[pg_query_executor] failed to set snapshot", + // slog.Any("error", err), slog.String("query", query)) + // err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err) + // stream.Close(err) + // return 0, err + // } + // } + // log that we are ignoring snapshot for customer-orange + qe.logger.Info("[pg_query_executor] ignoring snapshot for customer-orange") randomUint, err := shared.RandomUInt64() if err != nil {