From 7472550cd4d2b46db7af3bf06814b8971aaaa2a6 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Sun, 13 Mar 2022 12:24:30 +0000 Subject: [PATCH] Add config to emit empty chunks when fetch returns nothing --- .gitignore | 2 + .../scala/fs2/kafka/ConsumerSettings.scala | 11 ++++- .../main/scala/fs2/kafka/KafkaConsumer.scala | 3 +- .../scala/fs2/kafka/KafkaConsumerSpec.scala | 40 +++++++++++++++++-- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index f7f4fa41e..8fc2c65b6 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,5 @@ target/ .metals/ .vscode/ +.bloop/ +metals.sbt \ No newline at end of file diff --git a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala index 6e1582f83..51612a3c5 100644 --- a/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala +++ b/modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala @@ -379,6 +379,10 @@ sealed abstract class ConsumerSettings[F[_], K, V] { */ def maxPrefetchBatches: Int + def emitEmptyChunks: Boolean + + def withEmitEmptyChunks(emitEmptyChunks: Boolean): ConsumerSettings[F, K, V] + /** * Creates a new [[ConsumerSettings]] with the specified value * for [[maxPrefetchBatches]]. Note that if a value lower than @@ -405,6 +409,7 @@ object ConsumerSettings { override val pollTimeout: FiniteDuration, override val commitRecovery: CommitRecovery, override val recordMetadata: ConsumerRecord[K, V] => String, + override val emitEmptyChunks: Boolean, override val maxPrefetchBatches: Int ) extends ConsumerSettings[F, K, V] { override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] = @@ -520,6 +525,9 @@ object ConsumerSettings { override def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V] = copy(maxPrefetchBatches = Math.max(2, maxPrefetchBatches)) + override def withEmitEmptyChunks(emitEmptyChunks: Boolean): ConsumerSettings[F, K, V] = + copy(emitEmptyChunks = emitEmptyChunks) + override def withCredentials( credentialsStore: KafkaCredentialStore ): ConsumerSettings[F, K, V] = @@ -547,7 +555,8 @@ object ConsumerSettings { pollTimeout = 50.millis, commitRecovery = CommitRecovery.Default, recordMetadata = _ => OffsetFetchResponse.NO_METADATA, - maxPrefetchBatches = 2 + maxPrefetchBatches = 2, + emitEmptyChunks = false ) def apply[F[_], K, V]( diff --git a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala index 5f1895d85..ccb504b9b 100644 --- a/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala +++ b/modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala @@ -173,7 +173,8 @@ object KafkaConsumer { stopReqs.complete(()).void case Right((chunk, reason)) => - val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty) + val enqueueChunk = + chunks.offer(Some(chunk)).unlessA(chunk.isEmpty && !settings.emitEmptyChunks) val completeRevoked = stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked) diff --git a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala index d1cbda793..225815bd2 100644 --- a/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala +++ b/modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala @@ -9,7 +9,11 @@ import cats.effect.unsafe.implicits.global import fs2.Stream import fs2.concurrent.SignallingRef import fs2.kafka.internal.converters.collection._ -import org.apache.kafka.clients.consumer.{ConsumerConfig, CooperativeStickyAssignor, NoOffsetForPartitionException} +import org.apache.kafka.clients.consumer.{ + ConsumerConfig, + CooperativeStickyAssignor, + NoOffsetForPartitionException +} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException import org.scalatest.Assertion @@ -62,6 +66,31 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { } } + it("should produce empty chunk when configured to do so") { + withTopic { topic => + createCustomTopic(topic, partitions = 3) + val produced = (0 until 5).map(n => s"key-$n" -> s"value->$n") + publishToKafka(topic, produced) + + val consumed = + KafkaConsumer + .stream(consumerSettings[IO].withEmitEmptyChunks(true)) + .subscribeTo(topic) + .flatMap( + _.partitionedStream.flatMap( + _.map(committable => committable.record.key -> committable.record.value).chunks + .takeWhile(_.nonEmpty) + .unchunks + ) + ) + .compile + .toVector + .unsafeRunSync() + + consumed should contain theSameElementsAs produced + } + } + it("should consume all records with subscribing for several consumers") { withTopic { topic => createCustomTopic(topic, partitions = 3) @@ -559,7 +588,9 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { .stream( consumerSettings[IO] .withProperties( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[ + CooperativeStickyAssignor + ].getName ) ) .subscribeTo(topic) @@ -801,9 +832,10 @@ final class KafkaConsumerSpec extends BaseKafkaSpec { .stream( consumerSettings[IO] .withProperties( - ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[ + CooperativeStickyAssignor + ].getName ) - ) .subscribeTo(topic) .evalMap { consumer =>