Skip to content

Commit

Permalink
mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 21, 2025
1 parent f9685e2 commit d3158e4
Show file tree
Hide file tree
Showing 34 changed files with 1,822 additions and 353 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ jobs:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
mysql:
image: mysql:oracle
ports:
- 3306:3306
env:
MYSQL_ROOT_PASSWORD: cipass
#mariadb:
# image: mariadb:lts-ubi
# ports:
# - 3300:3306
# env:
# MARIADB_ROOT_PASSWORD: cipass
redpanda:
image: redpandadata/redpanda@sha256:7214ddaf8426d25936459cf77c1f905566a4483a97d2b13006120dcd98a5c846
ports:
Expand Down
5 changes: 4 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func (a *FlowableActivity) CheckMetadataTables(
}
defer connectors.CloseConnector(ctx, conn)

needsSetup := conn.NeedsSetupMetadataTables(ctx)
needsSetup, err := conn.NeedsSetupMetadataTables(ctx)
if err != nil {
return nil, err
}

return &CheckMetadataTablesResult{
NeedsSetupMetadataTables: needsSetup,
Expand Down
38 changes: 23 additions & 15 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/connectors"
"github.com/PeerDB-io/peerdb/flow/connectors/mysql"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -133,23 +134,27 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
batchSize = 250_000
}

lastOffset, err := func() (int64, error) {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)
lastOffset, err := func() (model.CdcCheckpoint, error) {
if myConn, isMy := any(srcConn).(*connmysql.MySqlConnector); isMy {
return myConn.GetLastOffset(ctx, config.FlowJobName)
} else {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.GetLastOffset(ctx, config.FlowJobName)
return dstConn.GetLastOffset(ctx, config.FlowJobName)
}
}()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}

logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
logger.Info("pulling records...", slog.Any("LastOffset", lastOffset))
consumedOffset := atomic.Int64{}
consumedOffset.Store(lastOffset)
consumedOffset.Store(lastOffset.ID)

channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env)
if err != nil {
Expand Down Expand Up @@ -224,7 +229,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
}

var syncStartTime time.Time
var res *model.SyncResponse
var dstConnType string
errGroup.Go(func() error {
Expand Down Expand Up @@ -253,7 +257,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return err
}

syncStartTime = time.Now()
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{
SyncBatchID: syncBatchID,
Records: recordBatchSync,
Expand All @@ -273,6 +276,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil
})

syncStartTime := time.Now()
if err := errGroup.Wait(); err != nil {
// don't log flow error for "replState changed" and "slot is already active"
if !(temporal.IsApplicationError(err) ||
Expand All @@ -282,14 +286,18 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if temporal.IsApplicationError(err) {
return nil, err
} else {
return nil, fmt.Errorf("failed to pull records: %w", err)
return nil, fmt.Errorf("[cdc] failed to pull records: %w", err)
}
}
syncState.Store(shared.Ptr("bookkeeping"))

syncDuration := time.Since(syncStartTime)
lastCheckpoint := recordBatchSync.GetLastCheckpoint()
srcConn.UpdateReplStateLastOffset(lastCheckpoint)
logger.Info("batch synced", slog.Any("checkpoint", lastCheckpoint))
if err := srcConn.UpdateReplStateLastOffset(ctx, lastCheckpoint); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}

if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint,
Expand All @@ -298,7 +306,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, err
}

if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil {
if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint.ID); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
Expand Down Expand Up @@ -451,7 +459,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe
tmp, err := pullRecords(srcConn, errCtx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull records: %w", err)
return fmt.Errorf("[qrep] failed to pull records: %w", err)
}
numRecords := int64(tmp)
if err := monitoring.UpdatePullEndTimeAndRowsForPartition(
Expand Down
5 changes: 5 additions & 0 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (h *FlowRequestHandler) ValidateCDCMirror(

sourcePeerConfig := sourcePeer.GetPostgresConfig()
if sourcePeerConfig == nil {
if sourcePeer.GetMysqlConfig() != nil {
// TODO mysql validation
// eg disable json diff, only row based replication supported, ...
return &protos.ValidateCDCMirrorResponse{}, nil
}
slog.Error("/validatecdc source peer config is not postgres", slog.String("peer", req.ConnectionConfigs.SourceName))
return nil, errors.New("source peer config is not postgres")
}
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
LastSyncedCheckpoint: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
}

return &model.SyncResponse{
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
LastSyncedCheckpoint: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand All @@ -111,7 +111,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return nil, err
}

if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID); err != nil {
if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpoint); err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
}
Expand Down
16 changes: 9 additions & 7 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type CDCPullConnectorCore interface {
ReplPing(context.Context) error

// Called when offset has been confirmed to destination
UpdateReplStateLastOffset(lastOffset int64)
UpdateReplStateLastOffset(ctx context.Context, lastOffset model.CdcCheckpoint) error

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(ctx context.Context, jobName string) error
Expand Down Expand Up @@ -153,16 +153,16 @@ type CDCSyncConnectorCore interface {
Connector

// NeedsSetupMetadataTables checks if the metadata table [PEERDB_MIRROR_JOBS] needs to be created.
NeedsSetupMetadataTables(ctx context.Context) bool
NeedsSetupMetadataTables(ctx context.Context) (bool, error)

// SetupMetadataTables creates the metadata table [PEERDB_MIRROR_JOBS] if necessary.
SetupMetadataTables(ctx context.Context) error

// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(ctx context.Context, jobName string) (int64, error)
GetLastOffset(ctx context.Context, jobName string) (model.CdcCheckpoint, error)

// SetLastOffset updates the last offset on the metadata table on the destination
SetLastOffset(ctx context.Context, jobName string, lastOffset int64) error
SetLastOffset(ctx context.Context, jobName string, lastOffset model.CdcCheckpoint) error

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(ctx context.Context, jobName string) (int64, error)
Expand Down Expand Up @@ -407,7 +407,7 @@ func GetConnector(ctx context.Context, env map[string]string, config *protos.Pee
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_MysqlConfig:
return connmysql.MySqlConnector{}, nil
return connmysql.NewMySqlConnector(ctx, inner.MysqlConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickHouseConnector(ctx, env, inner.ClickhouseConfig)
case *protos.Peer_KafkaConfig:
Expand Down Expand Up @@ -454,6 +454,7 @@ func CloseConnector(ctx context.Context, conn Connector) {
// create type assertions to cause compile time error if connector interface not implemented
var (
_ CDCPullConnector = &connpostgres.PostgresConnector{}
_ CDCPullConnector = &connmysql.MySqlConnector{}

_ CDCPullPgConnector = &connpostgres.PostgresConnector{}

Expand All @@ -475,6 +476,7 @@ var (
_ CDCNormalizeConnector = &connclickhouse.ClickHouseConnector{}

_ GetTableSchemaConnector = &connpostgres.PostgresConnector{}
_ GetTableSchemaConnector = &connmysql.MySqlConnector{}
_ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{}
_ GetTableSchemaConnector = &connclickhouse.ClickHouseConnector{}

Expand All @@ -487,6 +489,7 @@ var (
_ CreateTablesFromExistingConnector = &connsnowflake.SnowflakeConnector{}

_ QRepPullConnector = &connpostgres.PostgresConnector{}
_ QRepPullConnector = &connmysql.MySqlConnector{}
_ QRepPullConnector = &connsqlserver.SQLServerConnector{}

_ QRepPullPgConnector = &connpostgres.PostgresConnector{}
Expand Down Expand Up @@ -521,6 +524,5 @@ var (

_ GetVersionConnector = &connclickhouse.ClickHouseConnector{}
_ GetVersionConnector = &connpostgres.PostgresConnector{}

_ Connector = &connmysql.MySqlConnector{}
_ GetVersionConnector = &connmysql.MySqlConnector{}
)
12 changes: 6 additions & 6 deletions flow/connectors/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
case <-ticker.C:
lastSeen := lastSeenLSN.Load()
if lastSeen > req.ConsumedOffset.Load() {
if err := esc.SetLastOffset(ctx, req.FlowJobName, lastSeen); err != nil {
if err := esc.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil {
esc.logger.Warn("[es] SetLastOffset error", slog.Any("error", err))
} else {
shared.AtomicInt64Max(req.ConsumedOffset, lastSeen)
Expand Down Expand Up @@ -297,10 +297,10 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
}

return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpoint: lastCheckpoint,
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}
20 changes: 6 additions & 14 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,6 @@ func (c *EventHubConnector) ConnectionActive(ctx context.Context) error {
return nil
}

func (c *EventHubConnector) NeedsSetupMetadataTables(_ context.Context) bool {
return false
}

func (c *EventHubConnector) SetupMetadataTables(_ context.Context) error {
return nil
}

func lvalueToEventData(ls *lua.LState, value lua.LValue) (ScopedEventhubData, error) {
var scoped ScopedEventhubData
switch v := value.(type) {
Expand Down Expand Up @@ -319,7 +311,7 @@ func (c *EventHubConnector) processBatch(
if err != nil {
return 0, err
} else if lastSeenLSN > req.ConsumedOffset.Load() {
if err := c.SetLastOffset(ctx, req.FlowJobName, lastSeenLSN); err != nil {
if err := c.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeenLSN}); err != nil {
c.logger.Warn("[eventhubs] SetLastOffset error", slog.Any("error", err))
} else {
shared.AtomicInt64Max(req.ConsumedOffset, lastSeenLSN)
Expand All @@ -345,11 +337,11 @@ func (c *EventHubConnector) SyncRecords(ctx context.Context, req *model.SyncReco
}

return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: make(map[string]*model.RecordTypeCounts),
TableSchemaDeltas: req.Records.SchemaDeltas,
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpoint: lastCheckpoint,
NumRecordsSynced: int64(numRecords),
TableNameRowsMapping: make(map[string]*model.RecordTypeCounts),
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
Loading

0 comments on commit d3158e4

Please sign in to comment.