Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

explicitly call EnsurePullability while adding tables #1298

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (a *FlowableActivity) EnsurePullability(

output, err := srcConn.EnsurePullability(ctx, config)
if err != nil {
if config.OnlyAlertOnConstraintsFail && errors.Is(err, connpostgres.ErrCDCNotSupportedForTable) {
a.Alerter.AlertGeneric(ctx, config.FlowJobName+"-add-tables-failed",
fmt.Sprintf("failed to add tables for mirror %s: %v", config.FlowJobName, err))
return nil, nil
}
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to ensure pullability: %w", err)
}
Expand Down Expand Up @@ -190,9 +195,9 @@ func (a *FlowableActivity) CreateNormalizedTable(

numTablesSetup.Add(1)
if !existing {
logger.Info(fmt.Sprintf("created table %s", tableIdentifier))
logger.Info("created table " + tableIdentifier)
} else {
logger.Info(fmt.Sprintf("table already exists %s", tableIdentifier))
logger.Info("table already exists " + tableIdentifier)
}
}

Expand Down Expand Up @@ -231,14 +236,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
}
defer connectors.CloseConnector(ctx, srcConn)

slotNameForMetrics := fmt.Sprintf("peerflow_slot_%s", input.FlowConnectionConfigs.FlowJobName)
slotNameForMetrics := "peerflow_slot_" + input.FlowConnectionConfigs.FlowJobName
if input.FlowConnectionConfigs.ReplicationSlotName != "" {
slotNameForMetrics = input.FlowConnectionConfigs.ReplicationSlotName
}

shutdown := utils.HeartbeatRoutine(ctx, func() string {
jobName := input.FlowConnectionConfigs.FlowJobName
return fmt.Sprintf("transferring records for job - %s", jobName)
return "transferring records for job - " + jobName
})
defer shutdown()

Expand Down Expand Up @@ -414,7 +419,7 @@ func (a *FlowableActivity) StartNormalize(
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("normalizing records from batch for job - %s", input.FlowConnectionConfigs.FlowJobName)
return "normalizing records from batch for job - " + input.FlowConnectionConfigs.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -482,7 +487,7 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
defer connectors.CloseConnector(ctx, srcConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("getting partitions for job - %s", config.FlowJobName)
return "getting partitions for job - " + config.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -573,7 +578,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}
defer connectors.CloseConnector(ctx, dstConn)

logger.Info(fmt.Sprintf("replicating partition %s", partition.PartitionId))
logger.Info("replicating partition " + partition.PartitionId)
shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("syncing partition - %s: %d of %d total.", partition.PartitionId, idx, total)
})
Expand Down Expand Up @@ -633,7 +638,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
}

if rowsSynced == 0 {
logger.Info(fmt.Sprintf("no records to push for partition %s", partition.PartitionId))
logger.Info("no records to push for partition " + partition.PartitionId)
pullCancel()
} else {
wg.Wait()
Expand Down Expand Up @@ -666,7 +671,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("consolidating partitions for job - %s", config.FlowJobName)
return "consolidating partitions for job - " + config.FlowJobName
})
defer shutdown()

Expand Down Expand Up @@ -868,26 +873,26 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena
defer connectors.CloseConnector(ctx, dstConn)

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("renaming tables for job - %s", config.FlowJobName)
return "renaming tables for job - " + config.FlowJobName
})
defer shutdown()

if config.Peer.Type == protos.DBType_SNOWFLAKE {
sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
return nil, errors.New("failed to cast connector to snowflake connector")
}
return sfConn.RenameTables(ctx, config)
} else if config.Peer.Type == protos.DBType_BIGQUERY {
bqConn, ok := dstConn.(*connbigquery.BigQueryConnector)
if !ok {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return nil, fmt.Errorf("failed to cast connector to bigquery connector")
return nil, errors.New("failed to cast connector to bigquery connector")
}
return bqConn.RenameTables(ctx, config)
}
return nil, fmt.Errorf("rename tables is only supported on snowflake and bigquery")
return nil, errors.New("rename tables is only supported on snowflake and bigquery")
}

func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *protos.CreateTablesFromExistingInput) (
Expand All @@ -903,18 +908,18 @@ func (a *FlowableActivity) CreateTablesFromExisting(ctx context.Context, req *pr
if req.Peer.Type == protos.DBType_SNOWFLAKE {
sfConn, ok := dstConn.(*connsnowflake.SnowflakeConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to snowflake connector")
return nil, errors.New("failed to cast connector to snowflake connector")
}
return sfConn.CreateTablesFromExisting(ctx, req)
} else if req.Peer.Type == protos.DBType_BIGQUERY {
bqConn, ok := dstConn.(*connbigquery.BigQueryConnector)
if !ok {
return nil, fmt.Errorf("failed to cast connector to bigquery connector")
return nil, errors.New("failed to cast connector to bigquery connector")
}
return bqConn.CreateTablesFromExisting(ctx, req)
}
a.Alerter.LogFlowError(ctx, req.FlowJobName, err)
return nil, fmt.Errorf("create tables from existing is only supported on snowflake and bigquery")
return nil, errors.New("create tables from existing is only supported on snowflake and bigquery")
}

// ReplicateXminPartition replicates a XminPartition from the source to the destination.
Expand Down
8 changes: 6 additions & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type PostgresConnector struct {
logger log.Logger
}

var ErrCDCNotSupportedForTable = errors.New("table has no primary keys and does not have REPLICA IDENTITY FULL")

func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) {
connectionString := utils.GetPGConnectionString(pgConfig)

Expand Down Expand Up @@ -515,7 +517,9 @@ type SlotCheckResult struct {
}

// CreateRawTable creates a raw table, implementing the Connector interface.
func (c *PostgresConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
func (c *PostgresConnector) CreateRawTable(ctx context.Context,
req *protos.CreateRawTableInput,
) (*protos.CreateRawTableOutput, error) {
rawTableIdentifier := getRawTableIdentifier(req.FlowJobName)

err := c.createMetadataSchema(ctx)
Expand Down Expand Up @@ -788,7 +792,7 @@ func (c *PostgresConnector) EnsurePullability(
// we only allow no primary key if the table has REPLICA IDENTITY FULL
// this is ok for replica identity index as we populate the primary key columns
if len(pKeyCols) == 0 && replicaIdentity != ReplicaIdentityFull {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
return nil, fmt.Errorf("%w: %s", ErrCDCNotSupportedForTable, schemaTable.String())
}

utils.RecordHeartbeat(ctx, "ensured pullability table "+tableName)
Expand Down
37 changes: 27 additions & 10 deletions flow/shared/alerting/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ func (a *Alerter) AlertIfSlotLag(ctx context.Context, peerName string, slotInfo

alertKey := peerName + "-slot-lag-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sSlot `%s` on peer `%s` has exceeded threshold size of %%dMB, "+
`currently at %.2fMB!
cc: <!channel>`, deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)
"currently at %.2fMB!",
deploymentUIDPrefix, slotInfo.SlotName, peerName, slotInfo.LagInMb)

if slotInfo.LagInMb > float32(lowestSlotLagMBAlertThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestSlotLagMBAlertThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.slotLagMBAlertThreshold > 0 {
if slotInfo.LagInMb > float32(slackAlertSender.slotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.slotLagMBAlertThreshold))
}
} else {
if slotInfo.LagInMb > float32(defaultSlotLagMBAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultSlotLagMBAlertThreshold))
}
}
Expand Down Expand Up @@ -130,30 +130,47 @@ func (a *Alerter) AlertIfOpenConnections(ctx context.Context, peerName string,

alertKey := peerName + "-max-open-connections-threshold-exceeded"
alertMessageTemplate := fmt.Sprintf("%sOpen connections from PeerDB user `%s` on peer `%s`"+
` has exceeded threshold size of %%d connections, currently at %d connections!
cc: <!channel>`, deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)
" has exceeded threshold size of %%d connections, currently at %d connections!",
deploymentUIDPrefix, openConnections.UserName, peerName, openConnections.CurrentOpenConnections)

if openConnections.CurrentOpenConnections > int64(lowestOpenConnectionsThreshold) &&
a.checkAndAddAlertToCatalog(ctx, alertKey, fmt.Sprintf(alertMessageTemplate, lowestOpenConnectionsThreshold)) {
for _, slackAlertSender := range slackAlertSenders {
if slackAlertSender.openConnectionsAlertThreshold > 0 {
if openConnections.CurrentOpenConnections > int64(slackAlertSender.openConnectionsAlertThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, slackAlertSender.openConnectionsAlertThreshold))
}
} else {
if openConnections.CurrentOpenConnections > int64(defaultOpenConnectionsThreshold) {
a.alertToSlack(ctx, slackAlertSender, alertKey,
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey,
fmt.Sprintf(alertMessageTemplate, defaultOpenConnectionsThreshold))
}
}
}
}
}

func (a *Alerter) alertToSlack(ctx context.Context, slackAlertSender *slackAlertSender, alertKey string, alertMessage string) {
func (a *Alerter) AlertGeneric(ctx context.Context, alertKey string, alertMessage string) {
if a.checkAndAddAlertToCatalog(ctx, alertKey, alertMessage) {
slackAlertSenders, err := a.registerSendersFromPool(ctx)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to set Slack senders", slog.Any("error", err))
return
}

for _, slackAlertSender := range slackAlertSenders {
a.alertToSpecificSlackSender(ctx, slackAlertSender, alertKey, alertMessage)
}
}
}

func (a *Alerter) alertToSpecificSlackSender(ctx context.Context, slackAlertSender *slackAlertSender,
alertKey string, alertMessage string,
) {
err := slackAlertSender.sendAlert(ctx,
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage)
":rotating_light:Alert:rotating_light:: "+alertKey, alertMessage+`
cc: <!channel>`)
if err != nil {
logger.LoggerFromCtx(ctx).Warn("failed to send alert", slog.Any("error", err))
return
Expand Down
31 changes: 30 additions & 1 deletion flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,34 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
continue
}

additionalSourceTables := make([]string, 0, len(flowConfigUpdate.AdditionalTables))
for _, additionalSourceTable := range flowConfigUpdate.AdditionalTables {
additionalSourceTables = append(additionalSourceTables, additionalSourceTable.SourceTableIdentifier)
}

ensurePullabilityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
ensurePullabilityFuture := workflow.ExecuteActivity(
ensurePullabilityCtx,
flowable.EnsurePullability,
&protos.EnsurePullabilityBatchInput{
PeerConnectionConfig: cfg.Source,
FlowJobName: cfg.FlowJobName,
SourceTableIdentifiers: additionalSourceTables,
CheckConstraints: true,
OnlyAlertOnConstraintsFail: true,
})
var ensurePullabilityOutput *protos.EnsurePullabilityBatchOutput
if err := ensurePullabilityFuture.Get(ctx, &ensurePullabilityOutput); err != nil {
w.logger.Error("failed to ensure pullability for additional tables: ", err)
return err
Copy link
Contributor

@Amogh-Bharadwaj Amogh-Bharadwaj Feb 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be return nil?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processCDCFlowConfigUpdates return value should not be aborting whole workflow. Any flow config update error should result in an error being reported to user in some way, & then being ignored. Then this can remain return err

}
// if err == nil and output == nil, constraints failed, so ignore batch
if ensurePullabilityOutput == nil {
continue
}

alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
})
Expand Down Expand Up @@ -217,7 +245,7 @@ func CDCFlowWorkflowWithConfig(
state *CDCFlowWorkflowState,
) (*CDCFlowWorkflowResult, error) {
if cfg == nil {
return nil, fmt.Errorf("invalid connection configs")
return nil, errors.New("invalid connection configs")
}
if state == nil {
state = NewCDCFlowWorkflowState(cfg.TableMappings)
Expand Down Expand Up @@ -471,6 +499,7 @@ func CDCFlowWorkflowWithConfig(
// only place we block on receive, so signal processing is immediate
mainLoopSelector.Select(ctx)
if state.ActiveSignal == shared.NoopSignal {
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT
err = w.processCDCFlowConfigUpdates(ctx, cfg, state, mirrorNameSearch)
if err != nil {
return state, err
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ message EnsurePullabilityBatchInput {
string flow_job_name = 2;
repeated string source_table_identifiers = 3;
bool check_constraints = 4;
bool only_alert_on_constraints_fail = 5;
}

message PostgresTableIdentifier {
Expand Down
Loading