Skip to content

Commit

Permalink
ignore snapshot for customer-orange
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Dec 7, 2024
1 parent 87a529f commit 09ea542
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 21 deletions.
23 changes: 12 additions & 11 deletions flow/connectors/postgres/sink_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,18 @@ func (p PgCopyWriter) ExecuteQueryWithTx(
) (int, error) {
defer shared.RollbackTx(tx, qe.logger)

if qe.snapshot != "" {
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
p.Close(err)
return 0, err
}
}

// if qe.snapshot != "" {
// _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
// if err != nil {
// qe.logger.Error("[pg_query_executor] failed to set snapshot",
// slog.Any("error", err), slog.String("query", query))
// err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
// p.Close(err)
// return 0, err
// }
// }
// log that we are ignoring snapshot for customer-orange
qe.logger.Info("[pg_query_executor] ignoring snapshot for customer-orange")
norows, err := tx.Query(ctx, query+" limit 0", args...)
if err != nil {
return 0, err
Expand Down
22 changes: 12 additions & 10 deletions flow/connectors/postgres/sink_q.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ func (stream RecordStreamSink) ExecuteQueryWithTx(
) (int, error) {
defer shared.RollbackTx(tx, qe.logger)

if qe.snapshot != "" {
_, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
if err != nil {
qe.logger.Error("[pg_query_executor] failed to set snapshot",
slog.Any("error", err), slog.String("query", query))
err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
stream.Close(err)
return 0, err
}
}
// if qe.snapshot != "" {
// _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+QuoteLiteral(qe.snapshot))
// if err != nil {
// qe.logger.Error("[pg_query_executor] failed to set snapshot",
// slog.Any("error", err), slog.String("query", query))
// err := fmt.Errorf("[pg_query_executor] failed to set snapshot: %w", err)
// stream.Close(err)
// return 0, err
// }
// }
// log that we are ignoring snapshot for customer-orange
qe.logger.Info("[pg_query_executor] ignoring snapshot for customer-orange")

randomUint, err := shared.RandomUInt64()
if err != nil {
Expand Down

0 comments on commit 09ea542

Please sign in to comment.