Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce ClearConsumersProcedure to clear consumers #4893

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,31 @@ All available procedures are listed below.
</td>
<td>CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint))</td>
</tr>
<tr>
<td>clear_consumers</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.clear_consumers(`table` => 'identifier', including_consumers => 'includingConsumers', excluding_consumers => 'excludingConsumers') <br/><br/>
chenxinwei marked this conversation as resolved.
Show resolved Hide resolved
-- Use indexed argument<br/>
-- clear all consumers in the table
CALL [catalog.]sys.clear_consumers('identifier')
-- clear some consumers in the table (accept regular expression)<br/>
CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers')<br/><br/>
-- exclude some consumers (accept regular expression)<br/>
CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers', 'excludingConsumers')
</td>
<td>
To reset or delete consumer. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>includingConsumers: consumers to be cleared.</li>
<li>excludingConsumers: consumers which not to be cleared.</li>
</td>
<td>CALL sys.clear_consumers(`table` => 'default.T')<br/><br/>
CALL sys.clear_consumers(`table` => 'default.T', including_consumers => 'myid.*')<br/><br/>
CALL sys.reset_consumer(table => 'default.T', including_consumers => '', excluding_consumers => 'myid1.*')<br/><br/>
CALL sys.reset_consumer(table => 'default.T', including_consumers => 'myid.*', excluding_consumers => 'myid1.*')
</td>
</tr>
<tr>
<td>rollback_to</td>
<td>
Expand Down
19 changes: 19 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,25 @@ This section introduce all available spark procedures about paimon.
-- delete consumer<br/>
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
</td>
</tr>
<tr>
<td>clear_consumers</td>
<td>
To clear consumers. Arguments:
<li>identifier: the target table identifier. Cannot be empty.</li>
<li>includingConsumers: consumers to be cleared.</li>
<li>excludingConsumers: consumers which not to be cleared.</li>
</td>
<td>
-- clear all consumers in the table<br/>
CALL sys.clear_consumers(table => 'default.T')<br/><br/>
-- clear some consumers in the table (accept regular expression)<br/>
CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*')<br/><br/>
-- clear all consumers except excludingConsumers in the table (accept regular expression)<br/>
CALL sys.reset_consumer(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')<br/><br/>
-- clear all consumers with includingConsumers and excludingConsumers (accept regular expression)<br/>
CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*')
</td>
</tr>
<tr>
<td>mark_partition_done</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
Expand Down Expand Up @@ -107,6 +108,34 @@ public void expire(LocalDateTime expireDateTime) {
}
}

/** Clear consumers. */
public void clearConsumers(Pattern includingPattern, Pattern excludingPattern) {
try {
listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX)
.forEach(
status -> {
chenxinwei marked this conversation as resolved.
Show resolved Hide resolved
String consumerName =
status.getPath()
.getName()
.substring(CONSUMER_PREFIX.length());
boolean shouldCompaction =
chenxinwei marked this conversation as resolved.
Show resolved Hide resolved
includingPattern.matcher(consumerName).matches();
if (excludingPattern != null) {
shouldCompaction =
shouldCompaction
&& !excludingPattern
.matcher(consumerName)
.matches();
}
if (shouldCompaction) {
fileIO.deleteQuietly(status.getPath());
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Get all consumer. */
public Map<String, Long> consumers() throws IOException {
Map<String, Long> consumers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.table.procedure.ProcedureContext;

import java.util.regex.Pattern;

/**
* Clear consumers procedure. Usage:
*
* <pre><code>
* -- NOTE: use '' as placeholder for optional arguments
*
* -- clear all consumers in the table
* CALL sys.clear_consumers('tableId')
*
* -- clear some consumers in the table (accept regular expression)
* CALL sys.clear_consumers('tableId', 'includingConsumers')
*
* -- exclude some consumers (accept regular expression)
* CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers')
* </code></pre>
*/
public class ClearConsumersProcedure extends ProcedureBase {
chenxinwei marked this conversation as resolved.
Show resolved Hide resolved

public static final String IDENTIFIER = "clear_consumers";

public String[] call(
ProcedureContext procedureContext,
String tableId,
String includingConsumers,
String excludingConsumers)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable =
(FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
ConsumerManager consumerManager =
new ConsumerManager(
fileStoreTable.fileIO(),
fileStoreTable.location(),
fileStoreTable.snapshotManager().branch());

includingConsumers = nullable(includingConsumers);
excludingConsumers = nullable(excludingConsumers);
Pattern includingPattern =
includingConsumers == null
? Pattern.compile(".*")
: Pattern.compile(includingConsumers);
Pattern excludingPattern =
excludingConsumers == null ? null : Pattern.compile(excludingConsumers);
consumerManager.clearConsumers(includingPattern, excludingPattern);

return new String[] {"Success"};
}

public String[] call(
ProcedureContext procedureContext, String tableId, String includingConsumers)
throws Catalog.TableNotExistException {
return call(procedureContext, tableId, includingConsumers, null);
}

public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
return call(procedureContext, tableId, null, null);
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.regex.Pattern;

/** Clear consumers action for Flink. */
public class ClearConsumerAction extends TableActionBase {

private String includingConsumers;
private String excludingConsumers;

protected ClearConsumerAction(
String databaseName, String tableName, Map<String, String> catalogConfig) {
super(databaseName, tableName, catalogConfig);
}

public ClearConsumerAction withIncludingConsumers(@Nullable String includingConsumers) {
this.includingConsumers = includingConsumers;
return this;
}

public ClearConsumerAction withExcludingConsumers(@Nullable String excludingConsumers) {
this.excludingConsumers = excludingConsumers;
return this;
}

@Override
public void run() throws Exception {
FileStoreTable dataTable = (FileStoreTable) table;
ConsumerManager consumerManager =
new ConsumerManager(
dataTable.fileIO(),
dataTable.location(),
dataTable.snapshotManager().branch());

includingConsumers =
StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? null : includingConsumers;
excludingConsumers =
StringUtils.isNullOrWhitespaceOnly(excludingConsumers) ? null : excludingConsumers;
Pattern includingPattern =
includingConsumers == null
? Pattern.compile(".*")
: Pattern.compile(includingConsumers);
Pattern excludingPattern =
excludingConsumers == null ? null : Pattern.compile(excludingConsumers);
consumerManager.clearConsumers(includingPattern, excludingPattern);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Optional;

/** Factory to create {@link ClearConsumerAction}. */
public class ClearConsumerActionFactory implements ActionFactory {

public static final String IDENTIFIER = "clear_consumers";

private static final String INCLUDING_CONSUMERS = "including_consumers";
private static final String EXCLUDING_CONSUMERS = "excluding_consumers";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
ClearConsumerAction action =
new ClearConsumerAction(
params.getRequired(DATABASE),
params.getRequired(TABLE),
catalogConfigMap(params));

if (params.has(INCLUDING_CONSUMERS)) {
action.withIncludingConsumers(params.get(INCLUDING_CONSUMERS));
}

if (params.has(EXCLUDING_CONSUMERS)) {
action.withExcludingConsumers(params.get(EXCLUDING_CONSUMERS));
}

return Optional.of(action);
}

@Override
public void printHelp() {
System.out.println(
"Action \"clear_consumers\" clear consumers with including consumers and excluding consumers.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" clear_consumers --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> [--including_consumers <including_pattern> --excluding_consumers <excluding_pattern>]");

System.out.println();
System.out.println("Note:");
System.out.println(
" use '' as placeholder for including_consumers if you want to clear all consumers except excludingConsumers in the table.");
System.out.println();
}
}
Loading
Loading