Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Supports only execute specified actions during the writing process. #4884

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,12 @@
<td><p>Enum</p></td>
<td>Type of the table.<br /><br />Possible values:<ul><li>"table": Normal Paimon table.</li><li>"format-table": A file format table refers to a directory that contains multiple files of the same format.</li><li>"materialized-table": A materialized table combines normal Paimon table and materialized SQL.</li><li>"object-table": A object table combines normal Paimon table and object location.</li></ul></td>
</tr>
<tr>
<td><h5>write-actions</h5></td>
<td style="word-wrap: break-word;">"all"</td>
<td>String</td>
<td>This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure.<br />1. 'all': By default, all actions will be performed.<br />2. 'partition-expire': Perform partition expiration action.<br />3. 'snapshot-expire': Perform snapshot expiration action.<br />4. 'tag-automatic-creation': Perform automatic creation tag action.<br />5. 'full-compact': Perform full compaction action.<br />6. 'minor-compact': Perform minor compaction action.<br />Both can be configured at the same time: 'partition-expire,snapshot-expire,tag-automatic-creation', if you want to skip all actions you can set this to ''.</td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
112 changes: 112 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,36 @@ public class CoreOptions implements Serializable {
"If set to true, compactions and snapshot expiration will be skipped. "
+ "This option is used along with dedicated compact jobs.");

public static final ConfigOption<String> WRITE_ACTIONS =
key("write-actions")
.stringType()
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
.defaultValue(WriteAction.ALL.value)
.withDescription(
Description.builder()
.text(
"This parameter is used to specify which actions will be performed during the writing process. This parameter is effective only when write-only is false and has no effect on action or procedure.")
.linebreak()
.text("1. 'all': By default, all actions will be performed.")
.linebreak()
.text(
"2. 'partition-expire': Perform partition expiration action.")
.linebreak()
.text(
"3. 'snapshot-expire': Perform snapshot expiration action.")
.linebreak()
.text(
"4. 'tag-automatic-creation': Perform automatic creation tag action.")
.linebreak()
.text("5. 'full-compact': Perform full compaction action.")
.linebreak()
.text("6. 'minor-compact': Perform minor compaction action.")
.linebreak()
.text(
"Both can be configured at the same time: 'partition-expire,"
+ "snapshot-expire,tag-automatic-creation', "
+ "if you want to skip all actions you can set this to ''.")
.build());

public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE =
key("source.split.target-size")
.memoryType()
Expand Down Expand Up @@ -2241,6 +2271,61 @@ public boolean writeOnly() {
return options.get(WRITE_ONLY);
}

public Set<WriteAction> writeActions() {
if (writeOnly()) {
return new HashSet<>(0);
}
return writeActions(options.get(WRITE_ACTIONS));
}

public static Set<WriteAction> writeActions(String str) {
if (StringUtils.isNullOrWhitespaceOnly(str)) {
return new HashSet<>(0);
}
return Arrays.stream(str.split(","))
.map(action -> WriteAction.valueOf(action.toUpperCase().replace('-', '_')))
.collect(Collectors.toCollection(HashSet::new));
}

public boolean doPartitionExpireAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.PARTITION_EXPIRE);
}

public boolean doSnapshotExpireAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.SNAPSHOT_EXPIRE);
}

public boolean doAutoCreateTagAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.TAG_AUTOMATIC_CREATION);
}

public boolean doFullCompactionAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.FULL_COMPACT);
}

public boolean doMinorCompactionAction(Set<WriteAction> doWriteActions) {
return doAllWriteActions(doWriteActions)
|| doWriteActions.contains(WriteAction.MINOR_COMPACT);
}

public boolean doCompact() {
return doCompact(writeActions());
}

public static boolean doCompact(Set<WriteAction> doWriteActions) {
return doWriteActions.contains(WriteAction.ALL)
|| doWriteActions.contains(WriteAction.FULL_COMPACT)
|| doWriteActions.contains(WriteAction.MINOR_COMPACT);
}

public boolean doAllWriteActions(Set<WriteAction> doWriteActions) {
return doWriteActions.contains(WriteAction.ALL);
}

public boolean streamingReadOverwrite() {
return options.get(STREAMING_READ_OVERWRITE);
}
Expand Down Expand Up @@ -3226,4 +3311,31 @@ public String toString() {
return value;
}
}

/** Actions performed during table writing. */
public enum WriteAction {

// All write actions will be performed, this is the default behavior.
ALL("all"),

// Actions during commit.
PARTITION_EXPIRE("partition-expire"),
LinMingQiang marked this conversation as resolved.
Show resolved Hide resolved
SNAPSHOT_EXPIRE("snapshot-expire"),
TAG_AUTOMATIC_CREATION("tag-automatic-creation"),

// Actions during writing.
MINOR_COMPACT("minor-compact"),
FULL_COMPACT("full-compact");

private final String value;

WriteAction(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected CompactManager getCompactManager(
List<DataFileMeta> restoredFiles,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (options.writeOnly()) {
if (!options.doCompact()) {
return new NoopCompactManager();
} else {
Function<String, DeletionVector> dvFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private CompactManager createCompactManager(
ExecutorService compactExecutor,
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (options.writeOnly()) {
if (!options.doCompact()) {
return new NoopCompactManager();
} else {
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Expand Down Expand Up @@ -288,7 +288,8 @@ private MergeTreeCompactRewriter createRewriter(
MergeEngine mergeEngine = options.mergeEngine();
ChangelogProducer changelogProducer = options.changelogProducer();
LookupStrategy lookupStrategy = options.lookupStrategy();
if (changelogProducer.equals(FULL_COMPACTION)) {
if (changelogProducer.equals(FULL_COMPACTION)
&& options.doFullCompactionAction(options.writeActions())) {
return new FullChangelogMergeTreeCompactRewriter(
maxLevel,
mergeEngine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.function.BiConsumer;

import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.WriteAction;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Abstract {@link FileStoreTable}. */
Expand Down Expand Up @@ -432,7 +434,8 @@ public ExpireSnapshots newExpireChangelog() {
public TableCommitImpl newCommit(String commitUser) {
CoreOptions options = coreOptions();
Runnable snapshotExpire = null;
if (!options.writeOnly()) {
Set<WriteAction> skippingActions = options.writeActions();
if (options.doSnapshotExpireAction(skippingActions)) {
boolean changelogDecoupled = options.changelogLifecycleDecoupled();
ExpireConfig expireConfig = options.expireConfig();
ExpireSnapshots expireChangelog = newExpireChangelog().config(expireConfig);
Expand All @@ -449,8 +452,12 @@ public TableCommitImpl newCommit(String commitUser) {
return new TableCommitImpl(
store().newCommit(commitUser, createCommitCallbacks(commitUser)),
snapshotExpire,
options.writeOnly() ? null : store().newPartitionExpire(commitUser),
options.writeOnly() ? null : store().newTagCreationManager(),
options.doPartitionExpireAction(skippingActions)
? store().newPartitionExpire(commitUser)
: null,
options.doAutoCreateTagAction(skippingActions)
? store().newTagCreationManager()
: null,
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
options.snapshotExpireExecutionMode(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public CompactAction(
}
HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
dynamicOptions.put(CoreOptions.WRITE_ACTIONS.key(), CoreOptions.WriteAction.ALL.toString());
table = table.copy(dynamicOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -79,6 +81,7 @@ public abstract class FlinkSink<T> implements Serializable {
private static final String WRITER_NAME = "Writer";
private static final String WRITER_WRITE_ONLY_NAME = "Writer(write-only)";
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
private static final Logger log = LoggerFactory.getLogger(FlinkSink.class);

protected final FileStoreTable table;
private final boolean ignorePreviousFiles;
Expand Down Expand Up @@ -108,35 +111,44 @@ private StoreSinkWrite.Provider createWriteProvider(
if (coreOptions.writeOnly()) {
waitCompaction = false;
} else {
Set<CoreOptions.WriteAction> writeActions = coreOptions.writeActions();
waitCompaction = coreOptions.prepareCommitWaitCompaction();
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
} else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
long fullCompactionThresholdMs =
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL).toMillis();
deltaCommits =
(int)
(fullCompactionThresholdMs
/ checkpointConfig.getCheckpointInterval());
}
if (coreOptions.doFullCompactionAction(writeActions)) {
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
} else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
long fullCompactionThresholdMs =
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)
.toMillis();
deltaCommits =
(int)
(fullCompactionThresholdMs
/ checkpointConfig.getCheckpointInterval());
}

if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new GlobalFullCompactionSinkWrite(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
finalDeltaCommits,
isStreaming,
memoryPool,
metricGroup);
};
if (changelogProducer == ChangelogProducer.FULL_COMPACTION || deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
return (table, commitUser, state, ioManager, memoryPool, metricGroup) -> {
assertNoSinkMaterializer.run();
return new GlobalFullCompactionSinkWrite(
table,
commitUser,
state,
ioManager,
ignorePreviousFiles,
waitCompaction,
finalDeltaCommits,
isStreaming,
memoryPool,
metricGroup);
};
}
} else {
log.warn(
"According to the config {} = {}, the action of 'full-compact' will be skipped.",
CoreOptions.WRITE_ACTIONS.key(),
coreOptions.toConfiguration().get(CoreOptions.WRITE_ACTIONS));
}
}

Expand Down Expand Up @@ -213,10 +225,11 @@ public DataStream<Committable> doWrite(
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming = isStreaming(input);

boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
input.transform(
(writeOnly ? WRITER_WRITE_ONLY_NAME : WRITER_NAME)
(table.coreOptions().writeOnly()
? WRITER_WRITE_ONLY_NAME
: WRITER_NAME)
+ " : "
+ table.name(),
new CommittableTypeInfo(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
Expand Down Expand Up @@ -149,9 +150,14 @@ public void processElement(StreamRecord<RowData> element) throws Exception {
FileStoreTable table = getTable(tableId);

Preconditions.checkArgument(
!table.coreOptions().writeOnly(),
CoreOptions.WRITE_ONLY.key()
+ " should not be true for MultiTablesStoreCompactOperator.");
table.coreOptions().doCompact(),
String.format(
"%s should not be true or %s should be %s or contains %s/%s for MultiTablesStoreCompactOperator.",
CoreOptions.WRITE_ONLY.key(),
CoreOptions.WRITE_ACTIONS.key(),
CoreOptions.WriteAction.ALL,
CoreOptions.WriteAction.MINOR_COMPACT,
CoreOptions.WriteAction.FULL_COMPACT));

storeSinkWriteProvider =
createWriteProvider(table, checkpointConfig, isStreaming, ignorePreviousFiles);
Expand Down Expand Up @@ -259,13 +265,12 @@ private StoreSinkWrite.Provider createWriteProvider(
Options options = fileStoreTable.coreOptions().toConfiguration();
CoreOptions.ChangelogProducer changelogProducer =
fileStoreTable.coreOptions().changelogProducer();
boolean waitCompaction;
CoreOptions coreOptions = fileStoreTable.coreOptions();
if (coreOptions.writeOnly()) {
waitCompaction = false;
} else {
waitCompaction = coreOptions.prepareCommitWaitCompaction();
int deltaCommits = -1;
boolean waitCompaction = coreOptions.prepareCommitWaitCompaction();
Set<CoreOptions.WriteAction> writeActions = coreOptions.writeActions();
int deltaCommits = -1;

if (coreOptions.doFullCompactionAction(writeActions)) {
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
deltaCommits = options.get(FULL_COMPACTION_DELTA_COMMITS);
} else if (options.contains(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)) {
Expand Down
Loading
Loading