Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling Updates on Partition Columns in Iceberg with Flink CDC #11573

Open
3 tasks
a8356555 opened this issue Nov 18, 2024 · 3 comments
Open
3 tasks

Handling Updates on Partition Columns in Iceberg with Flink CDC #11573

a8356555 opened this issue Nov 18, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@a8356555
Copy link

Apache Iceberg version

1.5.2

Query engine

Athena

Please describe the bug 🐞

Hi,

I'm using MySQL Flink CDC with Iceberg 1.5.2 and Flink 1.16. I have a table partitioned by the status column, but this column is subject to updates. When an update occurs, I encounter duplicate records in the Iceberg table, which is not the desired behavior.

Is there a way to properly handle updates on a partition column in Iceberg to avoid duplicates?

here is the sql of my flink CDC

CREATE CATALOG glue_catalog WITH (
    'type'='iceberg', 
    'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
    'warehouse'='s3://my-bucket'
);

CREATE TABLE mysql_cdc_source
(
    id  INT,
    status INT,
    value  INT,
    ...,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql',
    'port' = '3306',
    'username' = 'XXX',
    'password' = 'XXX',
    'database-name' = 'XXX',
    'table-name' = 'XXX',
    'debezium.snapshot.mode' = 'when_needed',
    'server-id' = '1'
);

CREATE TABLE IF NOT EXISTS glue_catalog.my_db.my_table(
    `id` INT NOT NULL,
    `status` INT,
    `value`  INT,
    ...,
iceberg)
PRIMARY KEY (id,status) NOT ENFORCED
) PARTITIONED BY (
    status
) WITH (
    'format-version'='2',
    'write.metadata.delete-after-commit.enabled'='true',
    'write.upsert.enabled'='true',
    'write.delete.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read',
    'write.update.mode'='merge-on-read'
);

INSERT INTO glue_catalog.my_db.my_table
SELECT
    *
FROM mysql_cdc_source;

data before updating mysql:
image

data after updating mysql (duplicated row showed up):
截圖 2024-11-18 中午12 19 24

get same results using spark 3.4:
image

query glue_catalog.my_db.my_table.files using spark 3.4
image

Thanks!

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@a8356555 a8356555 added the bug Something isn't working label Nov 18, 2024
@pvary
Copy link
Contributor

pvary commented Nov 18, 2024

What are the records generated by the MySQL CDC connector?

You are using upsert mode in FlinkSink.

In upsert mode when an update happens, Flink expects an unchanged primary key. Removes the old values for a given primary key, and insets a new record.

When the record is updated in a way that the primary key is changed, then it is not really an update in upsert mode. It should be a delete and an insert instead. It is the responsibility of the input stream to generate the correct records.

You can use write.upsert.enabled set to false if the MySQL connector is able to generate a retract stream.

@a8356555
Copy link
Author

What are the records generated by the MySQL CDC connector?

You are using upsert mode in FlinkSink.

In upsert mode when an update happens, Flink expects an unchanged primary key. Removes the old values for a given primary key, and insets a new record.

When the record is updated in a way that the primary key is changed, then it is not really an update in upsert mode. It should be a delete and an insert instead. It is the responsibility of the input stream to generate the correct records.

You can use write.upsert.enabled set to false if the MySQL connector is able to generate a retract stream.

But my use case requires upsert, so in this scenario, using status as the partition key is not suitable, right?

@pvary
Copy link
Contributor

pvary commented Nov 21, 2024

But my use case requires upsert, so in this scenario, using status as the partition key is not suitable, right?

The problem is not with the partitioning. The problem is that you added status to your PRIMARY KEY.

  • Original table:
CREATE TABLE mysql_cdc_source
(
[..]
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
  • New table:
CREATE TABLE IF NOT EXISTS glue_catalog.my_db.my_table(
[..]
PRIMARY KEY (id,status) NOT ENFORCED
) PARTITIONED BY (

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants