Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config to emit empty chunks when fetch returns nothing #899

Draft
wants to merge 1 commit into
base: series/2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
target/
.metals/
.vscode/
.bloop/
metals.sbt
11 changes: 10 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ sealed abstract class ConsumerSettings[F[_], K, V] {
*/
def maxPrefetchBatches: Int

def emitEmptyChunks: Boolean

def withEmitEmptyChunks(emitEmptyChunks: Boolean): ConsumerSettings[F, K, V]

/**
* Creates a new [[ConsumerSettings]] with the specified value
* for [[maxPrefetchBatches]]. Note that if a value lower than
Expand All @@ -405,6 +409,7 @@ object ConsumerSettings {
override val pollTimeout: FiniteDuration,
override val commitRecovery: CommitRecovery,
override val recordMetadata: ConsumerRecord[K, V] => String,
override val emitEmptyChunks: Boolean,
override val maxPrefetchBatches: Int
) extends ConsumerSettings[F, K, V] {
override def withCustomBlockingContext(ec: ExecutionContext): ConsumerSettings[F, K, V] =
Expand Down Expand Up @@ -520,6 +525,9 @@ object ConsumerSettings {
override def withMaxPrefetchBatches(maxPrefetchBatches: Int): ConsumerSettings[F, K, V] =
copy(maxPrefetchBatches = Math.max(2, maxPrefetchBatches))

override def withEmitEmptyChunks(emitEmptyChunks: Boolean): ConsumerSettings[F, K, V] =
copy(emitEmptyChunks = emitEmptyChunks)

override def withCredentials(
credentialsStore: KafkaCredentialStore
): ConsumerSettings[F, K, V] =
Expand Down Expand Up @@ -547,7 +555,8 @@ object ConsumerSettings {
pollTimeout = 50.millis,
commitRecovery = CommitRecovery.Default,
recordMetadata = _ => OffsetFetchResponse.NO_METADATA,
maxPrefetchBatches = 2
maxPrefetchBatches = 2,
emitEmptyChunks = false
)

def apply[F[_], K, V](
Expand Down
3 changes: 2 additions & 1 deletion modules/core/src/main/scala/fs2/kafka/KafkaConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ object KafkaConsumer {
stopReqs.complete(()).void

case Right((chunk, reason)) =>
val enqueueChunk = chunks.offer(Some(chunk)).unlessA(chunk.isEmpty)
val enqueueChunk =
chunks.offer(Some(chunk)).unlessA(chunk.isEmpty && !settings.emitEmptyChunks)

val completeRevoked =
stopReqs.complete(()).void.whenA(reason.topicPartitionRevoked)
Expand Down
40 changes: 36 additions & 4 deletions modules/core/src/test/scala/fs2/kafka/KafkaConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import cats.effect.unsafe.implicits.global
import fs2.Stream
import fs2.concurrent.SignallingRef
import fs2.kafka.internal.converters.collection._
import org.apache.kafka.clients.consumer.{ConsumerConfig, CooperativeStickyAssignor, NoOffsetForPartitionException}
import org.apache.kafka.clients.consumer.{
ConsumerConfig,
CooperativeStickyAssignor,
NoOffsetForPartitionException
}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.TimeoutException
import org.scalatest.Assertion
Expand Down Expand Up @@ -62,6 +66,31 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
}
}

it("should produce empty chunk when configured to do so") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val produced = (0 until 5).map(n => s"key-$n" -> s"value->$n")
publishToKafka(topic, produced)

val consumed =
KafkaConsumer
.stream(consumerSettings[IO].withEmitEmptyChunks(true))
.subscribeTo(topic)
.flatMap(
_.partitionedStream.flatMap(
_.map(committable => committable.record.key -> committable.record.value).chunks
.takeWhile(_.nonEmpty)
.unchunks
)
)
.compile
.toVector
.unsafeRunSync()

consumed should contain theSameElementsAs produced
}
}

it("should consume all records with subscribing for several consumers") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
Expand Down Expand Up @@ -559,7 +588,9 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(
consumerSettings[IO]
.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[
CooperativeStickyAssignor
].getName
)
)
.subscribeTo(topic)
Expand Down Expand Up @@ -801,9 +832,10 @@ final class KafkaConsumerSpec extends BaseKafkaSpec {
.stream(
consumerSettings[IO]
.withProperties(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[CooperativeStickyAssignor].getName
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> classOf[
CooperativeStickyAssignor
].getName
)

)
.subscribeTo(topic)
.evalMap { consumer =>
Expand Down