From 287017d737a35ead0e9cf02c606b16c3b70b207a Mon Sep 17 00:00:00 2001 From: adamw Date: Fri, 27 Dec 2024 19:17:17 +0100 Subject: [PATCH] Compression for fs2 --- .../internal/compression/Compressor.scala | 22 ++++----- .../GZIPCompressingInputStream.scala | 2 + .../httpclient/HttpClientBackend.scala | 2 +- .../httpclient/HttpClientFutureBackend.scala | 11 +++-- .../httpclient/HttpClientSyncBackend.scala | 4 +- .../HttpURLConnectionBackend.scala | 2 +- .../httpclient/BodyToHttpClient.scala | 6 +-- .../cats/HttpClientCatsBackend.scala | 2 +- .../httpclient/fs2/HttpClientFs2Backend.scala | 8 ++- .../fs2/compression/fs2Compressor.scala | 49 +++++++++++++++++++ 10 files changed, 82 insertions(+), 26 deletions(-) create mode 100644 effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/compression/fs2Compressor.scala diff --git a/core/src/main/scala/sttp/client4/internal/compression/Compressor.scala b/core/src/main/scala/sttp/client4/internal/compression/Compressor.scala index 6500f9d725..09b27548e0 100644 --- a/core/src/main/scala/sttp/client4/internal/compression/Compressor.scala +++ b/core/src/main/scala/sttp/client4/internal/compression/Compressor.scala @@ -10,15 +10,15 @@ import java.util.zip.DeflaterInputStream import java.util.zip.Deflater import java.io.ByteArrayOutputStream -private[client4] trait Compressor { +private[client4] trait Compressor[R] { def encoding: String - def apply[R](body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] + def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] } -private[client4] object GZipDefaultCompressor extends Compressor { +private[client4] class GZipDefaultCompressor[R] extends Compressor[R] { val encoding: String = Encodings.Gzip - def apply[R](body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] = + def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] = body match { case NoBody => NoBody case StringBody(s, encoding, defaultContentType) => @@ -27,10 +27,10 @@ private[client4] object GZipDefaultCompressor extends Compressor { case ByteBufferBody(b, defaultContentType) => ByteArrayBody(byteArray(byteBufferToArray(b)), defaultContentType) case InputStreamBody(b, defaultContentType) => - InputStreamBody(GZIPCompressingInputStream(b), defaultContentType) + InputStreamBody(new GZIPCompressingInputStream(b), defaultContentType) case StreamBody(b) => streamsNotSupported case FileBody(f, defaultContentType) => - InputStreamBody(GZIPCompressingInputStream(new FileInputStream(f.toFile)), defaultContentType) + InputStreamBody(new GZIPCompressingInputStream(new FileInputStream(f.toFile)), defaultContentType) case MultipartStreamBody(parts) => compressingMultipartBodiesNotSupported case BasicMultipartBody(parts) => compressingMultipartBodiesNotSupported } @@ -44,10 +44,10 @@ private[client4] object GZipDefaultCompressor extends Compressor { } } -private[client4] object DeflateDefaultCompressor extends Compressor { +private[client4] class DeflateDefaultCompressor[R] extends Compressor[R] { val encoding: String = Encodings.Deflate - def apply[R](body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] = + def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] = body match { case NoBody => NoBody case StringBody(s, encoding, defaultContentType) => @@ -56,10 +56,10 @@ private[client4] object DeflateDefaultCompressor extends Compressor { case ByteBufferBody(b, defaultContentType) => ByteArrayBody(byteArray(byteBufferToArray(b)), defaultContentType) case InputStreamBody(b, defaultContentType) => - InputStreamBody(DeflaterInputStream(b), defaultContentType) + InputStreamBody(new DeflaterInputStream(b), defaultContentType) case StreamBody(b) => streamsNotSupported case FileBody(f, defaultContentType) => - InputStreamBody(DeflaterInputStream(new FileInputStream(f.toFile)), defaultContentType) + InputStreamBody(new DeflaterInputStream(new FileInputStream(f.toFile)), defaultContentType) case MultipartStreamBody(parts) => compressingMultipartBodiesNotSupported case BasicMultipartBody(parts) => compressingMultipartBodiesNotSupported } @@ -93,7 +93,7 @@ private[client4] object Compressor { */ def compressIfNeeded[T, R]( request: GenericRequest[T, R], - compressors: List[Compressor] + compressors: List[Compressor[R]] ): (GenericRequestBody[R], Option[Long]) = request.options.compressRequestBody match { case Some(encoding) => diff --git a/core/src/main/scala/sttp/client4/internal/compression/GZIPCompressingInputStream.scala b/core/src/main/scala/sttp/client4/internal/compression/GZIPCompressingInputStream.scala index cbf08b8ac2..086f74fb3d 100644 --- a/core/src/main/scala/sttp/client4/internal/compression/GZIPCompressingInputStream.scala +++ b/core/src/main/scala/sttp/client4/internal/compression/GZIPCompressingInputStream.scala @@ -95,6 +95,8 @@ private[client4] class GZIPCompressingInputStream( bytesRead case DONE => -1 + + case _ => throw new IllegalArgumentException(s"Invalid state: $stage") } private def deflatePendingInput(readBuffer: Array[Byte], readOffset: Int, readLength: Int): Int = { diff --git a/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientBackend.scala b/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientBackend.scala index 9538f99dbe..412a6ee3eb 100644 --- a/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientBackend.scala +++ b/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientBackend.scala @@ -61,7 +61,7 @@ abstract class HttpClientBackend[F[_], S <: Streams[S], P, B]( SttpClientException.defaultExceptionToSttpClientException(request, _) ) - protected def bodyToHttpClient: BodyToHttpClient[F, S] + protected def bodyToHttpClient: BodyToHttpClient[F, S, R] protected def bodyFromHttpClient: BodyFromHttpClient[F, S, B] private[client4] def convertRequest[T](request: GenericRequest[T, R]): F[HttpRequest] = diff --git a/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientFutureBackend.scala b/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientFutureBackend.scala index fcf538c074..25100b5192 100644 --- a/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientFutureBackend.scala +++ b/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientFutureBackend.scala @@ -34,11 +34,12 @@ class HttpClientFutureBackend private ( override val streams: NoStreams = NoStreams - override protected val bodyToHttpClient: BodyToHttpClient[Future, Nothing] = new BodyToHttpClient[Future, Nothing] { - override val streams: NoStreams = NoStreams - override implicit val monad: MonadError[Future] = new FutureMonad - override def streamToPublisher(stream: Nothing): Future[BodyPublisher] = stream // nothing is everything - } + override protected val bodyToHttpClient: BodyToHttpClient[Future, Nothing, R] = + new BodyToHttpClient[Future, Nothing, R] { + override val streams: NoStreams = NoStreams + override implicit val monad: MonadError[Future] = new FutureMonad + override def streamToPublisher(stream: Nothing): Future[BodyPublisher] = stream // nothing is everything + } override protected val bodyFromHttpClient: BodyFromHttpClient[Future, Nothing, InputStream] = new InputStreamBodyFromHttpClient[Future, Nothing] { diff --git a/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientSyncBackend.scala b/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientSyncBackend.scala index 7463f06748..989f97b780 100644 --- a/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientSyncBackend.scala +++ b/core/src/main/scalajvm/sttp/client4/httpclient/HttpClientSyncBackend.scala @@ -82,8 +82,8 @@ class HttpClientSyncBackend private ( responseCell.take().fold(throw _, f => f()) } - override protected val bodyToHttpClient: BodyToHttpClient[Identity, Nothing] = - new BodyToHttpClient[Identity, Nothing] { + override protected val bodyToHttpClient: BodyToHttpClient[Identity, Nothing, R] = + new BodyToHttpClient[Identity, Nothing, R] { override val streams: NoStreams = NoStreams override implicit val monad: MonadError[Identity] = IdentityMonad override def streamToPublisher(stream: Nothing): Identity[BodyPublisher] = stream // nothing is everything diff --git a/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala b/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala index 7b2bdc333d..8d1ea9e7b2 100644 --- a/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala +++ b/core/src/main/scalajvm/sttp/client4/httpurlconnection/HttpURLConnectionBackend.scala @@ -49,7 +49,7 @@ class HttpURLConnectionBackend private ( ) extends SyncBackend { type R = Any with Effect[Identity] - private val compressors: List[Compressor] = List(GZipDefaultCompressor, DeflateDefaultCompressor) + private val compressors: List[Compressor[R]] = List(new GZipDefaultCompressor(), new DeflateDefaultCompressor()) override def send[T](r: GenericRequest[T, R]): Response[T] = adjustExceptions(r) { diff --git a/core/src/main/scalajvm/sttp/client4/internal/httpclient/BodyToHttpClient.scala b/core/src/main/scalajvm/sttp/client4/internal/httpclient/BodyToHttpClient.scala index 301a1649dc..57f18d8eef 100644 --- a/core/src/main/scalajvm/sttp/client4/internal/httpclient/BodyToHttpClient.scala +++ b/core/src/main/scalajvm/sttp/client4/internal/httpclient/BodyToHttpClient.scala @@ -17,12 +17,12 @@ import java.util.concurrent.Flow import java.util.function.Supplier import scala.collection.JavaConverters._ -private[client4] trait BodyToHttpClient[F[_], S] { +private[client4] trait BodyToHttpClient[F[_], S, R] { val streams: Streams[S] implicit def monad: MonadError[F] def apply[T]( - request: GenericRequest[T, _], + request: GenericRequest[T, R], builder: HttpRequest.Builder, contentType: Option[String] ): F[BodyPublisher] = { @@ -52,7 +52,7 @@ private[client4] trait BodyToHttpClient[F[_], S] { def streamToPublisher(stream: streams.BinaryStream): F[BodyPublisher] - def compressors: List[Compressor] = List(GZipDefaultCompressor, DeflateDefaultCompressor) + def compressors: List[Compressor[R]] = List(new GZipDefaultCompressor(), new DeflateDefaultCompressor()) private def multipartBody[T](parts: Seq[Part[GenericRequestBody[_]]]) = { val multipartBuilder = new MultiPartBodyPublisher() diff --git a/effects/cats/src/main/scalajvm/sttp/client4/httpclient/cats/HttpClientCatsBackend.scala b/effects/cats/src/main/scalajvm/sttp/client4/httpclient/cats/HttpClientCatsBackend.scala index 77e7fadaaa..605bdccfd4 100644 --- a/effects/cats/src/main/scalajvm/sttp/client4/httpclient/cats/HttpClientCatsBackend.scala +++ b/effects/cats/src/main/scalajvm/sttp/client4/httpclient/cats/HttpClientCatsBackend.scala @@ -40,7 +40,7 @@ class HttpClientCatsBackend[F[_]: Async] private ( override protected def createSequencer: F[Sequencer[F]] = CatsSequencer.create - override protected val bodyToHttpClient: BodyToHttpClient[F, Nothing] = new BodyToHttpClient[F, Nothing] { + override protected val bodyToHttpClient: BodyToHttpClient[F, Nothing, R] = new BodyToHttpClient[F, Nothing, Ra] { override val streams: NoStreams = NoStreams override implicit val monad: MonadError[F] = self.monad diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala index 38d967a350..546511c626 100644 --- a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala +++ b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/HttpClientFs2Backend.scala @@ -28,6 +28,8 @@ import java.net.http.HttpResponse.BodyHandlers import java.util.concurrent.Flow.Publisher import java.{util => ju} import scala.collection.JavaConverters._ +import sttp.client4.internal.compression.Compressor +import sttp.client4.httpclient.fs2.compression.{DeflateFs2Compressor, GZipFs2Compressor} class HttpClientFs2Backend[F[_]: Async] private ( client: HttpClient, @@ -46,10 +48,12 @@ class HttpClientFs2Backend[F[_]: Async] private ( override val streams: Fs2Streams[F] = Fs2Streams[F] - override protected val bodyToHttpClient: BodyToHttpClient[F, Fs2Streams[F]] = - new BodyToHttpClient[F, Fs2Streams[F]] { + override protected val bodyToHttpClient: BodyToHttpClient[F, Fs2Streams[F], R] = + new BodyToHttpClient[F, Fs2Streams[F], R] { override val streams: Fs2Streams[F] = Fs2Streams[F] override implicit def monad: MonadError[F] = self.monad + override def compressors: List[Compressor[R]] = + List(new GZipFs2Compressor[F, R](), new DeflateFs2Compressor[F, R]()) override def streamToPublisher(stream: Stream[F, Byte]): F[HttpRequest.BodyPublisher] = monad.eval( BodyPublishers.fromPublisher( diff --git a/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/compression/fs2Compressor.scala b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/compression/fs2Compressor.scala new file mode 100644 index 0000000000..0db3458fac --- /dev/null +++ b/effects/fs2/src/main/scalajvm/sttp/client4/httpclient/fs2/compression/fs2Compressor.scala @@ -0,0 +1,49 @@ +package sttp.client4.httpclient.fs2.compression + +import sttp.client4._ +import sttp.client4.internal.compression._ +import sttp.client4.GenericRequestBody +import fs2._ +import fs2.compression.Compression +import cats.syntax.all._ +import fs2.io.file.Files +import cats.effect.Sync +import sttp.capabilities.fs2.Fs2Streams +import fs2.compression.DeflateParams + +trait Fs2Compressor[F[_], R <: Fs2Streams[F]] extends Compressor[R] { + protected val fSync: Sync[F] + protected val fFiles: Files[F] + + override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] = + body match { + case InputStreamBody(b, defaultContentType) => + StreamBody(Fs2Streams[F])(compressStream(fs2.io.readInputStream(b.pure[F](fSync), 1024)(fSync))) + case StreamBody(b) => StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[fs2.Stream[F, Byte]])) + case FileBody(f, defaultContentType) => + StreamBody(Fs2Streams[F])(compressStream(Files[F](fFiles).readAll(f.toPath, 1024))) + case _ => super.apply(body, encoding) + } + + def compressStream(stream: fs2.Stream[F, Byte]): fs2.Stream[F, Byte] +} + +class GZipFs2Compressor[F[_]: Compression: Sync: Files, R <: Fs2Streams[F]] + extends GZipDefaultCompressor[R] + with Fs2Compressor[F, R] { + + override protected val fSync: Sync[F] = implicitly + override protected val fFiles: Files[F] = implicitly + + def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] = stream.through(fs2.compression.Compression[F].gzip()) +} + +class DeflateFs2Compressor[F[_]: Compression: Sync: Files, R <: Fs2Streams[F]] + extends DeflateDefaultCompressor[R] + with Fs2Compressor[F, R] { + override protected val fSync: Sync[F] = implicitly + override protected val fFiles: Files[F] = implicitly + + def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] = + stream.through(fs2.compression.Compression[F].deflate(DeflateParams())) +}