Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Various cleanups in clients #17895

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public Map<String, Uuid> topicIds() {

public synchronized LeaderAndEpoch currentLeader(TopicPartition topicPartition) {
Optional<MetadataResponse.PartitionMetadata> maybeMetadata = partitionMetadataIfCurrent(topicPartition);
if (!maybeMetadata.isPresent())
if (maybeMetadata.isEmpty())
return new LeaderAndEpoch(Optional.empty(), Optional.ofNullable(lastSeenLeaderEpochs.get(topicPartition)));

MetadataResponse.PartitionMetadata partitionMetadata = maybeMetadata.get();
Expand Down Expand Up @@ -392,7 +392,7 @@ public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicParti
TopicPartition partition = partitionLeader.getKey();
Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
Metadata.LeaderIdAndEpoch newLeader = partitionLeader.getValue();
if (!newLeader.epoch.isPresent() || !newLeader.leaderId.isPresent()) {
if (newLeader.epoch.isEmpty() || newLeader.leaderId.isEmpty()) {
log.debug("For {}, incoming leader information is incomplete {}", partition, newLeader);
continue;
}
Expand All @@ -404,7 +404,7 @@ public synchronized Set<TopicPartition> updatePartitionLeadership(Map<TopicParti
log.debug("For {}, incoming leader({}), the corresponding node information for node-id {} is missing, so ignoring.", partition, newLeader, newLeader.leaderId.get());
continue;
}
if (!this.metadataSnapshot.partitionMetadata(partition).isPresent()) {
if (this.metadataSnapshot.partitionMetadata(partition).isEmpty()) {
log.debug("For {}, incoming leader({}), partition metadata is no longer cached, ignoring.", partition, newLeader);
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Cluster cluster() {
*/
public OptionalInt leaderEpochFor(TopicPartition tp) {
PartitionMetadata partitionMetadata = metadataByPartition.get(tp);
if (partitionMetadata == null || !partitionMetadata.leaderEpoch.isPresent()) {
if (partitionMetadata == null || partitionMetadata.leaderEpoch.isEmpty()) {
return OptionalInt.empty();
} else {
return OptionalInt.of(partitionMetadata.leaderEpoch.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -84,8 +84,7 @@ public ConsumerGroupDescription(String groupId,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.type = type;
this.groupState = GroupState.parse(state.name());
Expand Down Expand Up @@ -122,8 +121,7 @@ public ConsumerGroupDescription(String groupId,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
this.partitionAssignor = partitionAssignor == null ? "" : partitionAssignor;
this.type = type;
this.groupState = groupState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class DeleteConsumerGroupsResult {
*/
public Map<String, KafkaFuture<Void>> deletedGroups() {
Map<String, KafkaFuture<Void>> deletedGroups = new HashMap<>(futures.size());
futures.forEach((key, future) -> deletedGroups.put(key, future));
deletedGroups.putAll(futures);
return deletedGroups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public KafkaFuture<UserScramCredentialsDescription> description(String userName)
// for users 1, 2, and 3 but this is looking for user 4), so explicitly take care of that case
Optional<DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult> optionalUserResult =
data.results().stream().filter(result -> result.user().equals(userName)).findFirst();
if (!optionalUserResult.isPresent()) {
if (optionalUserResult.isEmpty()) {
retval.completeExceptionally(new ResourceNotFoundException("No such user: " + userName));
} else {
DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult userResult = optionalUserResult.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -36,8 +35,7 @@ public class MemberAssignment {
* @param topicPartitions List of topic partitions
*/
public MemberAssignment(Set<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions == null ? Collections.emptySet() :
Collections.unmodifiableSet(new HashSet<>(topicPartitions));
this.topicPartitions = topicPartitions == null ? Collections.emptySet() : Set.copyOf(topicPartitions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kafka.clients.admin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand All @@ -34,7 +32,7 @@ public class NewPartitionReassignment {
public NewPartitionReassignment(List<Integer> targetReplicas) {
if (targetReplicas == null || targetReplicas.isEmpty())
throw new IllegalArgumentException("Cannot create a new partition reassignment without any replicas");
this.targetReplicas = Collections.unmodifiableList(new ArrayList<>(targetReplicas));
this.targetReplicas = List.copyOf(targetReplicas);
}

public List<Integer> targetReplicas() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -53,8 +53,7 @@ public ShareGroupDescription(String groupId,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.members = members == null ? Collections.emptyList() :
Collections.unmodifiableList(new ArrayList<>(members));
this.members = members == null ? Collections.emptyList() : List.copyOf(members);
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.kafka.clients.admin;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -61,7 +59,7 @@ public String toString() {
*/
public UserScramCredentialsDescription(String name, List<ScramCredentialInfo> credentialInfos) {
this.name = Objects.requireNonNull(name);
this.credentialInfos = Collections.unmodifiableList(new ArrayList<>(credentialInfos));
this.credentialInfos = List.copyOf(credentialInfos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private <T extends ApiRequestScope> void collectRequests(
}

// Copy the keys to avoid exposing the underlying mutable set
Set<K> copyKeys = Collections.unmodifiableSet(new HashSet<>(keys));
Set<K> copyKeys = Set.copyOf(keys);

Collection<AdminApiHandler.RequestAndKeys<K>> newRequests = buildRequest.apply(copyKeys, scope);
if (newRequests.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
public void transitionToUpdatePending(long now) {
this.state = State.UPDATE_PENDING;
this.lastMetadataFetchAttemptMs = now;
if (!metadataAttemptStartMs.isPresent())
if (metadataAttemptStartMs.isEmpty())
metadataAttemptStartMs = Optional.of(now);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public KafkaFutureImpl<Map<Integer, KafkaFutureImpl<V>>> all() {
}

private KafkaFutureImpl<V> futureOrThrow(BrokerKey key) {
if (!key.brokerId.isPresent()) {
if (key.brokerId.isEmpty()) {
throw new IllegalArgumentException("Attempt to complete with invalid key: " + key);
} else {
int brokerId = key.brokerId.getAsInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private AllBrokersStrategy.BrokerKey requireSingleton(
}

AllBrokersStrategy.BrokerKey key = keys.iterator().next();
if (!key.brokerId.isPresent() || key.brokerId.getAsInt() != brokerId) {
if (key.brokerId.isEmpty() || key.brokerId.getAsInt() != brokerId) {
throw new IllegalArgumentException("Unexpected broker key: " + key);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,7 @@ public class ConsumerConfig extends AbstractConfig {
// a list contains all the assignor names that only assign subscribed topics to consumer. Should be updated when new assignor added.
// This is to help optimize ConsumerCoordinator#performAssignment method
public static final List<String> ASSIGN_FROM_SUBSCRIBED_ASSIGNORS =
Collections.unmodifiableList(Arrays.asList(
RANGE_ASSIGNOR_NAME,
ROUNDROBIN_ASSIGNOR_NAME,
STICKY_ASSIGNOR_NAME,
COOPERATIVE_STICKY_ASSIGNOR_NAME
));
List.of(RANGE_ASSIGNOR_NAME, ROUNDROBIN_ASSIGNOR_NAME, STICKY_ASSIGNOR_NAME, COOPERATIVE_STICKY_ASSIGNOR_NAME);

/*
* NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
Expand Down Expand Up @@ -709,7 +704,7 @@ private void maybeOverrideEnableAutoCommit(Map<String, Object> configs) {
Optional<String> groupId = Optional.ofNullable(getString(CommonClientConfigs.GROUP_ID_CONFIG));
Map<String, Object> originals = originals();
boolean enableAutoCommit = originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG) ? getBoolean(ENABLE_AUTO_COMMIT_CONFIG) : false;
if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided
if (groupId.isEmpty()) { // overwrite in case of default group id where the config is not explicitly provided
if (!originals.containsKey(ENABLE_AUTO_COMMIT_CONFIG)) {
configs.put(ENABLE_AUTO_COMMIT_CONFIG, false);
} else if (enableAutoCommit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> i

@Override
public Iterator<ConsumerRecord<K, V>> iterator() {
return new AbstractIterator<ConsumerRecord<K, V>>() {
return new AbstractIterator<>() {
final Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator();
Iterator<ConsumerRecord<K, V>> current;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ public synchronized void scheduleNopPollTask() {
}

public synchronized Set<TopicPartition> paused() {
return Collections.unmodifiableSet(new HashSet<>(paused));
return Set.copyOf(paused);
}

private void ensureNotClosed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
Expand All @@ -40,7 +39,7 @@ public NoOffsetForPartitionException(TopicPartition partition) {

public NoOffsetForPartitionException(Collection<TopicPartition> partitions) {
super("Undefined offset with no reset policy for partitions: " + partitions);
this.partitions = Collections.unmodifiableSet(new HashSet<>(partitions));
this.partitions = Set.copyOf(partitions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public TopicAssignmentState(String topic, List<PartitionInfo> partitionInfos, Li
boolean racksMatch(String consumer, TopicPartition tp) {
Optional<String> consumerRack = consumers.get(consumer);
Set<String> replicaRacks = partitionRacks.get(tp);
return !consumerRack.isPresent() || (replicaRacks != null && replicaRacks.contains(consumerRack.get()));
return consumerRack.isEmpty() || (replicaRacks != null && replicaRacks.contains(consumerRack.get()));
}

int maxAssignable(String consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (lastRebalanceStartMs == -1L)
lastRebalanceStartMs = time.milliseconds();
joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
joinFuture.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ByteBuffer value) {
// do nothing since all the handler logic are in SyncGroupResponseHandler already
Expand Down Expand Up @@ -1188,7 +1188,7 @@ public synchronized RequestFuture<Void> maybeLeaveGroup(String leaveReason) {
}

protected boolean isDynamicMember() {
return !rebalanceConfig.groupInstanceId.isPresent();
return rebalanceConfig.groupInstanceId.isEmpty();
}

private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> {
Expand Down Expand Up @@ -1528,7 +1528,7 @@ public void run() {
} else {
heartbeat.sentHeartbeat(now);
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
heartbeatFuture.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(Void value) {
synchronized (AbstractCoordinator.this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()

Optional<Node> leaderOpt = position.currentLeader.leader;

if (!leaderOpt.isPresent()) {
if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sorted

List<TopicPartition> unassignedPartitions = new ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());

Collections.sort(sortedAssignedPartitions, Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
sortedAssignedPartitions.sort(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));

boolean shouldAddDirectly = false;
Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
Expand Down Expand Up @@ -991,7 +991,7 @@ private class GeneralAssignmentBuilder extends AbstractAssignmentBuilder {
currentPartitionConsumer.put(topicPartition, entry.getKey());

List<String> sortedAllTopics = new ArrayList<>(topic2AllPotentialConsumers.keySet());
Collections.sort(sortedAllTopics, new TopicComparator(topic2AllPotentialConsumers));
sortedAllTopics.sort(new TopicComparator(topic2AllPotentialConsumers));
sortedAllPartitions = getAllTopicPartitions(sortedAllTopics);

sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment));
Expand Down Expand Up @@ -1084,7 +1084,7 @@ private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sorted

List<TopicPartition> unassignedPartitions = new ArrayList<>();

Collections.sort(sortedAssignedPartitions, new PartitionComparator(topic2AllPotentialConsumers));
sortedAssignedPartitions.sort(new PartitionComparator(topic2AllPotentialConsumers));

boolean shouldAddDirectly = false;
Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
Expand Down Expand Up @@ -1154,7 +1154,7 @@ private void prepopulateCurrentAssignments(Map<TopicPartition, ConsumerGeneratio
if (memberData.generation.isPresent() && memberData.generation.get() < maxGeneration) {
// if the current member's generation is lower than maxGeneration, put into prevAssignment if needed
updatePrevAssignment(prevAssignment, memberData.partitions, consumer, memberData.generation.get());
} else if (!memberData.generation.isPresent() && maxGeneration > DEFAULT_GENERATION) {
} else if (memberData.generation.isEmpty() && maxGeneration > DEFAULT_GENERATION) {
// if maxGeneration is larger than DEFAULT_GENERATION
// put all (no generation) partitions as DEFAULT_GENERATION into prevAssignment if needed
updatePrevAssignment(prevAssignment, memberData.partitions, consumer, DEFAULT_GENERATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class AsyncClient<T1, Req extends AbstractRequest, Resp extends
public RequestFuture<T2> sendAsyncRequest(Node node, T1 requestData) {
AbstractRequest.Builder<Req> requestBuilder = prepareRequest(node, requestData);

return client.send(node, requestBuilder).compose(new RequestFutureAdapter<ClientResponse, T2>() {
return client.send(node, requestBuilder).compose(new RequestFutureAdapter<>() {
@Override
@SuppressWarnings("unchecked")
public void onSuccess(ClientResponse value, RequestFuture<T2> future) {
Expand Down
Loading