Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clickhouse] apply TableSchemaDeltas in normalize #2148

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

heavycrystal
Copy link
Contributor

TableSchemaDeltas are already stored in catalog for audit
instead of applying in SyncFlow like other connectors, apply them during normalize
as part of the philosophy of not connecting to CH during SyncFlow

@Amogh-Bharadwaj
Copy link
Contributor

Amogh-Bharadwaj commented Oct 17, 2024

Would be great if we could get in #2114 as part of landing this
I can fix the test in that PR up in the next couple of days

return nil, err
}

if err = c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err = c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil {
if err := c.replayTableSchemaDeltas(ctx, schemaDeltas); err != nil {

@@ -122,8 +119,16 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return res, nil
}

// For CH specifically, we don't want to replay during sync, but rather during the normalization phase.
// This a no-op for CH, private variant to be called during normalize.
func (c *ClickHouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJobName string,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should move this method to a separate connector interface, there's a few connectors that skip this already

@@ -172,6 +173,15 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
startTime := time.Now()
errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
pgMetadata := connmetadata.NewPostgresMetadataFromCatalog(logger, a.CatalogPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further down we have

syncBatchID, err := dstConn.GetLastSyncBatchID(errCtx, flowName)

Could lift that up to where we get lastOffset. Then use same value between two goroutines. Would make this logic work for connectors (pg:pg) which don't use catalog for metadata too

@heavycrystal heavycrystal force-pushed the ch-apply-schemadeltas-in-normalize branch from ed726eb to 2c645b1 Compare December 11, 2024 13:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants