Skip to content

Commit

Permalink
[core] Supports skipping specified actions during writing.
Browse files Browse the repository at this point in the history
  • Loading branch information
LinMingQiang committed Jan 12, 2025
1 parent 24c703a commit 9d3fcef
Show file tree
Hide file tree
Showing 4 changed files with 292 additions and 3 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,12 @@
<td>Integer</td>
<td>Write batch size for any file format if it supports.</td>
</tr>
<tr>
<td><h5>write.skip-actions</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>This parameter only works when write-only is false., You can specify which actions to skip during the write process.<br />1. 'partition-expire': skipping partition expire.<br />2. 'snapshot-expire': skipping snapshot expire.<br />3. 'create-tag': skipping auto create tag.<br />Both can be configured at the same time: 'partition-expire,snapshot-expire,create-tag'.</td>
</tr>
<tr>
<td><h5>zorder.var-length-contribution</h5></td>
<td style="word-wrap: break-word;">8</td>
Expand Down
63 changes: 63 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -442,6 +443,25 @@ public class CoreOptions implements Serializable {
"If set to true, compactions and snapshot expiration will be skipped. "
+ "This option is used along with dedicated compact jobs.");

public static final ConfigOption<String> WRITE_SKIP_ACTIONS =
key("write.skip-actions")
.stringType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"This parameter only works when write-only is false., You can specify which actions to skip during the write process.")
.linebreak()
.text("1. 'partition-expire': skipping partition expire.")
.linebreak()
.text("2. 'snapshot-expire': skipping snapshot expire.")
.linebreak()
.text("3. 'create-tag': skipping auto create tag.")
.linebreak()
.text(
"Both can be configured at the same time: 'partition-expire,snapshot-expire,create-tag'.")
.build());

public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
key("source.split.target-size")
.memoryType()
Expand Down Expand Up @@ -2216,6 +2236,27 @@ public boolean writeOnly() {
return options.get(WRITE_ONLY);
}

public HashSet<WriteAction> writeSkippingActions() {
String str = options.get(WRITE_SKIP_ACTIONS);
return StringUtils.isNullOrWhitespaceOnly(str)
? new HashSet<>(0)
: Arrays.stream(str.split(","))
.map(action -> WriteAction.valueOf(action.toUpperCase().replace('-', '_')))
.collect(Collectors.toCollection(HashSet::new));
}

public boolean skippingPartitionExpire(HashSet<WriteAction> skippingActions) {
return writeOnly() || skippingActions.contains(WriteAction.PARTITION_EXPIRE);
}

public boolean skippingSnapshotExpire(HashSet<WriteAction> skippingActions) {
return writeOnly() || skippingActions.contains(WriteAction.SNAPSHOT_EXPIRE);
}

public boolean skippingAutoCreateTag(HashSet<WriteAction> skippingActions) {
return writeOnly() || skippingActions.contains(WriteAction.CREATE_TAG);
}

public boolean streamingReadOverwrite() {
return options.get(STREAMING_READ_OVERWRITE);
}
Expand Down Expand Up @@ -3163,4 +3204,26 @@ public enum MaterializedTableRefreshStatus {
ACTIVATED,
SUSPENDED
}

/** Actions performed during table writing. */
public enum WriteAction {

// Actions during commit.
PARTITION_EXPIRE("partition-expire"),
SNAPSHOT_EXPIRE("snapshot-expire"),
CREATE_TAG("create-tag");

// TODO : Skipping more actions.

private final String value;

WriteAction(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -86,6 +87,7 @@
import java.util.function.BiConsumer;

import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.WriteAction;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Abstract {@link FileStoreTable}. */
Expand Down Expand Up @@ -432,7 +434,8 @@ public ExpireSnapshots newExpireChangelog() {
public TableCommitImpl newCommit(String commitUser) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
if (!options.writeOnly()) {
HashSet<WriteAction> skippingActions = options.writeSkippingActions();
if (!options.skippingSnapshotExpire(skippingActions)) {
boolean changelogDecoupled = options.changelogLifecycleDecoupled();
ExpireConfig expireConfig = options.expireConfig();
ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig);
Expand All @@ -449,8 +452,12 @@ public TableCommitImpl newCommit(String commitUser) {
return new TableCommitImpl(
store().newCommit(commitUser, createCommitCallbacks(commitUser)),
snapshotExpire,
options.writeOnly() ? null : store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
options.skippingPartitionExpire(skippingActions)
? null
: store().newPartitionExpire(commitUser),
options.skippingAutoCreateTag(skippingActions)
? null
: store().newTagCreationManager(),
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.CatalogITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.io.IOException;
import java.util.HashMap;

import static org.assertj.core.api.Assertions.assertThat;

/** ITCase for Skipping actions during writing. */
public class WriteSkippingActionsITCase extends CatalogITCaseBase {

private static final int TIMEOUT = 180;

@Test
@Timeout(value = TIMEOUT)
public void testSkippingCommitActions() throws Exception {

HashMap<String, String> optionsSkipPartitionExpire = createOptions("partition-expire");
HashMap<String, String> optionsSkipSnapshotExpire = createOptions("snapshot-expire");
HashMap<String, String> optionsSkippingCreateTag = createOptions("create-tag");

createTable(
"T_skip_partition_expire",
optionsSkipPartitionExpire); // skipping partition expire.
createTable(
"T_skip_snapshot_expire", optionsSkipSnapshotExpire); // skipping snapshot expire.
createTable("T_skip_create_tag", optionsSkippingCreateTag); // skipping create tags.

sql("INSERT INTO T_skip_partition_expire VALUES ('HXH', '20250101')");
sql("INSERT INTO T_skip_snapshot_expire VALUES ('HXH', '20250101')");
sql("INSERT INTO T_skip_create_tag VALUES ('HXH', '20250101')");

// Test case for skipping partition expire.
FileStoreTable tableSkipPartitionExpire = paimonTable("T_skip_partition_expire");
SnapshotManager smSkipPartitionExpire = tableSkipPartitionExpire.snapshotManager();
// Since partition expiration was skipped, there will be one less overwrite type snapshot.
expectTable(tableSkipPartitionExpire, smSkipPartitionExpire, 1, 2, 1, "20250101");
// Snapshot 2 is COMPACT.
assertThat(smSkipPartitionExpire.snapshot(2).commitKind())
.isEqualTo(Snapshot.CommitKind.COMPACT);

// Test case for skipping snapshot expire.
FileStoreTable tableSkipSnapshotExpire = paimonTable("T_skip_snapshot_expire");
SnapshotManager smSkipSnapshotExpire = tableSkipSnapshotExpire.snapshotManager();
// Because snapshot expiration is skipped, all snapshots are retained.
expectTable(tableSkipSnapshotExpire, smSkipSnapshotExpire, 3, 3, 1, null);
// Append write.
assertThat(smSkipSnapshotExpire.snapshot(1).commitKind())
.isEqualTo(Snapshot.CommitKind.APPEND);
// Data compact.
assertThat(smSkipSnapshotExpire.snapshot(2).commitKind())
.isEqualTo(Snapshot.CommitKind.COMPACT);
// Partition expired.
assertThat(smSkipSnapshotExpire.snapshot(3).commitKind())
.isEqualTo(Snapshot.CommitKind.OVERWRITE);

// Test case for skipping auto create tag.
FileStoreTable tableSkipCreateTag = paimonTable("T_skip_create_tag");
SnapshotManager smSkipCreateTag = tableSkipCreateTag.snapshotManager();
// No tags are generated because the automatic tag creation action is skipped.
expectTable(tableSkipCreateTag, smSkipCreateTag, 2, 3, 0, null);
// Partition expired.
assertThat(tableSkipCreateTag.snapshot(3).commitKind())
.isEqualTo(Snapshot.CommitKind.OVERWRITE);
}

@Test
@Timeout(value = TIMEOUT)
public void testSkippingAllActionsAndWriteOnly() throws Exception {

HashMap<String, String> optionsWriteOnly = new HashMap<>();
optionsWriteOnly.put(CoreOptions.WRITE_ONLY.key(), "true");

// All actions are skipped and only the compact is retained.
HashMap<String, String> optionsSkipAll =
createOptions("partition-expire,snapshot-expire,create-tag");

createTable("T", createOptions("")); // no actions being skipped.
createTable("T_write_only", optionsWriteOnly); // write only.
createTable("T_skip_all", optionsSkipAll); // skipping all actions.

sql("INSERT INTO T VALUES ('HXH', '20250101')");
sql("INSERT INTO T_write_only VALUES ('HXH', '20250101')");
sql("INSERT INTO T_skip_all VALUES ('HXH', '20250101')");

// Test case for no actions being skipped. (write-only is false)
FileStoreTable table = paimonTable("T");
SnapshotManager snapshotManager = table.snapshotManager();
// snapshot count is 2 (snapshot 1 has expired), last snapshot id is 3, auto create tag,
// partition expired.
expectTable(table, snapshotManager, 2, 3, 1, null);
// snapshot 2 is compact, snapshot 3 is overwrite because partition expired.
assertThat(snapshotManager.snapshot(2).commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
assertThat(snapshotManager.snapshot(3).commitKind())
.isEqualTo(Snapshot.CommitKind.OVERWRITE);

// Test case for write-only is true
FileStoreTable tableWriteOnly = paimonTable("T_write_only");
SnapshotManager smWriteOnly = tableWriteOnly.snapshotManager();
// no compact, no expire, no tag.
expectTable(tableWriteOnly, smWriteOnly, 1, 1, 0, "20250101");
assertThat(smWriteOnly.latestSnapshot().commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);

// All actions are skipped and only the compact is retained.
FileStoreTable tableSkipAll = paimonTable("T_skip_all");
SnapshotManager smSkipAll = tableSkipAll.snapshotManager();
// no expire, no tag, only compact.
expectTable(tableSkipAll, smSkipAll, 2, 2, 0, "20250101");
// Append write.
assertThat(smSkipAll.snapshot(1).commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
// Data compact.
assertThat(smSkipAll.snapshot(2).commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
}

@Test
@Timeout(value = TIMEOUT)
public void testSkipCreateTagWithBatchMode() throws Catalog.TableNotExistException {
HashMap<String, String> options = createOptions("create-tag");

// Skipping tag creation will not take effect if the tag creation mode is batch.
options.put(CoreOptions.TAG_AUTOMATIC_CREATION.key(), "batch");

createTable("T", options);
sql("INSERT INTO T VALUES ('a', '20250101')");
FileStoreTable table = paimonTable("T");
assertThat(table.tagManager().tagCount()).isEqualTo(1);
}

private HashMap<String, String> createOptions(String skippingActions) {
HashMap<String, String> options = new HashMap<>();
// Partition expiration will be triggered every time.
options.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "1 d");
options.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "0 s");
options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
// Only keep one snapshot.
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
options.put(CoreOptions.TAG_AUTOMATIC_CREATION.key(), "process-time");
options.put(CoreOptions.TAG_CREATION_PERIOD.key(), "daily");
// Compact will be triggered every time.
options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");

// skipping actions .
options.put(CoreOptions.WRITE_SKIP_ACTIONS.key(), skippingActions);

return options;
}

private void expectTable(
FileStoreTable table,
SnapshotManager snapshotManager,
long snapshotCount,
long lastSnapshotId,
long tagCount,
String partition)
throws IOException {
assertThat(snapshotManager.snapshotCount()).isEqualTo(snapshotCount);
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(lastSnapshotId);
assertThat(table.tagManager().tagCount()).isEqualTo(tagCount);
if (partition == null) {
assertThat(table.newScan().listPartitions().size()).isEqualTo(0);
} else {
assertThat(table.newScan().listPartitions().get(0).getString(0).toString())
.isEqualTo(partition);
}
}

private void createTable(String tableName, HashMap<String, String> hintOptions) {

StringBuilder sb = new StringBuilder();
sb.append("'bucket' = '1'\n");
hintOptions.forEach(
(k, v) -> sb.append(",'").append(k).append("'='").append(v).append("'\n"));

sql(
String.format(
"CREATE TABLE %s ("
+ " k STRING,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
+ ") PARTITIONED BY (dt) WITH ("
+ "%s"
+ ")",
tableName, sb));
}
}

0 comments on commit 9d3fcef

Please sign in to comment.