Skip to content

Commit

Permalink
http4s
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw committed Dec 28, 2024
1 parent 0e54409 commit aa8823b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import java.util.concurrent.Flow.Publisher
import java.{util => ju}
import scala.collection.JavaConverters._
import sttp.client4.compression.Compressor
import sttp.client4.impl.fs2.{DeflateFs2Compressor, GZipFs2Compressor}

class HttpClientFs2Backend[F[_]: Async] private (
client: HttpClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sttp.client4.httpclient.fs2
package sttp.client4.impl.fs2

import sttp.client4._
import sttp.client4.GenericRequestBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import sttp.client4.testing.StreamBackendStub
import sttp.client4.ws.{GotAWebSocketException, NotAWebSocketException}
import sttp.client4._
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4.compression.Compressor
import sttp.client4.impl.fs2.GZipFs2Compressor
import sttp.client4.impl.fs2.DeflateFs2Compressor

// needs http4s using cats-effect
class Http4sBackend[F[_]: Async](
Expand All @@ -33,25 +36,39 @@ class Http4sBackend[F[_]: Async](
customEncodingHandler: EncodingHandler[F]
) extends StreamBackend[F, Fs2Streams[F]] {
type R = Fs2Streams[F] with sttp.capabilities.Effect[F]

private val compressors: List[Compressor[R]] = List(new GZipFs2Compressor[F, R](), new DeflateFs2Compressor[F, R]())

override def send[T](r: GenericRequest[T, R]): F[Response[T]] =
adjustExceptions(r) {
val (entity, extraHeaders) = bodyToHttp4s(r, r.body)
val (body, contentLength) = Compressor.compressIfNeeded(r, compressors)
val (entity, extraHeaders) = bodyToHttp4s(body, contentLength)
val headers =
http4s.Headers {
val nonClHeaders = r.headers
.filterNot(_.is(HeaderNames.ContentLength))
.map(h => http4s.Header.Raw(CIString(h.name), h.value))
.toList

val clHeader = contentLength
.map(cl => http4s.Header.Raw(CIString(HeaderNames.ContentLength), cl.toString))

nonClHeaders ++ clHeader
} ++ extraHeaders
val request = r.httpVersion match {
case Some(version) =>
Http4sRequest(
method = methodToHttp4s(r.method),
uri = http4s.Uri.unsafeFromString(r.uri.toString),
headers =
http4s.Headers(r.headers.map(h => http4s.Header.Raw(CIString(h.name), h.value)).toList) ++ extraHeaders,
headers = headers,
body = entity.body,
httpVersion = versionToHttp4s(version)
)
case None =>
Http4sRequest(
method = methodToHttp4s(r.method),
uri = http4s.Uri.unsafeFromString(r.uri.toString),
headers =
http4s.Headers(r.headers.map(h => http4s.Header.Raw(CIString(h.name), h.value)).toList) ++ extraHeaders,
headers = headers,
body = entity.body
)
}
Expand Down Expand Up @@ -138,19 +155,16 @@ class Http4sBackend[F[_]: Async](
}

private def bodyToHttp4s[R](
r: GenericRequest[_, R],
body: GenericRequestBody[R]
body: GenericRequestBody[R],
contentLength: Option[Long]
): (http4s.Entity[F], http4s.Headers) =
body match {
case NoBody => (http4s.Entity(http4s.EmptyBody: http4s.EntityBody[F]), http4s.Headers.empty)

case b: BasicBodyPart => (basicBodyToHttp4s(b), http4s.Headers.empty)

case StreamBody(s) =>
val cl = r.headers
.find(_.is(HeaderNames.ContentLength))
.map(_.value.toLong)
(http4s.Entity(s.asInstanceOf[Stream[F, Byte]], cl), http4s.Headers.empty)
(http4s.Entity(s.asInstanceOf[Stream[F, Byte]], contentLength), http4s.Headers.empty)

case m: MultipartBody[_] =>
val parts = m.parts.toVector.map(multipartToHttp4s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import sttp.client4.testing.StreamBackendStub
import sttp.client4.ws.{GotAWebSocketException, NotAWebSocketException}
import sttp.client4._
import sttp.client4.wrappers.FollowRedirectsBackend
import sttp.client4.compression.{Compressor, DeflateDefaultCompressor, GZipDefaultCompressor}

import scala.concurrent.ExecutionContext

Expand All @@ -35,25 +36,40 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
customEncodingHandler: EncodingHandler[F]
) extends StreamBackend[F, Fs2Streams[F]] {
type R = Fs2Streams[F] with sttp.capabilities.Effect[F]

private val compressors: List[Compressor[R]] = List(new GZipDefaultCompressor[R](), new DeflateDefaultCompressor[R]())

override def send[T](r: GenericRequest[T, R]): F[Response[T]] =
adjustExceptions(r) {
val (entity, extraHeaders) = bodyToHttp4s(r, r.body)
val (body, contentLength) = Compressor.compressIfNeeded(r, compressors)
val (entity, extraHeaders) = bodyToHttp4s(body, contentLength)
val headers =
http4s.Headers {
val nonClHeaders = r.headers
.filterNot(_.is(HeaderNames.ContentLength))
.map(h => http4s.Header.Raw(CIString(h.name), h.value))
.toList

val clHeader = contentLength
.map(cl => http4s.Header.Raw(CIString(HeaderNames.ContentLength), cl.toString))

nonClHeaders ++ clHeader
} ++ extraHeaders

val request = r.httpVersion match {
case Some(version) =>
Http4sRequest(
method = methodToHttp4s(r.method),
uri = http4s.Uri.unsafeFromString(r.uri.toString),
headers =
http4s.Headers(r.headers.map(h => http4s.Header.Raw(CIString(h.name), h.value)).toList) ++ extraHeaders,
headers = headers,
body = entity.body,
httpVersion = versionToHttp4s(version)
)
case None =>
Http4sRequest(
method = methodToHttp4s(r.method),
uri = http4s.Uri.unsafeFromString(r.uri.toString),
headers =
http4s.Headers(r.headers.map(h => http4s.Header.Raw(CIString(h.name), h.value)).toList) ++ extraHeaders,
headers = headers,
body = entity.body
)
}
Expand Down Expand Up @@ -138,17 +154,17 @@ class Http4sBackend[F[_]: ConcurrentEffect: ContextShift](
http4s.EntityEncoder.fileEncoder(blocker).toEntity(b.toFile)
}

private def bodyToHttp4s(r: GenericRequest[_, R], body: GenericRequestBody[R]): (http4s.Entity[F], http4s.Headers) =
private def bodyToHttp4s(
body: GenericRequestBody[R],
contentLength: Option[Long]
): (http4s.Entity[F], http4s.Headers) =
body match {
case NoBody => (http4s.Entity(http4s.EmptyBody: http4s.EntityBody[F]), http4s.Headers.empty)

case b: BasicBodyPart => (basicBodyToHttp4s(b), http4s.Headers.empty)

case StreamBody(s) =>
val cl = r.headers
.find(_.is(HeaderNames.ContentLength))
.map(_.value.toLong)
(http4s.Entity(s.asInstanceOf[Stream[F, Byte]], cl), http4s.Headers.empty)
(http4s.Entity(s.asInstanceOf[Stream[F, Byte]], contentLength), http4s.Headers.empty)

case m: MultipartBody[_] =>
val parts = m.parts.toVector.map(multipartToHttp4s)
Expand Down

0 comments on commit aa8823b

Please sign in to comment.