diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index db0d18f7..3395f9ce 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -219,20 +219,32 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext val groupTopicPartitionOffsetMap:Cache[(String, String, Int), OffsetAndMetadata] = Caffeine .newBuilder() .maximumSize(config.groupTopicPartitionOffsetMaxSize) - .expireAfterAccess(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS) + .expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS) .removalListener(new RemovalListener[(String, String, Int), OffsetAndMetadata] { override def onRemoval(key: (String, String, Int), value: OffsetAndMetadata, cause: RemovalCause): Unit = { groupTopicPartitionOffsetSet.remove(key) } }) .build[(String, String, Int), OffsetAndMetadata]() - val topicConsumerSetMap = new TrieMap[String, mutable.Set[String]]() - val consumerTopicSetMap = new TrieMap[String, mutable.Set[String]]() + + val topicConsumerSetMap: Cache[String, mutable.Set[String]] = Caffeine + .newBuilder() + .maximumSize(config.groupTopicPartitionOffsetMaxSize) + .expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS) + .build[String, mutable.Set[String]]() + + + val consumerTopicSetMap: Cache[String, mutable.Set[String]] = Caffeine + .newBuilder() + .maximumSize(config.groupTopicPartitionOffsetMaxSize) + .expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS) + .build[String, mutable.Set[String]]() + val groupTopicPartitionMemberSet: mutable.Set[(String, String, Int)] = KafkaManagedOffsetCache.createSet() val groupTopicPartitionMemberMap: Cache[(String, String, Int), MemberMetadata] = Caffeine .newBuilder() .maximumSize(config.groupTopicPartitionOffsetMaxSize) - .expireAfterAccess(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS) + .expireAfterWrite(config.groupTopicPartitionOffsetExpireDays, TimeUnit.DAYS) .removalListener(new RemovalListener[(String, String, Int), MemberMetadata] { override def onRemoval(key: (String, String, Int), value: MemberMetadata, cause: RemovalCause): Unit = { groupTopicPartitionMemberSet.remove(key) @@ -366,26 +378,9 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext groupTopicPartitionOffsetSet.add(newKey) val topic = key.topicPartition.topic val group = key.group - val consumerSet = { - if (topicConsumerSetMap.contains(topic)) { - topicConsumerSetMap(topic) - } else { - val s = new mutable.TreeSet[String]() - topicConsumerSetMap += topic -> s - s - } - } + val consumerSet = topicConsumerSetMap.get(topic, _ => new mutable.TreeSet[String]()) consumerSet += group - - val topicSet = { - if (consumerTopicSetMap.contains(group)) { - consumerTopicSetMap(group) - } else { - val s = new mutable.TreeSet[String]() - consumerTopicSetMap += group -> s - s - } - } + val topicSet = consumerTopicSetMap.get(group, _ => new mutable.TreeSet[String]()) topicSet += topic case GroupMetadataKey(version, key) => val value: GroupMetadata = readGroupMessageValue(key, ByteBuffer.wrap(record.value()), Time.SYSTEM) @@ -421,6 +416,10 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext groupTopicPartitionOffsetSet.clear() groupTopicPartitionOffsetMap.invalidateAll() groupTopicPartitionOffsetMap.cleanUp() + topicConsumerSetMap.invalidateAll(); + topicConsumerSetMap.cleanUp(); + consumerTopicSetMap.invalidateAll(); + consumerTopicSetMap.cleanUp(); info(s"KafkaManagedOffsetCache shut down for cluster ${clusterContext.config.name}") } @@ -436,9 +435,9 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext Option(groupTopicPartitionMemberMap.getIfPresent((group, topic, part))).map(mm => s"${mm.memberId}:${mm.clientHost}") } - def getConsumerTopics(group: String) : Set[String] = consumerTopicSetMap.get(group).map(_.toSet).getOrElse(Set.empty) - def getTopicConsumers(topic: String) : Set[String] = topicConsumerSetMap.get(topic).map(_.toSet).getOrElse(Set.empty) - def getConsumers : IndexedSeq[String] = consumerTopicSetMap.keys.toIndexedSeq + def getConsumerTopics(group: String) : Set[String] = consumerTopicSetMap.get(group, _ => new mutable.TreeSet[String]()).toSet + def getTopicConsumers(topic: String) : Set[String] = topicConsumerSetMap.get(topic, _ => new mutable.TreeSet[String]()).toSet + def getConsumers : IndexedSeq[String] = consumerTopicSetMap.asMap().keySet().asScala.toIndexedSeq def getLastUpdateTimeMillis: Long = lastUpdateTimeMillis }