Skip to content

Commit

Permalink
skip custom pub check
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Oct 4, 2024
1 parent cd2ded7 commit 1bc875b
Showing 1 changed file with 6 additions and 20 deletions.
26 changes: 6 additions & 20 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,29 +1244,13 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
additionalSrcTables = append(additionalSrcTables, additionalTableMapping.SourceTableIdentifier)
}

// just check if we have all the tables already in the publication for custom publications
if req.PublicationName != "" {
rows, err := c.conn.Query(ctx,
"SELECT schemaname || '.' || tablename FROM pg_publication_tables WHERE pubname=$1", req.PublicationName)
for _, additionalSrcTable := range additionalSrcTables {
schemaTable, err := utils.ParseSchemaTable(additionalSrcTable)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
return err
}

tableNames, err := pgx.CollectRows[string](rows, pgx.RowTo)
if err != nil {
return fmt.Errorf("failed to check tables in publication: %w", err)
}
notPresentTables := shared.ArrayMinus(additionalSrcTables, tableNames)
if len(notPresentTables) > 0 {
return fmt.Errorf("some additional tables not present in custom publication: %s",
strings.Join(notPresentTables, ", "))
}
} else {
for _, additionalSrcTable := range additionalSrcTables {
schemaTable, err := utils.ParseSchemaTable(additionalSrcTable)
if err != nil {
return err
}
if req.PublicationName == "" {
_, err = c.execWithLogging(ctx, fmt.Sprintf("ALTER PUBLICATION %s ADD TABLE %s",
utils.QuoteIdentifier(c.getDefaultPublicationName(req.FlowJobName)),
schemaTable.String()))
Expand All @@ -1277,6 +1261,8 @@ func (c *PostgresConnector) AddTablesToPublication(ctx context.Context, req *pro
c.logger.Info("added table to publication",
slog.String("publication", c.getDefaultPublicationName(req.FlowJobName)),
slog.String("table", additionalSrcTable))
} else {
c.logger.Info("custom publication " + req.PublicationName + " detected, skipping adding table to publication")
}
}

Expand Down

0 comments on commit 1bc875b

Please sign in to comment.