Skip to content

Commit

Permalink
KAFKA-17374 add bootstrap.controller to kafka-reassign-partitions.sh (#…
Browse files Browse the repository at this point in the history
…16964)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
m1a2st authored Oct 14, 2024
1 parent a51eec1 commit 203f323
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ public static void main(String[] args) {
Properties props = opts.options.has(opts.commandConfigOpt)
? Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
: new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
if (opts.options.has(opts.bootstrapControllerOpt)) {
props.put(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, opts.options.valueOf(opts.bootstrapControllerOpt));
} else {
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt));
}
props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool");
adminClient = Admin.create(props);
handleAction(adminClient, opts);
Expand Down Expand Up @@ -1405,9 +1409,13 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
}

OptionSpec<?> action = allActions.get(0);

if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller");
else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please specify either --bootstrap-server or --bootstrap-controller");

if (!opts.options.has(opts.bootstrapServerOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "Please specify --bootstrap-server");
boolean isBootstrapServer = opts.options.has(opts.bootstrapServerOpt);

// Make sure that we have all the required arguments for our action.
Map<OptionSpec<?>, List<OptionSpec<?>>> requiredArgs = new HashMap<>();
Expand Down Expand Up @@ -1451,13 +1459,13 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
opts.timeoutOpt
));
permittedArgs.put(opts.cancelOpt, Arrays.asList(
opts.bootstrapServerOpt,
isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt,
opts.commandConfigOpt,
opts.preserveThrottlesOpt,
opts.timeoutOpt
));
permittedArgs.put(opts.listOpt, Arrays.asList(
opts.bootstrapServerOpt,
isBootstrapServer ? opts.bootstrapServerOpt : opts.bootstrapControllerOpt,
opts.commandConfigOpt
));

Expand All @@ -1469,7 +1477,6 @@ static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) {
String.format("Option \"%s\" can't be used with action \"%s\"", opt, action));
}
});

return opts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ReassignPartitionsCommandOptions extends CommandDefaultOptions {
final OptionSpec<String> reassignmentJsonFileOpt;
final OptionSpec<String> topicsToMoveJsonFileOpt;
final OptionSpec<String> brokerListOpt;
final OptionSpec<String> bootstrapControllerOpt;
final OptionSpec<?> disableRackAware;
final OptionSpec<Long> interBrokerThrottleOpt;
final OptionSpec<Long> replicaAlterLogDirsThrottleOpt;
Expand All @@ -54,8 +55,8 @@ public ReassignPartitionsCommandOptions(String[] args) {
listOpt = parser.accepts("list", "List all active partition reassignments.");

// Arguments
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED: the server(s) to use for bootstrapping.")
.withRequiredArg()
bootstrapServerOpt = parser.accepts("bootstrap-server", "the server(s) to use for bootstrapping.")
.withOptionalArg()
.describedAs("Server(s) to use for bootstrapping")
.ofType(String.class);

Expand Down Expand Up @@ -83,6 +84,13 @@ public ReassignPartitionsCommandOptions(String[] args) {
.withRequiredArg()
.describedAs("brokerlist")
.ofType(String.class);

bootstrapControllerOpt = parser.accepts("bootstrap-controller", "The controller to use for reassignment. " +
"By default, the tool will get the quorum controller. This option supports the actions --cancel and --list.")
.withOptionalArg()
.describedAs("bootstrap controller to connect to")
.ofType(String.class);

disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment");
interBrokerThrottleOpt = parser.accepts("throttle", "The movement of partitions between brokers will be throttled to this value (bytes/sec). " +
"This option can be included with --execute when a reassignment is started, and it can be altered by resubmitting the current reassignment " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

@Timeout(60)
public class ReassignPartitionsCommandArgsTest {
public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify --bootstrap-server";
public static final String MISSING_BOOTSTRAP_SERVER_MSG = "Please specify either --bootstrap-server or --bootstrap-controller";

@BeforeEach
public void setUp() {
Expand Down Expand Up @@ -289,4 +289,34 @@ public void shouldNotAllowCancelWithoutReassignmentJsonFile() {
"--preserve-throttles"};
shouldFailWith("Missing required argument \"[reassignment-json-file]\"", args);
}

@Test
public void shouldAllowBootstrapControllerArg() {
String[] args = new String[] {
"--bootstrap-controller", "localhost:1234",
"--cancel",
"--reassignment-json-file", "myfile.json"};
ReassignPartitionsCommand.validateAndParseArgs(args);
}

@Test
public void shouldNotAllowBootstrapControllerArgWithUnsupportedAction() {
String[] args = new String[] {
"--bootstrap-controller", "localhost:1234",
"--generate",
"--broker-list", "101,102",
"--topics-to-move-json-file", "myfile.json"};
shouldFailWith("Option \"[bootstrap-controller]\" can't be used with action \"[generate]", args);
}

@Test
public void shouldNotAllowBootstrapControllerAndBootstrapServerArg() {
String[] args = new String[] {
"--bootstrap-server", "localhost:1234",
"--bootstrap-controller", "localhost:1234",
"--generate",
"--broker-list", "101,102",
"--topics-to-move-json-file", "myfile.json"};
shouldFailWith("Please don't specify both --bootstrap-server and --bootstrap-controller", args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
Expand Down Expand Up @@ -92,6 +93,7 @@
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.BROKER_LEVEL_THROTTLES;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.cancelAssignment;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.executeAssignment;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment;
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.verifyAssignment;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -189,6 +191,24 @@ public void testHighWaterMarkAfterPartitionReassignment() throws Exception {
}, "Timeout for waiting offset");
}
}

@ClusterTest
public void testGenerateAssignmentWithBootstrapServer() throws Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
produceMessages(foo0.topic(), foo0.partition(), 100);

try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}";
generateAssignment(admin, assignment, "1,2,3", false);
Map<TopicPartition, PartitionReassignmentState> finalAssignment = singletonMap(foo0,
new PartitionReassignmentState(asList(0, 1, 2), asList(3, 1, 2), true));
waitForVerifyAssignment(admin, assignment, false,
new VerifyAssignmentResult(finalAssignment));
}
}

@ClusterTest
public void testAlterReassignmentThrottle() throws Exception {
Expand Down Expand Up @@ -326,47 +346,13 @@ public void testProduceAndConsumeWithReassignmentInProgress() throws Exception {
* Test running a reassignment and then cancelling it.
*/
@ClusterTest
public void testCancellation() throws Exception {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
TopicPartition baz1 = new TopicPartition("baz", 1);

produceMessages(foo0.topic(), foo0.partition(), 200);
produceMessages(baz1.topic(), baz1.partition(), 200);
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," +
"{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}";
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet()));
long interBrokerThrottle = 1L;
runExecuteAssignment(false, assignment, interBrokerThrottle, -1L);
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);

Map<TopicPartition, PartitionReassignmentState> partStates = new HashMap<>();

partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false));
partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false));
public void testCancellationWithBootstrapServer() throws Exception {
testCancellationAction(true);
}

// Verify that the reassignment is running. The very low throttle should keep it
// from completing before this runs.
waitForVerifyAssignment(admin, assignment, true,
new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false));
// Cancel the reassignment.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true));
// Broker throttles are still active because we passed --preserve-throttles
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);
// Cancelling the reassignment again should reveal nothing to cancel.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false));
// This time, the broker throttles were removed.
waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs);
// Verify that there are no ongoing reassignments.
assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing);
}
// Verify that the partition is removed from cancelled replicas
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3));
@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT})
public void testCancellationWithBootstrapController() throws Exception {
testCancellationAction(false);
}

@ClusterTest
Expand Down Expand Up @@ -400,7 +386,7 @@ public void testCancellationWithAddingReplicaInIsr() throws Exception {
}

// Now cancel the assignment and verify that the partition is removed from cancelled replicas
assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true));
assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true));
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4));
}
Expand Down Expand Up @@ -717,9 +703,16 @@ private void runExecuteAssignment(Boolean additional,

private Map.Entry<Set<TopicPartition>, Set<TopicPartitionReplica>> runCancelAssignment(
String jsonString,
Boolean preserveThrottles
Boolean preserveThrottles,
Boolean useBootstrapServer
) {
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
Map<String, Object> config;
if (useBootstrapServer) {
config = Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
} else {
config = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers());
}
try (Admin admin = Admin.create(config)) {
return cancelAssignment(admin, jsonString, preserveThrottles, 10000L, Time.SYSTEM);
} catch (ExecutionException | InterruptedException | JsonProcessingException | TerseException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -751,6 +744,49 @@ public BrokerDirs(DescribeLogDirsResult result, int brokerId) throws ExecutionEx
}
}

private void testCancellationAction(boolean useBootstrapServer) throws InterruptedException {
createTopics();
TopicPartition foo0 = new TopicPartition("foo", 0);
TopicPartition baz1 = new TopicPartition("baz", 1);

produceMessages(foo0.topic(), foo0.partition(), 200);
produceMessages(baz1.topic(), baz1.partition(), 200);
String assignment = "{\"version\":1,\"partitions\":" +
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," +
"{\"topic\":\"baz\",\"partition\":1,\"replicas\":[0,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}" +
"]}";
try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
assertEquals(unthrottledBrokerConfigs,
describeBrokerLevelThrottles(admin, unthrottledBrokerConfigs.keySet()));
long interBrokerThrottle = 1L;
runExecuteAssignment(false, assignment, interBrokerThrottle, -1L);
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);

Map<TopicPartition, PartitionReassignmentState> partStates = new HashMap<>();

partStates.put(foo0, new PartitionReassignmentState(asList(0, 1, 3, 2), asList(0, 1, 3), false));
partStates.put(baz1, new PartitionReassignmentState(asList(0, 2, 3, 1), asList(0, 2, 3), false));

// Verify that the reassignment is running. The very low throttle should keep it
// from completing before this runs.
waitForVerifyAssignment(admin, assignment, true,
new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false));
// Cancel the reassignment.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(assignment, true, useBootstrapServer));
// Broker throttles are still active because we passed --preserve-throttles
waitForInterBrokerThrottle(admin, asList(0, 1, 2, 3), interBrokerThrottle);
// Cancelling the reassignment again should reveal nothing to cancel.
assertEquals(new AbstractMap.SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(assignment, false, useBootstrapServer));
// This time, the broker throttles were removed.
waitForBrokerLevelThrottles(admin, unthrottledBrokerConfigs);
// Verify that there are no ongoing reassignments.
assertFalse(runVerifyAssignment(admin, assignment, false).partsOngoing);
}
// Verify that the partition is removed from cancelled replicas
verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3));
verifyReplicaDeleted(new TopicPartitionReplica(baz1.topic(), baz1.partition(), 3));
}

/**
* Remove a set of throttled partitions and reset the overall replication quota.
*/
Expand Down

0 comments on commit 203f323

Please sign in to comment.