Skip to content

Commit

Permalink
Merge pull request #25 from arenadata/feature/add-error-msg-on-zk-errors
Browse files Browse the repository at this point in the history
Added zk error messages in KafkaManagerActor.
  • Loading branch information
Asmoday authored Mar 6, 2023
2 parents 176c1e7 + 92e4682 commit 95f7263
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions app/kafka/manager/actor/KafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,34 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
log.info(s"baseZkPath=$baseZkPath")

//create kafka manager base path
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseZkPath))
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseZkPath)) match {
case Success(zNode) => log.info(s"Kafka manager base path successfully created: $zNode")
case Failure(exception) => log.error(exception, s"Failed to create Kafka manager base path: $baseZkPath")
}
require(curator.checkExists().forPath(baseZkPath) != null,s"Kafka manager base path not found : $baseZkPath")

//create kafka manager base clusters path
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseClusterZkPath))
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(baseClusterZkPath)) match {
case Success(zNode) => log.info(s"Kafka manager base cluster path successfully created: $zNode")
case Failure(exception) => log.error(exception,
s"Failed to create Kafka manager base cluster path: $baseClusterZkPath")
}
require(curator.checkExists().forPath(baseClusterZkPath) != null,s"Kafka manager base clusters path not found : $baseClusterZkPath")

//create kafka manager delete clusters path
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(deleteClustersZkPath))
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(deleteClustersZkPath)) match {
case Success(zNode) => log.info(s"Kafka manager base delete cluster path successfully created: $zNode")
case Failure(exception) => log.error(exception,
s"Failed to create Kafka manager base delete cluster path: $deleteClustersZkPath")
}
require(curator.checkExists().forPath(deleteClustersZkPath) != null,s"Kafka manager delete clusters path not found : $deleteClustersZkPath")

//create kafka manager configs path
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configsZkPath))
Try(curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(configsZkPath)) match {
case Success(zNode) => log.info(s"Kafka manager configs path successfully created: $zNode")
case Failure(exception) => log.error(exception,
s"Failed to create Kafka manager configs path: $configsZkPath")
}
require(curator.checkExists().forPath(configsZkPath) != null,s"Kafka manager configs path not found : $configsZkPath")

private[this] val longRunningExecutor = new ThreadPoolExecutor(
Expand Down

0 comments on commit 95f7263

Please sign in to comment.