Skip to content

Commit

Permalink
Handling thread sync when evenhandler is updated with a new queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Robban1980 committed Dec 5, 2024
1 parent 51b3b7d commit 16b8ccf
Showing 1 changed file with 34 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void start() {
}

@Override
public void stop() {
public synchronized void stop() {
LOGGER.info("Kafka consumer stopping for topic: {}", topic);
if (!running) {
LOGGER.warn("KafkaObservableQueue is already stopped for topic: {}", topic);
Expand Down Expand Up @@ -488,39 +488,56 @@ public void stop() {
}

private void retryCloseConsumer() {
int retries = 3;
while (retries > 0) {
int attempts = 3;
while (attempts > 0) {
try {
kafkaConsumer.unsubscribe();
kafkaConsumer.close();
LOGGER.info("Kafka consumer closed successfully on retry.");
return;
LOGGER.info("Kafka consumer stopped for topic: {}", topic);
return; // Exit if successful
} catch (Exception e) {
retries--;
LOGGER.warn(
"Retry failed to close Kafka consumer. Remaining attempts: {}", retries, e);
if (retries == 0) {
LOGGER.error("Exhausted retries for closing Kafka consumer.");
"Error stopping Kafka consumer for topic: {}, attempts remaining: {}",
topic,
attempts - 1,
e);
attempts--;
try {
Thread.sleep(1000); // Wait before retrying
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOGGER.error("Thread interrupted during Kafka consumer shutdown retries");
break;
}
}
}
LOGGER.error("Failed to stop Kafka consumer for topic: {} after retries", topic);
}

private void retryCloseProducer() {
int retries = 3;
while (retries > 0) {
int attempts = 3;
while (attempts > 0) {
try {
kafkaProducer.close();
LOGGER.info("Kafka producer closed successfully on retry.");
return;
LOGGER.info("Kafka producer stopped for topic: {}", topic);
return; // Exit if successful
} catch (Exception e) {
retries--;
LOGGER.warn(
"Retry failed to close Kafka producer. Remaining attempts: {}", retries, e);
if (retries == 0) {
LOGGER.error("Exhausted retries for closing Kafka producer.");
"Error stopping Kafka producer for topic: {}, attempts remaining: {}",
topic,
attempts - 1,
e);
attempts--;
try {
Thread.sleep(1000); // Wait before retrying
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOGGER.error("Thread interrupted during Kafka producer shutdown retries");
break;
}
}
}
LOGGER.error("Failed to stop Kafka producer for topic: {} after retries", topic);
}

@Override
Expand Down

0 comments on commit 16b8ccf

Please sign in to comment.