Replies: 2 comments
-
val dbStream = db.getEvents
val consumerRecords = consumer.getRecords
val n = 3 // number of events expected on the database
val m = 2 // number of events expected on the topic
for {
_ <- produceEvents
a <- dbStream.take(n).compile.lastOrError.delayBy(2.seconds) // wait for events to reach db
b <- consumerRecords.take(m).compile.lastOrError
} yield {
assert(a, ...)
assert(b, ...)
} The original test also had some interrupt logic to stop a bunch of streams based on the completion status of the deferred, but as suggested by @SystemFw there's no need to use a
which are very specific use cases (tests use cases in particular), so definitely not worth adding a combinator to the library :) Nonetheless, It was a good exercise. |
Beta Was this translation helpful? Give feedback.
0 replies
-
Turns out the private def runStream[Z, O](zero: Z, run: (Z, A) => (Z, Option[O])): F[O] =
(stream
.mapAccumulate(zero)(run)
.map { case (_, maybeO) => maybeO } ++ Stream.sleep_(interval)).repeat.unNone.head.compile.lastOrError |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Quick look TL;DR
A simple api to await the n-th element of a stream. (can also await the first n elements. See code at the end)
The combinator will return the n-th element or the first n soon as they are available.
Context
While working on an integration test I noticed the test logic was a bit heavy so I decided to declutter it
Use case
IT test that publishes events through a kafka pipeline and stores events to a Postgres db.
A test consumer is subscribed to one of the topics so that we can inspect the behaviour of the pipeline and make
assertions
at the end of the testIssues
.delayBy(xyz.seconds)
to make sure the events have enough time to flow through the pipeline before ending up to the databaseAt a very high level the old test looks like this. We publish events to a topic, we read from 2 streams (finite or infinite) and we complete 2 deferred to make assertions on certain events. There are more components in the original test, removed for simplicity
The new version using the
awaitNth[N]
combinatorAdvantages
Deferred
) to focus on the business logic being testedStream.take(n)
is quite permissive in the sense that it will accept a number greater than the number of events that the stream can produce. If you set the wrong expectation, the test might still pass: not ideal in the context of a test. TheawaitNth
combinator will wait for the exact number of elements to be produced.Notes
Critiques
The code
Beta Was this translation helpful? Give feedback.
All reactions