diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index ed2bfb94..cf51f0b7 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -72,6 +72,8 @@ import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; +import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK; +import static com.alibaba.fluss.record.TestData.DATA3_TABLE_PATH_PK; import static com.alibaba.fluss.testutils.DataTestUtils.assertRowValueEquals; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.keyRow; @@ -884,7 +886,7 @@ void testFirstRowMergeEngine() throws Exception { TableDescriptor tableDescriptor = TableDescriptor.builder() .schema(DATA1_SCHEMA_PK) - .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.FIRST_ROW) .build(); RowType rowType = DATA1_SCHEMA_PK.toRowType(); createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false); @@ -901,7 +903,6 @@ void testFirstRowMergeEngine() throws Exception { expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"})); } upsertWriter.flush(); - // now, get rows by lookup for (int id = 0; id < rows; id++) { InternalRow gotRow = @@ -910,17 +911,13 @@ void testFirstRowMergeEngine() throws Exception { .getRow(); assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id)); } - - // check scan change log LogScanner logScanner = table.getLogScanner(new LogScan()); logScanner.subscribeFromBeginning(0); - List actualLogRecords = new ArrayList<>(0); while (actualLogRecords.size() < rows) { ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); scanRecords.forEach(actualLogRecords::add); } - assertThat(actualLogRecords).hasSize(rows); for (int i = 0; i < actualLogRecords.size(); i++) { ScanRecord scanRecord = actualLogRecords.get(i); @@ -931,4 +928,71 @@ void testFirstRowMergeEngine() throws Exception { } } } + + @Test + void testMergeEngineWithVersion() throws Exception { + // Create table. + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(DATA3_SCHEMA_PK) + .property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.Type.VERSION) + .property(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN, "b") + .build(); + RowType rowType = DATA3_SCHEMA_PK.toRowType(); + createTable(DATA3_TABLE_PATH_PK, tableDescriptor, false); + + int rows = 3; + try (Table table = conn.getTable(DATA3_TABLE_PATH_PK)) { + // put rows. + UpsertWriter upsertWriter = table.getUpsertWriter(); + List expectedScanRecords = new ArrayList<>(rows); + // init rows. + for (int row = 0; row < rows; row++) { + upsertWriter.upsert(compactedRow(rowType, new Object[] {row, 1000L})); + expectedScanRecords.add( + new ScanRecord(compactedRow(rowType, new Object[] {row, 1000L}))); + } + // update row if id=0 and version < 1000L, will not update + upsertWriter.upsert(compactedRow(rowType, new Object[] {0, 999L})); + + // update if version> 1000L + upsertWriter.upsert(compactedRow(rowType, new Object[] {1, 1001L})); + // update_before record, don't care about offset/timestamp + expectedScanRecords.add( + new ScanRecord( + -1, + -1, + RowKind.UPDATE_BEFORE, + compactedRow(rowType, new Object[] {1, 1000L}))); + // update_after record + expectedScanRecords.add( + new ScanRecord( + -1, + -1, + RowKind.UPDATE_AFTER, + compactedRow(rowType, new Object[] {1, 1001L}))); + rows = rows + 2; + + upsertWriter.flush(); + + LogScanner logScanner = table.getLogScanner(new LogScan()); + logScanner.subscribeFromBeginning(0); + + List actualLogRecords = new ArrayList<>(rows); + while (actualLogRecords.size() < rows) { + ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1)); + scanRecords.forEach(actualLogRecords::add); + } + + assertThat(actualLogRecords).hasSize(rows); + for (int i = 0; i < rows; i++) { + ScanRecord actualScanRecord = actualLogRecords.get(i); + ScanRecord expectedRecord = expectedScanRecords.get(i); + assertThat(actualScanRecord.getRowKind()).isEqualTo(expectedRecord.getRowKind()); + assertThatRow(actualScanRecord.getRow()) + .withSchema(rowType) + .isEqualTo(expectedRecord.getRow()); + } + } + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 888049d2..dc1b8a9d 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -969,12 +969,18 @@ public class ConfigOptions { + "When this option is set to ture and the datalake tiering service is up," + " the table will be tiered and compacted into datalake format stored on lakehouse storage."); - public static final ConfigOption TABLE_MERGE_ENGINE = + public static final ConfigOption TABLE_MERGE_ENGINE = key("table.merge-engine") - .enumType(MergeEngine.class) + .enumType(MergeEngine.Type.class) .noDefaultValue() .withDescription("The merge engine for the primary key table."); + public static final ConfigOption TABLE_MERGE_ENGINE_VERSION_COLUMN = + key("table.merge-engine.version.column") + .stringType() + .noDefaultValue() + .withDescription("The merge engine version column for the primary key table."); + // ------------------------------------------------------------------------ // ConfigOptions for Kv // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java index fe1cfdb1..e76b98b0 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/MergeEngine.java @@ -1,13 +1,11 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Copyright (c) 2024 Alibaba Group Holding Ltd. * - * http://www.apache.org/licenses/LICENSE-2.0 + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,22 +16,95 @@ package com.alibaba.fluss.metadata; +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.config.Configuration; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Objects; + /** * The merge engine for primary key table. * * @since 0.6 */ -public enum MergeEngine { - FIRST_ROW("first_row"); +public class MergeEngine { + + private final Type type; + + /** When merge engine type is version, column cannot be null. */ + @Nullable private final String column; + + private MergeEngine(Type type) { + this(type, null); + } + + private MergeEngine(Type type, String column) { + this.type = type; + this.column = column; + } + + public static MergeEngine create(Map properties) { + return create(Configuration.fromMap(properties)); + } + + private static MergeEngine create(Configuration options) { + MergeEngine.Type type = options.get(ConfigOptions.TABLE_MERGE_ENGINE); + if (type == null) { + return null; + } + switch (type) { + case FIRST_ROW: + return new MergeEngine(Type.FIRST_ROW); + case VERSION: + String column = options.get(ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN); + if (column == null) { + throw new IllegalArgumentException( + String.format( + "When the merge engine is set to version, the '%s' must be set.", + ConfigOptions.TABLE_MERGE_ENGINE_VERSION_COLUMN.key())); + } + return new MergeEngine(Type.VERSION, column); + default: + throw new UnsupportedOperationException("Unsupported merge engine: " + type); + } + } - private final String value; + public Type getType() { + return type; + } + + public String getColumn() { + return column; + } - MergeEngine(String value) { - this.value = value; + public enum Type { + FIRST_ROW("first_row"), + VERSION("version"); + private final String value; + + Type(String value) { + this.value = value; + } + + @Override + public String toString() { + return value; + } + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + MergeEngine that = (MergeEngine) o; + return type == that.type && Objects.equals(column, that.column); } @Override - public String toString() { - return value; + public int hashCode() { + return Objects.hash(type, column); } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 9a346ce7..a850201e 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -281,7 +281,7 @@ public boolean isDataLakeEnabled() { } public @Nullable MergeEngine getMergeEngine() { - return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE); + return MergeEngine.create(properties); } public TableDescriptor copy(Map newProperties) { diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java b/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java index f9af6cee..64ae19fe 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/TestData.java @@ -187,4 +187,18 @@ public final class TestData { TableDescriptor.builder().schema(DATA2_SCHEMA).distributedBy(3, "a").build(), 1); // -------------------------------- data2 info end ------------------------------------ + + // ------------------- data3 and related table info begin ---------------------- + public static final Schema DATA3_SCHEMA_PK = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .withComment("a is first column") + .column("b", DataTypes.BIGINT()) + .withComment("b is second column") + .primaryKey("a") + .build(); + public static final TablePath DATA3_TABLE_PATH_PK = + TablePath.of("test_db_3", "test_pk_table_3"); + // ---------------------------- data3 table info end ------------------------------ + } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index 7f08a476..1e0713bb 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.connector.flink.sink.FlinkTableSink; import com.alibaba.fluss.connector.flink.source.FlinkTableSource; import com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils; +import com.alibaba.fluss.metadata.MergeEngine; import com.alibaba.fluss.metadata.TablePath; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -129,7 +130,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { cache, partitionDiscoveryIntervalMs, tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)), - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); + MergeEngine.create(helper.getOptions().toMap())); } @Override @@ -150,7 +151,7 @@ public DynamicTableSink createDynamicTableSink(Context context) { rowType, context.getPrimaryKeyIndexes(), isStreamingMode, - tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE))); + MergeEngine.create(helper.getOptions().toMap())); } @Override diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java index d9a9b96d..c0a33a9e 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java @@ -116,19 +116,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { // is 0, when no column specified, it's not partial update // see FLINK-36000 && context.getTargetColumns().get().length != 0) { - // is partial update, check whether partial update is supported or not if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) { if (primaryKeyIndexes.length == 0) { throw new ValidationException( "Fluss table sink does not support partial updates for table without primary key. Please make sure the " + "number of specified columns in INSERT INTO matches columns of the Fluss table."); - } else if (mergeEngine == MergeEngine.FIRST_ROW) { - throw new ValidationException( - String.format( - "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " - + "number of specified columns in INSERT INTO matches columns of the Fluss table.", - tablePath, MergeEngine.FIRST_ROW)); + } + if (mergeEngine != null) { + if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW + || mergeEngine.getType() == MergeEngine.Type.VERSION) { + throw new ValidationException( + String.format( + "Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the " + + "number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath, mergeEngine.getType())); + } } } int[][] targetColumns = context.getTargetColumns().get(); @@ -298,12 +301,14 @@ private void validateUpdatableAndDeletable() { "Table %s is a Log Table. Log Table doesn't support DELETE and UPDATE statements.", tablePath)); } - - if (mergeEngine == MergeEngine.FIRST_ROW) { - throw new UnsupportedOperationException( - String.format( - "Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.", - tablePath, MergeEngine.FIRST_ROW)); + if (mergeEngine != null) { + if (mergeEngine.getType() == MergeEngine.Type.FIRST_ROW + || mergeEngine.getType() == MergeEngine.Type.VERSION) { + throw new UnsupportedOperationException( + String.format( + "Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.", + tablePath, mergeEngine.getType())); + } } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 77484e15..930d05e5 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -164,7 +164,7 @@ public ChangelogMode getChangelogMode() { } else { if (hasPrimaryKey()) { // pk table - if (mergeEngine == MergeEngine.FIRST_ROW) { + if (mergeEngine != null && mergeEngine.getType() == MergeEngine.Type.FIRST_ROW) { return ChangelogMode.insertOnly(); } else { return ChangelogMode.all(); diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java index eda67565..6b28077c 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java @@ -787,6 +787,111 @@ void testUnsupportedStmtOnFirstRowMergeEngine() { tablePath); } + @Test + void testUnsupportedStmtOnVersionRowMergeEngine() { + String t1 = "versionMergeEngineTable"; + TablePath tablePath = TablePath.of(DEFAULT_DB, t1); + tBatchEnv.executeSql( + String.format( + "create table %s (" + + " a int not null," + + " b bigint null, " + + " c string null, " + + " primary key (a) not enforced" + + ") with ('table.merge-engine' = 'version', 'table.merge-engine.version.column' = 'b')", + t1)); + assertThatThrownBy(() -> tBatchEnv.executeSql("DELETE FROM " + t1 + " WHERE a = 1").await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("UPDATE " + t1 + " SET b = 4004 WHERE a = 1") + .await()) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support DELETE or UPDATE statements.", + tablePath); + + assertThatThrownBy( + () -> + tBatchEnv + .executeSql("INSERT INTO " + t1 + "(a, c) VALUES(1, 'c1')") + .await()) + .isInstanceOf(ValidationException.class) + .hasMessage( + "Table %s uses the 'version' merge engine which does not support partial updates." + + " Please make sure the number of specified columns in INSERT INTO matches columns of the Fluss table.", + tablePath); + } + + @Test + void testVersionMergeEngineWithTypeBigint() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts bigint) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + tEnv.executeSql( + "create table log_sink (a int not null primary key not enforced, b string, ts bigint)"); + + JobClient insertJobClient = + tEnv.executeSql("insert into log_sink select * from merge_engine_with_version") + .getJobClient() + .get(); + + // insert once + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)") + .await(); + + CloseableIterator rowIter = tEnv.executeSql("select * from log_sink").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList("+I[1, v1, 1000]", "+I[2, v2, 1000]", "+I[3, v3, 1000]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + // insert again, update id=3 + tEnv.executeSql( + "insert into merge_engine_with_version (a, b, ts) VALUES (3, 'v33', 1001), (4, 'v44', 1000)") + .await(); + expectedRows = Arrays.asList("-U[3, v3, 1000]", "+U[3, v33, 1001]", "+I[4, v44, 1000]"); + assertResultsIgnoreOrder(rowIter, expectedRows, false); + + insertJobClient.cancel().get(); + } + + @Test + void testVersionMergeEngineWithTypeTimestamp() throws Exception { + tEnv.executeSql( + "create table merge_engine_with_version (a int not null primary key not enforced," + + " b string, ts TIMESTAMP(3)) with('table.merge-engine' = 'version','table.merge-engine.version.column' = 'ts')"); + + // insert once + tEnv.executeSql( + "INSERT INTO merge_engine_with_version (a, b, ts) VALUES " + + "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), " + + "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), " + + "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), " + + "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');") + .await(); + + CloseableIterator rowIter = + tEnv.executeSql("select * from merge_engine_with_version").collect(); + + // id=1 not update + List expectedRows = + Arrays.asList( + "+I[1, v1, 2024-12-27T12:00:00.123]", + "+I[2, v2, 2024-12-27T12:00:00.123]", + "+I[3, v3, 2024-12-27T12:00:00.123]"); + + assertResultsIgnoreOrder(rowIter, expectedRows, true); + } + private InsertAndExpectValues rowsToInsertInto(Collection partitions) { List insertValues = new ArrayList<>(); List expectedValues = new ArrayList<>(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index abab7d7a..df568273 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -39,6 +39,9 @@ import com.alibaba.fluss.row.arrow.ArrowWriterProvider; import com.alibaba.fluss.row.encode.ValueDecoder; import com.alibaba.fluss.row.encode.ValueEncoder; +import com.alibaba.fluss.server.kv.mergeengine.FirstRowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.RowMergeEngine; +import com.alibaba.fluss.server.kv.mergeengine.VersionRowMergeEngine; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdater; import com.alibaba.fluss.server.kv.partialupdate.PartialUpdaterCache; import com.alibaba.fluss.server.kv.prewrite.KvPreWriteBuffer; @@ -267,7 +270,7 @@ public LogAppendInfo putAsLeader( KvRecordReadContext.createReadContext(kvFormat, fieldTypes); ValueDecoder valueDecoder = new ValueDecoder(readContext.getRowDecoder(schemaId)); - + RowMergeEngine rowMergeEngine = getRowMergeEngine(schema); int appendedRecordCount = 0; for (KvRecord kvRecord : kvRecords.records(readContext)) { byte[] keyBytes = BytesUtils.toArray(kvRecord.getKey()); @@ -281,12 +284,17 @@ public LogAppendInfo putAsLeader( "The specific key can't be found in kv tablet although the kv record is for deletion, " + "ignore it directly as it doesn't exist in the kv tablet yet."); } else { - if (mergeEngine == MergeEngine.FIRST_ROW) { - // if the merge engine is first row, skip the deletion - continue; - } + BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = deleteRow(oldRow, partialUpdater); + if (rowMergeEngine.shouldSkipDeletion(newRow)) { + continue; + } + newRow = rowMergeEngine.merge(oldRow, newRow); + if (newRow == null) { + continue; + } + // if newRow is null, it means the row should be deleted if (newRow == null) { walBuilder.append(RowKind.DELETE, oldRow); @@ -309,13 +317,13 @@ public LogAppendInfo putAsLeader( byte[] oldValue = getFromBufferOrKv(key); // it's update if (oldValue != null) { - if (mergeEngine == MergeEngine.FIRST_ROW) { - // if the merge engine is first row, skip the update - continue; - } BinaryRow oldRow = valueDecoder.decodeValue(oldValue).row; BinaryRow newRow = updateRow(oldRow, kvRecord.getRow(), partialUpdater); + newRow = rowMergeEngine.merge(oldRow, newRow); + if (newRow == null) { + continue; + } walBuilder.append(RowKind.UPDATE_BEFORE, oldRow); walBuilder.append(RowKind.UPDATE_AFTER, newRow); appendedRecordCount += 2; @@ -372,6 +380,23 @@ public LogAppendInfo putAsLeader( }); } + private RowMergeEngine getRowMergeEngine(Schema schema) { + if (mergeEngine != null) { + switch (mergeEngine.getType()) { + case VERSION: + return new VersionRowMergeEngine(schema, mergeEngine); + case FIRST_ROW: + return new FirstRowMergeEngine(); + } + } + return new RowMergeEngine() { + @Override + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + return newRow; + } + }; + } + private WalBuilder createWalBuilder(int schemaId, RowType rowType) throws Exception { switch (logFormat) { case INDEXED: diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java new file mode 100644 index 00000000..942d3eef --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/FirstRowMergeEngine.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.row.BinaryRow; + +/** + * The first row merge engine for primary key table. Always retain the first row. + * + * @since 0.6 + */ +public class FirstRowMergeEngine implements RowMergeEngine { + @Override + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + return oldRow == null ? newRow : null; + } + + @Override + public boolean shouldSkipDeletion(BinaryRow newRow) { + return newRow == null; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java new file mode 100644 index 00000000..56831cdf --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/RowMergeEngine.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.row.BinaryRow; + +/** + * The row merge engine for primary key table. + * + * @since 0.6 + */ +public interface RowMergeEngine { + BinaryRow merge(BinaryRow oldRow, BinaryRow newRow); + + default boolean shouldSkipDeletion(BinaryRow newRow) { + return false; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java new file mode 100644 index 00000000..478ef50a --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/mergeengine/VersionRowMergeEngine.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.fluss.server.kv.mergeengine; + +import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.metadata.MergeEngine; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.row.BinaryRow; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.row.TimestampLtz; +import com.alibaba.fluss.row.TimestampNtz; +import com.alibaba.fluss.types.BigIntType; +import com.alibaba.fluss.types.DataType; +import com.alibaba.fluss.types.IntType; +import com.alibaba.fluss.types.LocalZonedTimestampType; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.types.TimeType; +import com.alibaba.fluss.types.TimestampType; + +/** + * The version row merge engine for primary key table. The update will only occur if the new value + * of the specified version field is greater than the old value. + * + * @since 0.6 + */ +public class VersionRowMergeEngine implements RowMergeEngine { + + private final MergeEngine mergeEngine; + private final InternalRow.FieldGetter[] currentFieldGetters; + private final RowType rowType; + + public VersionRowMergeEngine(Schema schema, MergeEngine mergeEngine) { + this.mergeEngine = mergeEngine; + this.rowType = schema.toRowType(); + this.currentFieldGetters = new InternalRow.FieldGetter[rowType.getFieldCount()]; + for (int i = 0; i < rowType.getFieldCount(); i++) { + currentFieldGetters[i] = InternalRow.createFieldGetter(rowType.getTypeAt(i), i); + } + } + + @Override + public BinaryRow merge(BinaryRow oldRow, BinaryRow newRow) { + int fieldIndex = rowType.getFieldIndex(mergeEngine.getColumn()); + if (fieldIndex == -1) { + throw new IllegalArgumentException( + String.format( + "When the merge engine is set to version, the column %s does not exist.", + mergeEngine.getColumn())); + } + InternalRow.FieldGetter fieldGetter = currentFieldGetters[fieldIndex]; + Object oldValue = fieldGetter.getFieldOrNull(oldRow); + Object newValue = fieldGetter.getFieldOrNull(newRow); + // If the old value is null, simply overwrite it with the new value. + if (oldValue == null) return newRow; + // If the new value is empty, ignore it directly. + if (newValue == null) return null; + DataType dataType = rowType.getTypeAt(fieldIndex); + return getValueComparator(dataType).isGreaterThan(newValue, oldValue) ? newRow : null; + } + + @Override + public boolean shouldSkipDeletion(BinaryRow newRow) { + return true; + } + + private ValueComparator getValueComparator(DataType dataType) { + if (dataType instanceof BigIntType) { + return (left, right) -> (Long) left > (Long) right; + } + if (dataType instanceof IntType || dataType instanceof TimeType) { + return (left, right) -> (Integer) left > (Integer) right; + } + if (dataType instanceof TimestampType) { + return (left, right) -> + ((TimestampNtz) left).getMillisecond() + > ((TimestampNtz) right).getMillisecond(); + } + if (dataType instanceof LocalZonedTimestampType) { + return (left, right) -> + ((TimestampLtz) left).toEpochMicros() > ((TimestampLtz) right).toEpochMicros(); + } + throw new FlussRuntimeException("Unsupported data type: " + dataType.asSummaryString()); + } + + interface ValueComparator { + boolean isGreaterThan(Object left, Object right); + } +} diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java index 89674aaa..e6f99204 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/kv/KvTabletTest.java @@ -72,6 +72,7 @@ import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID; import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DATA2_SCHEMA; +import static com.alibaba.fluss.record.TestData.DATA3_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.testutils.DataTestUtils.compactedRow; import static com.alibaba.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; @@ -558,8 +559,11 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) LogTablet logTablet = createLogTablet(tempLogDir, 1L, tablePath); TableBucket tableBucket = logTablet.getTableBucket(); + Map config = new HashMap<>(); + config.put("table.merge-engine", "first_row"); KvTablet kvTablet = - createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, MergeEngine.FIRST_ROW); + createKvTablet( + tablePath, tableBucket, logTablet, tmpKvDir, MergeEngine.create(config)); List kvData1 = Arrays.asList( @@ -568,7 +572,6 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, "v23"})); KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); kvTablet.putAsLeader(kvRecordBatch1, null, DATA1_SCHEMA_PK); - long endOffset = logTablet.localLogEndOffset(); LogRecords actualLogRecords = readLogRecords(logTablet); List expectedLogs = @@ -600,6 +603,80 @@ void testFirstRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) checkEqual(actualLogRecords, expectedLogs); } + @Test + void testVersionRowMergeEngine(@TempDir File tempLogDir, @TempDir File tmpKvDir) + throws Exception { + + KvRecordTestUtils.KvRecordFactory kvRecordFactory = + KvRecordTestUtils.KvRecordFactory.of(DATA3_SCHEMA_PK.toRowType()); + + PhysicalTablePath tablePath = + PhysicalTablePath.of(TablePath.of("testDb", "test_version_row")); + + LogTablet logTablet = createLogTablet(tempLogDir, 1L, tablePath); + TableBucket tableBucket = logTablet.getTableBucket(); + + Map config = new HashMap<>(); + config.put("table.merge-engine", "version"); + config.put("table.merge-engine.version.column", "b"); + MergeEngine mergeEngine = MergeEngine.create(config); + KvTablet kvTablet = + createKvTablet(tablePath, tableBucket, logTablet, tmpKvDir, mergeEngine); + + List kvData1 = + Arrays.asList( + kvRecordFactory.ofRecord("k1".getBytes(), new Object[] {1, 1000L}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, 1000L}), + kvRecordFactory.ofRecord("k2".getBytes(), new Object[] {2, 1001L})); + KvRecordBatch kvRecordBatch1 = kvRecordBatchFactory.ofRecords(kvData1); + kvTablet.putAsLeader(kvRecordBatch1, null, DATA3_SCHEMA_PK); + + long endOffset = logTablet.localLogEndOffset(); + LogRecords actualLogRecords = readLogRecords(logTablet); + List expectedLogs = + Collections.singletonList( + logRecords( + DATA3_SCHEMA_PK.toRowType(), + 0, + Arrays.asList( + RowKind.INSERT, + RowKind.INSERT, + RowKind.UPDATE_BEFORE, + RowKind.UPDATE_AFTER), + Arrays.asList( + new Object[] {1, 1000L}, + new Object[] {2, 1000L}, + new Object[] {2, 1000L}, + new Object[] {2, 1001L}))); + checkEqual(actualLogRecords, expectedLogs, DATA3_SCHEMA_PK.toRowType()); + + List kvData2 = + Arrays.asList( + kvRecordFactory.ofRecord( + "k2".getBytes(), new Object[] {2, 1000L}), // not update + kvRecordFactory.ofRecord( + "k1".getBytes(), new Object[] {1, 1001L}), // -U , +U + kvRecordFactory.ofRecord("k3".getBytes(), new Object[] {3, 1000L})); // +I + KvRecordBatch kvRecordBatch2 = kvRecordBatchFactory.ofRecords(kvData2); + kvTablet.putAsLeader(kvRecordBatch2, null, DATA3_SCHEMA_PK); + + expectedLogs = + Collections.singletonList( + logRecords( + DATA3_SCHEMA_PK.toRowType(), + endOffset, + Arrays.asList( + RowKind.UPDATE_BEFORE, + RowKind.UPDATE_AFTER, + RowKind.INSERT), + Arrays.asList( + new Object[] {1, 1000L}, + new Object[] {1, 1001L}, + new Object[] {3, 1000L}))); + actualLogRecords = readLogRecords(logTablet, endOffset); + checkEqual(actualLogRecords, expectedLogs, DATA3_SCHEMA_PK.toRowType()); + } + private LogRecords readLogRecords() throws Exception { return readLogRecords(0L); }