Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the broker round robin selection in task execution configurable #2223

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

allenxwang
Copy link
Contributor

@allenxwang allenxwang commented Nov 20, 2024

Summary

  1. Why:

We found that the rebalance operation significantly slows down towards the latter half of the execution. It happens when most of the tasks in the proposal involve a single broker. The current behavior is that the ExecutionTaskPlanner would sort the brokers according to the ReplicaMoveStrategy, continuously pick a task from the first broker until it reaches the broker's max concurrency. However, it would skip the broker in the same round if the broker is already involved in a picked task. The effect is that almost all tasks towards the end of the execution are related to the broker that has the most tasks, thus the execution is bottlenecked by the single broker's max concurrency.

  1. What:

We will make this round robin behavior configurable and true by default. When it is set to false, the ExecutionTaskPlanner can keep picking the tasks from same broker that has the highest priority when it is asked to find the next set of tasks to execute. The ExecutionTaskPlanner would compare the broker to pick in the following order:

  • use the ReplicaMoveStrategy to compare brokers by comparing the priority of its first set of tasks.
  • If the above comparison is equal, it will depend on the total number of tasks involving that broker. This is exact what we need to prioritize the broker with large number of tasks.
  • Fallback to the broker id comparison

Expected Behavior

Actual Behavior

Steps to Reproduce

Known Workarounds

Additional evidence

Categorization

  • documentation
  • bugfix
  • new feature
  • refactor
  • security/CVE
  • other

This PR resolves # if any.

@allenxwang allenxwang marked this pull request as ready for review November 26, 2024 01:36
Copy link
Contributor

@CCisGG CCisGG left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change @allenxwang ! Let's get this tested before merging it.

continue;
}
// Check the available balancing proposals of this broker to see if we can find one ready to execute.
SortedSet<ExecutionTask> proposalsForBroker = _interPartMoveTasksByBrokerId.get(brokerId);
SortedSet<ExecutionTask> proposalsForBroker = new TreeSet<>(_interPartMoveTasksByBrokerId.get(brokerId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to use TreeSet to interpret the proposalsForBroker? I think that is only needed if you need to call TreeSet APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to create a copy of the set otherwise it would throw ConcurrentModificationException. It did not happen before this code change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API being called is in the next line which iterate through the set. That's why I need to use TreeSet to preserve the same order as before this code change.

@@ -398,8 +404,8 @@ public List<ExecutionTask> getInterBrokerReplicaMovementTasks(Map<Integer, Integ
int sourceBroker = task.proposal().oldLeader().brokerId();
Set<Integer> destinationBrokers = task.proposal().replicasToAdd().stream().mapToInt(ReplicaPlacementInfo::brokerId)
.boxed().collect(Collectors.toSet());
if (brokerInvolved.contains(sourceBroker)
|| KafkaCruiseControlUtils.containsAny(brokerInvolved, destinationBrokers)) {
if (_preferRoundRobin && (brokerInvolved.contains(sourceBroker)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question related to previous implementation:

Based on the previous implementation, in any round, one broker can only move at most one partition. Do I understand correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can still move more than one partition of the same broker. The outer while loop will go through the same process to pick another broker's move tasks, which could involve brokers that have been visited. But there is no guarantee. In practice, in the first few rounds, usually only 1 or 2 tasks are picked for the broker that has the most number of tasks .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. That makes sense.

@@ -830,6 +831,22 @@ public synchronized void executeProposals(Collection<ExecutionProposal> proposal
requestedIntraBrokerPartitionMovementConcurrency, requestedClusterLeadershipMovementConcurrency,
requestedBrokerLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy,
isTriggeredByUserRequest, loadMonitor);
if (removedBrokers != null && !removedBrokers.isEmpty()) {
int count = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe extract this snippet as a separate function.

}
}
}
LOG.info("User task {}: {} of partition move proposals are related to removed brokers.", uuid, ((float) count) / totalCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to actually log count and total count here, to observe whether this only happens for large scale rebalance or not.

LOG.info("User task {}: Execution succeeded: {}. ", _uuid, executionStatusString);
String status = "succeeded";
if (userTaskInfo != null && userTaskInfo.state() != COMPLETED) {
status = userTaskInfo.state().toString();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which scenario that it is not treat as succeeded when executionException==null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens if the user requests to stop the execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Let's add a comment there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants