diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala index 3f1d572c25..271d3e33de 100644 --- a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala +++ b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala @@ -29,7 +29,6 @@ import java.util.concurrent.Flow.Publisher import java.{util => ju} import scala.collection.JavaConverters._ import sttp.client4.compression.Compressor -import sttp.client4.httpclient.fs2.compression.{DeflateFs2Compressor, GZipFs2Compressor} class HttpClientFs2Backend[F[_]: Async] private ( client: HttpClient, diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/compression/fs2Compressor.scala b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/fs2Compressor.scala similarity index 80% rename from effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/compression/fs2Compressor.scala rename to effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/fs2Compressor.scala index df65c17fda..94eefb4b61 100644 --- a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/compression/fs2Compressor.scala +++ b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/fs2Compressor.scala @@ -1,4 +1,4 @@ -package sttp.client4.httpclient.fs2.compression +package sttp.client4.httpclient.fs2 import sttp.client4._ import sttp.client4.GenericRequestBody @@ -17,12 +17,11 @@ trait Fs2Compressor[F[_], R <: Fs2Streams[F]] extends Compressor[R] { override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] = body match { - case InputStreamBody(b, defaultContentType) => + case InputStreamBody(b, _) => StreamBody(Fs2Streams[F])(compressStream(fs2.io.readInputStream(b.pure[F](fSync), 1024)(fSync))) - case StreamBody(b) => StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[fs2.Stream[F, Byte]])) - case FileBody(f, defaultContentType) => - StreamBody(Fs2Streams[F])(compressStream(Files[F](fFiles).readAll(f.toPath, 1024))) - case _ => super.apply(body, encoding) + case StreamBody(b) => StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[fs2.Stream[F, Byte]])) + case FileBody(f, _) => StreamBody(Fs2Streams[F])(compressStream(Files[F](fFiles).readAll(f.toPath, 1024))) + case _ => super.apply(body, encoding) } def compressStream(stream: fs2.Stream[F, Byte]): fs2.Stream[F, Byte] diff --git a/effects/zio/src/main/scalajvm/sttp/client4/httpclient/zio/HttpClientZioBackend.scala b/effects/zio/src/main/scalajvm/sttp/client4/httpclient/zio/HttpClientZioBackend.scala index 2bcc37f856..29c2eb558f 100644 --- a/effects/zio/src/main/scalajvm/sttp/client4/httpclient/zio/HttpClientZioBackend.scala +++ b/effects/zio/src/main/scalajvm/sttp/client4/httpclient/zio/HttpClientZioBackend.scala @@ -10,7 +10,6 @@ import sttp.client4.internal._ import sttp.client4.internal.httpclient.{BodyFromHttpClient, BodyToHttpClient, Sequencer} import sttp.client4.internal.ws.SimpleQueue import sttp.client4.testing.WebSocketStreamBackendStub -import sttp.client4.wrappers.FollowRedirectsBackend import sttp.client4.{wrappers, BackendOptions, GenericRequest, Response, WebSocketStreamBackend} import sttp.monad.MonadError import zio.Chunk.ByteArray @@ -25,6 +24,7 @@ import java.nio.ByteBuffer import java.util import java.util.concurrent.Flow.Publisher import java.{util => ju} +import sttp.client4.compression.Compressor class HttpClientZioBackend private ( client: HttpClient, @@ -58,10 +58,11 @@ class HttpClientZioBackend private ( ByteArray(a, 0, a.length) } - override protected val bodyToHttpClient: BodyToHttpClient[Task, ZioStreams] = - new BodyToHttpClient[Task, ZioStreams] { + override protected val bodyToHttpClient: BodyToHttpClient[Task, ZioStreams, R] = + new BodyToHttpClient[Task, ZioStreams, R] { override val streams: ZioStreams = ZioStreams override implicit def monad: MonadError[Task] = self.monad + override def compressors: List[Compressor[R]] = List(new GZipZioCompressor[R](), new DeflateZioCompressor[R]()) override def streamToPublisher(stream: ZStream[Any, Throwable, Byte]): Task[BodyPublisher] = { import _root_.zio.interop.reactivestreams.{streamToPublisher => zioStreamToPublisher} val publisher = stream.mapChunks(byteChunk => Chunk(ByteBuffer.wrap(byteChunk.toArray))).toPublisher @@ -88,7 +89,7 @@ class HttpClientZioBackend private ( override protected def standardEncoding: (ZStream[Any, Throwable, Byte], String) => ZStream[Any, Throwable, Byte] = { case (body, "gzip") => body.via(ZPipeline.gunzip()) case (body, "deflate") => - ZStream.scoped(body.peel(ZSink.take[Byte](1))).flatMap { case (chunk, stream) => + ZStream.scoped[Any](body.peel(ZSink.take[Byte](1))).flatMap { case (chunk, stream) => val wrapped = chunk.headOption.exists(byte => (byte & 0x0f) == 0x08) (ZStream.fromChunk(chunk) ++ stream).via(ZPipeline.inflate(noWrap = !wrapped)) } diff --git a/effects/zio/src/main/scalajvm/sttp/client4/httpclient/zio/zioCompressor.scala b/effects/zio/src/main/scalajvm/sttp/client4/httpclient/zio/zioCompressor.scala new file mode 100644 index 0000000000..780035cd55 --- /dev/null +++ b/effects/zio/src/main/scalajvm/sttp/client4/httpclient/zio/zioCompressor.scala @@ -0,0 +1,31 @@ +package sttp.client4.httpclient.zio + +import sttp.client4._ +import sttp.client4.compression.Compressor +import sttp.capabilities.zio.ZioStreams + +import zio.stream.Stream +import sttp.client4.compression.GZipDefaultCompressor +import sttp.client4.compression.DeflateDefaultCompressor +import zio.stream.ZPipeline +import zio.stream.ZStream + +trait ZioCompressor[R <: ZioStreams] extends Compressor[R] { + override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] = + body match { + case InputStreamBody(b, _) => StreamBody(ZioStreams)(compressStream(ZStream.fromInputStream(b))) + case StreamBody(b) => StreamBody(ZioStreams)(compressStream(b.asInstanceOf[Stream[Throwable, Byte]])) + case FileBody(f, _) => StreamBody(ZioStreams)(compressStream(ZStream.fromFile(f.toFile))) + case _ => super.apply(body, encoding) + } + + def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte] +} + +class GZipZioCompressor[R <: ZioStreams] extends GZipDefaultCompressor[R] with ZioCompressor[R] { + def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte] = stream.via(ZPipeline.gzip()) +} + +class DeflateZioCompressor[R <: ZioStreams] extends DeflateDefaultCompressor[R] with ZioCompressor[R] { + def compressStream(stream: Stream[Throwable, Byte]): Stream[Throwable, Byte] = stream.via(ZPipeline.deflate()) +}