Skip to content

Commit

Permalink
KAFKA-18062: use feature version to enable ELR (apache#17867)
Browse files Browse the repository at this point in the history
Replace the ELR static config with feature version.

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
CalvinConfluent authored Nov 26, 2024
1 parent f5d7123 commit 2b2b3cd
Show file tree
Hide file tree
Showing 19 changed files with 201 additions and 70 deletions.
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ class ControllerServer(
setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
setInterBrokerListenerName(config.interBrokerListenerName.value()).
setEligibleLeaderReplicasEnabled(config.elrEnabled)
setInterBrokerListenerName(config.interBrokerListenerName.value())
}
controller = controllerBuilder.build()

Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val migrationEnabled: Boolean = getBoolean(KRaftConfigs.MIGRATION_ENABLED_CONFIG)
val migrationMetadataMinBatchSize: Int = getInt(KRaftConfigs.MIGRATION_METADATA_MIN_BATCH_SIZE_CONFIG)

val elrEnabled: Boolean = getBoolean(KRaftConfigs.ELR_ENABLED_CONFIG)

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.queue.KafkaEventQueue
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.timer.SystemTimer
Expand Down Expand Up @@ -376,6 +376,12 @@ abstract class QuorumTestHarness extends Logging {
} else TransactionVersion.TV_1.featureLevel()
formatter.setFeatureLevel(TransactionVersion.FEATURE_NAME, transactionVersion)

val elrVersion =
if (TestInfoUtils.isEligibleLeaderReplicasV1Enabled(testInfo)) {
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()
} else EligibleLeaderReplicasVersion.ELRV_0.featureLevel()
formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, elrVersion)

addFormatterSettings(formatter)
formatter.run()
val bootstrapMetadata = formatter.bootstrapMetadata()
Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/kafka/utils/TestInfoUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ object TestInfoUtils {
def isTransactionV2Enabled(testInfo: TestInfo): Boolean = {
!testInfo.getDisplayName.contains("isTV2Enabled=false")
}

/**
* Returns whether eligible leader replicas version 1 is enabled.
* When no parameter is provided, the default returned is false.
*/
def isEligibleLeaderReplicasV1Enabled(testInfo: TestInfo): Boolean = {
testInfo.getDisplayName.contains("isELRV1Enabled=true")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record.RecordVersion
import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{GroupVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.server.common.{EligibleLeaderReplicasVersion, GroupVersion, MetadataVersion, TransactionVersion}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Tag
Expand Down Expand Up @@ -65,11 +65,11 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
apiVersion: Short = ApiKeys.API_VERSIONS.latestVersion
): Unit = {
if (apiVersion >= 3) {
assertEquals(3, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(4, apiVersionsResponse.data().finalizedFeatures().size())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).minVersionLevel())
assertEquals(MetadataVersion.latestTesting().featureLevel(), apiVersionsResponse.data().finalizedFeatures().find(MetadataVersion.FEATURE_NAME).maxVersionLevel())

assertEquals(4, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(5, apiVersionsResponse.data().supportedFeatures().size())
assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(MetadataVersion.FEATURE_NAME).minVersion())
if (apiVersion < 4) {
assertEquals(1, apiVersionsResponse.data().supportedFeatures().find("kraft.version").minVersion())
Expand All @@ -83,6 +83,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {

assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).minVersion())
assertEquals(GroupVersion.GV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(GroupVersion.FEATURE_NAME).maxVersion())

assertEquals(0, apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).minVersion())
assertEquals(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), apiVersionsResponse.data().supportedFeatures().find(EligibleLeaderReplicasVersion.FEATURE_NAME).maxVersion())
}
val expectedApis = if (cluster.controllerListenerName().toScala.contains(listenerName)) {
ApiVersionsResponse.collectApis(
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ Found problem:
properties.putAll(defaultStaticQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
assertEquals("Unsupported feature: non.existent.feature. Supported features are: " +
"group.version, kraft.version, transaction.version",
"eligible.leader.replicas.version, group.version, kraft.version, transaction.version",
assertThrows(classOf[FormatterException], () =>
runFormatCommand(new ByteArrayOutputStream(), properties,
Seq("--feature", "non.existent.feature=20"))).getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,15 @@ FinalizedControllerFeatures finalizedFeatures(long epoch) {
return new FinalizedControllerFeatures(features, epoch);
}

FinalizedControllerFeatures latestFinalizedFeatures() {
Map<String, Short> features = new HashMap<>();
features.put(MetadataVersion.FEATURE_NAME, metadataVersion.get().featureLevel());
for (Entry<String, Short> entry : finalizedVersions.entrySet()) {
features.put(entry.getKey(), entry.getValue());
}
return new FinalizedControllerFeatures(features, -1);
}

public void replay(FeatureLevelRecord record) {
VersionRange range = quorumFeatures.localSupportedFeature(record.name());
if (!range.contains(record.featureLevel())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ public static class Builder {
private Map<String, Object> staticConfig = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = null;
private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
private boolean eligibleLeaderReplicasEnabled = false;
private DelegationTokenCache tokenCache;
private String tokenSecretKeyString;
private long delegationTokenMaxLifeMs;
Expand Down Expand Up @@ -337,11 +336,6 @@ public Builder setStaticConfig(Map<String, Object> staticConfig) {
return this;
}

public Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}

public Builder setDelegationTokenCache(DelegationTokenCache tokenCache) {
this.tokenCache = tokenCache;
return this;
Expand Down Expand Up @@ -432,7 +426,6 @@ public QuorumController build() throws Exception {
delegationTokenMaxLifeMs,
delegationTokenExpiryTimeMs,
delegationTokenExpiryCheckIntervalMs,
eligibleLeaderReplicasEnabled,
uncleanLeaderElectionCheckIntervalMs,
interBrokerListenerName
);
Expand Down Expand Up @@ -1436,11 +1429,6 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
*/
private final BootstrapMetadata bootstrapMetadata;

/**
* True if the KIP-966 eligible leader replicas feature is enabled.
*/
private final boolean eligibleLeaderReplicasEnabled;

/**
* The maximum number of records per batch to allow.
*/
Expand Down Expand Up @@ -1480,7 +1468,6 @@ private QuorumController(
long delegationTokenMaxLifeMs,
long delegationTokenExpiryTimeMs,
long delegationTokenExpiryCheckIntervalMs,
boolean eligibleLeaderReplicasEnabled,
long uncleanLeaderElectionCheckIntervalMs,
String interBrokerListenerName
) {
Expand Down Expand Up @@ -1549,7 +1536,6 @@ private QuorumController(
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
setDefaultNumPartitions(defaultNumPartitions).
setEligibleLeaderReplicasEnabled(eligibleLeaderReplicasEnabled).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
Expand Down Expand Up @@ -1580,7 +1566,6 @@ private QuorumController(
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
this.recordRedactor = new RecordRedactor(configSchema);
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
if (maxIdleIntervalNs.isPresent()) {
registerWriteNoOpRecord(maxIdleIntervalNs.getAsLong());
}
Expand All @@ -1599,10 +1584,7 @@ private QuorumController(
setMetrics(controllerMetrics).
setTime(time).
build();

log.info("Creating new QuorumController with clusterId {}.{}",
clusterId,
eligibleLeaderReplicasEnabled ? " Eligible leader replicas enabled." : "");
log.info("Creating new QuorumController with clusterId {}", clusterId);

this.raftClient.register(metaLogListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.kafka.metadata.placement.TopicAssignment;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.TopicIdPartition;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateTopicPolicy;
Expand Down Expand Up @@ -165,7 +166,6 @@ static class Builder {
private ClusterControlManager clusterControl = null;
private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
private FeatureControlManager featureControl = null;
private boolean eligibleLeaderReplicasEnabled = false;

Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
Expand All @@ -187,11 +187,6 @@ Builder setDefaultNumPartitions(int defaultNumPartitions) {
return this;
}

Builder setEligibleLeaderReplicasEnabled(boolean eligibleLeaderReplicasEnabled) {
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
return this;
}

Builder setMaxElectionsPerImbalance(int maxElectionsPerImbalance) {
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
return this;
Expand Down Expand Up @@ -233,7 +228,6 @@ ReplicationControlManager build() {
defaultReplicationFactor,
defaultNumPartitions,
maxElectionsPerImbalance,
eligibleLeaderReplicasEnabled,
configurationControl,
clusterControl,
createTopicPolicy,
Expand Down Expand Up @@ -305,11 +299,6 @@ static Map<String, String> translateCreationConfigs(CreatableTopicConfigCollecti
*/
private final int defaultNumPartitions;

/**
* True if eligible leader replicas is enabled.
*/
private final boolean eligibleLeaderReplicasEnabled;

/**
* Maximum number of leader elections to perform during one partition leader balancing operation.
*/
Expand Down Expand Up @@ -399,7 +388,6 @@ private ReplicationControlManager(
short defaultReplicationFactor,
int defaultNumPartitions,
int maxElectionsPerImbalance,
boolean eligibleLeaderReplicasEnabled,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
Optional<CreateTopicPolicy> createTopicPolicy,
Expand All @@ -410,7 +398,6 @@ private ReplicationControlManager(
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.eligibleLeaderReplicasEnabled = eligibleLeaderReplicasEnabled;
this.configurationControl = configurationControl;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
Expand Down Expand Up @@ -1029,7 +1016,8 @@ TimelineHashSet<TopicIdPartition> imbalancedPartitions() {
}

boolean isElrEnabled() {
return eligibleLeaderReplicasEnabled && featureControl.metadataVersion().isElrSupported();
return featureControl.metadataVersion().isElrSupported() && featureControl.latestFinalizedFeatures().
versionOrDefault(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 0) >= EligibleLeaderReplicasVersion.ELRV_1.featureLevel();
}

ControllerResult<AlterPartitionResponseData> alterPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;

import org.slf4j.Logger;
Expand Down Expand Up @@ -75,6 +76,32 @@ static BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
return features;
}

/**
* Create a broker features collection for use in a registration request. MV and given features are included.
*
* @param minVersion The minimum supported MV.
* @param maxVersion The maximum supported MV.
* @param featureMaxVersions The features and their max supported versions.
*/
static BrokerRegistrationRequestData.FeatureCollection brokerFeaturesPlusFeatureVersions(
MetadataVersion minVersion,
MetadataVersion maxVersion,
Map<String, Short> featureMaxVersions
) {
BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection();
features.add(new BrokerRegistrationRequestData.Feature()
.setName(MetadataVersion.FEATURE_NAME)
.setMinSupportedVersion(minVersion.featureLevel())
.setMaxSupportedVersion(maxVersion.featureLevel()));
featureMaxVersions.entrySet().forEach(entry -> {
features.add(new BrokerRegistrationRequestData.Feature()
.setName(entry.getKey())
.setMaxSupportedVersion(entry.getValue())
.setMinSupportedVersion((short) 0));
});
return features;
}

/**
* Register the given number of brokers.
*
Expand All @@ -94,7 +121,8 @@ static Map<Integer, Long> registerBrokersAndUnfence(
.setBrokerId(brokerId)
.setRack(null)
.setClusterId(controller.clusterId())
.setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting()))
.setFeatures(brokerFeaturesPlusFeatureVersions(MetadataVersion.IBP_3_0_IV1, MetadataVersion.latestTesting(),
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME, EligibleLeaderReplicasVersion.ELRV_1.featureLevel())))
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
.setLogDirs(Collections.singletonList(
Uuid.fromString("TESTBROKER" + Integer.toString(100000 + brokerId).substring(1) + "DIRAAAA")
Expand Down
Loading

0 comments on commit 2b2b3cd

Please sign in to comment.