Replies: 2 comments 2 replies
-
Or it could be named |
Beta Was this translation helpful? Give feedback.
0 replies
-
something like this? implicit class StreamOps[F[_], O](private val s: Stream[F, O]) extends AnyVal {
def groupWeighedWithin(timeout: FiniteDuration, limit: Long, weight: O => Long)(implicit
F: Temporal[F]
): Stream[F, Chunk[O]] =
s.pull
.timed { timedPull =>
def go(timedPull: Pull.Timed[F, O], buffer: Chunk[O], bufferSize: Long): Pull[F, O, Unit] =
timedPull.timeout(timeout) >>
timedPull.uncons.flatMap {
case Some((Right(elems), next)) =>
val size = elems.map(weight).sumAll
val nextSize = bufferSize + size
if (nextSize >= limit)
Pull.output(buffer) >> go(next, elems, size)
else
go(next, buffer ++ elems, nextSize)
case Some((Left(_), next)) =>
Pull.output(buffer) >> go(next, Chunk.empty, 0)
case None =>
Pull.output(buffer) >> Pull.done
}
go(timedPull, Chunk.empty, 0)
}
.stream
.chunks
} |
Beta Was this translation helpful? Give feedback.
2 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
This is a question about an idea: introducing
Stream.groupWeighted
. It would output Chunks limited by the weight of the elements. Weight would be provided by a weight function.Similar to Akka's
groupedWeightedWithin
, but without the time config.Or even a
Stream.groupWeightedWithin
method, similar to the already existingStream.groupWithin
.What do you think?
Beta Was this translation helpful? Give feedback.
All reactions