Skip to content

Commit

Permalink
Renamed streams object in fs2
Browse files Browse the repository at this point in the history
  • Loading branch information
sksamuel committed Jun 29, 2021
1 parent db85f88 commit 58c3e2e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ trait CommittableMessage[F[_], X] {
def map[Y](f: X => Y): CommittableMessage[F, Y]
}

object Streams {
object PulsarStreams {
import _root_.fs2.{Pipe, Stream}

private final class DelegateCommittableMessage[F[_] : AsyncHandler, T](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class StreamsTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {

(for {
_ <- publishMessages[IO](client, topic, messages)
read <- Streams.reader[IO, String](client.readerAsync[String, IO](ReaderConfig(
read <- PulsarStreams.reader[IO, String](client.readerAsync[String, IO](ReaderConfig(
topic = topic,
startMessage = Message(MessageId.earliest)
))).take(messages.size).map(_.value).compile.toList
Expand All @@ -51,12 +51,12 @@ class StreamsTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {

(for {
_ <- publishMessages[IO](client, topic, messages)
batch <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
batch <- PulsarStreams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_batch"),
topics = Seq(topic),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
))).take(messages.size).map(_.data.value).compile.toList
single <- Streams.single[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
single <- PulsarStreams.single[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_single"),
topics = Seq(topic),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
Expand All @@ -77,7 +77,7 @@ class StreamsTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {
(for {
_ <- publishMessages[IO](client, topic, messages)

_ <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
_ <- PulsarStreams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_batch"),
topics = Seq(topic),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
Expand All @@ -86,11 +86,11 @@ class StreamsTest extends AnyFunSuite with Matchers with BeforeAndAfterAll {
.map(_.map { message =>
ProducerMessage(message.value)
})
.through(Streams.committableSink(client.producerAsync[String, IO](ProducerConfig(topic2))))
.through(PulsarStreams.committableSink(client.producerAsync[String, IO](ProducerConfig(topic2))))
.compile
.drain

batch <- Streams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
batch <- PulsarStreams.batch[IO, String](client.consumerAsync[String, IO](ConsumerConfig(
subscriptionName = Subscription("fs2_subscription_batch"),
topics = Seq(topic2),
subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest)
Expand Down

0 comments on commit 58c3e2e

Please sign in to comment.