Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15561 [1/N]: Introduce new subscribe api for RE2J regex #17897

Open
wants to merge 2 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public interface Consumer<K, V> extends Closeable {
*/
void subscribe(Pattern pattern);

/**
* @see KafkaConsumer#subscribe(SubscriptionPattern, ConsumerRebalanceListener)
*/
void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback);

/**
* @see KafkaConsumer#subscribe(SubscriptionPattern)
*/
void subscribe(SubscriptionPattern pattern);

/**
* @see KafkaConsumer#unsubscribe()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
Expand Down Expand Up @@ -755,6 +756,55 @@ public void subscribe(Pattern pattern) {
delegate.subscribe(pattern);
}

/**
* Subscribe to all topics matching the specified pattern, to get dynamically assigned partitions.
* The pattern matching will be done periodically against all topics. This is only supported under the
* CONSUMER group protocol (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG}).
* <p>
* If the provided pattern is not compatible with Google RE2/J, an {@link InvalidRegularExpression} will be
* eventually thrown on a call to {@link #poll(Duration)} following this call to subscribe.
* <p>
* See {@link #subscribe(Collection, ConsumerRebalanceListener)} for details on the
* use of the {@link ConsumerRebalanceListener}. Generally, rebalances are triggered when there
* is a change to the topics matching the provided pattern and when consumer group membership changes.
* Group rebalances only take place during an active call to {@link #poll(Duration)}.
*
* @param pattern Pattern to subscribe to, that must be compatible with Google RE2/J.
* @param listener Non-null listener instance to get notifications on partition assignment/revocation for the
* subscribed topics.
* @throws IllegalArgumentException If pattern is null or empty, or if the listener is null.
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}).
*/
@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener listener) {
delegate.subscribe(pattern, listener);
}

/**
* Subscribe to all topics matching the specified pattern, to get dynamically assigned partitions.
* The pattern matching will be done periodically against topics. This is only supported under the
* CONSUMER group protocol (see {@link ConsumerConfig#GROUP_PROTOCOL_CONFIG})
* <p>
* If the provided pattern is not compatible with Google RE2/J, an {@link InvalidRegularExpression} will be
* eventually thrown on a call to {@link #poll(Duration)} following this call to subscribe.
* <p>
* This is a short-hand for {@link #subscribe(Pattern, ConsumerRebalanceListener)}, which
* uses a no-op listener. If you need the ability to seek to particular offsets, you should prefer
* {@link #subscribe(Pattern, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
* to be reset. You should also provide your own listener if you are doing your own offset
* management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
*
* @param pattern Pattern to subscribe to, that must be compatible with Google RE2/J.
* @throws IllegalArgumentException If pattern is null or empty.
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}).
*/
@Override
public void subscribe(SubscriptionPattern pattern) {
delegate.subscribe(pattern);
}

/**
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
* This also clears any partitions directly assigned through {@link #assign(Collection)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public synchronized void subscribe(Pattern pattern) {
subscribe(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: support in MockConsumer will come in follow-up PR

}

@Override
public void subscribe(SubscriptionPattern pattern) {
throw new UnsupportedOperationException("Subscribe to RE2/J regular expression not supported in MockConsumer yet");
}

@Override
public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
if (listener == null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer;

/**
* Represents a regular expression compatible with Google RE2/J, used to subscribe to topics .
* This just keeps the String representation of the pattern, and all validations to ensure
* it is RE2/J compatible are delegated to the broker.
*/
public class SubscriptionPattern {

/**
* String representation the regular expression, compatible with RE2/J.
*/
private final String pattern;

public SubscriptionPattern(String pattern) {
this.pattern = pattern;
}

/**
* @return Regular expression pattern compatible with RE2/J.
*/
public String pattern() {
return this.pattern;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
Expand Down Expand Up @@ -1793,6 +1794,16 @@ public void subscribe(Pattern pattern) {
subscribeInternal(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
subscribeToRegex(pattern, Optional.ofNullable(callback));
}

@Override
public void subscribe(SubscriptionPattern pattern) {
subscribeToRegex(pattern, Optional.empty());
}

@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
if (listener == null)
Expand Down Expand Up @@ -1855,6 +1866,29 @@ private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListen
}
}

/**
* Subscribe to the RE2/J pattern. This will generate an event to update the pattern in the
* subscription, so it's included in a next heartbeat request sent to the broker. No validation of the pattern is
* performed by the client.
*/
private void subscribeToRegex(SubscriptionPattern pattern,
Optional<ConsumerRebalanceListener> listener) {
maybeThrowInvalidGroupIdException();
throwIfNullOrEmpty(pattern);
log.info("Subscribing to regular expression {}", pattern);

// TODO: generate event to update subscribed regex so it's included in the next HB.
}

private void throwIfNullOrEmpty(SubscriptionPattern subscriptionPattern) {
if (subscriptionPattern == null) {
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
}
if (subscriptionPattern.pattern().isEmpty()) {
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be empty");
}
}

private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebalanceListener> listener) {
acquireAndEnsureOpen();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
Expand Down Expand Up @@ -518,6 +519,18 @@ public void subscribe(Pattern pattern) {
subscribeInternal(pattern, Optional.empty());
}

@Override
public void subscribe(SubscriptionPattern pattern, ConsumerRebalanceListener callback) {
throw new UnsupportedOperationException(String.format("Subscribe to RE2/J pattern is not supported when using" +
"the %s protocol defined in config %s", GroupProtocol.CLASSIC, ConsumerConfig.GROUP_PROTOCOL_CONFIG));
}

@Override
public void subscribe(SubscriptionPattern pattern) {
throw new UnsupportedOperationException(String.format("Subscribe to RE2/J pattern is not supported when using" +
"the %s protocol defined in config %s", GroupProtocol.CLASSIC, ConsumerConfig.GROUP_PROTOCOL_CONFIG));
}

/**
* Internal helper method for {@link #subscribe(Pattern)} and
* {@link #subscribe(Pattern, ConsumerRebalanceListener)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
Expand Down Expand Up @@ -3497,6 +3498,17 @@ public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws Inter
"Expected " + (groupProtocol == GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");
}

@ParameterizedTest
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
public void testSubscribeToRe2jPatternNotSupportedForClassicConsumer(GroupProtocol groupProtocol) {
KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(groupProtocol, time, mock(NetworkClient.class), subscription,
mock(ConsumerMetadata.class));
assertThrows(UnsupportedOperationException.class, () ->
consumer.subscribe(new SubscriptionPattern("t*")));
assertThrows(UnsupportedOperationException.class, () ->
consumer.subscribe(new SubscriptionPattern("t*"), mock(ConsumerRebalanceListener.class)));
}

private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
return client.requests().stream().anyMatch(request -> request.requestBuilder().apiKey().equals(apiKey));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
Expand Down Expand Up @@ -1829,6 +1830,19 @@ public void testSeekToEnd() {
assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy());
}

@Test
public void testSubscribeToRe2JPatternValidation() {
consumer = newConsumer();

Throwable t = assertThrows(IllegalArgumentException.class, () -> consumer.subscribe((SubscriptionPattern) null));
assertEquals("Topic pattern to subscribe to cannot be null", t.getMessage());

t = assertThrows(IllegalArgumentException.class, () -> consumer.subscribe(new SubscriptionPattern("")));
assertEquals("Topic pattern to subscribe to cannot be empty", t.getMessage());

assertDoesNotThrow(() -> consumer.subscribe(new SubscriptionPattern("t*")));
}

private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
Expand Down