Skip to content

Commit ed49eb8

Browse files
committed
[core] Supports skipping specified actions during writing.
1 parent 24c703a commit ed49eb8

File tree

4 files changed

+294
-3
lines changed

4 files changed

+294
-3
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

+6
Original file line numberDiff line numberDiff line change
@@ -1050,6 +1050,12 @@
10501050
<td>Integer</td>
10511051
<td>Write batch size for any file format if it supports.</td>
10521052
</tr>
1053+
<tr>
1054+
<td><h5>write.skip-actions</h5></td>
1055+
<td style="word-wrap: break-word;">(none)</td>
1056+
<td>String</td>
1057+
<td>This parameter only works when write-only is false., You can specify which actions to skip during the write process.<br />1. 'partition-expire': skipping partition expire.<br />2. 'snapshot-expire': skipping snapshot expire.<br />3. 'create-tag': skipping auto create tag.<br />Both can be configured at the same time: 'partition-expire,snapshot-expire,create-tag'.</td>
1058+
</tr>
10531059
<tr>
10541060
<td><h5>zorder.var-length-contribution</h5></td>
10551061
<td style="word-wrap: break-word;">8</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

+63
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Arrays;
5050
import java.util.Collections;
5151
import java.util.HashMap;
52+
import java.util.HashSet;
5253
import java.util.List;
5354
import java.util.Locale;
5455
import java.util.Map;
@@ -442,6 +443,25 @@ public class CoreOptions implements Serializable {
442443
"If set to true, compactions and snapshot expiration will be skipped. "
443444
+ "This option is used along with dedicated compact jobs.");
444445

446+
public static final ConfigOption<String> WRITE_SKIP_ACTIONS =
447+
key("write.skip-actions")
448+
.stringType()
449+
.noDefaultValue()
450+
.withDescription(
451+
Description.builder()
452+
.text(
453+
"This parameter only works when write-only is false., You can specify which actions to skip during the write process.")
454+
.linebreak()
455+
.text("1. 'partition-expire': skipping partition expire.")
456+
.linebreak()
457+
.text("2. 'snapshot-expire': skipping snapshot expire.")
458+
.linebreak()
459+
.text("3. 'create-tag': skipping auto create tag.")
460+
.linebreak()
461+
.text(
462+
"Both can be configured at the same time: 'partition-expire,snapshot-expire,create-tag'.")
463+
.build());
464+
445465
public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
446466
key("source.split.target-size")
447467
.memoryType()
@@ -2216,6 +2236,27 @@ public boolean writeOnly() {
22162236
return options.get(WRITE_ONLY);
22172237
}
22182238

2239+
public HashSet<WriteAction> writeSkippingActions() {
2240+
String str = options.get(WRITE_SKIP_ACTIONS);
2241+
return StringUtils.isNullOrWhitespaceOnly(str)
2242+
? new HashSet<>(0)
2243+
: Arrays.stream(str.split(","))
2244+
.map(action -> WriteAction.valueOf(action.toUpperCase().replace('-', '_')))
2245+
.collect(Collectors.toCollection(HashSet::new));
2246+
}
2247+
2248+
public boolean skippingPartitionExpire(HashSet<WriteAction> skippingActions) {
2249+
return writeOnly() || skippingActions.contains(WriteAction.PARTITION_EXPIRE);
2250+
}
2251+
2252+
public boolean skippingSnapshotExpire(HashSet<WriteAction> skippingActions) {
2253+
return writeOnly() || skippingActions.contains(WriteAction.SNAPSHOT_EXPIRE);
2254+
}
2255+
2256+
public boolean skippingAutoCreateTag(HashSet<WriteAction> skippingActions) {
2257+
return writeOnly() || skippingActions.contains(WriteAction.CREATE_TAG);
2258+
}
2259+
22192260
public boolean streamingReadOverwrite() {
22202261
return options.get(STREAMING_READ_OVERWRITE);
22212262
}
@@ -3163,4 +3204,26 @@ public enum MaterializedTableRefreshStatus {
31633204
ACTIVATED,
31643205
SUSPENDED
31653206
}
3207+
3208+
/** Actions performed during table writing. */
3209+
public enum WriteAction {
3210+
3211+
// Actions during commit.
3212+
PARTITION_EXPIRE("partition-expire"),
3213+
SNAPSHOT_EXPIRE("snapshot-expire"),
3214+
CREATE_TAG("create-tag");
3215+
3216+
// TODO : Support skipping actions during write operations.
3217+
3218+
private final String value;
3219+
3220+
WriteAction(String value) {
3221+
this.value = value;
3222+
}
3223+
3224+
@Override
3225+
public String toString() {
3226+
return value;
3227+
}
3228+
}
31663229
}

paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import java.time.Duration;
7878
import java.util.ArrayList;
7979
import java.util.HashMap;
80+
import java.util.HashSet;
8081
import java.util.List;
8182
import java.util.Map;
8283
import java.util.Objects;
@@ -86,6 +87,7 @@
8687
import java.util.function.BiConsumer;
8788

8889
import static org.apache.paimon.CoreOptions.PATH;
90+
import static org.apache.paimon.CoreOptions.WriteAction;
8991
import static org.apache.paimon.utils.Preconditions.checkArgument;
9092

9193
/** Abstract {@link FileStoreTable}. */
@@ -432,7 +434,8 @@ public ExpireSnapshots newExpireChangelog() {
432434
public TableCommitImpl newCommit(String commitUser) {
433435
CoreOptions options = coreOptions();
434436
Runnable snapshotExpire = null;
435-
if (!options.writeOnly()) {
437+
HashSet<WriteAction> skippingActions = options.writeSkippingActions();
438+
if (!options.skippingSnapshotExpire(skippingActions)) {
436439
boolean changelogDecoupled = options.changelogLifecycleDecoupled();
437440
ExpireConfig expireConfig = options.expireConfig();
438441
ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig);
@@ -449,8 +452,12 @@ public TableCommitImpl newCommit(String commitUser) {
449452
return new TableCommitImpl(
450453
store().newCommit(commitUser, createCommitCallbacks(commitUser)),
451454
snapshotExpire,
452-
options.writeOnly() ? null : store().newPartitionExpire(commitUser),
453-
options.writeOnly() ? null : store().newTagCreationManager(),
455+
options.skippingPartitionExpire(skippingActions)
456+
? null
457+
: store().newPartitionExpire(commitUser),
458+
options.skippingAutoCreateTag(skippingActions)
459+
? null
460+
: store().newTagCreationManager(),
454461
catalogEnvironment.lockFactory().create(),
455462
CoreOptions.fromMap(options()).consumerExpireTime(),
456463
new ConsumerManager(fileIO, path, snapshotManager().branch()),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.sink;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.catalog.Catalog;
24+
import org.apache.paimon.flink.CatalogITCaseBase;
25+
import org.apache.paimon.table.FileStoreTable;
26+
import org.apache.paimon.utils.SnapshotManager;
27+
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.Timeout;
30+
import org.junit.jupiter.params.ParameterizedTest;
31+
import org.junit.jupiter.params.provider.EnumSource;
32+
import org.junit.jupiter.params.provider.ValueSource;
33+
34+
import java.io.IOException;
35+
import java.util.HashMap;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
39+
/** ITCase for Skipping actions during writing. */
40+
public class WriteSkippingActionsITCase extends CatalogITCaseBase {
41+
42+
private static final int TIMEOUT = 180;
43+
44+
@Timeout(value = TIMEOUT)
45+
@ParameterizedTest
46+
@EnumSource(CoreOptions.WriteAction.class)
47+
public void testSkippingCommitActions(CoreOptions.WriteAction skipAction) throws Exception {
48+
49+
HashMap<String, String> skipActionOptions = createOptions(skipAction.toString());
50+
51+
createTable("T", skipActionOptions);
52+
sql("INSERT INTO T VALUES ('HXH', '20250101')");
53+
54+
FileStoreTable table = paimonTable("T");
55+
SnapshotManager snapshotManager = table.snapshotManager();
56+
57+
switch (skipAction) {
58+
case PARTITION_EXPIRE:
59+
// Since partition expiration was skipped, there will be one less overwrite type
60+
// snapshot.
61+
expectTable(table, snapshotManager, 1, 2, 1, "20250101");
62+
// Snapshot 2 is COMPACT.
63+
assertThat(snapshotManager.snapshot(2).commitKind())
64+
.isEqualTo(Snapshot.CommitKind.COMPACT);
65+
break;
66+
case SNAPSHOT_EXPIRE:
67+
// Test case for skipping snapshot expire.
68+
// Because snapshot expiration is skipped, all snapshots are retained.
69+
expectTable(table, snapshotManager, 3, 3, 1, null);
70+
// Append write.
71+
assertThat(snapshotManager.snapshot(1).commitKind())
72+
.isEqualTo(Snapshot.CommitKind.APPEND);
73+
// Data compact.
74+
assertThat(snapshotManager.snapshot(2).commitKind())
75+
.isEqualTo(Snapshot.CommitKind.COMPACT);
76+
// Partition expired.
77+
assertThat(snapshotManager.snapshot(3).commitKind())
78+
.isEqualTo(Snapshot.CommitKind.OVERWRITE);
79+
break;
80+
case CREATE_TAG:
81+
// Test case for skipping auto create tag.
82+
// No tags are generated because the automatic tag creation action is skipped.
83+
expectTable(table, snapshotManager, 2, 3, 0, null);
84+
// Partition expired.
85+
assertThat(snapshotManager.snapshot(3).commitKind())
86+
.isEqualTo(Snapshot.CommitKind.OVERWRITE);
87+
}
88+
}
89+
90+
@Timeout(value = TIMEOUT)
91+
@ParameterizedTest
92+
@ValueSource(strings = {"do-all", "write-only", "skip-all"})
93+
public void testSkippingAllActionsAndWriteOnly(String action) throws Exception {
94+
95+
HashMap<String, String> options =
96+
createOptions(
97+
action.equals("do-all")
98+
? ""
99+
: "partition-expire,snapshot-expire,create-tag");
100+
101+
if (action.equals("write-only")) {
102+
options.put(CoreOptions.WRITE_ONLY.key(), "true");
103+
}
104+
105+
createTable("T", options);
106+
sql("INSERT INTO T VALUES ('HXH', '20250101')");
107+
108+
FileStoreTable table = paimonTable("T");
109+
SnapshotManager snapshotManager = table.snapshotManager();
110+
111+
switch (action) {
112+
case "do-all":
113+
// Test case for no actions being skipped. (write-only is false)
114+
// snapshot count is 2 (snapshot 1 has expired), last snapshot id is 3, auto create
115+
// tag,
116+
// partition expired.
117+
expectTable(table, snapshotManager, 2, 3, 1, null);
118+
// snapshot 2 is compact, snapshot 3 is overwrite because partition expired.
119+
assertThat(snapshotManager.snapshot(2).commitKind())
120+
.isEqualTo(Snapshot.CommitKind.COMPACT);
121+
assertThat(snapshotManager.snapshot(3).commitKind())
122+
.isEqualTo(Snapshot.CommitKind.OVERWRITE);
123+
break;
124+
case "write-only":
125+
// no compact, no expire, no tag.
126+
expectTable(table, snapshotManager, 1, 1, 0, "20250101");
127+
assertThat(snapshotManager.latestSnapshot().commitKind())
128+
.isEqualTo(Snapshot.CommitKind.APPEND);
129+
130+
break;
131+
case "skip-all":
132+
// All actions are skipped and only the compact is retained.
133+
// no expire, no tag, only compact.
134+
expectTable(table, snapshotManager, 2, 2, 0, "20250101");
135+
// Append write.
136+
assertThat(snapshotManager.snapshot(1).commitKind())
137+
.isEqualTo(Snapshot.CommitKind.APPEND);
138+
// Data compact.
139+
assertThat(snapshotManager.snapshot(2).commitKind())
140+
.isEqualTo(Snapshot.CommitKind.COMPACT);
141+
}
142+
}
143+
144+
@Test
145+
@Timeout(value = TIMEOUT)
146+
public void testSkipCreateTagWithBatchMode() throws Catalog.TableNotExistException {
147+
HashMap<String, String> options = createOptions("create-tag");
148+
149+
// Skipping tag creation will not take effect if the tag creation mode is batch.
150+
options.put(CoreOptions.TAG_AUTOMATIC_CREATION.key(), "batch");
151+
152+
createTable("T", options);
153+
sql("INSERT INTO T VALUES ('a', '20250101')");
154+
FileStoreTable table = paimonTable("T");
155+
assertThat(table.tagManager().tagCount()).isEqualTo(1);
156+
}
157+
158+
private HashMap<String, String> createOptions(String skippingActions) {
159+
HashMap<String, String> options = new HashMap<>();
160+
// Partition expiration will be triggered every time.
161+
options.put(CoreOptions.PARTITION_EXPIRATION_TIME.key(), "1 d");
162+
options.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "0 s");
163+
options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), "yyyyMMdd");
164+
// Only keep one snapshot.
165+
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "1");
166+
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "1");
167+
options.put(CoreOptions.TAG_AUTOMATIC_CREATION.key(), "process-time");
168+
options.put(CoreOptions.TAG_CREATION_PERIOD.key(), "daily");
169+
// Compact will be triggered every time.
170+
options.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
171+
172+
// skipping actions .
173+
options.put(CoreOptions.WRITE_SKIP_ACTIONS.key(), skippingActions);
174+
175+
return options;
176+
}
177+
178+
private void expectTable(
179+
FileStoreTable table,
180+
SnapshotManager snapshotManager,
181+
long snapshotCount,
182+
long lastSnapshotId,
183+
long tagCount,
184+
String partition)
185+
throws IOException {
186+
assertThat(snapshotManager.snapshotCount()).isEqualTo(snapshotCount);
187+
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(lastSnapshotId);
188+
assertThat(table.tagManager().tagCount()).isEqualTo(tagCount);
189+
if (partition == null) {
190+
assertThat(table.newScan().listPartitions().size()).isEqualTo(0);
191+
} else {
192+
assertThat(table.newScan().listPartitions().get(0).getString(0).toString())
193+
.isEqualTo(partition);
194+
}
195+
}
196+
197+
private void createTable(String tableName, HashMap<String, String> hintOptions) {
198+
199+
StringBuilder sb = new StringBuilder();
200+
sb.append("'bucket' = '1'\n");
201+
hintOptions.forEach(
202+
(k, v) -> sb.append(",'").append(k).append("'='").append(v).append("'\n"));
203+
204+
sql(
205+
String.format(
206+
"CREATE TABLE %s ("
207+
+ " k STRING,"
208+
+ " dt STRING,"
209+
+ " PRIMARY KEY (k, dt) NOT ENFORCED"
210+
+ ") PARTITIONED BY (dt) WITH ("
211+
+ "%s"
212+
+ ")",
213+
tableName, sb));
214+
}
215+
}

0 commit comments

Comments
 (0)