Skip to content

Commit

Permalink
mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 21, 2025
1 parent 67ba3d6 commit f40244b
Show file tree
Hide file tree
Showing 63 changed files with 2,436 additions and 891 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ jobs:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
mysql:
image: mysql:oracle
ports:
- 3306:3306
env:
MYSQL_ROOT_PASSWORD: cipass
#mariadb:
# image: mariadb:lts-ubi
# ports:
# - 3300:3306
# env:
# MARIADB_ROOT_PASSWORD: cipass
redpanda:
image: redpandadata/redpanda@sha256:7214ddaf8426d25936459cf77c1f905566a4483a97d2b13006120dcd98a5c846
ports:
Expand Down
36 changes: 29 additions & 7 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log/slog"
"sync/atomic"
"time"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -80,7 +81,10 @@ func (a *FlowableActivity) CheckMetadataTables(
}
defer connectors.CloseConnector(ctx, conn)

needsSetup := conn.NeedsSetupMetadataTables(ctx)
needsSetup, err := conn.NeedsSetupMetadataTables(ctx)
if err != nil {
return nil, err
}

return &CheckMetadataTablesResult{
NeedsSetupMetadataTables: needsSetup,
Expand Down Expand Up @@ -831,12 +835,30 @@ func (a *FlowableActivity) QRepHasNewRows(ctx context.Context,

logger.Info(fmt.Sprintf("current last partition value is %v", last))

result, err := srcConn.CheckForUpdatedMaxValue(ctx, config, last)
maxValue, err := srcConn.GetMaxValue(ctx, config, last)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return false, fmt.Errorf("failed to check for new rows: %w", err)
}
return result, nil

if maxValue == nil || last == nil || last.Range == nil {
return maxValue != nil, nil
}

switch x := last.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
if maxValue.(int64) > x.IntRange.End {
return true, nil
}
case *protos.PartitionRange_TimestampRange:
if maxValue.(time.Time).After(x.TimestampRange.End.AsTime()) {
return true, nil
}
default:
return false, fmt.Errorf("unknown range type: %v", x)
}

return false, nil
}

func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
Expand Down Expand Up @@ -1025,16 +1047,16 @@ func (a *FlowableActivity) RemoveTablesFromRawTable(
for _, table := range tablesToRemove {
tableNames = append(tableNames, table.DestinationTableIdentifier)
}
err = dstConn.RemoveTableEntriesFromRawTable(ctx, &protos.RemoveTablesFromRawTableInput{
if err := dstConn.RemoveTableEntriesFromRawTable(ctx, &protos.RemoveTablesFromRawTableInput{
FlowJobName: cfg.FlowJobName,
DestinationTableNames: tableNames,
SyncBatchId: syncBatchID,
NormalizeBatchId: normBatchID,
})
if err != nil {
}); err != nil {
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err)
return err
}
return err
return nil
}

func (a *FlowableActivity) RemoveTablesFromCatalog(
Expand Down
38 changes: 23 additions & 15 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/connectors"
"github.com/PeerDB-io/peerdb/flow/connectors/mysql"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -133,23 +134,27 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
batchSize = 250_000
}

lastOffset, err := func() (int64, error) {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return 0, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)
lastOffset, err := func() (model.CdcCheckpoint, error) {
if myConn, isMy := any(srcConn).(*connmysql.MySqlConnector); isMy {
return myConn.GetLastOffset(ctx, config.FlowJobName)
} else {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.GetLastOffset(ctx, config.FlowJobName)
return dstConn.GetLastOffset(ctx, config.FlowJobName)
}
}()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}

logger.Info("pulling records...", slog.Int64("LastOffset", lastOffset))
logger.Info("pulling records...", slog.Any("LastOffset", lastOffset))
consumedOffset := atomic.Int64{}
consumedOffset.Store(lastOffset)
consumedOffset.Store(lastOffset.ID)

channelBufferSize, err := peerdbenv.PeerDBCDCChannelBufferSize(ctx, config.Env)
if err != nil {
Expand Down Expand Up @@ -224,7 +229,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, a.applySchemaDeltas(ctx, config, options, recordBatchSync.SchemaDeltas)
}

var syncStartTime time.Time
var res *model.SyncResponse
errGroup.Go(func() error {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
Expand All @@ -251,7 +255,6 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return err
}

syncStartTime = time.Now()
res, err = sync(dstConn, errCtx, &model.SyncRecordsRequest[Items]{
SyncBatchID: syncBatchID,
Records: recordBatchSync,
Expand All @@ -271,6 +274,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil
})

syncStartTime := time.Now()
if err := errGroup.Wait(); err != nil {
// don't log flow error for "replState changed" and "slot is already active"
if !(temporal.IsApplicationError(err) ||
Expand All @@ -280,14 +284,18 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
if temporal.IsApplicationError(err) {
return nil, err
} else {
return nil, fmt.Errorf("failed to pull records: %w", err)
return nil, fmt.Errorf("[cdc] failed to pull records: %w", err)
}
}
syncState.Store(shared.Ptr("bookkeeping"))

syncDuration := time.Since(syncStartTime)
lastCheckpoint := recordBatchSync.GetLastCheckpoint()
srcConn.UpdateReplStateLastOffset(lastCheckpoint)
logger.Info("batch synced", slog.Any("checkpoint", lastCheckpoint))
if err := srcConn.UpdateReplStateLastOffset(ctx, lastCheckpoint); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}

if err := monitoring.UpdateNumRowsAndEndLSNForCDCBatch(
ctx, a.CatalogPool, flowName, res.CurrentSyncBatchID, uint32(res.NumRecordsSynced), lastCheckpoint,
Expand All @@ -296,7 +304,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
return nil, err
}

if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint); err != nil {
if err := monitoring.UpdateLatestLSNAtTargetForCDCFlow(ctx, a.CatalogPool, flowName, lastCheckpoint.ID); err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
return nil, err
}
Expand Down Expand Up @@ -453,7 +461,7 @@ func replicateQRepPartition[TRead any, TWrite StreamCloser, TSync connectors.QRe
tmp, err := pullRecords(srcConn, errCtx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull records: %w", err)
return fmt.Errorf("[qrep] failed to pull records: %w", err)
}
numRecords := int64(tmp)
if err := monitoring.UpdatePullEndTimeAndRowsForPartition(
Expand Down
10 changes: 7 additions & 3 deletions flow/activities/snapshot_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,13 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee
}

a.SnapshotStatesMutex.Lock()
a.TxSnapshotStates[sessionID] = TxSnapshotState{
SnapshotName: exportSnapshotOutput.SnapshotName,
SupportsTIDScans: exportSnapshotOutput.SupportsTidScans,
if exportSnapshotOutput != nil {
a.TxSnapshotStates[sessionID] = TxSnapshotState{
SnapshotName: exportSnapshotOutput.SnapshotName,
SupportsTIDScans: exportSnapshotOutput.SupportsTidScans,
}
} else {
a.TxSnapshotStates[sessionID] = TxSnapshotState{}
}
a.SnapshotStatesMutex.Unlock()

Expand Down
5 changes: 5 additions & 0 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (h *FlowRequestHandler) ValidateCDCMirror(

sourcePeerConfig := sourcePeer.GetPostgresConfig()
if sourcePeerConfig == nil {
if sourcePeer.GetMysqlConfig() != nil {
// TODO mysql validation
// eg disable json diff, only row based replication supported, ...
return &protos.ValidateCDCMirrorResponse{}, nil
}
slog.Error("/validatecdc source peer config is not postgres", slog.String("peer", req.ConnectionConfigs.SourceName))
return nil, errors.New("source peer config is not postgres")
}
Expand Down
10 changes: 5 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (s *QRepAvroSyncMethod) SyncRecords(
}

return &model.SyncResponse{
LastSyncedCheckpointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
LastSyncedCheckpoint: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand Down
12 changes: 6 additions & 6 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ func (c *ClickHouseConnector) syncRecordsViaAvro(
}

return &model.SyncResponse{
LastSyncedCheckpointID: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
LastSyncedCheckpoint: req.Records.GetLastCheckpoint(),
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}

Expand All @@ -111,7 +111,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
return nil, err
}

if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpointID); err != nil {
if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpoint); err != nil {
c.logger.Error("failed to increment id", slog.Any("error", err))
return nil, err
}
Expand Down
16 changes: 9 additions & 7 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type CDCPullConnectorCore interface {
ReplPing(context.Context) error

// Called when offset has been confirmed to destination
UpdateReplStateLastOffset(lastOffset int64)
UpdateReplStateLastOffset(ctx context.Context, lastOffset model.CdcCheckpoint) error

// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(ctx context.Context, jobName string) error
Expand Down Expand Up @@ -153,16 +153,16 @@ type CDCSyncConnectorCore interface {
Connector

// NeedsSetupMetadataTables checks if the metadata table [PEERDB_MIRROR_JOBS] needs to be created.
NeedsSetupMetadataTables(ctx context.Context) bool
NeedsSetupMetadataTables(ctx context.Context) (bool, error)

// SetupMetadataTables creates the metadata table [PEERDB_MIRROR_JOBS] if necessary.
SetupMetadataTables(ctx context.Context) error

// GetLastOffset gets the last offset from the metadata table on the destination
GetLastOffset(ctx context.Context, jobName string) (int64, error)
GetLastOffset(ctx context.Context, jobName string) (model.CdcCheckpoint, error)

// SetLastOffset updates the last offset on the metadata table on the destination
SetLastOffset(ctx context.Context, jobName string, lastOffset int64) error
SetLastOffset(ctx context.Context, jobName string, lastOffset model.CdcCheckpoint) error

// GetLastSyncBatchID gets the last batch synced to the destination from the metadata table
GetLastSyncBatchID(ctx context.Context, jobName string) (int64, error)
Expand Down Expand Up @@ -407,7 +407,7 @@ func GetConnector(ctx context.Context, env map[string]string, config *protos.Pee
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_MysqlConfig:
return connmysql.MySqlConnector{}, nil
return connmysql.NewMySqlConnector(ctx, inner.MysqlConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickHouseConnector(ctx, env, inner.ClickhouseConfig)
case *protos.Peer_KafkaConfig:
Expand Down Expand Up @@ -454,6 +454,7 @@ func CloseConnector(ctx context.Context, conn Connector) {
// create type assertions to cause compile time error if connector interface not implemented
var (
_ CDCPullConnector = &connpostgres.PostgresConnector{}
_ CDCPullConnector = &connmysql.MySqlConnector{}

_ CDCPullPgConnector = &connpostgres.PostgresConnector{}

Expand All @@ -475,6 +476,7 @@ var (
_ CDCNormalizeConnector = &connclickhouse.ClickHouseConnector{}

_ GetTableSchemaConnector = &connpostgres.PostgresConnector{}
_ GetTableSchemaConnector = &connmysql.MySqlConnector{}
_ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{}
_ GetTableSchemaConnector = &connclickhouse.ClickHouseConnector{}

Expand All @@ -487,6 +489,7 @@ var (
_ CreateTablesFromExistingConnector = &connsnowflake.SnowflakeConnector{}

_ QRepPullConnector = &connpostgres.PostgresConnector{}
_ QRepPullConnector = &connmysql.MySqlConnector{}
_ QRepPullConnector = &connsqlserver.SQLServerConnector{}

_ QRepPullPgConnector = &connpostgres.PostgresConnector{}
Expand Down Expand Up @@ -521,6 +524,5 @@ var (

_ GetVersionConnector = &connclickhouse.ClickHouseConnector{}
_ GetVersionConnector = &connpostgres.PostgresConnector{}

_ Connector = &connmysql.MySqlConnector{}
_ GetVersionConnector = &connmysql.MySqlConnector{}
)
12 changes: 6 additions & 6 deletions flow/connectors/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
case <-ticker.C:
lastSeen := lastSeenLSN.Load()
if lastSeen > req.ConsumedOffset.Load() {
if err := esc.SetLastOffset(ctx, req.FlowJobName, lastSeen); err != nil {
if err := esc.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil {
esc.logger.Warn("[es] SetLastOffset error", slog.Any("error", err))
} else {
shared.AtomicInt64Max(req.ConsumedOffset, lastSeen)
Expand Down Expand Up @@ -297,10 +297,10 @@ func (esc *ElasticsearchConnector) SyncRecords(ctx context.Context,
}

return &model.SyncResponse{
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpointID: lastCheckpoint,
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
CurrentSyncBatchID: req.SyncBatchID,
LastSyncedCheckpoint: lastCheckpoint,
NumRecordsSynced: numRecords,
TableNameRowsMapping: tableNameRowsMapping,
TableSchemaDeltas: req.Records.SchemaDeltas,
}, nil
}
Loading

0 comments on commit f40244b

Please sign in to comment.