diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index d650670bf824..d99f279368c8 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1020,6 +1020,12 @@
Enum |
Type of the table.
Possible values:- "table": Normal Paimon table.
- "format-table": A file format table refers to a directory that contains multiple files of the same format.
- "materialized-table": A materialized table combines normal Paimon table and materialized SQL.
- "object-table": A object table combines normal Paimon table and object location.
|
+
+ write-actions |
+ "all" |
+ String |
+ This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure. 1. 'all': By default, all actions will be performed. 2. 'partition-expire': Perform partition expiration action. 3. 'snapshot-expire': Perform snapshot expiration action. 4. 'tag-automatic-creation': Perform automatic creation tag action. 5. 'full-compact': Perform full compaction action. 6. 'minor-compact': Perform minor compaction action. Both can be configured at the same time: 'partition-expire,snapshot-expire,tag-automatic-creation', if you want to skip all actions you can set this to ''. |
+
write-buffer-for-append |
false |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index afe4c50208f7..bbbd6db1196f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -443,6 +443,36 @@ public class CoreOptions implements Serializable {
"If set to true, compactions and snapshot expiration will be skipped. "
+ "This option is used along with dedicated compact jobs.");
+ public static final ConfigOption WRITE_ACTIONS =
+ key("write-actions")
+ .stringType()
+ .defaultValue(WriteAction.ALL.value)
+ .withDescription(
+ Description.builder()
+ .text(
+ "This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure.")
+ .linebreak()
+ .text("1. 'all': By default, all actions will be performed.")
+ .linebreak()
+ .text(
+ "2. 'partition-expire': Perform partition expiration action.")
+ .linebreak()
+ .text(
+ "3. 'snapshot-expire': Perform snapshot expiration action.")
+ .linebreak()
+ .text(
+ "4. 'tag-automatic-creation': Perform automatic creation tag action.")
+ .linebreak()
+ .text("5. 'full-compact': Perform full compaction action.")
+ .linebreak()
+ .text("6. 'minor-compact': Perform minor compaction action.")
+ .linebreak()
+ .text(
+ "Both can be configured at the same time: 'partition-expire,"
+ + "snapshot-expire,tag-automatic-creation', "
+ + "if you want to skip all actions you can set this to ''.")
+ .build());
+
public static final ConfigOption SOURCE_SPLIT_TARGET_SIZE =
key("source.split.target-size")
.memoryType()
@@ -2241,6 +2271,61 @@ public boolean writeOnly() {
return options.get(WRITE_ONLY);
}
+ public Set writeActions() {
+ if (writeOnly()) {
+ return new HashSet<>(0);
+ }
+ return writeActions(options.get(WRITE_ACTIONS));
+ }
+
+ public static Set writeActions(String str) {
+ if (StringUtils.isNullOrWhitespaceOnly(str)) {
+ return new HashSet<>(0);
+ }
+ return Arrays.stream(str.split(","))
+ .map(action -> WriteAction.valueOf(action.toUpperCase().replace('-', '_')))
+ .collect(Collectors.toCollection(HashSet::new));
+ }
+
+ public boolean doPartitionExpireAction(Set doWriteActions) {
+ return doAllWriteActions(doWriteActions)
+ || doWriteActions.contains(WriteAction.PARTITION_EXPIRE);
+ }
+
+ public boolean doSnapshotExpireAction(Set doWriteActions) {
+ return doAllWriteActions(doWriteActions)
+ || doWriteActions.contains(WriteAction.SNAPSHOT_EXPIRE);
+ }
+
+ public boolean doAutoCreateTagAction(Set doWriteActions) {
+ return doAllWriteActions(doWriteActions)
+ || doWriteActions.contains(WriteAction.TAG_AUTOMATIC_CREATION);
+ }
+
+ public boolean doFullCompactionAction(Set doWriteActions) {
+ return doAllWriteActions(doWriteActions)
+ || doWriteActions.contains(WriteAction.FULL_COMPACT);
+ }
+
+ public boolean doMinorCompactionAction(Set doWriteActions) {
+ return doAllWriteActions(doWriteActions)
+ || doWriteActions.contains(WriteAction.MINOR_COMPACT);
+ }
+
+ public boolean doCompact() {
+ return doCompact(writeActions());
+ }
+
+ public static boolean doCompact(Set doWriteActions) {
+ return doWriteActions.contains(WriteAction.ALL)
+ || doWriteActions.contains(WriteAction.FULL_COMPACT)
+ || doWriteActions.contains(WriteAction.MINOR_COMPACT);
+ }
+
+ public boolean doAllWriteActions(Set doWriteActions) {
+ return doWriteActions.contains(WriteAction.ALL);
+ }
+
public boolean streamingReadOverwrite() {
return options.get(STREAMING_READ_OVERWRITE);
}
@@ -3226,4 +3311,31 @@ public String toString() {
return value;
}
}
+
+ /** Actions performed during table writing. */
+ public enum WriteAction {
+
+ // All write actions will be performed, this is the default behavior.
+ ALL("all"),
+
+ // Actions during commit.
+ PARTITION_EXPIRE("partition-expire"),
+ SNAPSHOT_EXPIRE("snapshot-expire"),
+ TAG_AUTOMATIC_CREATION("tag-automatic-creation"),
+
+ // Actions during writing.
+ MINOR_COMPACT("minor-compact"),
+ FULL_COMPACT("full-compact");
+
+ private final String value;
+
+ WriteAction(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
index c58bad9a9796..d0afdd381c8a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
@@ -78,7 +78,7 @@ protected CompactManager getCompactManager(
List restoredFiles,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
- if (options.writeOnly()) {
+ if (!options.doCompact()) {
return new NoopCompactManager();
} else {
Function dvFactory =
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index d061e181618b..e40238d066f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -239,7 +239,7 @@ private CompactManager createCompactManager(
ExecutorService compactExecutor,
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
- if (options.writeOnly()) {
+ if (!options.doCompact()) {
return new NoopCompactManager();
} else {
Comparator keyComparator = keyComparatorSupplier.get();
@@ -288,7 +288,8 @@ private MergeTreeCompactRewriter createRewriter(
MergeEngine mergeEngine = options.mergeEngine();
ChangelogProducer changelogProducer = options.changelogProducer();
LookupStrategy lookupStrategy = options.lookupStrategy();
- if (changelogProducer.equals(FULL_COMPACTION)) {
+ if (changelogProducer.equals(FULL_COMPACTION)
+ && options.doFullCompactionAction(options.writeActions())) {
return new FullChangelogMergeTreeCompactRewriter(
maxLevel,
mergeEngine,
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index b23e50de19dc..d6c7fe532679 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -82,10 +82,12 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.Set;
import java.util.SortedMap;
import java.util.function.BiConsumer;
import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.WriteAction;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Abstract {@link FileStoreTable}. */
@@ -432,7 +434,8 @@ public ExpireSnapshots newExpireChangelog() {
public TableCommitImpl newCommit(String commitUser) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
- if (!options.writeOnly()) {
+ Set skippingActions = options.writeActions();
+ if (options.doSnapshotExpireAction(skippingActions)) {
boolean changelogDecoupled = options.changelogLifecycleDecoupled();
ExpireConfig expireConfig = options.expireConfig();
ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig);
@@ -449,8 +452,12 @@ public TableCommitImpl newCommit(String commitUser) {
return new TableCommitImpl(
store().newCommit(commitUser, createCommitCallbacks(commitUser)),
snapshotExpire,
- options.writeOnly() ? null : store().newPartitionExpire(commitUser),
- options.writeOnly() ? null : store().newTagCreationManager(),
+ options.doPartitionExpireAction(skippingActions)
+ ? store().newPartitionExpire(commitUser)
+ : null,
+ options.doAutoCreateTagAction(skippingActions)
+ ? store().newTagCreationManager()
+ : null,
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
options.snapshotExpireExecutionMode(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 73c96b2c4bb1..c0d13d255956 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -74,6 +74,7 @@ public CompactAction(
}
HashMap dynamicOptions = new HashMap<>(tableConf);
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
+ dynamicOptions.put(CoreOptions.WRITE_ACTIONS.key(), CoreOptions.WriteAction.ALL.toString());
table = table.copy(dynamicOptions);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 4cd085883d33..4bc3ea5f941f 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -46,6 +46,8 @@
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -79,6 +81,7 @@ public abstract class FlinkSink implements Serializable {
private static final String WRITER_NAME = "Writer";
private static final String WRITER_WRITE_ONLY_NAME = "Writer(write-only)";
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
+ private static final Logger log = LoggerFactory.getLogger(FlinkSink.class);
protected final FileStoreTable table;
private final boolean ignorePreviousFiles;
@@ -108,35 +111,44 @@ private StoreSinkWrite.Provider createWriteProvider(
if (coreOptions.writeOnly()) {
waitCompaction = false;
} else {
+ Set writeActions = coreOptions.writeActions();
waitCompaction = coreOptions.prepareCommitWaitCompaction();
int deltaCommits = -1;
- if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
- deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
- } else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
- long fullCompactionThresholdMs =
- options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
- deltaCommits =
- (int)
- (fullCompactionThresholdMs
- / checkpointConfig.getCheckpointInterval());
- }
+ if (coreOptions.doFullCompactionAction(writeActions)) {
+ if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
+ deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
+ } else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
+ long fullCompactionThresholdMs =
+ options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)
+ .toMillis();
+ deltaCommits =
+ (int)
+ (fullCompactionThresholdMs
+ / checkpointConfig.getCheckpointInterval());
+ }
- if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
- int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
- assertNoSinkMaterializer.run();
- return new GlobalFullCompactionSinkWrite(
- table,
- commitUser,
- state,
- ioManager,
- ignorePreviousFiles,
- waitCompaction,
- finalDeltaCommits,
- isStreaming,
- memoryPool,
- metricGroup);
- };
+ if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
+ int finalDeltaCommits = Math.max(deltaCommits, 1);
+ return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new GlobalFullCompactionSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ finalDeltaCommits,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ }
+ } else {
+ log.warn(
+ "According to the config {} = {}, the action of 'full-compact' will be skipped.",
+ CoreOptions.WRITE_ACTIONS.key(),
+ coreOptions.toConfiguration().get(CoreOptions.WRITE_ACTIONS));
}
}
@@ -213,10 +225,11 @@ public DataStream doWrite(
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming = isStreaming(input);
- boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator written =
input.transform(
- (writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME)
+ (table.coreOptions().writeOnly()
+ ? WRITER_WRITE_ONLY_NAME
+ : WRITER_NAME)
+ " : "
+ table.name(),
new CommittableTypeInfo(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 02a7e6c1b3c8..aaadcacaf90d 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -45,6 +45,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
@@ -149,9 +150,14 @@ public void processElement(StreamRecord element) throws Exception {
FileStoreTable table = getTable(tableId);
Preconditions.checkArgument(
- !table.coreOptions().writeOnly(),
- CoreOptions.WRITE_ONLY.key()
- + " should not be true for MultiTablesStoreCompactOperator.");
+ table.coreOptions().doCompact(),
+ String.format(
+ "%s should not be true or %s should be %s or contains %s/%s for MultiTablesStoreCompactOperator.",
+ CoreOptions.WRITE_ONLY.key(),
+ CoreOptions.WRITE_ACTIONS.key(),
+ CoreOptions.WriteAction.ALL,
+ CoreOptions.WriteAction.MINOR_COMPACT,
+ CoreOptions.WriteAction.FULL_COMPACT));
storeSinkWriteProvider =
createWriteProvider(table, checkpointConfig, isStreaming, ignorePreviousFiles);
@@ -259,13 +265,12 @@ private StoreSinkWrite.Provider createWriteProvider(
Options options = fileStoreTable.coreOptions().toConfiguration();
CoreOptions.ChangelogProducer changelogProducer =
fileStoreTable.coreOptions().changelogProducer();
- boolean waitCompaction;
CoreOptions coreOptions = fileStoreTable.coreOptions();
- if (coreOptions.writeOnly()) {
- waitCompaction = false;
- } else {
- waitCompaction = coreOptions.prepareCommitWaitCompaction();
- int deltaCommits = -1;
+ boolean waitCompaction = coreOptions.prepareCommitWaitCompaction();
+ Set writeActions = coreOptions.writeActions();
+ int deltaCommits = -1;
+
+ if (coreOptions.doFullCompactionAction(writeActions)) {
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
} else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 1870a0493c2f..b2bd132d5f3a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -70,9 +70,9 @@ private StoreCompactOperator(
String initialCommitUser,
boolean fullCompaction) {
super(parameters, Options.fromMap(table.options()));
- Preconditions.checkArgument(
- !table.coreOptions().writeOnly(),
- CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
+
+ checkWriteActions(table.coreOptions());
+
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
@@ -168,6 +168,18 @@ public void close() throws Exception {
write.close();
}
+ public static void checkWriteActions(CoreOptions coreOptions) {
+ Preconditions.checkArgument(
+ coreOptions.doCompact(),
+ String.format(
+ "%s should not be true or %s should be %s or contains %s/%s for StoreCompactOperator.",
+ CoreOptions.WRITE_ONLY.key(),
+ CoreOptions.WRITE_ACTIONS.key(),
+ CoreOptions.WriteAction.ALL,
+ CoreOptions.WriteAction.MINOR_COMPACT,
+ CoreOptions.WriteAction.FULL_COMPACT));
+ }
+
/** {@link StreamOperatorFactory} of {@link StoreCompactOperator}. */
public static class Factory extends PrepareCommitOperator.Factory {
private final FileStoreTable table;
@@ -181,9 +193,8 @@ public Factory(
String initialCommitUser,
boolean fullCompaction) {
super(Options.fromMap(table.options()));
- Preconditions.checkArgument(
- !table.coreOptions().writeOnly(),
- CoreOptions.WRITE_ONLY.key() + " should not be true for StoreCompactOperator.");
+ checkWriteActions(table.coreOptions());
+
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
this.initialCommitUser = initialCommitUser;
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
index 487d0f268986..2fea587837e3 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
@@ -87,7 +87,7 @@ public DataStream doWrite(
.setParallelism(written.getParallelism());
}
- boolean enableCompaction = !table.coreOptions().writeOnly();
+ boolean enableCompaction = table.coreOptions().doCompact();
boolean isStreamingMode =
input.getExecutionEnvironment()
.getConfiguration()
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriteActionsITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriteActionsITCase.java
new file mode 100644
index 000000000000..50505a821ef2
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriteActionsITCase.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link CoreOptions.WriteAction }. */
+public class WriteActionsITCase extends CatalogITCaseBase {
+
+ private static final int TIMEOUT = 180;
+
+ @Timeout(value = TIMEOUT)
+ @ParameterizedTest
+ @ValueSource(strings = {"PARTITION-EXPIRE", "SNAPSHOT-EXPIRE", "TAG-AUTOMATIC-CREATION"})
+ public void testWriteActionsWhichExecutedDuringCommit(String val) throws Exception {
+
+ CoreOptions.WriteAction writeAction =
+ CoreOptions.WriteAction.valueOf(val.replace("-", "_"));
+
+ HashMap writeActionOptions =
+ createOptions(
+ String.format(
+ "%s,%s,%s",
+ writeAction,
+ CoreOptions.WriteAction.FULL_COMPACT,
+ CoreOptions.WriteAction.MINOR_COMPACT));
+
+ createPrimaryKeyTable("T", writeActionOptions);
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ switch (writeAction) {
+ case PARTITION_EXPIRE:
+ // Only do partition expiration, so generate 3 snapshot, append,compact,overwrite.
+ expectTable(table, snapshotManager, 3, 3, 0, null);
+ // Snapshot 1 is APPEND, data write.
+ assertThat(snapshotManager.snapshot(1).commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+ // Snapshot 2 is COMPACT, full compact.
+ assertThat(snapshotManager.snapshot(2).commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+ // Snapshot 3 is OVERWRITE, partition expired.
+ assertThat(snapshotManager.snapshot(3).commitKind())
+ .isEqualTo(Snapshot.CommitKind.OVERWRITE);
+ break;
+ case SNAPSHOT_EXPIRE:
+ // Only do snapshot expiration, so only generate 2 snapshot, snapshot 1 has expired
+ // and partition will be retained.
+ expectTable(table, snapshotManager, 1, 2, 0, "20250101");
+ // Snapshot expired. so snapshot 1 has expired and snapshot 2 is COMPACT.
+ assertThat(snapshotManager.snapshot(2).commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+ break;
+ case TAG_AUTOMATIC_CREATION:
+ // Only do automatic tag creation, 1 tag will be created, 2 snapshot is data write
+ // and full-compact.
+ expectTable(table, snapshotManager, 2, 2, 1, "20250101");
+ // Snapshot 1 is APPEND, data write.
+ assertThat(snapshotManager.snapshot(1).commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+ // Snapshot 2 is Compact, full compact.
+ assertThat(snapshotManager.snapshot(2).commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+ }
+ }
+
+ @Timeout(value = TIMEOUT)
+ @ParameterizedTest
+ @ValueSource(strings = {"write-only", "do-all", "skip-all"})
+ public void testSkipOrDoAllWriteActions(String action) throws Exception {
+
+ // If write-only is true, the option of write-actions should be ignored.
+ HashMap options =
+ createOptions(
+ action.equals("do-all") ? "all" : action.equals("skip-all") ? "" : "all");
+
+ if (action.equals("write-only")) {
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ }
+
+ createPrimaryKeyTable("T", options);
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ switch (action) {
+ case "do-all":
+ // Test case for no actions being skipped. (write-only is false)
+ expectTable(table, snapshotManager, 2, 3, 1, null);
+ // snapshot 2 is compact.
+ assertThat(snapshotManager.snapshot(2).commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+ // partition expired.
+ assertThat(snapshotManager.snapshot(3).commitKind())
+ .isEqualTo(Snapshot.CommitKind.OVERWRITE);
+ break;
+
+ case "write-only":
+ // no compact, no expire, no tag.
+ expectTable(table, snapshotManager, 1, 1, 0, "20250101");
+ // only data write.
+ assertThat(snapshotManager.latestSnapshot().commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+ break;
+
+ case "skip-all":
+ // Even though write-only is false, all actions still skipped.
+ expectTable(table, snapshotManager, 1, 1, 0, "20250101");
+ // only data write.
+ assertThat(snapshotManager.latestSnapshot().commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+ break;
+ }
+ }
+
+ @Timeout(value = TIMEOUT)
+ @ParameterizedTest
+ @ValueSource(strings = {"FULL-COMPACT", "MINOR-COMPACT"})
+ public void testAppendOnlyCompactActions(String val) throws Exception {
+
+ CoreOptions.WriteAction writeAction =
+ CoreOptions.WriteAction.valueOf(val.replace("-", "_"));
+
+ HashMap writeActionOptions = createOptions(writeAction.toString());
+ writeActionOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "4");
+
+ createAppendOnlyTable("T", writeActionOptions);
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+
+ switch (writeAction) {
+ case FULL_COMPACT:
+ // Trigger full compaction.
+ expectTable(table, snapshotManager, 4, 4, 0, "20250101");
+ break;
+ case MINOR_COMPACT:
+ // Will not trigger full compact because we skip it.
+ expectTable(table, snapshotManager, 3, 3, 0, "20250101");
+ // Trigger minor compact.
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+ expectTable(table, snapshotManager, 5, 5, 0, "20250101");
+ }
+ }
+
+ @Timeout(value = TIMEOUT)
+ @ParameterizedTest
+ @ValueSource(strings = {"FULL-COMPACT", "MINOR-COMPACT"})
+ public void testPrimaryKeyTableCompactActions(String val) throws Exception {
+
+ CoreOptions.WriteAction writeAction =
+ CoreOptions.WriteAction.valueOf(val.replace("-", "_"));
+
+ HashMap writeActionOptions = createOptions(writeAction.toString());
+
+ // Ensure that a single data write does not trigger a minor compaction.
+ writeActionOptions.put(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER.key(), "2");
+
+ createPrimaryKeyTable("T", writeActionOptions);
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ switch (writeAction) {
+ case FULL_COMPACT:
+ // A single write will trigger full compact.
+ expectTable(table, snapshotManager, 2, 2, 0, "20250101");
+ // Snapshot 1 is APPEND.
+ assertThat(snapshotManager.snapshot(1).commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+ // Snapshot 2 is COMPACT.
+ assertThat(snapshotManager.snapshot(2).commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+ break;
+ case MINOR_COMPACT:
+ // A single write will not trigger a minor compaction.
+ expectTable(table, snapshotManager, 1, 1, 0, "20250101");
+ // Snapshot 1 is APPEND.
+ assertThat(snapshotManager.snapshot(1).commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+
+ // Second write to trigger minor compact.
+ sql("INSERT INTO T VALUES ('HXH2', '20250101')");
+ expectTable(table, snapshotManager, 3, 3, 0, "20250101");
+ // Snapshot 2 is APPEND.
+ assertThat(snapshotManager.snapshot(2).commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+ // Snapshot 3 is COMPACT.
+ assertThat(snapshotManager.snapshot(3).commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+ }
+ }
+
+ @Test
+ @Timeout(value = TIMEOUT)
+ public void testSkipCreateTagWithBatchMode() throws Catalog.TableNotExistException {
+ // only do partition expire.
+ HashMap options = createOptions("partition-expire");
+
+ // Skipping tag creation will not take effect if the tag creation mode is batch.
+ options.put(CoreOptions.TAG_AUTOMATIC_CREATION.key(), "batch");
+
+ createPrimaryKeyTable("T", options);
+ sql("INSERT INTO T VALUES ('a', '20250101')");
+ FileStoreTable table = paimonTable("T");
+ assertThat(table.tagManager().tagCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void testCompactProcedure() throws Catalog.TableNotExistException, IOException {
+ // skip all actions, the option will be ignored.
+ HashMap writeActionOptions = createOptions("");
+
+ createPrimaryKeyTable("T", writeActionOptions);
+ sql("INSERT INTO T VALUES ('HXH', '20250101')");
+
+ FileStoreTable table = paimonTable("T");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ // Even though write-only is false , all actions still skipped.
+ expectTable(table, snapshotManager, 1, 1, 0, "20250101");
+ assertThat(snapshotManager.latestSnapshot().commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ sql("CALL sys.compact(`table` => 'default.T')");
+
+ expectTable(table, snapshotManager, 2, 3, 1, null);
+ // compact.
+ assertThat(snapshotManager.snapshot(2).commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ // partition expired.
+ assertThat(snapshotManager.snapshot(3).commitKind())
+ .isEqualTo(Snapshot.CommitKind.OVERWRITE);
+ }
+
+ private HashMap createOptions(String writeActions) {
+ HashMap options = new HashMap<>();
+ // Partition expiration will be triggered every time.
+ options.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "1 d");
+ options.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "0 s");
+ options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
+ // Only keep one snapshot.
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
+ options.put(CoreOptions.TAG_AUTOMATIC_CREATION.key(), "process-time");
+ options.put(CoreOptions.TAG_CREATION_PERIOD.key(), "daily");
+ // full-compact will be triggered every time.
+ options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+
+ // what actions will be executed.
+ if (writeActions != null) {
+ options.put(CoreOptions.WRITE_ACTIONS.key(), writeActions);
+ }
+
+ return options;
+ }
+
+ private void expectTable(
+ FileStoreTable table,
+ SnapshotManager snapshotManager,
+ long snapshotCount,
+ long lastSnapshotId,
+ long tagCount,
+ String partition)
+ throws IOException {
+ assertThat(snapshotManager.snapshotCount()).isEqualTo(snapshotCount);
+ assertThat(snapshotManager.latestSnapshotId()).isEqualTo(lastSnapshotId);
+ assertThat(table.tagManager().tagCount()).isEqualTo(tagCount);
+ if (partition == null) {
+ assertThat(table.newScan().listPartitions().size()).isEqualTo(0);
+ } else {
+ assertThat(table.newScan().listPartitions().get(0).getString(0).toString())
+ .isEqualTo(partition);
+ }
+ }
+
+ private void createPrimaryKeyTable(String tableName, HashMap hintOptions) {
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("'bucket' = '1'\n");
+ hintOptions.forEach(
+ (k, v) -> sb.append(",'").append(k).append("'='").append(v).append("'\n"));
+
+ sql(
+ String.format(
+ "CREATE TABLE %s ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + "%s"
+ + ")",
+ tableName, sb));
+ }
+
+ private void createAppendOnlyTable(String tableName, HashMap hintOptions) {
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("'bucket' = '1'\n");
+ sb.append(",'bucket-key' = 'k'\n");
+ hintOptions.forEach(
+ (k, v) -> sb.append(",'").append(k).append("'='").append(v).append("'\n"));
+ sql(
+ String.format(
+ "CREATE TABLE %s ("
+ + " k STRING,"
+ + " dt STRING"
+ + ") PARTITIONED BY (dt) WITH ("
+ + "%s"
+ + ")",
+ tableName, sb));
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 4a43e39c31ba..1846acfc84db 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -198,6 +198,10 @@ public InternalRow[] call(InternalRow args) {
Map dynamicOptions = new HashMap<>();
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
+ dynamicOptions.put(
+ CoreOptions.WRITE_ACTIONS.key(),
+ CoreOptions.WriteAction.ALL.toString());
+
if (!StringUtils.isNullOrWhitespaceOnly(options)) {
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
}