Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector] Fluss sink supports ignore_delete. #272

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

loserwang1024
Copy link
Collaborator

Purpose

Linked issue: close #251

IGNORE_DELETE,

/**
* Operate normally based on PK + rowkind, suitable for scenarios that do not involve localized
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What localized updates means?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial update is better, I will update it.

Row.ofKind(RowKind.INSERT, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_AFTER, 3, 3503L, "coco")));
tEnv.executeSql(
Copy link
Collaborator

@luoyuxia luoyuxia Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify a stream with -D/-U change mode can write to a log sink with ignoreDelete, but currently the source's changelog mode is +I only...

Btw, can we just use the public interface tableEnv.fromChangelogStream to do the testing? I'd like not to introduce an external test jar flink-table-planner since the test jar may change overtime and we may need to adjust our test then..

+ cdcSourceData
+ "')");
tEnv.executeSql(
"create table sink_test (a int not null, b bigint, c string) with('bucket.num' = '3', 'sink.delete-strategy'='IGNORE_DELETE')");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use ignore_delete since user likely to use lowercase in table option

Copy link
Collaborator

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! May @wuchong check the table option 'sink.delete-strategy'='ignore_delete' used for ignore delete is fine to you?

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @luoyuxia and @loserwang1024 , I left some comments about the config.

@@ -74,20 +75,27 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutCounter;
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;
private final DeleteStrategy deleteStrategy;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the final variables together.

flussConfig,
tableRowType,
targetColumnIndexes,
deleteStrategy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for primary key table as well.

Comment on lines +102 to +107
// log table which supports ignore_delete can accept RowKind.DELETE.
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)) {
return ChangelogMode.upsert();
} else {
return ChangelogMode.insertOnly();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        if (!streaming) {
            return ChangelogMode.insertOnly();
        } else {
            if (primaryKeyIndexes.length > 0
                    || DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)) {
                // primary-key table or ignore_delete mode can accept RowKind.DELETE
                ChangelogMode.Builder builder = ChangelogMode.newBuilder();
                for (RowKind kind : requestedMode.getContainedKinds()) {
                    // optimize out the update_before messages
                    if (kind != RowKind.UPDATE_BEFORE) {
                        builder.addContainedKind(kind);
                    }
                }
                return builder.build();
            } else {
                return ChangelogMode.insertOnly();
            }
        }

Better to dynamically remove update_before messages instead of hard-code upsert.

.withDescription(
"This field is used to decide what to do when data of type -D/-U is received. "
+ "`IGNORE_DELETE` means ignoring the `-D` and `-U` type message. "
+ "`CHANGELOG_STANDARD` means neither `-U` nor `-D` is ignored, they both cause the corresponding row in fluss to be deleted");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer sink.ignore-delete=true/false. If there are only 2 values and CHANGELOG_STANDARD is the default, I think sink.ignore-delete=true/false is much simpler for users to understand and use.

It seems sink.delete-strategy comes from here that contains additional values NON_PK_FIELD_TO_NULL and DELETE_ROW_ON_PK. However, Fluss already handles partial-updates and partial-delete with leveraging the INSERT INTO (columns) grammar and I don't see this config will extend additional values in the near future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][connector] Support ignore delete for Log Table
3 participants