Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL CDC Plugin #3014

Open
wants to merge 40 commits into
base: main
Choose a base branch
from
Open

MySQL CDC Plugin #3014

wants to merge 40 commits into from

Conversation

le-vlad
Copy link
Contributor

@le-vlad le-vlad commented Nov 18, 2024

Adds support for MySQL CDC using golang-canal lib

Features supported:

  • BinLog Streaming
  • Initial snapshot streaming
  • Using composite primary keys for snapshot
  • Storing binlog position using cache

Versions tested:

"8.0", "9.0", "9.1"

@le-vlad le-vlad marked this pull request as ready for review November 21, 2024 12:52
@rockwotj rockwotj self-requested a review November 21, 2024 13:43
Copy link
Collaborator

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial pass - thanks!

Description("The key to store the last processed binlog position."),
service.NewStringField(fieldFlavor).
Description("The flavor of MySQL to connect to.").
Example("mysql"),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the alternative? Should this be the default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative can be mariadb, since it's compatible with MySQL.
But if we release this connector for MySQL deployments only - I can remove this field.

Description("The flavor of MySQL to connect to.").
Example("mysql"),
service.NewBoolField(fieldMaxSnapshotParallelTables).
Description("Int specifies a number of tables to be streamed in parallel when taking a snapshot. If set to true, the connector will stream all tables in parallel. Otherwise, it will stream tables one by one.").
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This declared as a bool field

Comment on lines 53 to 54
service.NewStringField(fieldCheckpointKey).
Description("The key to store the last processed binlog position."),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need a cache field? Then this is the key for the cache

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following CockroachDB code example. We had only one field for checkpointer/cache

Description("If set to true, the connector will query all the existing data as a part of snapshot procerss. Otherwise, it will start from the current binlog position."),
service.NewAutoRetryNacksToggleField(),
service.NewIntField(fieldCheckpointLimit).
Description("The maximum number of messages that can be processed at a given time. Increasing this limit enables parallel processing and batching at the output level. Any given LSN will not be acknowledged unless all messages under that offset are delivered in order to preserve at least once delivery guarantees.").
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LSN is a postgres concept not MySQL

return nil, err
}

if streamInput.binLogCache, err = conf.FieldString(fieldCheckpointKey); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the docs this isn't quite right.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly, and again, I'm following CockroachDB example

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckpointKey should be the key used in the cache, not the cache itself. We should name this field checkpoint_cache or bin_log_position_cache. We can't hardcode the key to mysql_binlog_position like we do right now because it prevents someone from using the same cache for multiple streams, we need a config and override for the key and a better name for the actual cache.

}

// 2. Acquire global read lock (minimizing lock time)
if _, err := s.lockConn.ExecContext(ctx, "FLUSH TABLES WITH READ LOCK"); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This WITH READ LOCK is only held with the context of this statement or the connection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for this statement. We execute UNLOCK TABLES a few lines down.

return nil, fmt.Errorf("failed to start consistent snapshot: %v", err)
}

// 2. Acquire global read lock (minimizing lock time)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we comment as to why we need to FLUSH TABLES?

return nil, fmt.Errorf("failed to start transaction: %v", err)
}

// Execute START TRANSACTION WITH CONSISTENT SNAPSHOT
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a helpful comment. Maybe better would be to explain why we need to use this transaction option?


func (s *Snapshot) getRowsCount(table string) (int, error) {
var count int
if err := s.tx.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM "+table).Scan(&count); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually fast? Should we be querying the table stats instead? https://stackoverflow.com/a/61548683

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I found table stats query is not accurate, meaning we may get a lower number of records, wherefore missing some of the snapshot data.
Even in the link you sent it says: Estimate but very performant.
So, I'd stick with count(*). But let me know your thoughts

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a count anyways? In postgres we just query until we get no rows back, meaning we've reached the end of a table (because the pk is sorted). Can we do the same here? I think this will be slow and expensive for large tables.

Comment on lines 118 to 120
SELECT COLUMN_NAME
FROM information_schema.KEY_COLUMN_USAGE
WHERE TABLE_NAME = '%s' AND CONSTRAINT_NAME = 'PRIMARY';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this yield them in the right order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my tests - yes.
but I'll add ORDER BY ORDINAL_POSITION; just to be sure

@le-vlad le-vlad requested a review from rockwotj November 28, 2024 16:12
Copy link
Collaborator

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round two! Looking better, thanks for your work here 😄

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create an integration test using all datatypes to prevent the issue we had with postgres and some types not being handled correctly? I asked ChatGPT to generate a table with all types and got this:

CREATE TABLE all_data_types (
    -- Numeric Data Types
    tinyint_col TINYINT,
    smallint_col SMALLINT,
    mediumint_col MEDIUMINT,
    int_col INT,
    bigint_col BIGINT,
    decimal_col DECIMAL(10, 2),
    numeric_col NUMERIC(10, 2),
    float_col FLOAT,
    double_col DOUBLE,

    -- Date and Time Data Types
    date_col DATE,
    datetime_col DATETIME,
    timestamp_col TIMESTAMP,
    time_col TIME,
    year_col YEAR,

    -- String Data Types
    char_col CHAR(10),
    varchar_col VARCHAR(255),
    binary_col BINARY(10),
    varbinary_col VARBINARY(255),
    tinyblob_col TINYBLOB,
    blob_col BLOB,
    mediumblob_col MEDIUMBLOB,
    longblob_col LONGBLOB,
    tinytext_col TINYTEXT,
    text_col TEXT,
    mediumtext_col MEDIUMTEXT,
    longtext_col LONGTEXT,
    enum_col ENUM('option1', 'option2', 'option3'),
    set_col SET('a', 'b', 'c', 'd'),

    -- Spatial Data Types
    geometry_col GEOMETRY,
    point_col POINT,
    linestring_col LINESTRING,
    polygon_col POLYGON,
    multipoint_col MULTIPOINT,
    multilinestring_col MULTILINESTRING,
    multipolygon_col MULTIPOLYGON,
    geometrycollection_col GEOMETRYCOLLECTION
);

}
}

func (i *mysqlStreamInput) OnPosSynced(eh *replication.EventHeader, pos mysqlReplications.Position, gtid mysqlReplications.GTIDSet, synced bool) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synced bool is force bool in the interface - can we rename that here? I guess it doesn't matter for us here because we always sync...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain when this is called? Because right now we seem to blindly overwrite currentLogPosition in multiple places and I don't know if that's always OK to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I was just following the instructions from the comment:
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.

Basically, it should be executed when canal lib is up to some position in the binlog and requires the consumer to sync this position.

I added force check for that to sync position when requested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But actually. I think we don't need to use this method. This method is called every time we receive a message to indicate the current position.

As the message itself doesn't have binglog file name inside, only position.
But since we handle binlog rotation events we can update this correctly.

I tested it, looks like we are just fine without this method.

go.mod Outdated
@@ -63,6 +63,7 @@ require (
github.com/getsentry/sentry-go v0.28.1
github.com/go-faker/faker/v4 v4.4.2
github.com/go-jose/go-jose/v3 v3.0.3
github.com/go-mysql-org/go-mysql v1.9.1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest is 1.10 - can we upgrade?

// canal stands for mysql binlog listener connection
canal *canal.Canal
mysqlConfig *mysql.Config
canal.DummyEventHandler
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: generally embedded structs are listed first - can we do that here?

logger *service.Logger
res *service.Resources

streamCtx context.Context
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be using the shutSig contexts instead of this.

return nil, err
}

if streamInput.binLogCache, err = conf.FieldString(fieldCheckpointKey); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CheckpointKey should be the key used in the cache, not the cache itself. We should name this field checkpoint_cache or bin_log_position_cache. We can't hardcode the key to mysql_binlog_position like we do right now because it prevents someone from using the same cache for multiple streams, we need a config and override for the key and a better name for the actual cache.


// ---- Redpanda Connect specific methods end----

// --- MySQL Canal handler methods ----
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other handler methods we don't need those? Can you explain why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we don't need to implement some of these methods since we don't use GTID to store the position. Instead, we rely on the binlog position.

  • OnGTID used to track transaction IDs
  • OnRowsQueryEvent used to track executed queries. We don't need this for CDC
  • OnTableChanged - we don't propagate changes changes to CDC consumers.
  • OnDDL - we don't propagate DDL changes.

return nil
}

if msgType, ok := lastMsg.MetaGet("type"); ok && msgType == "snapshot" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still incorrect.


func (s *Snapshot) getRowsCount(table string) (int, error) {
var count int
if err := s.tx.QueryRowContext(s.ctx, "SELECT COUNT(*) FROM "+table).Scan(&count); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a count anyways? In postgres we just query until we get no rows back, meaning we've reached the end of a table (because the pk is sorted). Can we do the same here? I think this will be slow and expensive for large tables.

}
}

func (s *Snapshot) prepareSnapshot(ctx context.Context) (*mysql.Position, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@toddfarmer have any concerns about how we take a snapshot of the DB?


func init() {
err := service.RegisterBatchInput(
"mysql_stream", mysqlStreamConfigSpec,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on some internal conversations, we should rename this to be more explicit it's a CDC connector

Suggested change
"mysql_stream", mysqlStreamConfigSpec,
"mysql_cdc", mysqlStreamConfigSpec,

@rockwotj rockwotj self-requested a review December 20, 2024 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants