Skip to content

Commit

Permalink
Initial functional streams support (#284)
Browse files Browse the repository at this point in the history
* feat: add fs2 support [reader, subscription, producer, committable producer]

(cherry picked from commit 45a0ddc)

* fix: avoid unsubscription by default after end of stream.

Co-authored-by: Sam Sam <[email protected]>
  • Loading branch information
tonyvelichko and sksamuel authored Jun 29, 2021
1 parent 03c1540 commit db85f88
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 1 deletion.
38 changes: 37 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,39 @@ Thread.sleep(10000)
control.close()
```

## FS2 Support

Pulsar4s integrates with the [fs2](https://fs2.io/) library - it provides both a source and a sink.
To use this, you need to add a dependency on the `pulsar4s-{effect}` + `pulsar4s-fs2` module.

### Example

```scala
import com.sksamuel.pulsar4s._
import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._
import com.sksamuel.pulsar4s.fs2.Streams

import org.apache.pulsar.client.api.Schema

implicit val schema: Schema[Array[Byte]] = Schema.BYTES

val client = PulsarClient("pulsar://localhost:6650")

val intopic = Topic("persistent://sample/standalone/ns1/in")
val outtopic = Topic("persistent://sample/standalone/ns1/out")

Streams.batch[IO, Array[Byte]](client.consumerAsync[Array[Byte], IO](ConsumerConfig(
subscriptionName = Subscription("mysub"),
topics = Seq(intopic),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
)))
.map(_.map(ProducerMessage(_.value)))
.through(Streams.committableSink(client.producerAsync[Array[Byte], IO](ProducerConfig(outtopic))))
.compile
.drain

```

## Example SBT Setup

```scala
Expand Down Expand Up @@ -289,7 +322,10 @@ libraryDependencies ++= Seq(

// if you want to use cats effects
"com.sksamuel.pulsar4s" %% "pulsar4s-cats-effect" % pulsar4sVersion,


// if you want to use fs2
"com.sksamuel.pulsar4s" %% "pulsar4s-fs2" % pulsar4sVersion,

// if you want to use zio
"com.sksamuel.pulsar4s" %% "pulsar4s-zio" % pulsar4sVersion
)
Expand Down
12 changes: 12 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ val MonixVersion = "3.1.0"
val PlayJsonVersion = "2.7.4" // compatible with 2.7.x and 2.8.x
val PulsarVersion = "2.7.2"
val ReactiveStreamsVersion = "1.0.2"
val FunctionalStreamsVersion = "2.5.2"
val Json4sVersion = "3.6.11"
val Avro4sVersion = "3.1.0"
val ScalaVersion = "2.13.5"
Expand Down Expand Up @@ -172,6 +173,17 @@ lazy val cats_effect = Project("pulsar4s-cats-effect", file("pulsar4s-cats-effec
"dev.zio" %% "zio-interop-cats" % ZIOInteropCatsVersion % Test
))

lazy val fs2 = Project("pulsar4s-fs2", file("pulsar4s-fs2"))
.dependsOn(core)
.dependsOn(cats_effect)
.settings(name := "pulsar4s-fs2")
.settings(allSettings)
.settings(libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % CatsEffectVersion,
"co.fs2" %% "fs2-core" % FunctionalStreamsVersion,
"co.fs2" %% "fs2-reactive-streams" % FunctionalStreamsVersion
))

lazy val scalaz = Project("pulsar4s-scalaz", file("pulsar4s-scalaz"))
.dependsOn(core)
.settings(name := "pulsar4s-scalaz")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.sksamuel.pulsar4s.fs2

import cats.Applicative
import cats.effect.{Bracket, BracketThrow, ExitCase, Resource}
import cats.implicits._
import com.sksamuel.pulsar4s._

trait CommittableMessage[F[_], X] {
def ack: F[Unit]
def nack: F[Unit]
def data: X

def map[Y](f: X => Y): CommittableMessage[F, Y]
}

object Streams {
import _root_.fs2.{Pipe, Stream}

private final class DelegateCommittableMessage[F[_] : AsyncHandler, T](
message: ConsumerCommittableMessage[F, _],
payload: T
) extends CommittableMessage[F, T] {
override def ack: F[Unit] = message.ack
override def nack: F[Unit] = message.nack
override def data: T = payload
override def map[Y](f: T => Y): CommittableMessage[F, Y] = new DelegateCommittableMessage(message, f(payload))
}

private final case class ConsumerCommittableMessage[F[_] : AsyncHandler, T](
message: ConsumerMessage[T],
consumer: Consumer[T]
) extends CommittableMessage[F, ConsumerMessage[T]] {
override def ack: F[Unit] = consumer.acknowledgeAsync(message.messageId)
override def nack: F[Unit] = consumer.negativeAcknowledgeAsync(message.messageId)
override def data: ConsumerMessage[T] = message

override def map[Y](f: ConsumerMessage[T] => Y): CommittableMessage[F, Y] =
new DelegateCommittableMessage(this, f(message))
}

def batch[F[_] : Applicative : AsyncHandler, T](
consumer: F[Consumer[T]]
): Stream[F, CommittableMessage[F, ConsumerMessage[T]]] =
Stream.resource(Resource.make(consumer)(_.closeAsync))
.flatMap { consumer =>
Stream
.repeatEval(consumer.receiveBatchAsync[F])
.flatMap(Stream.emits(_))
.mapChunks(_.map(message => ConsumerCommittableMessage(message, consumer)))
}

def single[F[_] : Applicative : AsyncHandler, T](
consumer: F[Consumer[T]]
): Stream[F, CommittableMessage[F, ConsumerMessage[T]]] =
Stream.resource(Resource.make(consumer)(_.closeAsync))
.flatMap { consumer =>
Stream
.repeatEval(consumer.receiveAsync[F])
.mapChunks(_.map(message => ConsumerCommittableMessage(message, consumer)))
}

def reader[F[_] : Applicative : AsyncHandler, T](
reader: F[Reader[T]]
): Stream[F, ConsumerMessage[T]] =
Stream.resource(Resource.make(reader)(_.closeAsync))
.flatMap { reader =>
Stream
.repeatEval(reader.nextAsync[F])
}

def sink[F[_] : Applicative : AsyncHandler, T](
producer: F[Producer[T]]
): Pipe[F, ProducerMessage[T], MessageId] = messages =>
Stream.resource(Resource.make(producer)(_.closeAsync))
.flatMap { producer =>
messages.evalMap(producer.sendAsync(_))
}

def committableSink[F[_] : Applicative : BracketThrow : AsyncHandler , T](
producer: F[Producer[T]]
): Pipe[F, CommittableMessage[F, ProducerMessage[T]], MessageId] = messages =>
Stream.resource(Resource.make(producer)(_.closeAsync))
.flatMap { producer =>
messages.evalMap { message =>
Bracket[F, Throwable].guaranteeCase(producer.sendAsync(message.data)) {
case ExitCase.Completed => message.ack
case _ => message.nack
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.sksamuel.pulsar4s.fs2

import cats.effect.{IO, Sync}
import cats.implicits._
import com.sksamuel.pulsar4s.{AsyncHandler, ConsumerConfig, Message, MessageId, ProducerConfig, ProducerMessage, PulsarAsyncClient, PulsarClient, ReaderConfig, Subscription, Topic}
import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import java.util.UUID

class StreamsTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {

implicit val schema: Schema[String] = Schema.STRING

private val client = PulsarClient("pulsar://localhost:6650")

override def afterAll(): Unit = {
client.close()
}

private def publishMessages[F[_]: Sync: AsyncHandler](client: PulsarAsyncClient, t: Topic, messages: List[String]): F[Unit] =
for {
producer <- client.producerAsync[String, F](ProducerConfig(
topic = t
))
_ <- messages
.map(producer.sendAsync(_))
.sequence
} yield ()

test("able to read from topic") {
import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._
val topic = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString)
val messages = (0 to 10).map(p => s"TestMessage_$p").toList

(for {
_ <- publishMessages[IO](client, topic, messages)
read <- Streams.reader[IO, String](client.readerAsync[String, IO](ReaderConfig(
topic = topic,
startMessage = Message(MessageId.earliest)
))).take(messages.size).map(_.value).compile.toList
} yield read shouldBe messages).unsafeRunSync()
}

test("able to read from subscription [batch]") {
import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._
val topic = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString)
val messages = (0 to 10).map(p => s"TestMessage_$p").toList

(for {
_ <- publishMessages[IO](client, topic, messages)
batch <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_batch"),
topics = Seq(topic),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
))).take(messages.size).map(_.data.value).compile.toList
single <- Streams.single[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_single"),
topics = Seq(topic),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
))).take(messages.size).map(_.data.value).compile.toList
} yield {
batch shouldBe messages
single shouldBe messages
}).unsafeRunSync()
}

test("able to connect with sink") {
import com.sksamuel.pulsar4s.cats.CatsAsyncHandler._

val topic = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString)
val topic2 = Topic("persistent://sample/standalone/ns1/" + UUID.randomUUID().toString)
val messages = (0 to 10).map(p => s"TestMessage_$p").toList

(for {
_ <- publishMessages[IO](client, topic, messages)

_ <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_batch"),
topics = Seq(topic),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
)))
.take(messages.size)
.map(_.map { message =>
ProducerMessage(message.value)
})
.through(Streams.committableSink(client.producerAsync[String, IO](ProducerConfig(topic2))))
.compile
.drain

batch <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_batch"),
topics = Seq(topic2),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
))).take(messages.size).map(_.data.value).compile.toList
} yield {
batch shouldBe messages
}).unsafeRunSync()
}
}

0 comments on commit db85f88

Please sign in to comment.