From 97a23c76c69648fad6ed165a2f8eb4ee414cac5b Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Tue, 20 Feb 2024 01:13:03 +0530 Subject: [PATCH] switched to Slack alerts instead of erroring --- flow/activities/flowable.go | 37 ++++++++++++++++------------ flow/connectors/postgres/postgres.go | 8 ++++-- flow/shared/alerting/alerting.go | 37 ++++++++++++++++++++-------- flow/workflows/cdc_flow.go | 27 +++++++++++--------- protos/flow.proto | 1 + 5 files changed, 71 insertions(+), 39 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 4fc6c9d7c4..73dcaf1552 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -94,6 +94,11 @@ func (a *FlowableActivity) EnsurePullability( output, err := srcConn.EnsurePullability(ctx, config) if err != nil { + if config.OnlyAlertOnConstraintsFail && errors.Is(err, connpostgres.ErrCDCNotSupportedForTable) { + a.Alerter.AlertGeneric(ctx, config.FlowJobName+"-add-tables-failed", + fmt.Sprintf("failed to add tables for mirror %s: %v", config.FlowJobName, err)) + return nil, nil + } a.Alerter.LogFlowError(ctx, config.FlowJobName, err) return nil, fmt.Errorf("failed to ensure pullability: %w", err) } @@ -190,9 +195,9 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup.Add(1) if !existing { - logger.Info(fmt.Sprintf("created table %s", tableIdentifier)) + logger.Info("created table " + tableIdentifier) } else { - logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier)) + logger.Info("table already exists " + tableIdentifier) } } @@ -231,14 +236,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } defer connectors.CloseConnector(ctx, srcConn) - slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName) + slotNameForMetrics := "peerflow_slot_" + input.FlowConnectionConfigs.FlowJobName if input.FlowConnectionConfigs.ReplicationSlotName != "" { slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName } shutdown := utils.HeartbeatRoutine(ctx, func() string { jobName := input.FlowConnectionConfigs.FlowJobName - return fmt.Sprintf("transferring records for job - %s", jobName) + return "transferring records for job - " + jobName }) defer shutdown() @@ -414,7 +419,7 @@ func (a *FlowableActivity) StartNormalize( defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName) + return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName }) defer shutdown() @@ -482,7 +487,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, defer connectors.CloseConnector(ctx, srcConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName) + return "getting partitions for job - " + config.FlowJobName }) defer shutdown() @@ -573,7 +578,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } defer connectors.CloseConnector(ctx, dstConn) - logger.Info(fmt.Sprintf("replicating partition %s", partition.PartitionId)) + logger.Info("replicating partition " + partition.PartitionId) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total) }) @@ -633,7 +638,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context, } if rowsSynced == 0 { - logger.Info(fmt.Sprintf("no records to push for partition %s", partition.PartitionId)) + logger.Info("no records to push for partition " + partition.PartitionId) pullCancel() } else { wg.Wait() @@ -666,7 +671,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName) + return "consolidating partitions for job - " + config.FlowJobName }) defer shutdown() @@ -868,7 +873,7 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena defer connectors.CloseConnector(ctx, dstConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { - return fmt.Sprintf("renaming tables for job - %s", config.FlowJobName) + return "renaming tables for job - " + config.FlowJobName }) defer shutdown() @@ -876,18 +881,18 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector) if !ok { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return nil, fmt.Errorf("failed to cast connector to snowflake connector") + return nil, errors.New("failed to cast connector to snowflake connector") } return sfConn.RenameTables(ctx, config) } else if config.Peer.Type == protos.DBType_BIGQUERY { bqConn, ok := dstConn.(*connbigquery.BigQueryConnector) if !ok { a.Alerter.LogFlowError(ctx, config.FlowJobName, err) - return nil, fmt.Errorf("failed to cast connector to bigquery connector") + return nil, errors.New("failed to cast connector to bigquery connector") } return bqConn.RenameTables(ctx, config) } - return nil, fmt.Errorf("rename tables is only supported on snowflake and bigquery") + return nil, errors.New("rename tables is only supported on snowflake and bigquery") } func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) ( @@ -903,18 +908,18 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr if req.Peer.Type == protos.DBType_SNOWFLAKE { sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector) if !ok { - return nil, fmt.Errorf("failed to cast connector to snowflake connector") + return nil, errors.New("failed to cast connector to snowflake connector") } return sfConn.CreateTablesFromExisting(ctx, req) } else if req.Peer.Type == protos.DBType_BIGQUERY { bqConn, ok := dstConn.(*connbigquery.BigQueryConnector) if !ok { - return nil, fmt.Errorf("failed to cast connector to bigquery connector") + return nil, errors.New("failed to cast connector to bigquery connector") } return bqConn.CreateTablesFromExisting(ctx, req) } a.Alerter.LogFlowError(ctx, req.FlowJobName, err) - return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery") + return nil, errors.New("create tables from existing is only supported on snowflake and bigquery") } // ReplicateXminPartition replicates a XminPartition from the source to the destination. diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 1079431c17..64d2f73863 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -37,6 +37,8 @@ type PostgresConnector struct { logger log.Logger } +var ErrCDCNotSupportedForTable = errors.New("table has no primary keys and does not have REPLICA IDENTITY FULL") + func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { connectionString := utils.GetPGConnectionString(pgConfig) @@ -515,7 +517,9 @@ type SlotCheckResult struct { } // CreateRawTable creates a raw table, implementing the Connector interface. -func (c *PostgresConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) { +func (c *PostgresConnector) CreateRawTable(ctx context.Context, + req *protos.CreateRawTableInput, +) (*protos.CreateRawTableOutput, error) { rawTableIdentifier := getRawTableIdentifier(req.FlowJobName) err := c.createMetadataSchema(ctx) @@ -788,7 +792,7 @@ func (c *PostgresConnector) EnsurePullability( // we only allow no primary key if the table has REPLICA IDENTITY FULL // this is ok for replica identity index as we populate the primary key columns if len(pKeyCols) == 0 && replicaIdentity != ReplicaIdentityFull { - return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable) + return nil, fmt.Errorf("%w: %s", ErrCDCNotSupportedForTable, schemaTable.String()) } utils.RecordHeartbeat(ctx, "ensured pullability table "+tableName) diff --git a/flow/shared/alerting/alerting.go b/flow/shared/alerting/alerting.go index 4e95f1d5d1..5eb43d8a97 100644 --- a/flow/shared/alerting/alerting.go +++ b/flow/shared/alerting/alerting.go @@ -84,20 +84,20 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo alertKey := peerName + "-slot-lag-threshold-exceeded" alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+ - `currently at %.2fMB! - cc: `, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) + "currently at %.2fMB!", + deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb) if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) && a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) { for _, slackAlertSender := range slackAlertSenders { if slackAlertSender.slotLagMBAlertThreshold > 0 { if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold)) } } else { if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold)) } } @@ -130,20 +130,20 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, alertKey := peerName + "-max-open-connections-threshold-exceeded" alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+ - ` has exceeded threshold size of %%d connections, currently at %d connections! - cc: `, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) + " has exceeded threshold size of %%d connections, currently at %d connections!", + deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections) if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) && a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) { for _, slackAlertSender := range slackAlertSenders { if slackAlertSender.openConnectionsAlertThreshold > 0 { if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold)) } } else { if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) { - a.alertToSlack(ctx, slackAlertSender, alertKey, + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold)) } } @@ -151,9 +151,26 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string, } } -func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) { +func (a *Alerter) AlertGeneric(ctx context.Context, alertKey string, alertMessage string) { + if a.checkAndAddAlertToCatalog(ctx, alertKey, alertMessage) { + slackAlertSenders, err := a.registerSendersFromPool(ctx) + if err != nil { + logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err)) + return + } + + for _, slackAlertSender := range slackAlertSenders { + a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, alertMessage) + } + } +} + +func (a *Alerter) alertToSpecificSlackSender(ctx context.Context, slackAlertSender *slackAlertSender, + alertKey string, alertMessage string, +) { err := slackAlertSender.sendAlert(ctx, - ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage) + ":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage+` +cc: `) if err != nil { logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err)) return diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index df9eaa4b49..a1355be8ce 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -152,6 +152,11 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont continue } + additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables)) + for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables { + additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier) + } + ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, }) @@ -159,21 +164,21 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont ensurePullabilityCtx, flowable.EnsurePullability, &protos.EnsurePullabilityBatchInput{ - PeerConnectionConfig: cfg.Source, - FlowJobName: cfg.FlowJobName, - SourceTableIdentifiers: func() []string { - additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables)) - for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables { - additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier) - } - return additionalSourceTables - }(), - CheckConstraints: true, + PeerConnectionConfig: cfg.Source, + FlowJobName: cfg.FlowJobName, + SourceTableIdentifiers: additionalSourceTables, + CheckConstraints: true, + OnlyAlertOnConstraintsFail: true, }) - if err := ensurePullabilityFuture.Get(ctx, nil); err != nil { + var ensurePullabilityOutput *protos.EnsurePullabilityBatchOutput + if err := ensurePullabilityFuture.Get(ctx, &ensurePullabilityOutput); err != nil { w.logger.Error("failed to ensure pullability for additional tables: ", err) return err } + // if err == nil and output == nil, constraints failed, so ignore batch + if ensurePullabilityOutput == nil { + continue + } alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, diff --git a/protos/flow.proto b/protos/flow.proto index 2147ee2fc8..037bb30f72 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -132,6 +132,7 @@ message EnsurePullabilityBatchInput { string flow_job_name = 2; repeated string source_table_identifiers = 3; bool check_constraints = 4; + bool only_alert_on_constraints_fail = 5; } message PostgresTableIdentifier {