From db85f882203c79cef2f14a953ec6eb60bf0bd3c6 Mon Sep 17 00:00:00 2001 From: Velichko Anton Date: Tue, 29 Jun 2021 04:51:20 +0300 Subject: [PATCH] Initial functional streams support (#284) * feat: add fs2 support [reader, subscription, producer, committable producer] (cherry picked from commit 45a0ddc897a741e42c2a6c46d105eb44122a6a5c) * fix: avoid unsubscription by default after end of stream. Co-authored-by: Sam Sam --- README.md | 38 ++++++- build.sbt | 12 +++ .../com/sksamuel/pulsar4s/fs2/Streams.scala | 91 ++++++++++++++++ .../sksamuel/pulsar4s/fs2/StreamsTest.scala | 102 ++++++++++++++++++ 4 files changed, 242 insertions(+), 1 deletion(-) create mode 100644 pulsar4s-fs2/src/main/scala/com/sksamuel/pulsar4s/fs2/Streams.scala create mode 100644 pulsar4s-fs2/src/test/scala/com/sksamuel/pulsar4s/fs2/StreamsTest.scala diff --git a/README.md b/README.md index d14debe3..227bc29f 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,39 @@ Thread.sleep(10000) control.close() ``` +## FS2 Support + +Pulsar4s integrates with the [fs2](https://fs2.io/) library - it provides both a source and a sink. +To use this, you need to add a dependency on the `pulsar4s-{effect}` + `pulsar4s-fs2` module. + +### Example + +```scala +import com.sksamuel.pulsar4s._ +import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._ +import com.sksamuel.pulsar4s.fs2.Streams + +import org.apache.pulsar.client.api.Schema + +implicit val schema: Schema[Array[Byte]] = Schema.BYTES + +val client = PulsarClient("pulsar://localhost:6650") + +val intopic = Topic("persistent://sample/standalone/ns1/in") +val outtopic = Topic("persistent://sample/standalone/ns1/out") + +Streams.batch[IO, Array[Byte]](client.consumerAsync[Array[Byte], IO](ConsumerConfig( + subscriptionName = Subscription("mysub"), + topics = Seq(intopic), + subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest) +))) + .map(_.map(ProducerMessage(_.value))) + .through(Streams.committableSink(client.producerAsync[Array[Byte], IO](ProducerConfig(outtopic)))) + .compile + .drain + +``` + ## Example SBT Setup ```scala @@ -289,7 +322,10 @@ libraryDependencies ++= Seq( // if you want to use cats effects "com.sksamuel.pulsar4s" %% "pulsar4s-cats-effect" % pulsar4sVersion, - + + // if you want to use fs2 + "com.sksamuel.pulsar4s" %% "pulsar4s-fs2" % pulsar4sVersion, + // if you want to use zio "com.sksamuel.pulsar4s" %% "pulsar4s-zio" % pulsar4sVersion ) diff --git a/build.sbt b/build.sbt index 55f16b9f..274bd302 100644 --- a/build.sbt +++ b/build.sbt @@ -18,6 +18,7 @@ val MonixVersion = "3.1.0" val PlayJsonVersion = "2.7.4" // compatible with 2.7.x and 2.8.x val PulsarVersion = "2.7.2" val ReactiveStreamsVersion = "1.0.2" +val FunctionalStreamsVersion = "2.5.2" val Json4sVersion = "3.6.11" val Avro4sVersion = "3.1.0" val ScalaVersion = "2.13.5" @@ -172,6 +173,17 @@ lazy val cats_effect = Project("pulsar4s-cats-effect", file("pulsar4s-cats-effec "dev.zio" %% "zio-interop-cats" % ZIOInteropCatsVersion % Test )) +lazy val fs2 = Project("pulsar4s-fs2", file("pulsar4s-fs2")) + .dependsOn(core) + .dependsOn(cats_effect) + .settings(name := "pulsar4s-fs2") + .settings(allSettings) + .settings(libraryDependencies ++= Seq( + "org.typelevel" %% "cats-effect" % CatsEffectVersion, + "co.fs2" %% "fs2-core" % FunctionalStreamsVersion, + "co.fs2" %% "fs2-reactive-streams" % FunctionalStreamsVersion + )) + lazy val scalaz = Project("pulsar4s-scalaz", file("pulsar4s-scalaz")) .dependsOn(core) .settings(name := "pulsar4s-scalaz") diff --git a/pulsar4s-fs2/src/main/scala/com/sksamuel/pulsar4s/fs2/Streams.scala b/pulsar4s-fs2/src/main/scala/com/sksamuel/pulsar4s/fs2/Streams.scala new file mode 100644 index 00000000..1424e1ce --- /dev/null +++ b/pulsar4s-fs2/src/main/scala/com/sksamuel/pulsar4s/fs2/Streams.scala @@ -0,0 +1,91 @@ +package com.sksamuel.pulsar4s.fs2 + +import cats.Applicative +import cats.effect.{Bracket, BracketThrow, ExitCase, Resource} +import cats.implicits._ +import com.sksamuel.pulsar4s._ + +trait CommittableMessage[F[_], X] { + def ack: F[Unit] + def nack: F[Unit] + def data: X + + def map[Y](f: X => Y): CommittableMessage[F, Y] +} + +object Streams { + import _root_.fs2.{Pipe, Stream} + + private final class DelegateCommittableMessage[F[_] : AsyncHandler, T]( + message: ConsumerCommittableMessage[F, _], + payload: T + ) extends CommittableMessage[F, T] { + override def ack: F[Unit] = message.ack + override def nack: F[Unit] = message.nack + override def data: T = payload + override def map[Y](f: T => Y): CommittableMessage[F, Y] = new DelegateCommittableMessage(message, f(payload)) + } + + private final case class ConsumerCommittableMessage[F[_] : AsyncHandler, T]( + message: ConsumerMessage[T], + consumer: Consumer[T] + ) extends CommittableMessage[F, ConsumerMessage[T]] { + override def ack: F[Unit] = consumer.acknowledgeAsync(message.messageId) + override def nack: F[Unit] = consumer.negativeAcknowledgeAsync(message.messageId) + override def data: ConsumerMessage[T] = message + + override def map[Y](f: ConsumerMessage[T] => Y): CommittableMessage[F, Y] = + new DelegateCommittableMessage(this, f(message)) + } + + def batch[F[_] : Applicative : AsyncHandler, T]( + consumer: F[Consumer[T]] + ): Stream[F, CommittableMessage[F, ConsumerMessage[T]]] = + Stream.resource(Resource.make(consumer)(_.closeAsync)) + .flatMap { consumer => + Stream + .repeatEval(consumer.receiveBatchAsync[F]) + .flatMap(Stream.emits(_)) + .mapChunks(_.map(message => ConsumerCommittableMessage(message, consumer))) + } + + def single[F[_] : Applicative : AsyncHandler, T]( + consumer: F[Consumer[T]] + ): Stream[F, CommittableMessage[F, ConsumerMessage[T]]] = + Stream.resource(Resource.make(consumer)(_.closeAsync)) + .flatMap { consumer => + Stream + .repeatEval(consumer.receiveAsync[F]) + .mapChunks(_.map(message => ConsumerCommittableMessage(message, consumer))) + } + + def reader[F[_] : Applicative : AsyncHandler, T]( + reader: F[Reader[T]] + ): Stream[F, ConsumerMessage[T]] = + Stream.resource(Resource.make(reader)(_.closeAsync)) + .flatMap { reader => + Stream + .repeatEval(reader.nextAsync[F]) + } + + def sink[F[_] : Applicative : AsyncHandler, T]( + producer: F[Producer[T]] + ): Pipe[F, ProducerMessage[T], MessageId] = messages => + Stream.resource(Resource.make(producer)(_.closeAsync)) + .flatMap { producer => + messages.evalMap(producer.sendAsync(_)) + } + + def committableSink[F[_] : Applicative : BracketThrow : AsyncHandler , T]( + producer: F[Producer[T]] + ): Pipe[F, CommittableMessage[F, ProducerMessage[T]], MessageId] = messages => + Stream.resource(Resource.make(producer)(_.closeAsync)) + .flatMap { producer => + messages.evalMap { message => + Bracket[F, Throwable].guaranteeCase(producer.sendAsync(message.data)) { + case ExitCase.Completed => message.ack + case _ => message.nack + } + } + } +} \ No newline at end of file diff --git a/pulsar4s-fs2/src/test/scala/com/sksamuel/pulsar4s/fs2/StreamsTest.scala b/pulsar4s-fs2/src/test/scala/com/sksamuel/pulsar4s/fs2/StreamsTest.scala new file mode 100644 index 00000000..659ce776 --- /dev/null +++ b/pulsar4s-fs2/src/test/scala/com/sksamuel/pulsar4s/fs2/StreamsTest.scala @@ -0,0 +1,102 @@ +package com.sksamuel.pulsar4s.fs2 + +import cats.effect.{IO, Sync} +import cats.implicits._ +import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerConfig, Message, MessageId, ProducerConfig, ProducerMessage, PulsarAsyncClient, PulsarClient, ReaderConfig, Subscription, Topic} +import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition} +import org.scalatest.BeforeAndAfterAll +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.util.UUID + +class StreamsTest extends AnyFunSuite with Matchers with BeforeAndAfterAll { + + implicit val schema: Schema[String] = Schema.STRING + + private val client = PulsarClient("pulsar://localhost:6650") + + override def afterAll(): Unit = { + client.close() + } + + private def publishMessages[F[_]: Sync: AsyncHandler](client: PulsarAsyncClient, t: Topic, messages: List[String]): F[Unit] = + for { + producer <- client.producerAsync[String, F](ProducerConfig( + topic = t + )) + _ <- messages + .map(producer.sendAsync(_)) + .sequence + } yield () + + test("able to read from topic") { + import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._ + val topic = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString) + val messages = (0 to 10).map(p => s"TestMessage_$p").toList + + (for { + _ <- publishMessages[IO](client, topic, messages) + read <- Streams.reader[IO, String](client.readerAsync[String, IO](ReaderConfig( + topic = topic, + startMessage = Message(MessageId.earliest) + ))).take(messages.size).map(_.value).compile.toList + } yield read shouldBe messages).unsafeRunSync() + } + + test("able to read from subscription [batch]") { + import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._ + val topic = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString) + val messages = (0 to 10).map(p => s"TestMessage_$p").toList + + (for { + _ <- publishMessages[IO](client, topic, messages) + batch <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig( + subscriptionName = Subscription("fs2_subscription_batch"), + topics = Seq(topic), + subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest) + ))).take(messages.size).map(_.data.value).compile.toList + single <- Streams.single[IO, String](client.consumerAsync[String, IO](ConsumerConfig( + subscriptionName = Subscription("fs2_subscription_single"), + topics = Seq(topic), + subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest) + ))).take(messages.size).map(_.data.value).compile.toList + } yield { + batch shouldBe messages + single shouldBe messages + }).unsafeRunSync() + } + + test("able to connect with sink") { + import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._ + + val topic = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString) + val topic2 = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString) + val messages = (0 to 10).map(p => s"TestMessage_$p").toList + + (for { + _ <- publishMessages[IO](client, topic, messages) + + _ <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig( + subscriptionName = Subscription("fs2_subscription_batch"), + topics = Seq(topic), + subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest) + ))) + .take(messages.size) + .map(_.map { message => + ProducerMessage(message.value) + }) + .through(Streams.committableSink(client.producerAsync[String, IO](ProducerConfig(topic2)))) + .compile + .drain + + batch <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig( + subscriptionName = Subscription("fs2_subscription_batch"), + topics = Seq(topic2), + subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest) + ))).take(messages.size).map(_.data.value).compile.toList + } yield { + batch shouldBe messages + }).unsafeRunSync() + } +}