Skip to content

Commit

Permalink
Compression for fs2
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Dec 27, 2024
1 parent 7f839fc commit 287017d
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import java.util.zip.DeflaterInputStream
import java.util.zip.Deflater
import java.io.ByteArrayOutputStream

private[client4] trait Compressor {
private[client4] trait Compressor[R] {
def encoding: String
def apply[R](body: GenericRequestBody[R], encoding: String): GenericRequestBody[R]
def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R]
}

private[client4] object GZipDefaultCompressor extends Compressor {
private[client4] class GZipDefaultCompressor[R] extends Compressor[R] {
val encoding: String = Encodings.Gzip

def apply[R](body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
body match {
case NoBody => NoBody
case StringBody(s, encoding, defaultContentType) =>
Expand All @@ -27,10 +27,10 @@ private[client4] object GZipDefaultCompressor extends Compressor {
case ByteBufferBody(b, defaultContentType) =>
ByteArrayBody(byteArray(byteBufferToArray(b)), defaultContentType)
case InputStreamBody(b, defaultContentType) =>
InputStreamBody(GZIPCompressingInputStream(b), defaultContentType)
InputStreamBody(new GZIPCompressingInputStream(b), defaultContentType)
case StreamBody(b) => streamsNotSupported
case FileBody(f, defaultContentType) =>
InputStreamBody(GZIPCompressingInputStream(new FileInputStream(f.toFile)), defaultContentType)
InputStreamBody(new GZIPCompressingInputStream(new FileInputStream(f.toFile)), defaultContentType)
case MultipartStreamBody(parts) => compressingMultipartBodiesNotSupported
case BasicMultipartBody(parts) => compressingMultipartBodiesNotSupported
}
Expand All @@ -44,10 +44,10 @@ private[client4] object GZipDefaultCompressor extends Compressor {
}
}

private[client4] object DeflateDefaultCompressor extends Compressor {
private[client4] class DeflateDefaultCompressor[R] extends Compressor[R] {
val encoding: String = Encodings.Deflate

def apply[R](body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
body match {
case NoBody => NoBody
case StringBody(s, encoding, defaultContentType) =>
Expand All @@ -56,10 +56,10 @@ private[client4] object DeflateDefaultCompressor extends Compressor {
case ByteBufferBody(b, defaultContentType) =>
ByteArrayBody(byteArray(byteBufferToArray(b)), defaultContentType)
case InputStreamBody(b, defaultContentType) =>
InputStreamBody(DeflaterInputStream(b), defaultContentType)
InputStreamBody(new DeflaterInputStream(b), defaultContentType)
case StreamBody(b) => streamsNotSupported
case FileBody(f, defaultContentType) =>
InputStreamBody(DeflaterInputStream(new FileInputStream(f.toFile)), defaultContentType)
InputStreamBody(new DeflaterInputStream(new FileInputStream(f.toFile)), defaultContentType)
case MultipartStreamBody(parts) => compressingMultipartBodiesNotSupported
case BasicMultipartBody(parts) => compressingMultipartBodiesNotSupported
}
Expand Down Expand Up @@ -93,7 +93,7 @@ private[client4] object Compressor {
*/
def compressIfNeeded[T, R](
request: GenericRequest[T, R],
compressors: List[Compressor]
compressors: List[Compressor[R]]
): (GenericRequestBody[R], Option[Long]) =
request.options.compressRequestBody match {
case Some(encoding) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ private[client4] class GZIPCompressingInputStream(
bytesRead

case DONE => -1

case _ => throw new IllegalArgumentException(s"Invalid state: $stage")
}

private def deflatePendingInput(readBuffer: Array[Byte], readOffset: Int, readLength: Int): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ abstract class HttpClientBackend[F[_], S <: Streams[S], P, B](
SttpClientException.defaultExceptionToSttpClientException(request, _)
)

protected def bodyToHttpClient: BodyToHttpClient[F, S]
protected def bodyToHttpClient: BodyToHttpClient[F, S, R]
protected def bodyFromHttpClient: BodyFromHttpClient[F, S, B]

private[client4] def convertRequest[T](request: GenericRequest[T, R]): F[HttpRequest] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ class HttpClientFutureBackend private (

override val streams: NoStreams = NoStreams

override protected val bodyToHttpClient: BodyToHttpClient[Future, Nothing] = new BodyToHttpClient[Future, Nothing] {
override val streams: NoStreams = NoStreams
override implicit val monad: MonadError[Future] = new FutureMonad
override def streamToPublisher(stream: Nothing): Future[BodyPublisher] = stream // nothing is everything
}
override protected val bodyToHttpClient: BodyToHttpClient[Future, Nothing, R] =
new BodyToHttpClient[Future, Nothing, R] {
override val streams: NoStreams = NoStreams
override implicit val monad: MonadError[Future] = new FutureMonad
override def streamToPublisher(stream: Nothing): Future[BodyPublisher] = stream // nothing is everything
}

override protected val bodyFromHttpClient: BodyFromHttpClient[Future, Nothing, InputStream] =
new InputStreamBodyFromHttpClient[Future, Nothing] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ class HttpClientSyncBackend private (
responseCell.take().fold(throw _, f => f())
}

override protected val bodyToHttpClient: BodyToHttpClient[Identity, Nothing] =
new BodyToHttpClient[Identity, Nothing] {
override protected val bodyToHttpClient: BodyToHttpClient[Identity, Nothing, R] =
new BodyToHttpClient[Identity, Nothing, R] {
override val streams: NoStreams = NoStreams
override implicit val monad: MonadError[Identity] = IdentityMonad
override def streamToPublisher(stream: Nothing): Identity[BodyPublisher] = stream // nothing is everything
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class HttpURLConnectionBackend private (
) extends SyncBackend {
type R = Any with Effect[Identity]

private val compressors: List[Compressor] = List(GZipDefaultCompressor, DeflateDefaultCompressor)
private val compressors: List[Compressor[R]] = List(new GZipDefaultCompressor(), new DeflateDefaultCompressor())

override def send[T](r: GenericRequest[T, R]): Response[T] =
adjustExceptions(r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ import java.util.concurrent.Flow
import java.util.function.Supplier
import scala.collection.JavaConverters._

private[client4] trait BodyToHttpClient[F[_], S] {
private[client4] trait BodyToHttpClient[F[_], S, R] {
val streams: Streams[S]
implicit def monad: MonadError[F]

def apply[T](
request: GenericRequest[T, _],
request: GenericRequest[T, R],
builder: HttpRequest.Builder,
contentType: Option[String]
): F[BodyPublisher] = {
Expand Down Expand Up @@ -52,7 +52,7 @@ private[client4] trait BodyToHttpClient[F[_], S] {

def streamToPublisher(stream: streams.BinaryStream): F[BodyPublisher]

def compressors: List[Compressor] = List(GZipDefaultCompressor, DeflateDefaultCompressor)
def compressors: List[Compressor[R]] = List(new GZipDefaultCompressor(), new DeflateDefaultCompressor())

private def multipartBody[T](parts: Seq[Part[GenericRequestBody[_]]]) = {
val multipartBuilder = new MultiPartBodyPublisher()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class HttpClientCatsBackend[F[_]: Async] private (

override protected def createSequencer: F[Sequencer[F]] = CatsSequencer.create

override protected val bodyToHttpClient: BodyToHttpClient[F, Nothing] = new BodyToHttpClient[F, Nothing] {
override protected val bodyToHttpClient: BodyToHttpClient[F, Nothing, R] = new BodyToHttpClient[F, Nothing, Ra] {
override val streams: NoStreams = NoStreams
override implicit val monad: MonadError[F] = self.monad

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import java.net.http.HttpResponse.BodyHandlers
import java.util.concurrent.Flow.Publisher
import java.{util => ju}
import scala.collection.JavaConverters._
import sttp.client4.internal.compression.Compressor
import sttp.client4.httpclient.fs2.compression.{DeflateFs2Compressor, GZipFs2Compressor}

class HttpClientFs2Backend[F[_]: Async] private (
client: HttpClient,
Expand All @@ -46,10 +48,12 @@ class HttpClientFs2Backend[F[_]: Async] private (

override val streams: Fs2Streams[F] = Fs2Streams[F]

override protected val bodyToHttpClient: BodyToHttpClient[F, Fs2Streams[F]] =
new BodyToHttpClient[F, Fs2Streams[F]] {
override protected val bodyToHttpClient: BodyToHttpClient[F, Fs2Streams[F], R] =
new BodyToHttpClient[F, Fs2Streams[F], R] {
override val streams: Fs2Streams[F] = Fs2Streams[F]
override implicit def monad: MonadError[F] = self.monad
override def compressors: List[Compressor[R]] =
List(new GZipFs2Compressor[F, R](), new DeflateFs2Compressor[F, R]())
override def streamToPublisher(stream: Stream[F, Byte]): F[HttpRequest.BodyPublisher] =
monad.eval(
BodyPublishers.fromPublisher(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package sttp.client4.httpclient.fs2.compression

import sttp.client4._
import sttp.client4.internal.compression._
import sttp.client4.GenericRequestBody
import fs2._
import fs2.compression.Compression
import cats.syntax.all._
import fs2.io.file.Files
import cats.effect.Sync
import sttp.capabilities.fs2.Fs2Streams
import fs2.compression.DeflateParams

trait Fs2Compressor[F[_], R <: Fs2Streams[F]] extends Compressor[R] {
protected val fSync: Sync[F]
protected val fFiles: Files[F]

override abstract def apply(body: GenericRequestBody[R], encoding: String): GenericRequestBody[R] =
body match {
case InputStreamBody(b, defaultContentType) =>
StreamBody(Fs2Streams[F])(compressStream(fs2.io.readInputStream(b.pure[F](fSync), 1024)(fSync)))
case StreamBody(b) => StreamBody(Fs2Streams[F])(compressStream(b.asInstanceOf[fs2.Stream[F, Byte]]))
case FileBody(f, defaultContentType) =>
StreamBody(Fs2Streams[F])(compressStream(Files[F](fFiles).readAll(f.toPath, 1024)))
case _ => super.apply(body, encoding)
}

def compressStream(stream: fs2.Stream[F, Byte]): fs2.Stream[F, Byte]
}

class GZipFs2Compressor[F[_]: Compression: Sync: Files, R <: Fs2Streams[F]]
extends GZipDefaultCompressor[R]
with Fs2Compressor[F, R] {

override protected val fSync: Sync[F] = implicitly
override protected val fFiles: Files[F] = implicitly

def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] = stream.through(fs2.compression.Compression[F].gzip())
}

class DeflateFs2Compressor[F[_]: Compression: Sync: Files, R <: Fs2Streams[F]]
extends DeflateDefaultCompressor[R]
with Fs2Compressor[F, R] {
override protected val fSync: Sync[F] = implicitly
override protected val fFiles: Files[F] = implicitly

def compressStream(stream: Stream[F, Byte]): Stream[F, Byte] =
stream.through(fs2.compression.Compression[F].deflate(DeflateParams()))
}

0 comments on commit 287017d

Please sign in to comment.