Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector] Fluss sink supports ignore_delete. #272

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink;

import com.alibaba.fluss.config.FlussConfigUtils;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -98,6 +99,15 @@ public class FlinkConnectorOptions {
+ "the new partitions for partitioned table while scanning."
+ " A non-positive value disables the partition discovery.");

public static final ConfigOption<DeleteStrategy> SINK_DELETE_STRATEGY =
ConfigOptions.key("sink.delete-strategy")
.enumType(DeleteStrategy.class)
.defaultValue(DeleteStrategy.CHANGELOG_STANDARD)
.withDescription(
"This field is used to decide what to do when data of type -D/-U is received. "
+ "`IGNORE_DELETE` means ignoring the `-D` and `-U` type message. "
+ "`CHANGELOG_STANDARD` means neither `-U` nor `-D` is ignored, they both cause the corresponding row in fluss to be deleted");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer sink.ignore-delete=true/false. If there are only 2 values and CHANGELOG_STANDARD is the default, I think sink.ignore-delete=true/false is much simpler for users to understand and use.

It seems sink.delete-strategy comes from here that contains additional values NON_PK_FIELD_TO_NULL and DELETE_ROW_ON_PK. However, Fluss already handles partial-updates and partial-delete with leveraging the INSERT INTO (columns) grammar and I don't see this config will extend additional values in the near future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems sink.delete-strategy comes from here that contains additional values NON_PK_FIELD_TO_NULL and DELETE_ROW_ON_PK.

Yes, I don't know whether user want to partial update(-D) not delete the whole row but also not ignore , just set the non pk columns as null.


// --------------------------------------------------------------------------------------------
// table storage specific options
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig tableOptions = helper.getOptions();

boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
Expand All @@ -139,10 +140,11 @@ public DynamicTableSink createDynamicTableSink(Context context) {

return new FlinkTableSink(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
toFlussClientConfig(tableOptions, context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode);
isStreamingMode,
tableOptions.get(FlinkConnectorOptions.SINK_DELETE_STRATEGY));
}

@Override
Expand All @@ -166,6 +168,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
FlinkConnectorOptions.SINK_DELETE_STRATEGY,
LookupOptions.MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.options;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/** The strategy when fluss sink receives delete data. */
@PublicEvolving
public enum DeleteStrategy implements Serializable {

/**
* Ignore -U and -D. This is applicable for scenarios where users only need to insert or update
* data without the need to delete data.
*/
IGNORE_DELETE,

/**
* Operate normally based on PK + rowkind, suitable for scenarios that do not involve partial
* updates. The Flink framework operates according to the Flink SQL Changelog working
* principles, not ignoring delete operations, and executes update operations by first deleting
* data then inserting, to ensure data accuracy.
*/
CHANGELOG_STANDARD;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.client.table.writer.AppendWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -35,8 +36,12 @@ class AppendSinkFunction extends FlinkSinkFunction {

private transient AppendWriter appendWriter;

AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
super(tablePath, flussConfig, tableRowType);
AppendSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, deleteStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.metrics.FlinkMetricRegistry;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -74,20 +75,27 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutCounter;
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;
private final DeleteStrategy deleteStrategy;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put the final variables together.


public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this(tablePath, flussConfig, tableRowType, null);
public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
this(tablePath, flussConfig, tableRowType, null, deleteStrategy);
}

public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumns) {
@Nullable int[] targetColumns,
DeleteStrategy deleteStrategy) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.targetColumnIndexes = targetColumns;
this.tableRowType = tableRowType;
this.deleteStrategy = deleteStrategy;
}

@Override
Expand Down Expand Up @@ -117,6 +125,12 @@ protected void initMetrics() {
@Override
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
checkAsyncException();
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)
&& (value.getRowKind() == RowKind.UPDATE_BEFORE
|| value.getRowKind() == RowKind.DELETE)) {
return;
}

InternalRow internalRow = dataConverter.toInternalRow(value);
CompletableFuture<Void> writeFuture = writeRow(value.getRowKind(), internalRow);
writeFuture.exceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class FlinkTableSink
private final RowType tableRowType;
private final int[] primaryKeyIndexes;
private final boolean streaming;
private final DeleteStrategy deleteStrategy;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
Expand All @@ -72,12 +74,14 @@ public FlinkTableSink(
Configuration flussConfig,
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming) {
boolean streaming,
DeleteStrategy deleteStrategy) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.deleteStrategy = deleteStrategy;
}

@Override
Expand All @@ -95,8 +99,12 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
}
return builder.build();
} else {
// append only
return ChangelogMode.insertOnly();
// log table which supports ignore_delete can accept RowKind.DELETE.
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)) {
return ChangelogMode.upsert();
} else {
return ChangelogMode.insertOnly();
}
Comment on lines +102 to +107
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        if (!streaming) {
            return ChangelogMode.insertOnly();
        } else {
            if (primaryKeyIndexes.length > 0
                    || DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)) {
                // primary-key table or ignore_delete mode can accept RowKind.DELETE
                ChangelogMode.Builder builder = ChangelogMode.newBuilder();
                for (RowKind kind : requestedMode.getContainedKinds()) {
                    // optimize out the update_before messages
                    if (kind != RowKind.UPDATE_BEFORE) {
                        builder.addContainedKind(kind);
                    }
                }
                return builder.build();
            } else {
                return ChangelogMode.insertOnly();
            }
        }

Better to dynamically remove update_before messages instead of hard-code upsert.

}
}
}
Expand Down Expand Up @@ -147,8 +155,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
FlinkSinkFunction sinkFunction =
primaryKeyIndexes.length > 0
? new UpsertSinkFunction(
tablePath, flussConfig, tableRowType, targetColumnIndexes)
: new AppendSinkFunction(tablePath, flussConfig, tableRowType);
tablePath,
flussConfig,
tableRowType,
targetColumnIndexes,
deleteStrategy)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test for primary key table as well.

: new AppendSinkFunction(
tablePath, flussConfig, tableRowType, deleteStrategy);

return SinkFunctionProvider.of(sinkFunction);
}
Expand All @@ -165,7 +178,12 @@ private List<String> columns(int[] columnIndexes) {
public DynamicTableSink copy() {
FlinkTableSink sink =
new FlinkTableSink(
tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming);
tablePath,
flussConfig,
tableRowType,
primaryKeyIndexes,
streaming,
deleteStrategy);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fluss.client.table.writer.UpsertWrite;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -42,8 +43,9 @@ class UpsertSinkFunction extends FlinkSinkFunction {
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes);
@Nullable int[] targetColumnIndexes,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes, deleteStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -67,7 +68,9 @@ void testSinkMetrics(String clientId) throws Exception {
Arrays.asList(
new RowType.RowField("id", DataTypes.INT().getLogicalType()),
new RowType.RowField("name", DataTypes.STRING().getLogicalType())));
FlinkSinkFunction flinkSinkFunction = new AppendSinkFunction(tablePath, flussConf, rowType);
FlinkSinkFunction flinkSinkFunction =
new AppendSinkFunction(
tablePath, flussConf, rowType, DeleteStrategy.CHANGELOG_STANDARD);
InterceptingOperatorMetricGroup interceptingOperatorMetricGroup =
new InterceptingOperatorMetricGroup();
MockStreamingRuntimeContext mockStreamingRuntimeContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -378,6 +379,28 @@ void testInsertWithoutSpecifiedCols() {
.isEqualTo(expectPlan);
}

@Test
void testIgnoreDelete() throws Exception {
org.apache.flink.table.api.Table cdcSourceData =
tEnv.fromChangelogStream(
env.fromData(
Row.ofKind(RowKind.INSERT, 1, 3501L, "Tim"),
Row.ofKind(RowKind.DELETE, 1, 3501L, "Tim"),
Row.ofKind(RowKind.INSERT, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_AFTER, 3, 3503L, "coco")));
tEnv.createTemporaryView("cdcSourceData", cdcSourceData);

tEnv.executeSql(
Copy link
Collaborator

@luoyuxia luoyuxia Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should verify a stream with -D/-U change mode can write to a log sink with ignoreDelete, but currently the source's changelog mode is +I only...

Btw, can we just use the public interface tableEnv.fromChangelogStream to do the testing? I'd like not to introduce an external test jar flink-table-planner since the test jar may change overtime and we may need to adjust our test then..

"create table sink_test (a int not null, b bigint, c string) with('bucket.num' = '3', 'sink.delete-strategy'='ignore_delete')");
tEnv.executeSql("INSERT INTO sink_test SELECT * FROM cdcSourceData").await();

CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect();
List<String> expectedRows =
Arrays.asList("+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", "+I[3, 3503, coco]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWritePartitionedTable(boolean isPrimaryKeyTable) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.lakehouse.paimon.record.MultiplexCdcRecord;
Expand Down Expand Up @@ -87,7 +88,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro
flussConfig,
FlinkConversions.toFlinkRowType(rowType),
tableDescriptor.getSchema().getPrimaryKeyIndexes(),
true);
true,
DeleteStrategy.CHANGELOG_STANDARD);

sinkFunction =
((SinkFunctionProvider)
Expand Down