Skip to content

Commit

Permalink
Merge pull request #26 from arenadata/feature/ADS-1063
Browse files Browse the repository at this point in the history
Feature/ADS-1063
  • Loading branch information
Asmoday authored Jun 22, 2023
2 parents 95f7263 + 57a38e1 commit 5307093
Showing 1 changed file with 25 additions and 26 deletions.
51 changes: 25 additions & 26 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,20 +219,32 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
val groupTopicPartitionOffsetMap:Cache[(String, String, Int), OffsetAndMetadata] = Caffeine
.newBuilder()
.maximumSize(config.groupTopicPartitionOffsetMaxSize)
.expireAfterAccess(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS)
.expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS)
.removalListener(new RemovalListener[(String, String, Int), OffsetAndMetadata] {
override def onRemoval(key: (String, String, Int), value: OffsetAndMetadata, cause: RemovalCause): Unit = {
groupTopicPartitionOffsetSet.remove(key)
}
})
.build[(String, String, Int), OffsetAndMetadata]()
val topicConsumerSetMap = new TrieMap[String, mutable.Set[String]]()
val consumerTopicSetMap = new TrieMap[String, mutable.Set[String]]()

val topicConsumerSetMap: Cache[String, mutable.Set[String]] = Caffeine
.newBuilder()
.maximumSize(config.groupTopicPartitionOffsetMaxSize)
.expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS)
.build[String, mutable.Set[String]]()


val consumerTopicSetMap: Cache[String, mutable.Set[String]] = Caffeine
.newBuilder()
.maximumSize(config.groupTopicPartitionOffsetMaxSize)
.expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS)
.build[String, mutable.Set[String]]()

val groupTopicPartitionMemberSet: mutable.Set[(String, String, Int)] = KafkaManagedOffsetCache.createSet()
val groupTopicPartitionMemberMap: Cache[(String, String, Int), MemberMetadata] = Caffeine
.newBuilder()
.maximumSize(config.groupTopicPartitionOffsetMaxSize)
.expireAfterAccess(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS)
.expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS)
.removalListener(new RemovalListener[(String, String, Int), MemberMetadata] {
override def onRemoval(key: (String, String, Int), value: MemberMetadata, cause: RemovalCause): Unit = {
groupTopicPartitionMemberSet.remove(key)
Expand Down Expand Up @@ -366,26 +378,9 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
groupTopicPartitionOffsetSet.add(newKey)
val topic = key.topicPartition.topic
val group = key.group
val consumerSet = {
if (topicConsumerSetMap.contains(topic)) {
topicConsumerSetMap(topic)
} else {
val s = new mutable.TreeSet[String]()
topicConsumerSetMap += topic -> s
s
}
}
val consumerSet = topicConsumerSetMap.get(topic, _ => new mutable.TreeSet[String]())
consumerSet += group

val topicSet = {
if (consumerTopicSetMap.contains(group)) {
consumerTopicSetMap(group)
} else {
val s = new mutable.TreeSet[String]()
consumerTopicSetMap += group -> s
s
}
}
val topicSet = consumerTopicSetMap.get(group, _ => new mutable.TreeSet[String]())
topicSet += topic
case GroupMetadataKey(version, key) =>
val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()), Time.SYSTEM)
Expand Down Expand Up @@ -421,6 +416,10 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
groupTopicPartitionOffsetSet.clear()
groupTopicPartitionOffsetMap.invalidateAll()
groupTopicPartitionOffsetMap.cleanUp()
topicConsumerSetMap.invalidateAll();
topicConsumerSetMap.cleanUp();
consumerTopicSetMap.invalidateAll();
consumerTopicSetMap.cleanUp();
info(s"KafkaManagedOffsetCache shut down for cluster ${clusterContext.config.name}")
}

Expand All @@ -436,9 +435,9 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
Option(groupTopicPartitionMemberMap.getIfPresent((group, topic, part))).map(mm => s"${mm.memberId}:${mm.clientHost}")
}

def getConsumerTopics(group: String) : Set[String] = consumerTopicSetMap.get(group).map(_.toSet).getOrElse(Set.empty)
def getTopicConsumers(topic: String) : Set[String] = topicConsumerSetMap.get(topic).map(_.toSet).getOrElse(Set.empty)
def getConsumers : IndexedSeq[String] = consumerTopicSetMap.keys.toIndexedSeq
def getConsumerTopics(group: String) : Set[String] = consumerTopicSetMap.get(group, _ => new mutable.TreeSet[String]()).toSet
def getTopicConsumers(topic: String) : Set[String] = topicConsumerSetMap.get(topic, _ => new mutable.TreeSet[String]()).toSet
def getConsumers : IndexedSeq[String] = consumerTopicSetMap.asMap().keySet().asScala.toIndexedSeq
def getLastUpdateTimeMillis: Long = lastUpdateTimeMillis
}

Expand Down

0 comments on commit 5307093

Please sign in to comment.