Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BackOffPolicy does not work because error count is resetted #7

Open
dwirtz opened this issue Apr 19, 2021 · 0 comments
Open

BackOffPolicy does not work because error count is resetted #7

dwirtz opened this issue Apr 19, 2021 · 0 comments

Comments

@dwirtz
Copy link

dwirtz commented Apr 19, 2021

The AmqpConsumer uses a backoff policy when there is any channel error or problems like backpressure until the channel reaches its capacity and gets full. In this case the consumer restarts and wait up to backoffTime before consuming data via amqp again. The backoffTime is calculated based on the counter numberOfConsecutiveConnectionErrors. Unfortunateley in this case (channel gets full) this counter is always resetted for each retry. In consequence the counter is always equals to 1 and therefore the backOffTime is always 1000 ms.

See consumeLoop method which resets the counter each time:

private void consumeLoop() throws InterruptedException {
Thread currentThread = Thread.currentThread();
Channel channel = null;
Connection connection = null;
int numberOfConsecutiveConnectionErrors = 0;
while (!currentThread.isInterrupted()) {
try {
connection = createConnection();
// successfully connected, reset counter
numberOfConsecutiveConnectionErrors = 0;
channel = createChannel(connection);
declarationsForChannel(channel);
QueueingConsumer consumer = new QueueingConsumer(channel);
processDeliveriesForConsumer(currentThread, consumer);
} catch (InterruptedException e) {
// NOTE - we need to catch InterruptedException before Exception to ensure we see the shutdown signal
LOG.info("Received interrupt, shutting down consumer");
// NEED to set the interrupt status on the thread to ensure the loop terminates
currentThread.interrupt();
} catch (IOException e) {
LOG.info("IOException caught. Closing connection and waiting to reconnect", e);
numberOfConsecutiveConnectionErrors++;
} catch (Exception e) {
// NOTE - this should never happen, but we don't want the thread to die because of an uncaught exception
LOG.error("Consumer encountered an exception while processing the events", e);
}
// we need to take the interrupt status BEFORE closing because the close methods can alter the INTERRUPT state of the thread
boolean wasInterrupted = currentThread.isInterrupted();
closeChannelSilently(channel);
closeConnectionSilently(connection);
if (wasInterrupted) {
currentThread.interrupt();
}
if (!currentThread.isInterrupted()) {
waitToRetry(Sleeper.THREAD_SLEEPER, numberOfConsecutiveConnectionErrors);
}
}
}

Either the counter must not be resetted within the while loop or the back off time calculation should be independent from that counter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant