Skip to content

Commit

Permalink
Fix CallbackStack.Node leak (#1318)
Browse files Browse the repository at this point in the history
Some context to start: Cats Effects has been having memory leaks in CallbackStack since version 3.4.3.
See for example: typelevel/cats-effect#3935

I've been facing this memory leak in an application using fs2-kafka, and found that I'm not the only one (typelevel/cats-effect#3973).

Using a simple consumer > produce stream application, I monitored the size of the CallbackStack using the following command:

```
while sleep 1; do jcmd <pid> GC.class_histogram | grep 'cats.effect.CallbackStack$Node' ; done
```

I found that swapping the `F.race(shutdown, fetch)` for `fetch` stops the memory leak. This should not be an issue because the Stream is anyway interrupted on `.interruptWhen(F.race(shutdown, stopReqs.get).void.attempt)`, but I'm not 100% convinced of this.

Co-authored-by: Adrien Bestel <[email protected]>
  • Loading branch information
abestel and Adrien Bestel authored Apr 17, 2024
1 parent ed3d15e commit fb082a0
Showing 1 changed file with 7 additions and 11 deletions.
18 changes: 7 additions & 11 deletions modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ object KafkaConsumer {
stopReqs <- Deferred[F, Unit]
} yield Stream
.eval {
def fetchPartition: F[Unit] = F
val fetchPartition: F[Unit] = F
.deferred[PartitionResult]
.flatMap { deferred =>
val callback: PartitionResult => F[Unit] =
Expand Down Expand Up @@ -202,19 +202,15 @@ object KafkaConsumer {

assigned.ifM(storeFetch, completeRevoked)
} >> deferred.get
F.race(shutdown, fetch)
.flatMap {
case Left(()) =>
stopReqs.complete(()).void

case Right((chunk, reason)) =>
val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)
fetch.flatMap { case (chunk, reason) =>
val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)

val completeRevoked =
stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked)
val completeRevoked =
stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked)

enqueueChunk >> completeRevoked
}
enqueueChunk >> completeRevoked
}
}

Stream
Expand Down

0 comments on commit fb082a0

Please sign in to comment.