Skip to content

Commit

Permalink
fix create/replace logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 3, 2024
1 parent 87a529f commit f777f97
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,24 @@ func (c *BigQueryConnector) SetupNormalizedTable(
// check if the table exists
existingMetadata, err := table.Metadata(ctx)
if err == nil {
// table exists, go to next table
c.logger.Info("[bigquery] table already exists, skipping",
slog.String("table", tableIdentifier),
slog.Any("existingMetadata", existingMetadata))
return true, nil
if config.IsResync {
c.logger.Info("[bigquery] deleting existing resync table",
slog.String("table", tableIdentifier))
deleteErr := table.Delete(ctx)
if deleteErr != nil {
return false, fmt.Errorf("failed to delete table %s: %w", tableIdentifier, deleteErr)
}
} else {
// table exists, go to next table
c.logger.Info("[bigquery] table already exists, skipping",
slog.String("table", tableIdentifier),
slog.Any("existingMetadata", existingMetadata))
return true, nil
}
}
if !strings.Contains(err.Error(), "notFound") {
return false, fmt.Errorf("error while checking metadata for BigQuery table existence %s: %w",
tableIdentifier, err)
}

// convert the column names and types to bigquery types
Expand Down Expand Up @@ -698,21 +711,6 @@ func (c *BigQueryConnector) SetupNormalizedTable(
TimePartitioning: timePartitioning,
}

if config.IsResync {
_, existsErr := table.Metadata(ctx)
if existsErr == nil {
c.logger.Info("[bigquery] deleting existing resync table",
slog.String("table", tableIdentifier))
deleteErr := table.Delete(ctx)
if deleteErr != nil {
return false, fmt.Errorf("failed to delete table %s: %w", tableIdentifier, deleteErr)
}
} else if !strings.Contains(existsErr.Error(), "notFound") {
return false, fmt.Errorf("error while checking metadata for BigQuery resynced table %s: %w",
tableIdentifier, existsErr)
}
}

c.logger.Info("[bigquery] creating table",
slog.String("table", tableIdentifier),
slog.Any("metadata", metadata))
Expand Down

0 comments on commit f777f97

Please sign in to comment.