Skip to content

Commit

Permalink
mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 22, 2025
1 parent dbb5592 commit e67454b
Show file tree
Hide file tree
Showing 17 changed files with 1,501 additions and 21 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ jobs:
POSTGRES_PASSWORD: postgres
POSTGRES_DB: postgres
POSTGRES_INITDB_ARGS: --locale=C.UTF-8
mysql:
image: mysql:oracle
ports:
- 3306:3306
env:
MYSQL_ROOT_PASSWORD: cipass
#mariadb:
# image: mariadb:lts-ubi
# ports:
# - 3300:3306
# env:
# MARIADB_ROOT_PASSWORD: cipass
redpanda:
image: redpandadata/redpanda@sha256:7214ddaf8426d25936459cf77c1f905566a4483a97d2b13006120dcd98a5c846
ports:
Expand Down
17 changes: 11 additions & 6 deletions flow/activities/flowable_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/connectors"
"github.com/PeerDB-io/peerdb/flow/connectors/mysql"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/connectors/utils/monitoring"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand Down Expand Up @@ -134,13 +135,17 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon
}

lastOffset, err := func() (model.CdcCheckpoint, error) {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)
if myConn, isMy := any(srcConn).(*connmysql.MySqlConnector); isMy {
return myConn.GetLastOffset(ctx, config.FlowJobName)
} else {
dstConn, err := connectors.GetByNameAs[TSync](ctx, config.Env, a.CatalogPool, config.DestinationName)
if err != nil {
return model.CdcCheckpoint{}, fmt.Errorf("failed to get destination connector: %w", err)
}
defer connectors.CloseConnector(ctx, dstConn)

return dstConn.GetLastOffset(ctx, config.FlowJobName)
return dstConn.GetLastOffset(ctx, config.FlowJobName)
}
}()
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
Expand Down
5 changes: 5 additions & 0 deletions flow/cmd/validate_mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ func (h *FlowRequestHandler) ValidateCDCMirror(

sourcePeerConfig := sourcePeer.GetPostgresConfig()
if sourcePeerConfig == nil {
if sourcePeer.GetMysqlConfig() != nil {
// TODO mysql validation
// eg disable json diff, only row based replication supported, ...
return &protos.ValidateCDCMirrorResponse{}, nil
}
slog.Error("/validatecdc source peer config is not postgres", slog.String("peer", req.ConnectionConfigs.SourceName))
return nil, errors.New("source peer config is not postgres")
}
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func GetConnector(ctx context.Context, env map[string]string, config *protos.Pee
case *protos.Peer_SqlserverConfig:
return connsqlserver.NewSQLServerConnector(ctx, inner.SqlserverConfig)
case *protos.Peer_MysqlConfig:
return connmysql.MySqlConnector{}, nil
return connmysql.NewMySqlConnector(ctx, inner.MysqlConfig)
case *protos.Peer_ClickhouseConfig:
return connclickhouse.NewClickHouseConnector(ctx, env, inner.ClickhouseConfig)
case *protos.Peer_KafkaConfig:
Expand Down Expand Up @@ -454,6 +454,7 @@ func CloseConnector(ctx context.Context, conn Connector) {
// create type assertions to cause compile time error if connector interface not implemented
var (
_ CDCPullConnector = &connpostgres.PostgresConnector{}
_ CDCPullConnector = &connmysql.MySqlConnector{}

_ CDCPullPgConnector = &connpostgres.PostgresConnector{}

Expand All @@ -475,6 +476,7 @@ var (
_ CDCNormalizeConnector = &connclickhouse.ClickHouseConnector{}

_ GetTableSchemaConnector = &connpostgres.PostgresConnector{}
_ GetTableSchemaConnector = &connmysql.MySqlConnector{}
_ GetTableSchemaConnector = &connsnowflake.SnowflakeConnector{}
_ GetTableSchemaConnector = &connclickhouse.ClickHouseConnector{}

Expand All @@ -487,6 +489,7 @@ var (
_ CreateTablesFromExistingConnector = &connsnowflake.SnowflakeConnector{}

_ QRepPullConnector = &connpostgres.PostgresConnector{}
_ QRepPullConnector = &connmysql.MySqlConnector{}
_ QRepPullConnector = &connsqlserver.SQLServerConnector{}

_ QRepPullPgConnector = &connpostgres.PostgresConnector{}
Expand Down Expand Up @@ -521,6 +524,5 @@ var (

_ GetVersionConnector = &connclickhouse.ClickHouseConnector{}
_ GetVersionConnector = &connpostgres.PostgresConnector{}

_ Connector = &connmysql.MySqlConnector{}
_ GetVersionConnector = &connmysql.MySqlConnector{}
)
Loading

0 comments on commit e67454b

Please sign in to comment.