Skip to content

Commit

Permalink
Merge pull request #23 from arenadata/feature/ADS-731
Browse files Browse the repository at this point in the history
ADS-731. Added kafka 2.6.0 support.
  • Loading branch information
Asmoday authored Jan 12, 2022
2 parents 0d16315 + 6a32fb7 commit 8357bc4
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 1 deletion.
4 changes: 4 additions & 0 deletions app/controllers/Logkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
LogkafkaNewConfigs.configMaps(Kafka_2_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_4_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_4_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_6_0_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_6_0).map{case(k,v) => LKConfig(k,Some(v))}.toList)
val kafka_2_8_1_Default = CreateLogkafka("","",
LogkafkaNewConfigs.configMaps(Kafka_2_8_1).map{case(k,v) => LKConfig(k,Some(v))}.toList)

Expand Down Expand Up @@ -163,6 +165,7 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext)
case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext)
case Kafka_2_4_0 => (defaultCreateForm.fill(kafka_2_4_0_Default), clusterContext)
case Kafka_2_6_0 => (defaultCreateForm.fill(kafka_2_6_0_Default), clusterContext)
case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext)
}
}
Expand Down Expand Up @@ -269,6 +272,7 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana
case Kafka_2_1_1 => LogkafkaNewConfigs.configNames(Kafka_2_1_1).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_2_0 => LogkafkaNewConfigs.configNames(Kafka_2_2_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_4_0 => LogkafkaNewConfigs.configNames(Kafka_2_4_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_6_0 => LogkafkaNewConfigs.configNames(Kafka_2_6_0).map(n => (n,LKConfig(n,None))).toMap
case Kafka_2_8_1 => LogkafkaNewConfigs.configNames(Kafka_2_8_1).map(n => (n,LKConfig(n,None))).toMap
}
val identityOption = li.identityMap.get(log_path)
Expand Down
3 changes: 3 additions & 0 deletions app/controllers/Topic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
val kafka_2_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_4_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_4_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_6_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_6_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)
val kafka_2_8_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList)

val defaultCreateForm = Form(
Expand Down Expand Up @@ -172,6 +173,7 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext)
case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext)
case Kafka_2_4_0 => (defaultCreateForm.fill(kafka_2_4_0_Default), clusterContext)
case Kafka_2_6_0 => (defaultCreateForm.fill(kafka_2_6_0_Default), clusterContext)
case Kafka_2_8_1 => (defaultCreateForm.fill(kafka_2_8_1_Default), clusterContext)
}
}
Expand Down Expand Up @@ -424,6 +426,7 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager
case Kafka_2_1_1 => TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_2_0 => TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_4_0 => TopicConfigs.configNamesAndDoc(Kafka_2_4_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_6_0 => TopicConfigs.configNamesAndDoc(Kafka_2_6_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
case Kafka_2_8_1 => TopicConfigs.configNamesAndDoc(Kafka_2_8_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) }
}
val updatedConfigMap = ti.config.toMap
Expand Down
2 changes: 1 addition & 1 deletion app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath


object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_4_0, Kafka_2_8_1)
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_4_0, Kafka_2_6_0, Kafka_2_8_1)
val ConsumerOffsetTopic = "__consumer_offsets"

def isSupported(version: KafkaVersion) : Boolean = {
Expand Down
5 changes: 5 additions & 0 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ case object Kafka_2_4_0 extends KafkaVersion {
override def toString = "2.4.0"
}

case object Kafka_2_6_0 extends KafkaVersion {
override def toString = "2.6.0"
}

case object Kafka_2_8_1 extends KafkaVersion {
override def toString = "2.8.1"
}
Expand Down Expand Up @@ -134,6 +138,7 @@ object KafkaVersion {
"2.1.1" -> Kafka_2_1_1,
"2.2.0" -> Kafka_2_2_0,
"2.4.0" -> Kafka_2_4_0,
"2.6.0" -> Kafka_2_6_0,
"2.8.1" -> Kafka_2_8_1
)

Expand Down
1 change: 1 addition & 0 deletions app/kafka/manager/utils/LogkafkaNewConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object LogkafkaNewConfigs {
Kafka_2_1_1 -> logkafka82.LogConfig,
Kafka_2_2_0 -> logkafka82.LogConfig,
Kafka_2_4_0 -> logkafka82.LogConfig,
Kafka_2_6_0 -> logkafka82.LogConfig,
Kafka_2_8_1 -> logkafka82.LogConfig
)

Expand Down
1 change: 1 addition & 0 deletions app/kafka/manager/utils/TopicConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object TopicConfigs {
Kafka_2_1_1 -> two00.LogConfig,
Kafka_2_2_0 -> two00.LogConfig,
Kafka_2_4_0 -> two00.LogConfig,
Kafka_2_6_0 -> two00.LogConfig,
Kafka_2_8_1 -> two00.LogConfig
)

Expand Down
2 changes: 2 additions & 0 deletions test/kafka/manager/model/KafkaVersionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class KafkaVersionTest extends FunSuite {
"2.1.1" -> Kafka_2_1_1,
"2.2.0" -> Kafka_2_2_0,
"2.4.0" -> Kafka_2_4_0,
"2.6.0" -> Kafka_2_6_0,
"2.8.1" -> Kafka_2_8_1
)

Expand Down Expand Up @@ -79,6 +80,7 @@ class KafkaVersionTest extends FunSuite {
("2.1.1","2.1.1"),
("2.2.0","2.2.0"),
("2.4.0","2.4.0"),
("2.6.0","2.6.0"),
("2.8.1","2.8.1")
)
assertResult(expected)(KafkaVersion.formSelectList)
Expand Down
8 changes: 8 additions & 0 deletions test/kafka/manager/utils/TestClusterConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ class TestClusterConfig extends FunSuite with Matchers {
assert(cc == deserialize.get)
}

test("serialize and deserialize 2.6.0") {
val cc = ClusterConfig("qa", "2.6.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah"))
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
assert(cc == deserialize.get)
}

test("serialize and deserialize 2.8.1") {
val cc = ClusterConfig("qa", "2.8.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah"))
val serialize: String = ClusterConfig.serialize(cc)
Expand Down

0 comments on commit 8357bc4

Please sign in to comment.