Skip to content

Commit

Permalink
[connector] Fluss sink supports ignore_delete.
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 committed Dec 26, 2024
1 parent 2700ef5 commit 2dc0437
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 17 deletions.
9 changes: 9 additions & 0 deletions fluss-connectors/fluss-connector-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink;

import com.alibaba.fluss.config.FlussConfigUtils;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -98,6 +99,15 @@ public class FlinkConnectorOptions {
+ "the new partitions for partitioned table while scanning."
+ " A non-positive value disables the partition discovery.");

public static final ConfigOption<DeleteStrategy> SINK_DELETE_STRATEGY =
ConfigOptions.key("sink.delete-strategy")
.enumType(DeleteStrategy.class)
.defaultValue(DeleteStrategy.CHANGELOG_STANDARD)
.withDescription(
"This field is used to decide what to do when data of type -D/-U is received. "
+ "`IGNORE_DELETE` means ignoring the `-D` and `-U` type message. "
+ "`CHANGELOG_STANDARD` means neither `-U` nor `-D` is ignored, they both cause the corresponding row in fluss to be deleted");

// --------------------------------------------------------------------------------------------
// table storage specific options
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig tableOptions = helper.getOptions();

boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
Expand All @@ -139,10 +140,11 @@ public DynamicTableSink createDynamicTableSink(Context context) {

return new FlinkTableSink(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
toFlussClientConfig(tableOptions, context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode);
isStreamingMode,
tableOptions.get(FlinkConnectorOptions.SINK_DELETE_STRATEGY));
}

@Override
Expand All @@ -166,6 +168,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
FlinkConnectorOptions.SINK_DELETE_STRATEGY,
LookupOptions.MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.options;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/** The strategy when fluss sink receives delete data. */
@PublicEvolving
public enum DeleteStrategy implements Serializable {

/**
* Ignore -U and -D. This is applicable for scenarios where users only need to insert or update
* data without the need to delete data.
*/
IGNORE_DELETE,

/**
* Operate normally based on PK + rowkind, suitable for scenarios that do not involve localized
* updates. The Flink framework operates according to the Flink SQL Changelog working
* principles, not ignoring delete operations, and executes update operations by first deleting
* data then inserting, to ensure data accuracy.
*/
CHANGELOG_STANDARD;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.client.table.writer.AppendWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -35,8 +36,12 @@ class AppendSinkFunction extends FlinkSinkFunction {

private transient AppendWriter appendWriter;

AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
super(tablePath, flussConfig, tableRowType);
AppendSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, deleteStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.metrics.FlinkMetricRegistry;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -74,20 +75,27 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutCounter;
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;
private final DeleteStrategy deleteStrategy;

public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this(tablePath, flussConfig, tableRowType, null);
public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
this(tablePath, flussConfig, tableRowType, null, deleteStrategy);
}

public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumns) {
@Nullable int[] targetColumns,
DeleteStrategy deleteStrategy) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.targetColumnIndexes = targetColumns;
this.tableRowType = tableRowType;
this.deleteStrategy = deleteStrategy;
}

@Override
Expand Down Expand Up @@ -117,6 +125,12 @@ protected void initMetrics() {
@Override
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
checkAsyncException();
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)
&& (value.getRowKind() == RowKind.UPDATE_BEFORE
|| value.getRowKind() == RowKind.DELETE)) {
return;
}

InternalRow internalRow = dataConverter.toInternalRow(value);
CompletableFuture<Void> writeFuture = writeRow(value.getRowKind(), internalRow);
writeFuture.exceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class FlinkTableSink
private final RowType tableRowType;
private final int[] primaryKeyIndexes;
private final boolean streaming;
private final DeleteStrategy deleteStrategy;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
Expand All @@ -72,12 +74,14 @@ public FlinkTableSink(
Configuration flussConfig,
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming) {
boolean streaming,
DeleteStrategy deleteStrategy) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.deleteStrategy = deleteStrategy;
}

@Override
Expand All @@ -95,8 +99,12 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
}
return builder.build();
} else {
// append only
return ChangelogMode.insertOnly();
// log table which supports ignore_delete can accept RowKind.DELETE.
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)) {
return ChangelogMode.upsert();
} else {
return ChangelogMode.insertOnly();
}
}
}
}
Expand Down Expand Up @@ -147,8 +155,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
FlinkSinkFunction sinkFunction =
primaryKeyIndexes.length > 0
? new UpsertSinkFunction(
tablePath, flussConfig, tableRowType, targetColumnIndexes)
: new AppendSinkFunction(tablePath, flussConfig, tableRowType);
tablePath,
flussConfig,
tableRowType,
targetColumnIndexes,
deleteStrategy)
: new AppendSinkFunction(
tablePath, flussConfig, tableRowType, deleteStrategy);

return SinkFunctionProvider.of(sinkFunction);
}
Expand All @@ -165,7 +178,12 @@ private List<String> columns(int[] columnIndexes) {
public DynamicTableSink copy() {
FlinkTableSink sink =
new FlinkTableSink(
tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming);
tablePath,
flussConfig,
tableRowType,
primaryKeyIndexes,
streaming,
deleteStrategy);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fluss.client.table.writer.UpsertWrite;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -42,8 +43,9 @@ class UpsertSinkFunction extends FlinkSinkFunction {
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes);
@Nullable int[] targetColumnIndexes,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes, deleteStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -67,7 +68,9 @@ void testSinkMetrics(String clientId) throws Exception {
Arrays.asList(
new RowType.RowField("id", DataTypes.INT().getLogicalType()),
new RowType.RowField("name", DataTypes.STRING().getLogicalType())));
FlinkSinkFunction flinkSinkFunction = new AppendSinkFunction(tablePath, flussConf, rowType);
FlinkSinkFunction flinkSinkFunction =
new AppendSinkFunction(
tablePath, flussConf, rowType, DeleteStrategy.CHANGELOG_STANDARD);
InterceptingOperatorMetricGroup interceptingOperatorMetricGroup =
new InterceptingOperatorMetricGroup();
MockStreamingRuntimeContext mockStreamingRuntimeContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -378,6 +380,30 @@ void testInsertWithoutSpecifiedCols() {
.isEqualTo(expectPlan);
}

@Test
void testIgnoreDelete() throws Exception {
String cdcSourceData =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, 3501L, "Tim"),
Row.ofKind(RowKind.DELETE, 1, 3501L, "Tim"),
Row.ofKind(RowKind.INSERT, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_AFTER, 3, 3503L, "coco")));
tEnv.executeSql(
"create temporary table source_test (a int not null, b bigint, c string) with('connector'='values', 'bounded' = 'true', 'data-id'='"
+ cdcSourceData
+ "')");
tEnv.executeSql(
"create table sink_test (a int not null, b bigint, c string) with('bucket.num' = '3', 'sink.delete-strategy'='IGNORE_DELETE')");
tEnv.executeSql("INSERT INTO sink_test SELECT * FROM source_test").await();

CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect();
List<String> expectedRows =
Arrays.asList("+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", "+I[3, 3503, coco]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWritePartitionedTable(boolean isPrimaryKeyTable) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.lakehouse.paimon.record.MultiplexCdcRecord;
Expand Down Expand Up @@ -87,7 +88,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro
flussConfig,
FlinkConversions.toFlinkRowType(rowType),
tableDescriptor.getSchema().getPrimaryKeyIndexes(),
true);
true,
DeleteStrategy.CHANGELOG_STANDARD);

sinkFunction =
((SinkFunctionProvider)
Expand Down

0 comments on commit 2dc0437

Please sign in to comment.