Skip to content

Commit

Permalink
improve config operations
Browse files Browse the repository at this point in the history
  • Loading branch information
wuchong committed Jan 5, 2025
1 parent 2873d6a commit 1353756
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -883,9 +884,7 @@ void testFirstRowMergeEngine() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(DATA1_SCHEMA_PK)
.property(
ConfigOptions.TABLE_MERGE_ENGINE,
ConfigOptions.MergeEngine.FIRST_ROW)
.property(ConfigOptions.TABLE_MERGE_ENGINE, MergeEngine.FIRST_ROW)
.build();
RowType rowType = DATA1_SCHEMA_PK.toRowType();
createTable(DATA1_TABLE_PATH_PK, tableDescriptor, false);
Expand All @@ -895,21 +894,21 @@ void testFirstRowMergeEngine() throws Exception {
// first, put rows
UpsertWriter upsertWriter = table.getUpsertWriter();
List<InternalRow> expectedRows = new ArrayList<>(rows);
for (int row = 0; row < rows; row++) {
for (int id = 0; id < rows; id++) {
for (int num = 0; num < duplicateNum; num++) {
upsertWriter.upsert(compactedRow(rowType, new Object[] {row, "value_" + num}));
upsertWriter.upsert(compactedRow(rowType, new Object[] {id, "value_" + num}));
}
expectedRows.add(compactedRow(rowType, new Object[] {row, "value_0"}));
expectedRows.add(compactedRow(rowType, new Object[] {id, "value_0"}));
}
upsertWriter.flush();

// now, get rows by lookup
for (int row = 0; row < rows; row++) {
for (int id = 0; id < rows; id++) {
InternalRow gotRow =
table.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {row, "dumpy"}))
table.lookup(keyRow(DATA1_SCHEMA_PK, new Object[] {id, "dumpy"}))
.get()
.getRow();
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(row));
assertThatRow(gotRow).withSchema(rowType).isEqualTo(expectedRows.get(id));
}

// check scan change log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.utils.ArrayUtils;

import java.time.Duration;
Expand Down Expand Up @@ -1321,20 +1322,4 @@ public enum CompressionType {
LZ4,
ZSTD
}

/** The merge engine for primary key table. */
public enum MergeEngine {
FIRST_ROW("first_row");

private final String value;

MergeEngine(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.metadata;

/**
* The merge engine for primary key table.
*
* @since 0.6
*/
public enum MergeEngine {
FIRST_ROW("first_row");

private final String value;

MergeEngine(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public boolean isDataLakeEnabled() {
return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED);
}

public @Nullable ConfigOptions.MergeEngine getMergeEngine() {
public @Nullable MergeEngine getMergeEngine() {
return configuration().get(ConfigOptions.TABLE_MERGE_ENGINE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,11 @@
import java.util.Set;

import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
import static org.apache.flink.configuration.ConfigOptions.key;
import static com.alibaba.fluss.connector.flink.utils.FlinkConversions.toFlinkOption;

/** Factory to create table source and table sink for Fluss. */
public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

private static final ConfigOption<ConfigOptions.MergeEngine> MERGE_ENGINE_OPTION =
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
.enumType(ConfigOptions.MergeEngine.class)
.noDefaultValue();

private volatile LakeTableFactory lakeTableFactory;

@Override
Expand Down Expand Up @@ -119,15 +114,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions
.get(FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL)
.toMillis();
boolean isDatalakeEnabled =
tableOptions.get(
key(ConfigOptions.TABLE_DATALAKE_ENABLED.key())
.booleanType()
.defaultValue(false));

return new FlinkTableSource(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
toFlussClientConfig(tableOptions, context.getConfiguration()),
tableOutputType,
primaryKeyIndexes,
bucketKeyIndexes,
Expand All @@ -138,7 +128,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
tableOptions.get(FlinkConnectorOptions.LOOKUP_ASYNC),
cache,
partitionDiscoveryIntervalMs,
isDatalakeEnabled);
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
}

@Override
Expand All @@ -151,14 +142,15 @@ public DynamicTableSink createDynamicTableSink(Context context) {
== RuntimeExecutionMode.STREAMING;

RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
final ReadableConfig tableOptions = helper.getOptions();

return new FlinkTableSink(
toFlussTablePath(context.getObjectIdentifier()),
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
toFlussClientConfig(tableOptions, context.getConfiguration()),
rowType,
context.getPrimaryKeyIndexes(),
isStreamingMode,
helper.getOptions().get(MERGE_ENGINE_OPTION));
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.GenericRow;

Expand Down Expand Up @@ -64,7 +64,7 @@ public class FlinkTableSink
private final RowType tableRowType;
private final int[] primaryKeyIndexes;
private final boolean streaming;
@Nullable private final ConfigOptions.MergeEngine mergeEngine;
@Nullable private final MergeEngine mergeEngine;

private boolean appliedUpdates = false;
@Nullable private GenericRow deleteRow;
Expand All @@ -75,7 +75,7 @@ public FlinkTableSink(
RowType tableRowType,
int[] primaryKeyIndexes,
boolean streaming,
@Nullable ConfigOptions.MergeEngine mergeEngine) {
@Nullable MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
Expand Down Expand Up @@ -123,12 +123,12 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
throw new ValidationException(
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");
} else if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
} else if (mergeEngine == MergeEngine.FIRST_ROW) {
throw new ValidationException(
String.format(
"Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.",
tablePath, ConfigOptions.MergeEngine.FIRST_ROW));
tablePath, MergeEngine.FIRST_ROW));
}
}
int[][] targetColumns = context.getTargetColumns().get();
Expand Down Expand Up @@ -299,11 +299,11 @@ private void validateUpdatableAndDeletable() {
tablePath));
}

if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
if (mergeEngine == MergeEngine.FIRST_ROW) {
throw new UnsupportedOperationException(
String.format(
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
tablePath, ConfigOptions.MergeEngine.FIRST_ROW));
tablePath, MergeEngine.FIRST_ROW));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.alibaba.fluss.connector.flink.source;

import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.FlinkConnectorOptions;
import com.alibaba.fluss.connector.flink.source.enumerator.initializer.OffsetsInitializer;
Expand All @@ -27,6 +26,7 @@
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.types.RowType;

Expand Down Expand Up @@ -105,7 +105,7 @@ public class FlinkTableSource

private final long scanPartitionDiscoveryIntervalMs;
private final boolean isDataLakeEnabled;
@Nullable private final ConfigOptions.MergeEngine mergeEngine;
@Nullable private final MergeEngine mergeEngine;

// output type after projection pushdown
private LogicalType producedDataType;
Expand Down Expand Up @@ -137,7 +137,7 @@ public FlinkTableSource(
@Nullable LookupCache cache,
long scanPartitionDiscoveryIntervalMs,
boolean isDataLakeEnabled,
@Nullable ConfigOptions.MergeEngine mergeEngine) {
@Nullable MergeEngine mergeEngine) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableOutputType = tableOutputType;
Expand All @@ -164,7 +164,7 @@ public ChangelogMode getChangelogMode() {
} else {
if (hasPrimaryKey()) {
// pk table
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
if (mergeEngine == MergeEngine.FIRST_ROW) {
return ChangelogMode.insertOnly();
} else {
return ChangelogMode.all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,9 @@ public static List<org.apache.flink.configuration.ConfigOption<?>> toFlinkOption
}

/** Convert Fluss's ConfigOption to Flink's ConfigOption. */
public static org.apache.flink.configuration.ConfigOption<?> toFlinkOption(
ConfigOption<?> flussOption) {
@SuppressWarnings("unchecked")
public static <T> org.apache.flink.configuration.ConfigOption<T> toFlinkOption(
ConfigOption<T> flussOption) {
org.apache.flink.configuration.ConfigOptions.OptionBuilder builder =
org.apache.flink.configuration.ConfigOptions.key(flussOption.key());
org.apache.flink.configuration.ConfigOption<?> option;
Expand Down Expand Up @@ -301,7 +302,7 @@ public static org.apache.flink.configuration.ConfigOption<?> toFlinkOption(
}
option.withDescription(flussOption.description());
// TODO: support fallback keys in the future.
return option;
return (org.apache.flink.configuration.ConfigOption<T>) option;
}

private static Map<String, String> convertFlinkOptionsToFlussTableProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.fluss.memory.LazyMemorySegmentPool;
import com.alibaba.fluss.memory.MemorySegmentPool;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableDescriptor;
Expand Down Expand Up @@ -149,7 +150,7 @@ public KvTablet getOrCreateKv(
TableBucket tableBucket,
LogTablet logTablet,
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine)
@Nullable MergeEngine mergeEngine)
throws Exception {
return inLock(
tabletCreationOrDeletionLock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.fluss.memory.MemorySegmentPool;
import com.alibaba.fluss.metadata.KvFormat;
import com.alibaba.fluss.metadata.LogFormat;
import com.alibaba.fluss.metadata.MergeEngine;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
Expand Down Expand Up @@ -102,7 +103,7 @@ public final class KvTablet {
private final ReadWriteLock kvLock = new ReentrantReadWriteLock();
private final LogFormat logFormat;
private final KvFormat kvFormat;
private final @Nullable ConfigOptions.MergeEngine mergeEngine;
private final @Nullable MergeEngine mergeEngine;

/**
* The kv data in pre-write buffer whose log offset is less than the flushedLogOffset has been
Expand All @@ -124,7 +125,7 @@ private KvTablet(
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine) {
@Nullable MergeEngine mergeEngine) {
this.physicalPath = physicalPath;
this.tableBucket = tableBucket;
this.logTablet = logTablet;
Expand All @@ -148,7 +149,7 @@ public static KvTablet create(
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine)
@Nullable MergeEngine mergeEngine)
throws IOException {
Tuple2<PhysicalTablePath, TableBucket> tablePathAndBucket =
FlussPaths.parseTabletDir(kvTabletDir);
Expand All @@ -173,7 +174,7 @@ public static KvTablet create(
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat,
@Nullable ConfigOptions.MergeEngine mergeEngine)
@Nullable MergeEngine mergeEngine)
throws IOException {
RocksDBKv kv = buildRocksDBKv(conf, kvTabletDir);
return new KvTablet(
Expand Down Expand Up @@ -280,7 +281,7 @@ public LogAppendInfo putAsLeader(
"The specific key can't be found in kv tablet although the kv record is for deletion, "
+ "ignore it directly as it doesn't exist in the kv tablet yet.");
} else {
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
if (mergeEngine == MergeEngine.FIRST_ROW) {
// if the merge engine is first row, skip the deletion
continue;
}
Expand Down Expand Up @@ -308,7 +309,7 @@ public LogAppendInfo putAsLeader(
byte[] oldValue = getFromBufferOrKv(key);
// it's update
if (oldValue != null) {
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
if (mergeEngine == MergeEngine.FIRST_ROW) {
// if the merge engine is first row, skip the update
continue;
}
Expand Down
Loading

0 comments on commit 1353756

Please sign in to comment.