Skip to content

Commit

Permalink
add cause for context
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 24, 2024
1 parent c0d1d8a commit 6b17c70
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
8 changes: 4 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (a *FlowableActivity) StartNormalize(
if errors.Is(err, errors.ErrUnsupported) {
return nil, monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName, input.SyncBatchID)
} else if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get normalize connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

Expand All @@ -420,7 +420,7 @@ func (a *FlowableActivity) StartNormalize(

tableNameSchemaMapping, err := a.getTableNameSchemaMapping(ctx, input.FlowConnectionConfigs.FlowJobName)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get table name schema mapping: %w", err)
}

res, err := dstConn.NormalizeRecords(ctx, &model.NormalizeRecordsRequest{
Expand All @@ -438,13 +438,13 @@ func (a *FlowableActivity) StartNormalize(
}
dstType, err := connectors.LoadPeerType(ctx, a.CatalogPool, input.FlowConnectionConfigs.DestinationName)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get peer type: %w", err)
}
if dstType == protos.DBType_POSTGRES {
err = monitoring.UpdateEndTimeForCDCBatch(ctx, a.CatalogPool, input.FlowConnectionConfigs.FlowJobName,
input.SyncBatchID)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to update end time for cdc batch: %w", err)
}
}

Expand Down
3 changes: 3 additions & 0 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ func (c *ClickHouseConnector) NormalizeRecords(
case queries <- insertIntoSelectQuery.String():
case <-errCtx.Done():
close(queries)
c.logger.Error("[clickhouse] context canceled while normalizing",
slog.Any("error", errCtx.Err()),
slog.Any("cause", context.Cause(errCtx)))
return nil, errCtx.Err()
}
}
Expand Down

0 comments on commit 6b17c70

Please sign in to comment.