Skip to content

Commit

Permalink
[connector] Fluss sink supports ignore_delete.
Browse files Browse the repository at this point in the history
  • Loading branch information
loserwang1024 committed Dec 26, 2024
1 parent 2700ef5 commit dbf95db
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,15 @@ public RecordAppendResult append(
synchronized (dq) {
RecordAppendResult appendResult =
tryAppend(writeRecord, callback, dq, batchesToBuild);

// todo: 1 如果能够append,就直接返回,不需要再创建新的batch了
// todo: 如果appendResult为空,说明不存在上一批或者上一批内存已经满了
if (appendResult != null) {
return appendResult;
}
}

// 如果配置abortIfBatchFull,也不写入了
// we don't have an in-progress record batch try to allocate a new batch
if (abortIfBatchFull) {
// Return a result that will cause another call to append.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ private void doSend(WriteRecord record, WriteCallback callback) {
accumulator.append(
record, callback, cluster, bucketId, bucketAssigner.abortIfBatchFull());

// 如果 abortRecordForNewBatch且bucketId对应的batch满了,就尝试写入另一个batch==正常不会走到这里
if (result.abortRecordForNewBatch) {
int prevBucketId = bucketId;
bucketAssigner.onNewBatch(cluster, prevBucketId);
Expand All @@ -180,6 +181,7 @@ private void doSend(WriteRecord record, WriteCallback callback) {
result = accumulator.append(record, callback, cluster, bucketId, false);
}

// 记录切换batch
if (result.batchIsFull || result.newBatchCreated) {
LOG.trace(
"Waking up the sender since table {} bucket {} is either full or getting a new batch",
Expand Down
9 changes: 9 additions & 0 deletions fluss-connectors/fluss-connector-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>


<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink;

import com.alibaba.fluss.config.FlussConfigUtils;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;

import org.apache.flink.configuration.ConfigOption;
Expand Down Expand Up @@ -98,6 +99,15 @@ public class FlinkConnectorOptions {
+ "the new partitions for partitioned table while scanning."
+ " A non-positive value disables the partition discovery.");

public static final ConfigOption<DeleteStrategy> SINK_DELETE_STRATEGY =
ConfigOptions.key("sink.delete-strategy")
.enumType(DeleteStrategy.class)
.defaultValue(DeleteStrategy.CHANGELOG_STANDARD)
.withDescription(
"This field is used to decide what to do when data of type -D/-U is received. "
+ "`IGNORE_DELETE` means ignoring the `-D` and `-U` type message. "
+ "`CHANGELOG_STANDARD` means neither `-U` nor `-D` is ignored, they both cause the corresponding row in fluss to be deleted");

// --------------------------------------------------------------------------------------------
// table storage specific options
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
helper.validate();
ReadableConfig tableOptions = helper.getOptions();

boolean isStreamingMode =
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
Expand All @@ -139,10 +140,11 @@ public DynamicTableSink createDynamicTableSink(Context context) {

return new FlinkTableSink(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
toFlussClientConfig(tableOptions, context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode);
isStreamingMode,
tableOptions.get(FlinkConnectorOptions.SINK_DELETE_STRATEGY));
}

@Override
Expand All @@ -166,6 +168,7 @@ public Set<ConfigOption<?>> optionalOptions() {
FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP,
FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL,
FlinkConnectorOptions.LOOKUP_ASYNC,
FlinkConnectorOptions.SINK_DELETE_STRATEGY,
LookupOptions.MAX_RETRIES,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.options;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/** The strategy when fluss sink receives delete data. */
@PublicEvolving
public enum DeleteStrategy implements Serializable {

/**
* Ignore -U and -D. This is applicable for scenarios where users only need to insert or update
* data without the need to delete data.
*/
IGNORE_DELETE,

/**
* Operate normally based on PK + rowkind, suitable for scenarios that do not involve localized
* updates. The Flink framework operates according to the Flink SQL Changelog working
* principles, not ignoring delete operations, and executes update operations by first deleting
* data then inserting, to ensure data accuracy.
*/
CHANGELOG_STANDARD;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.client.table.writer.AppendWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -35,8 +36,12 @@ class AppendSinkFunction extends FlinkSinkFunction {

private transient AppendWriter appendWriter;

AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
super(tablePath, flussConfig, tableRowType);
AppendSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, deleteStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.alibaba.fluss.client.table.Table;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.metrics.FlinkMetricRegistry;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -74,20 +75,27 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutCounter;
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;
private final DeleteStrategy deleteStrategy;

public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this(tablePath, flussConfig, tableRowType, null);
public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
DeleteStrategy deleteStrategy) {
this(tablePath, flussConfig, tableRowType, null, deleteStrategy);
}

public FlinkSinkFunction(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumns) {
@Nullable int[] targetColumns,
DeleteStrategy deleteStrategy) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.targetColumnIndexes = targetColumns;
this.tableRowType = tableRowType;
this.deleteStrategy = deleteStrategy;
}

@Override
Expand Down Expand Up @@ -117,6 +125,12 @@ protected void initMetrics() {
@Override
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
checkAsyncException();
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)
&& (value.getRowKind() == RowKind.UPDATE_BEFORE
|| value.getRowKind() == RowKind.DELETE)) {
return;
}

InternalRow internalRow = dataConverter.toInternalRow(value);
CompletableFuture<Void> writeFuture = writeRow(value.getRowKind(), internalRow);
writeFuture.exceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
Expand Down Expand Up @@ -63,6 +64,7 @@ public class FlinkTableSink
private final RowType tableRowType;
private final int[] primaryKeyIndexes;
private final boolean streaming;
private final DeleteStrategy deleteStrategy;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
Expand All @@ -72,12 +74,14 @@ public FlinkTableSink(
Configuration flussConfig,
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming) {
boolean streaming,
DeleteStrategy deleteStrategy) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.primaryKeyIndexes = primaryKeyIndexes;
this.streaming = streaming;
this.deleteStrategy = deleteStrategy;
}

@Override
Expand All @@ -95,8 +99,12 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
}
return builder.build();
} else {
// append only
return ChangelogMode.insertOnly();
// log table which supports ignore_delete can accept RowKind.DELETE.
if (DeleteStrategy.IGNORE_DELETE.equals(deleteStrategy)) {
return ChangelogMode.upsert();
} else {
return ChangelogMode.insertOnly();
}
}
}
}
Expand Down Expand Up @@ -147,8 +155,13 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
FlinkSinkFunction sinkFunction =
primaryKeyIndexes.length > 0
? new UpsertSinkFunction(
tablePath, flussConfig, tableRowType, targetColumnIndexes)
: new AppendSinkFunction(tablePath, flussConfig, tableRowType);
tablePath,
flussConfig,
tableRowType,
targetColumnIndexes,
deleteStrategy)
: new AppendSinkFunction(
tablePath, flussConfig, tableRowType, deleteStrategy);

return SinkFunctionProvider.of(sinkFunction);
}
Expand All @@ -165,7 +178,12 @@ private List<String> columns(int[] columnIndexes) {
public DynamicTableSink copy() {
FlinkTableSink sink =
new FlinkTableSink(
tablePath, flussConfig, tableRowType, primaryKeyIndexes, streaming);
tablePath,
flussConfig,
tableRowType,
primaryKeyIndexes,
streaming,
deleteStrategy);
sink.appliedUpdates = appliedUpdates;
sink.deleteRow = deleteRow;
return sink;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alibaba.fluss.client.table.writer.UpsertWrite;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;
Expand All @@ -42,8 +43,9 @@ class UpsertSinkFunction extends FlinkSinkFunction {
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes);
@Nullable int[] targetColumnIndexes,
DeleteStrategy deleteStrategy) {
super(tablePath, flussConfig, tableRowType, targetColumnIndexes, deleteStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.source.testutils.FlinkTestBase;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -67,7 +68,9 @@ void testSinkMetrics(String clientId) throws Exception {
Arrays.asList(
new RowType.RowField("id", DataTypes.INT().getLogicalType()),
new RowType.RowField("name", DataTypes.STRING().getLogicalType())));
FlinkSinkFunction flinkSinkFunction = new AppendSinkFunction(tablePath, flussConf, rowType);
FlinkSinkFunction flinkSinkFunction =
new AppendSinkFunction(
tablePath, flussConf, rowType, DeleteStrategy.CHANGELOG_STANDARD);
InterceptingOperatorMetricGroup interceptingOperatorMetricGroup =
new InterceptingOperatorMetricGroup();
MockStreamingRuntimeContext mockStreamingRuntimeContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -378,6 +380,30 @@ void testInsertWithoutSpecifiedCols() {
.isEqualTo(expectPlan);
}

@Test
void testIgnoreDelete() throws Exception {
String cdcSourceData =
TestValuesTableFactory.registerData(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, 3501L, "Tim"),
Row.ofKind(RowKind.DELETE, 1, 3501L, "Tim"),
Row.ofKind(RowKind.INSERT, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3502L, "Fabian"),
Row.ofKind(RowKind.UPDATE_AFTER, 3, 3503L, "coco")));
tEnv.executeSql(
"create temporary table source_test (a int not null, b bigint, c string) with('connector'='values', 'bounded' = 'true', 'data-id'='"
+ cdcSourceData
+ "')");
tEnv.executeSql(
"create table sink_test (a int not null, b bigint, c string) with('bucket.num' = '3', 'sink.delete-strategy'='IGNORE_DELETE')");
tEnv.executeSql("INSERT INTO sink_test SELECT * FROM source_test").await();

CloseableIterator<Row> rowIter = tEnv.executeSql("select * from sink_test").collect();
List<String> expectedRows =
Arrays.asList("+I[1, 3501, Tim]", "+I[2, 3502, Fabian]", "+I[3, 3503, coco]");
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testWritePartitionedTable(boolean isPrimaryKeyTable) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.client.ConnectionFactory;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.options.DeleteStrategy;
import com.alibaba.fluss.connector.flink.sink.FlinkTableSink;
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.lakehouse.paimon.record.MultiplexCdcRecord;
Expand Down Expand Up @@ -87,7 +88,8 @@ public void invoke(MultiplexCdcRecord record, SinkFunction.Context context) thro
flussConfig,
FlinkConversions.toFlinkRowType(rowType),
tableDescriptor.getSchema().getPrimaryKeyIndexes(),
true);
true,
DeleteStrategy.CHANGELOG_STANDARD);

sinkFunction =
((SinkFunctionProvider)
Expand Down

0 comments on commit dbf95db

Please sign in to comment.