From d984ff52d858ba916d8ca63692ba60c422773285 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 15 Feb 2024 17:48:02 +0530 Subject: [PATCH 1/4] explicitly call EnsurePullability while adding tables --- flow/connectors/postgres/client.go | 103 +++++++++++++++++++++++++++++ flow/workflows/cdc_flow.go | 26 +++++++- 2 files changed, 128 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index f707a04347..70ca0b7391 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "regexp" + "strconv" "strings" "github.com/jackc/pglogrepl" @@ -625,3 +626,105 @@ func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, e func (c *PostgresConnector) getDefaultPublicationName(jobName string) string { return "peerflow_pub_" + jobName } + +func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []string, pubName string) error { + if c.conn == nil { + return errors.New("check tables: conn is nil") + } + + // Check that we can select from all tables + tableArr := make([]string, 0, len(tableNames)) + for _, table := range tableNames { + var row pgx.Row + schemaName, tableName, found := strings.Cut(table, ".") + if !found { + return fmt.Errorf("invalid source table identifier: %s", table) + } + + tableArr = append(tableArr, fmt.Sprintf(`(%s::text, %s::text)`, QuoteLiteral(schemaName), QuoteLiteral(tableName))) + err := c.conn.QueryRow(ctx, + fmt.Sprintf("SELECT * FROM %s.%s LIMIT 0;", QuoteIdentifier(schemaName), QuoteIdentifier(tableName))).Scan(&row) + if err != nil && err != pgx.ErrNoRows { + return err + } + } + + tableStr := strings.Join(tableArr, ",") + if pubName != "" { + // Check if publication exists + err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) + if err != nil { + if err == pgx.ErrNoRows { + return fmt.Errorf("publication does not exist: %s", pubName) + } + return fmt.Errorf("error while checking for publication existence: %w", err) + } + + // Check if tables belong to publication + var pubTableCount int + err = c.conn.QueryRow(ctx, fmt.Sprintf(` + with source_table_components (sname, tname) as (values %s) + select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables + INNER JOIN source_table_components stc + ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount) + if err != nil { + return err + } + + if pubTableCount != len(tableNames) { + return errors.New("not all tables belong to publication") + } + } + + return nil +} + +func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error { + if c.conn == nil { + return errors.New("check replication permissions: conn is nil") + } + + var replicationRes bool + err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = $1", username).Scan(&replicationRes) + if err != nil { + return err + } + + if !replicationRes { + // RDS case: check pg_settings for rds.logical_replication + var setting string + err := c.conn.QueryRow(ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication'").Scan(&setting) + if err != nil || setting != "on" { + return errors.New("postgres user does not have replication role") + } + } + + // check wal_level + var walLevel string + err = c.conn.QueryRow(ctx, "SHOW wal_level").Scan(&walLevel) + if err != nil { + return err + } + + if walLevel != "logical" { + return fmt.Errorf("wal_level is not logical") + } + + // max_wal_senders must be at least 2 + var maxWalSendersRes string + err = c.conn.QueryRow(ctx, "SHOW max_wal_senders").Scan(&maxWalSendersRes) + if err != nil { + return err + } + + maxWalSenders, err := strconv.Atoi(maxWalSendersRes) + if err != nil { + return err + } + + if maxWalSenders < 2 { + return fmt.Errorf("max_wal_senders must be at least 2") + } + + return nil +} diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index afc17927ce..cb35b335c1 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -164,6 +164,29 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } + ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + ensurePullabilityFuture := workflow.ExecuteActivity( + ensurePullabilityCtx, + flowable.EnsurePullability, + &protos.EnsurePullabilityBatchInput{ + PeerConnectionConfig: cfg.Source, + FlowJobName: cfg.FlowJobName, + SourceTableIdentifiers: func() []string { + additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables)) + for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables { + additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier) + } + return additionalSourceTables + }(), + CheckConstraints: true, + }) + if err := ensurePullabilityFuture.Get(ctx, nil); err != nil { + w.logger.Error("failed to ensure pullability for additional tables: ", err) + return err + } + additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesWorkflowCfg.DoInitialSnapshot = true additionalTablesWorkflowCfg.InitialSnapshotOnly = true @@ -217,7 +240,7 @@ func CDCFlowWorkflowWithConfig( state *CDCFlowWorkflowState, ) (*CDCFlowWorkflowResult, error) { if cfg == nil { - return nil, fmt.Errorf("invalid connection configs") + return nil, errors.New("invalid connection configs") } if state == nil { state = NewCDCFlowWorkflowState(cfg.TableMappings) @@ -471,6 +494,7 @@ func CDCFlowWorkflowWithConfig( // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) if state.ActiveSignal == shared.NoopSignal { + state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch) if err != nil { return state, err From ebca80a203fbc3a6c9b25c74ee3c81145fea44c4 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 15 Feb 2024 17:50:08 +0530 Subject: [PATCH 2/4] swapping order of addpub and ensurepullability --- flow/workflows/cdc_flow.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index cb35b335c1..df9eaa4b49 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -152,18 +152,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont continue } - alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: 5 * time.Minute, - }) - alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity( - alterPublicationAddAdditionalTablesCtx, - flowable.AddTablesToPublication, - cfg, flowConfigUpdate.AdditionalTables) - if err := alterPublicationAddAdditionalTablesFuture.Get(ctx, nil); err != nil { - w.logger.Error("failed to alter publication for additional tables: ", err) - return err - } - ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -187,6 +175,18 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } + alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity( + alterPublicationAddAdditionalTablesCtx, + flowable.AddTablesToPublication, + cfg, flowConfigUpdate.AdditionalTables) + if err := alterPublicationAddAdditionalTablesFuture.Get(ctx, nil); err != nil { + w.logger.Error("failed to alter publication for additional tables: ", err) + return err + } + additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesWorkflowCfg.DoInitialSnapshot = true additionalTablesWorkflowCfg.InitialSnapshotOnly = true From 97a23c76c69648fad6ed165a2f8eb4ee414cac5b Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 20 Feb 2024 01:13:03 +0530 Subject: [PATCH 3/4] switched to Slack alerts instead of erroring --- flow/activities/flowable.go | 37 ++++++++++++++++------------ flow/connectors/postgres/postgres.go | 8 ++++-- flow/shared/alerting/alerting.go | 37 ++++++++++++++++++++-------- flow/workflows/cdc_flow.go | 27 +++++++++++--------- protos/flow.proto | 1 + 5 files changed, 71 insertions(+), 39 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4fc6c9d7c4..73dcaf1552 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -94,6 +94,11 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(ctx, config) if err != nil { + if config.OnlyAlertOnConstraintsFail && errors.Is(err, connpostgres.ErrCDCNotSupportedForTable) { + a.Alerter.AlertGeneric(ctx, config.FlowJobName+"-add-tables-failed", + fmt.Sprintf("failed to add tables for mirror %s: %v", config.FlowJobName, err)) + return nil, nil + } a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -190,9 +195,9 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup.Add(1) if !existing { - logger.Info(fmt.Sprintf("created table %s", tableIdentifier)) + logger.Info("created table " + tableIdentifier) } else { - logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier)) + logger.Info("table already exists " + tableIdentifier) } } @@ -231,14 +236,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } defer connectors.CloseConnector(ctx, srcConn) - slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName) + slotNameForMetrics := "peerflow_slot_" + input.FlowConnectionConfigs.FlowJobName if input.FlowConnectionConfigs.ReplicationSlotName != "" { slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName } shutdown := utils.HeartbeatRoutine(ctx, func() string { jobName := input.FlowConnectionConfigs.FlowJobName - return fmt.Sprintf("transferring records for job - %s", jobName) + return "transferring records for job - " + jobName }) defer shutdown() @@ -414,7 +419,7 @@ func (a *FlowableActivity) StartNormalize( defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) + return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName }) defer shutdown() @@ -482,7 +487,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, defer connectors.CloseConnector(ctx, srcConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) + return "getting partitions for job - " + config.FlowJobName }) defer shutdown() @@ -573,7 +578,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(ctx, dstConn) - logger.Info(fmt.Sprintf("replicating partition %s", partition.PartitionId)) + logger.Info("replicating partition " + partition.PartitionId) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) @@ -633,7 +638,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { - logger.Info(fmt.Sprintf("no records to push for partition %s", partition.PartitionId)) + logger.Info("no records to push for partition " + partition.PartitionId) pullCancel() } else { wg.Wait() @@ -666,7 +671,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName) + return "consolidating partitions for job - " + config.FlowJobName }) defer shutdown() @@ -868,7 +873,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("renaming tables for job - %s", config.FlowJobName) + return "renaming tables for job - " + config.FlowJobName }) defer shutdown() @@ -876,18 +881,18 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector) if !ok { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return nil, fmt.Errorf("failed to cast connector to snowflake connector") + return nil, errors.New("failed to cast connector to snowflake connector") } return sfConn.RenameTables(ctx, config) } else if config.Peer.Type == protos.DBType_BIGQUERY { bqConn, ok := dstConn.(*connbigquery.BigQueryConnector) if !ok { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return nil, fmt.Errorf("failed to cast connector to bigquery connector") + return nil, errors.New("failed to cast connector to bigquery connector") } return bqConn.RenameTables(ctx, config) } - return nil, fmt.Errorf("rename tables is only supported on snowflake and bigquery") + return nil, errors.New("rename tables is only supported on snowflake and bigquery") } func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) ( @@ -903,18 +908,18 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr if req.Peer.Type == protos.DBType_SNOWFLAKE { sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector) if !ok { - return nil, fmt.Errorf("failed to cast connector to snowflake connector") + return nil, errors.New("failed to cast connector to snowflake connector") } return sfConn.CreateTablesFromExisting(ctx, req) } else if req.Peer.Type == protos.DBType_BIGQUERY { bqConn, ok := dstConn.(*connbigquery.BigQueryConnector) if !ok { - return nil, fmt.Errorf("failed to cast connector to bigquery connector") + return nil, errors.New("failed to cast connector to bigquery connector") } return bqConn.CreateTablesFromExisting(ctx, req) } a.Alerter.LogFlowError(ctx, req.FlowJobName, err) - return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery") + return nil, errors.New("create tables from existing is only supported on snowflake and bigquery") } // ReplicateXminPartition replicates a XminPartition from the source to the destination. diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1079431c17..64d2f73863 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -37,6 +37,8 @@ type PostgresConnector struct { logger log.Logger } +var ErrCDCNotSupportedForTable = errors.New("table has no primary keys and does not have REPLICA IDENTITY FULL") + func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { connectionString := utils.GetPGConnectionString(pgConfig) @@ -515,7 +517,9 @@ type SlotCheckResult struct { } // CreateRawTable creates a raw table, implementing the Connector interface. -func (c *PostgresConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { +func (c *PostgresConnector) CreateRawTable(ctx context.Context, + req *protos.CreateRawTableInput, +) (*protos.CreateRawTableOutput, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) err := c.createMetadataSchema(ctx) @@ -788,7 +792,7 @@ func (c *PostgresConnector) EnsurePullability( // we only allow no primary key if the table has REPLICA IDENTITY FULL // this is ok for replica identity index as we populate the primary key columns if len(pKeyCols) == 0 && replicaIdentity != ReplicaIdentityFull { - return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) + return nil, fmt.Errorf("%w: %s", ErrCDCNotSupportedForTable, schemaTable.String()) } utils.RecordHeartbeat(ctx, "ensured pullability table "+tableName) diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 4e95f1d5d1..5eb43d8a97 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -84,20 +84,20 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo alertKey := peerName + "-slot-lag-threshold-exceeded" alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+ - `currently at %.2fMB! - cc: `, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) + "currently at %.2fMB!", + deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) && a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { for _, slackAlertSender := range slackAlertSenders { if slackAlertSender.slotLagMBAlertThreshold > 0 { if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold)) } } else { if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) } } @@ -130,20 +130,20 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, alertKey := peerName + "-max-open-connections-threshold-exceeded" alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+ - ` has exceeded threshold size of %%d connections, currently at %d connections! - cc: `, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) + " has exceeded threshold size of %%d connections, currently at %d connections!", + deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) && a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { for _, slackAlertSender := range slackAlertSenders { if slackAlertSender.openConnectionsAlertThreshold > 0 { if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold)) } } else { if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) } } @@ -151,9 +151,26 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, } } -func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) { +func (a *Alerter) AlertGeneric(ctx context.Context, alertKey string, alertMessage string) { + if a.checkAndAddAlertToCatalog(ctx, alertKey, alertMessage) { + slackAlertSenders, err := a.registerSendersFromPool(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) + return + } + + for _, slackAlertSender := range slackAlertSenders { + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, alertMessage) + } + } +} + +func (a *Alerter) alertToSpecificSlackSender(ctx context.Context, slackAlertSender *slackAlertSender, + alertKey string, alertMessage string, +) { err := slackAlertSender.sendAlert(ctx, - ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage) + ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage+` +cc: `) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index df9eaa4b49..a1355be8ce 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -152,6 +152,11 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont continue } + additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables)) + for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables { + additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier) + } + ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -159,21 +164,21 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont ensurePullabilityCtx, flowable.EnsurePullability, &protos.EnsurePullabilityBatchInput{ - PeerConnectionConfig: cfg.Source, - FlowJobName: cfg.FlowJobName, - SourceTableIdentifiers: func() []string { - additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables)) - for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables { - additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier) - } - return additionalSourceTables - }(), - CheckConstraints: true, + PeerConnectionConfig: cfg.Source, + FlowJobName: cfg.FlowJobName, + SourceTableIdentifiers: additionalSourceTables, + CheckConstraints: true, + OnlyAlertOnConstraintsFail: true, }) - if err := ensurePullabilityFuture.Get(ctx, nil); err != nil { + var ensurePullabilityOutput *protos.EnsurePullabilityBatchOutput + if err := ensurePullabilityFuture.Get(ctx, &ensurePullabilityOutput); err != nil { w.logger.Error("failed to ensure pullability for additional tables: ", err) return err } + // if err == nil and output == nil, constraints failed, so ignore batch + if ensurePullabilityOutput == nil { + continue + } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, diff --git a/protos/flow.proto b/protos/flow.proto index 2147ee2fc8..037bb30f72 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -132,6 +132,7 @@ message EnsurePullabilityBatchInput { string flow_job_name = 2; repeated string source_table_identifiers = 3; bool check_constraints = 4; + bool only_alert_on_constraints_fail = 5; } message PostgresTableIdentifier { From 4ee863af851e6241a38fd162063846377ecfca8a Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 20 Feb 2024 01:21:27 +0530 Subject: [PATCH 4/4] rebase oopsie --- flow/connectors/postgres/client.go | 103 ----------------------------- 1 file changed, 103 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index 70ca0b7391..f707a04347 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "regexp" - "strconv" "strings" "github.com/jackc/pglogrepl" @@ -626,105 +625,3 @@ func (c *PostgresConnector) getCurrentLSN(ctx context.Context) (pglogrepl.LSN, e func (c *PostgresConnector) getDefaultPublicationName(jobName string) string { return "peerflow_pub_" + jobName } - -func (c *PostgresConnector) CheckSourceTables(ctx context.Context, tableNames []string, pubName string) error { - if c.conn == nil { - return errors.New("check tables: conn is nil") - } - - // Check that we can select from all tables - tableArr := make([]string, 0, len(tableNames)) - for _, table := range tableNames { - var row pgx.Row - schemaName, tableName, found := strings.Cut(table, ".") - if !found { - return fmt.Errorf("invalid source table identifier: %s", table) - } - - tableArr = append(tableArr, fmt.Sprintf(`(%s::text, %s::text)`, QuoteLiteral(schemaName), QuoteLiteral(tableName))) - err := c.conn.QueryRow(ctx, - fmt.Sprintf("SELECT * FROM %s.%s LIMIT 0;", QuoteIdentifier(schemaName), QuoteIdentifier(tableName))).Scan(&row) - if err != nil && err != pgx.ErrNoRows { - return err - } - } - - tableStr := strings.Join(tableArr, ",") - if pubName != "" { - // Check if publication exists - err := c.conn.QueryRow(ctx, "SELECT pubname FROM pg_publication WHERE pubname=$1", pubName).Scan(nil) - if err != nil { - if err == pgx.ErrNoRows { - return fmt.Errorf("publication does not exist: %s", pubName) - } - return fmt.Errorf("error while checking for publication existence: %w", err) - } - - // Check if tables belong to publication - var pubTableCount int - err = c.conn.QueryRow(ctx, fmt.Sprintf(` - with source_table_components (sname, tname) as (values %s) - select COUNT(DISTINCT(schemaname,tablename)) from pg_publication_tables - INNER JOIN source_table_components stc - ON schemaname=stc.sname and tablename=stc.tname where pubname=$1;`, tableStr), pubName).Scan(&pubTableCount) - if err != nil { - return err - } - - if pubTableCount != len(tableNames) { - return errors.New("not all tables belong to publication") - } - } - - return nil -} - -func (c *PostgresConnector) CheckReplicationPermissions(ctx context.Context, username string) error { - if c.conn == nil { - return errors.New("check replication permissions: conn is nil") - } - - var replicationRes bool - err := c.conn.QueryRow(ctx, "SELECT rolreplication FROM pg_roles WHERE rolname = $1", username).Scan(&replicationRes) - if err != nil { - return err - } - - if !replicationRes { - // RDS case: check pg_settings for rds.logical_replication - var setting string - err := c.conn.QueryRow(ctx, "SELECT setting FROM pg_settings WHERE name = 'rds.logical_replication'").Scan(&setting) - if err != nil || setting != "on" { - return errors.New("postgres user does not have replication role") - } - } - - // check wal_level - var walLevel string - err = c.conn.QueryRow(ctx, "SHOW wal_level").Scan(&walLevel) - if err != nil { - return err - } - - if walLevel != "logical" { - return fmt.Errorf("wal_level is not logical") - } - - // max_wal_senders must be at least 2 - var maxWalSendersRes string - err = c.conn.QueryRow(ctx, "SHOW max_wal_senders").Scan(&maxWalSendersRes) - if err != nil { - return err - } - - maxWalSenders, err := strconv.Atoi(maxWalSendersRes) - if err != nil { - return err - } - - if maxWalSenders < 2 { - return fmt.Errorf("max_wal_senders must be at least 2") - } - - return nil -}