You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Our application runs on 2 Kubernetes pods and uses fs2-kafka for several consumers; most of the time we have 8 consumers in total per topic, 4 consumers on each pod, every consumer subscribed to its own partition.
When our pods are scaled down and up again, i.e. when our application is restarted, we often find that for one topic, some 'zombie' consumers are still hanging around on the broker after the restart. For this topic we see a mix of new consumers from both restarted pods, plus a few consumers from one pod from before the restart. At least, that's what the broker shows, in reality the old pod has long died and those consumers don't exist anymore. However since the broker is unaware of this, the stale consumers are still assigned to partitions, which means that new messages from those partitions are not consumed anymore and a lag builds up.
The topic where this happens is not always the same, also sometimes a restart doesn't give any problems. And the workaround is simply to restart our pods once more, usually the stale consumers are gone after that.
We suspect that the consumer might not have unsubscribed properly during JVM shutdown, but this is just a guess: we don't do any unsubscribing by ourselves, we completely rely on fs2-kafka for this.
We usually subscribe like this:
Even if FS2 is failing to unsubscribe, this is mostly a Kafka Broker issue.
Which value do you have set for session.timeout.ms?
Regardless of the lib used (FS2, raw KafkaConsumer, etc) the broker should free those partitions and reassign them to another instance live in the Consumer Group
We left session.timeout.ms at its default, so 45000 (45s).
We have reason to suspect that the problem is on our side, because other application that are using Akka Kafka clients and are on the same broker do not have this problem.
You can try to set org.apache.kafka logs to DEBUG level. Here you'll see if the lib is unsubscribing or not. But if it's not, I'd bet on some resource shutdown order issue in your app. We have multiple projects using FS2 at $work at these graceful shutdown correctly.
Our application runs on 2 Kubernetes pods and uses fs2-kafka for several consumers; most of the time we have 8 consumers in total per topic, 4 consumers on each pod, every consumer subscribed to its own partition.
When our pods are scaled down and up again, i.e. when our application is restarted, we often find that for one topic, some 'zombie' consumers are still hanging around on the broker after the restart. For this topic we see a mix of new consumers from both restarted pods, plus a few consumers from one pod from before the restart. At least, that's what the broker shows, in reality the old pod has long died and those consumers don't exist anymore. However since the broker is unaware of this, the stale consumers are still assigned to partitions, which means that new messages from those partitions are not consumed anymore and a lag builds up.
The topic where this happens is not always the same, also sometimes a restart doesn't give any problems. And the workaround is simply to restart our pods once more, usually the stale consumers are gone after that.
We suspect that the consumer might not have unsubscribed properly during JVM shutdown, but this is just a guess: we don't do any unsubscribing by ourselves, we completely rely on fs2-kafka for this.
We usually subscribe like this:
and we don't have any code to explicitly shutdown or unsubscribe upon JVM exit.
Could it be that we are not using fs2-kafka in the correct way, and that this is causing the random stale consumers? Or is this perhaps a known issue?
The text was updated successfully, but these errors were encountered: