Skip to content

Commit

Permalink
Support input streams as a response body (#2103)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Mar 14, 2024
1 parent a6330cc commit 8de46c1
Show file tree
Hide file tree
Showing 41 changed files with 150 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ class AkkaHttpClientHttpTest extends HttpTest[Future] {
override implicit val convertToFuture: ConvertToFuture[Future] = ConvertToFuture.future

override def supportsCancellation: Boolean = false
override def supportsResponseAsInputStream = false

override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ class ArmeriaCatsHttpTest extends HttpTest[IO] with CatsTestBase {
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs
override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ class ArmeriaCatsHttpTest extends HttpTest[IO] with CatsRetryTest with CatsTestB
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs
override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ class ArmeriaFs2HttpTest extends HttpTest[IO] with CatsTestBase {
override def supportsCancellation = false
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs

override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ class ArmeriaFs2HttpTest extends HttpTest[IO] with CatsTestBase with TestIODispa
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs
override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class ArmeriaMonixHttpTest extends HttpTest[Task] {
override def supportsHostHeaderOverride = false
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs

override def supportsEmptyContentEncoding = false

override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ArmeriaScalazHttpTest extends HttpTest[Task] {
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs
override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false

override def timeoutToNone[T](t: Task[T], timeoutMillis: Int): Task[Option[T]] = t.map(Some(_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import sttp.client4.armeria.AbstractArmeriaBackend.{noopCanceler, RightUnit}
import sttp.client4.internal.toByteArray
import sttp.model._
import sttp.monad.syntax._
import sttp.monad.{Canceler, MonadAsyncError, MonadError}
import sttp.monad.{Canceler, MonadAsyncError}

abstract class AbstractArmeriaBackend[F[_], S <: Streams[S]](
client: WebClient = WebClient.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.linecorp.armeria.common.{CommonPools, HttpData}
import com.linecorp.armeria.common.stream.{StreamMessage, StreamMessages}
import io.netty.util.concurrent.EventExecutor

import java.io.File
import java.io.{File, InputStream}
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicReference
import sttp.capabilities.Streams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ArmeriaFutureHttpTest extends HttpTest[Future] {
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs
override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false

override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ArmeriaZioHttpTest extends HttpTest[Task] with ZioTestBase {
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs
override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false

"throw an exception instead of ZIO defect if the header value is invalid" in {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class ArmeriaZioHttpTest extends HttpTest[Task] with ZioTestBase {
override def supportsAutoDecompressionDisabling = false
override def supportsDeflateWrapperChecking = false // armeria hangs
override def supportsEmptyContentEncoding = false
override def supportsResponseAsInputStream = false

"throw an exception instead of ZIO defect if the header value is invalid" in {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ class AsyncHttpClientCatsHttpTest extends HttpTest[IO] with CatsTestBase {

override def throwsExceptionOnUnsupportedEncoding = false
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ class AsyncHttpClientCatsHttpTest extends HttpTest[IO] with CatsRetryTest with C

override def throwsExceptionOnUnsupportedEncoding = false
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ class AsyncHttpClientFs2HttpTest extends HttpTest[IO] with CatsTestBase {

override def throwsExceptionOnUnsupportedEncoding = false
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ class AsyncHttpClientFs2HttpTest extends HttpTest[IO] with TestIODispatcher with
// for some unknown reason this single test fails using the fs2 implementation
override def supportsConnectionRefusedTest = false
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ class AsyncHttpClientFutureHttpTest extends HttpTest[Future] {
override def supportsCancellation: Boolean = false
override def timeoutToNone[T](t: Future[T], timeoutMillis: Int): Future[Option[T]] = t.map(Some(_))
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ class AsyncHttpClientMonixHttpTest extends HttpTest[Task] {

override def throwsExceptionOnUnsupportedEncoding = false
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ class AsyncHttpClientScalazHttpTest extends HttpTest[Task] {
override def supportsCancellation: Boolean = false
override def timeoutToNone[T](t: Task[T], timeoutMillis: Int): Task[Option[T]] = t.map(Some(_))
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AsyncHttpClientZioHttpTest extends HttpTest[Task] with ZioTestBase {

override def throwsExceptionOnUnsupportedEncoding = false
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false

"throw an exception instead of ZIO defect if the header value is invalid" in {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AsyncHttpClientZioHttpTest extends HttpTest[Task] with ZioTestBase {

override def throwsExceptionOnUnsupportedEncoding = false
override def supportsAutoDecompressionDisabling = false
override def supportsResponseAsInputStream = false

"throw an exception instead of ZIO defect if the header value is invalid" in {

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/sttp/client4/ResponseAs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import sttp.model.ResponseMetadata
import sttp.model.internal.Rfc3986
import sttp.ws.{WebSocket, WebSocketFrame}

import java.io.InputStream
import scala.collection.immutable.Seq
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -299,6 +300,13 @@ object ResponseAsStreamUnsafe {
def apply[S](s: Streams[S]): GenericResponseAs[s.BinaryStream, S] = new ResponseAsStreamUnsafe(s)
}

case class ResponseAsInputStream[T](f: InputStream => T) extends GenericResponseAs[T, Any] {
override def show: String = s"as input stream"
}
case object ResponseAsInputStreamUnsafe extends GenericResponseAs[InputStream, Any] {
override def show: String = s"as input stream unsafe"
}

case class ResponseAsFile(output: SttpFile) extends GenericResponseAs[SttpFile, Any] {
override def show: String = s"as file: ${output.name}"
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/sttp/client4/internal/BodyFromResponseAs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import sttp.model.ResponseMetadata
import sttp.monad.MonadError
import sttp.monad.syntax._

import java.io.InputStream

abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](implicit m: MonadError[F]) {
def apply[T](
responseAs: ResponseAsDelegate[T, _],
Expand Down Expand Up @@ -58,6 +60,13 @@ abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](imp
(stream.asInstanceOf[T], nonReplayableBody)
}

case (ResponseAsInputStream(f), Left(regular)) =>
regularAsInputStream(regular)
.flatMap(w => m.eval(f(w)).ensure(m.eval(w.close())))
.map(t => (t, nonReplayableBody))
case (ResponseAsInputStreamUnsafe, Left(regular)) =>
regularAsInputStream(regular).map(w => (w, nonReplayableBody))

case (ResponseAsFile(file), Left(regular)) =>
regularAsFile(regular, file).map(f => (f, replayableBody(f)))

Expand All @@ -82,6 +91,8 @@ abstract class BodyFromResponseAs[F[_], RegularResponse, WSResponse, Stream](imp
protected def regularAsByteArray(response: RegularResponse): F[Array[Byte]]
protected def regularAsFile(response: RegularResponse, file: SttpFile): F[SttpFile]
protected def regularAsStream(response: RegularResponse): F[(Stream, () => F[Unit])]
protected def regularAsInputStream(response: RegularResponse): F[InputStream] =
throw new UnsupportedOperationException("Responses as a java.io.InputStream are not supported")
protected def handleWS[T](responseAs: GenericWebSocketResponseAs[T, _], meta: ResponseMetadata, ws: WSResponse): F[T]
protected def cleanupWhenNotAWebSocket(response: RegularResponse, e: NotAWebSocketException): F[Unit]
protected def cleanupWhenGotWebSocket(response: WSResponse, e: GotAWebSocketException): F[Unit]
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/sttp/client4/monad/MapEffect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ object MapEffect {
ResponseAsStream(s)((s, m) => fk(f.asInstanceOf[(Any, ResponseMetadata) => F[Any]](s, m)))
case rasu: ResponseAsStreamUnsafe[_, _] => rasu
case ResponseAsFile(output) => ResponseAsFile(output)
case ResponseAsInputStream(f) => ResponseAsInputStream(f)
case ResponseAsInputStreamUnsafe => ResponseAsInputStreamUnsafe
case ResponseAsWebSocket(f) =>
ResponseAsWebSocket((wg: WebSocket[G], m: ResponseMetadata) =>
fk(f.asInstanceOf[(WebSocket[F], ResponseMetadata) => F[Any]](apply[G, F](wg, gk, fm), m))
Expand Down
15 changes: 13 additions & 2 deletions core/src/main/scala/sttp/client4/testing/AbstractBackendStub.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sttp.client4.testing

import java.io.InputStream
import java.io.{ByteArrayInputStream, InputStream}
import sttp.capabilities.Effect
import sttp.client4.internal._
import sttp.client4.testing.AbstractBackendStub._
Expand Down Expand Up @@ -121,7 +121,15 @@ object AbstractBackendStub {
ra: GenericResponseAs[T, _],
b: U,
meta: ResponseMetadata
)(implicit monad: MonadError[F]): Option[F[T]] =
)(implicit monad: MonadError[F]): Option[F[T]] = {
def bAsInputStream = b match {
case s: String => Some(new ByteArrayInputStream(s.getBytes(Utf8)))
case a: Array[Byte] => Some(new ByteArrayInputStream(a))
case is: InputStream => Some(is)
case () => Some(new ByteArrayInputStream(new Array[Byte](0)))
case _ => None
}

ra match {
case IgnoreResponse => Some(().unit.asInstanceOf[F[T]])
case ResponseAsByteArray =>
Expand All @@ -142,6 +150,8 @@ object AbstractBackendStub {
case RawStream(s) => Some(s.unit.asInstanceOf[F[T]])
case _ => None
}
case ResponseAsInputStream(f) => bAsInputStream.map(f).map(_.unit.asInstanceOf[F[T]])
case ResponseAsInputStreamUnsafe => bAsInputStream.map(_.unit.asInstanceOf[F[T]])
case ResponseAsFile(_) =>
b match {
case f: SttpFile => Some(f.unit.asInstanceOf[F[T]])
Expand Down Expand Up @@ -172,4 +182,5 @@ object AbstractBackendStub {
}
}
}
}
}
37 changes: 35 additions & 2 deletions core/src/main/scalajvm/sttp/client4/SttpExtensions.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,45 @@
package sttp.client4

import java.io.File
import java.io.{File, InputStream}
import java.nio.file.Path

import sttp.client4.internal.SttpFile
import sttp.model.{Part, StatusCode}

trait SttpExtensions {

/** Specify that the body should be passed as an input stream to the given function `f`. After the function completes,
* the input stream is always closed.
*
* If the response code is not successful, the body is returned as a `String`.
*
* '''Warning:''' this type of responses is supported only by some backends on the JVM.
*/
def asInputStream[T](f: InputStream => T): ResponseAs[Either[String, T]] =
asEither(asStringAlways, asInputStreamAlways(f))

/** Specify that the body should be passed as an input stream to the given function `f`. After the function completes,
* the input stream is always closed.
*
* '''Warning:''' this type of responses is supported only by some backends on the JVM.
*/
def asInputStreamAlways[T](f: InputStream => T): ResponseAs[T] = new ResponseAs(ResponseAsInputStream(f))

/** Specify that the body should be returned as an input stream. It is the responsibility of the user to properly
* close the stream.
*
* If the response code is not successful, the body is returned as a `String`.
*
* '''Warning:''' this type of responses is supported only by some backends on the JVM.
*/
def asInputStreamUnsafe: ResponseAs[Either[String, InputStream]] = asEither(asStringAlways, asInputStreamAlwaysUnsafe)

/** Specify that the body should be returned as an input stream. It is the responsibility of the user to properly
* close the stream.
*
* '''Warning:''' this type of responses is supported only by some backends on the JVM.
*/
def asInputStreamAlwaysUnsafe: ResponseAs[InputStream] = new ResponseAs(ResponseAsInputStreamUnsafe)

def asFile(file: File): ResponseAs[Either[String, File]] = asEither(asStringAlways, asFileAlways(file))

def asFileAlways(file: File): ResponseAs[File] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class HttpURLConnectionBackend private (
}
override protected def regularAsStream(response: InputStream): (Nothing, () => Identity[Unit]) =
throw new IllegalStateException()
override protected def regularAsInputStream(response: InputStream): Identity[InputStream] = response
override protected def handleWS[T](
responseAs: GenericWebSocketResponseAs[T, _],
meta: ResponseMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ private[client4] trait InputStreamBodyFromHttpClient[F[_], S] extends BodyFromHt
override protected def regularAsStream(response: InputStream): F[(streams.BinaryStream, () => F[Unit])] =
inputStreamToStream(response)

override protected def regularAsInputStream(response: InputStream): F[InputStream] = monad.unit(response)

override protected def handleWS[T](
responseAs: GenericWebSocketResponseAs[T, _],
meta: ResponseMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,33 @@ import sttp.client4.wrappers.{DigestAuthenticationBackend, FollowRedirectsBacken
import sttp.model.headers.CookieWithMeta

trait HttpTestExtensions[F[_]] extends AsyncFreeSpecLike { self: HttpTest[F] =>
protected def supportsResponseAsInputStream = true

"parse response" - {
if (supportsResponseAsInputStream) {
"as input stream" in {
postEcho.body(testBody).response(asInputStreamAlways(_.readAllBytes())).send(backend).toFuture().map {
response =>
val allBytes = response.body
val fc = new String(allBytes, "UTF-8")
fc should be(expectedPostEchoResponse)
}
}

"as input stream unsafe" in {
postEcho.body(testBody).response(asInputStreamAlwaysUnsafe).send(backend).toFuture().map { response =>
try {
val allBytes = response.body.readAllBytes()
val fc = new String(allBytes, "UTF-8")
fc should be(expectedPostEchoResponse)
} finally {
response.body.close()
}
}
}
}
}

"cookies" - {
"read response cookies" in {
basicRequest
Expand Down
Loading

0 comments on commit 8de46c1

Please sign in to comment.