Skip to content

Commit

Permalink
Caching backend + example (#2390)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Jan 8, 2025
1 parent 848ce13 commit 4d8f5ca
Show file tree
Hide file tree
Showing 8 changed files with 599 additions and 0 deletions.
16 changes: 16 additions & 0 deletions caching/src/main/scala/sttp/client4/caching/Cache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package sttp.client4.caching

import scala.concurrent.duration.FiniteDuration

/** A cache interface to be used with [[CachingBackend]].
*
* @tparam f
* The effect type, [[sttp.shared.Identity]] for direct-style (synchronous). Must be the same as used by the backend,
* which is being wrapped.
*/
trait Cache[F[_]] {
def get(key: Array[Byte]): F[Option[Array[Byte]]]
def delete(key: Array[Byte]): F[Unit]
def set(key: Array[Byte], value: Array[Byte], ttl: FiniteDuration): F[Unit]
def close(): F[Unit]
}
44 changes: 44 additions & 0 deletions caching/src/main/scala/sttp/client4/caching/CachedResponse.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sttp.client4.caching

import sttp.model.StatusCode
import sttp.model.Header
import sttp.client4.Response
import sttp.model.Method
import java.util.Base64
import sttp.model.RequestMetadata
import sttp.model.Uri
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker

case class CachedResponse(
body: String,
code: StatusCode,
statusText: String,
headers: List[Header],
requestMethod: Method,
requestUri: String,
requestHeaders: List[Header]
) {
def toResponse: Response[Array[Byte]] = Response(
Base64.getDecoder.decode(body),
code,
statusText,
headers,
Nil,
RequestMetadata(requestMethod, Uri.unsafeParse(requestUri), requestHeaders)
)
}

object CachedResponse {
def apply(response: Response[Array[Byte]]): CachedResponse = CachedResponse(
Base64.getEncoder.encodeToString(response.body),
response.code,
response.statusText,
response.headers.toList,
response.request.method,
response.request.uri.toString,
response.request.headers.toList
)

implicit val cachedResponseCodec: JsonValueCodec[CachedResponse] = JsonCodecMaker.make
}
200 changes: 200 additions & 0 deletions caching/src/main/scala/sttp/client4/caching/CachingBackend.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package sttp.client4.caching

import org.slf4j.LoggerFactory
import sttp.capabilities.Effect
import sttp.client4._
import sttp.client4.wrappers.DelegateBackend
import sttp.model.ResponseMetadata
import sttp.shared.Identity

import java.io.ByteArrayInputStream
import scala.util.Failure
import scala.util.Success

/** A backend wrapper which implements caching of HTTP responses.
*
* Caching happens when response-as description is "cache-friendly". This excludes non-blocking streaming responses,
* file-based responses and WebSockets. Additionally, caching eligibility & duration is determined by the [[config]].
* See [[CachingConfig.Default]] for the default behavior.
*
* For requests which might be cached, the response's body is read into a byte array. If the response is then
* determined to be cacheable, it is serialized and stored in the cache. After that, the response body is adjusted as
* specified by response-as and returned to the user.
*
* For details on how the cache key is created, and the responses are serialized/deserialized, see [[CachingConfig]].
*
* The cache will be closed (using [[Cache.close]]) when this backend is closed.
*
* @param config
* The caching backend configuration.
* @param cache
* The cache where responses will be stored. Must use the same effect type as the backend. If the backend and cache
* are both synchronous, this should be [[sttp.shared.Identity]].
*/
class CachingBackend[F[_], P](delegate: GenericBackend[F, P], cache: Cache[F], config: CachingConfig)
extends DelegateBackend(delegate) {
private val log = LoggerFactory.getLogger(this.getClass())

import sttp.monad.syntax._

override def send[T](request: GenericRequest[T, P with Effect[F]]): F[Response[T]] = {
val cacheableFromConfig = config.eligibleForCaching(request)

// Only requests with "cache-friendly" response-as descriptions can be cached, so that we can convert a cached
// response (as a byte array) into the desired type. This is not possible if we're requesting a non-blocking
// stream, storing the response to a file, or opening a web socket. These can only be handled by the backend
// directly.
val cacheableFromResponseAs = responseAsCacheFriendly(request.response.delegate)

if (cacheableFromConfig && cacheableFromResponseAs) {
// checking if the request is already cached
val key = config.cacheKey(request)
cache.get(key).flatMap { cached =>
cached.map(c => config.deserializeResponse(c)) match {
case None => sendNotInCache(request, key)
case Some(Success(cachedResponse)) =>
log.debug(s"Found a cached response for ${request.showBasic}.")
monad.unit(adjustResponseReadFromCache(cachedResponse.toResponse, request))
case Some(Failure(e)) =>
log.warn(s"Exception when deserializing response from cache for: ${request.showBasic}", e)
// clear the cache & send the request
cache.delete(key).flatMap(_ => sendNotInCache(request, key))
}
}
} else {
log.debug(s"Request ${request.showBasic} is not cacheable (${
if (!cacheableFromConfig) "due to config" else "due to response-as"
}).")
delegate.send(request) // we know that we won't be able to cache the response
}
}

override def close(): F[Unit] = super.close().ensure(cache.close())

private def sendNotInCache[T](request: GenericRequest[T, P with Effect[F]], key: Array[Byte]): F[Response[T]] = {
// Replacing the original response as with a byte array; we know that response-as is cache-friendly, so we'll be
// able to obtain a T-body later.
val byteArrayRequest = requestWithResponseAsByteArray(request)
delegate
.send(byteArrayRequest)
.flatMap { byteArrayResponse =>
config.cacheDuration(request, byteArrayResponse) match {
case Some(d) =>
log.debug(s"Storing response for ${request.showBasic} in the cache.")
cache
.set(key, config.serializeResponse(CachedResponse(byteArrayResponse)), d)
.map(_ => byteArrayResponse)
case None => monad.unit(byteArrayResponse)
}
}
.map { byteArrayResponse =>
adjustResponseReadFromCache(byteArrayResponse, request)
}
}

private def adjustResponseReadFromCache[T](
responseFromCache: Response[Array[Byte]],
request: GenericRequest[T, _]
): Response[T] = {
// We assume that it has been verified that responseAs is cache-friendly, so this won't throw an UOE.
val body: T = runResponseAs(request.response.delegate, responseFromCache.body, responseFromCache)
responseFromCache.copy(body = body)
}

private def responseAsCacheFriendly(responseAs: GenericResponseAs[_, _]): Boolean =
responseAs match {
case IgnoreResponse => true
case ResponseAsByteArray => true
case ResponseAsStream(_, _) => false
case ResponseAsStreamUnsafe(_) => false
case ResponseAsInputStream(_) => true
case ResponseAsInputStreamUnsafe => true
case ResponseAsFile(_) => false
case ResponseAsWebSocket(_) => false
case ResponseAsWebSocketUnsafe() => false
case ResponseAsWebSocketStream(_, _) => false
case ResponseAsFromMetadata(conditions, default) =>
conditions.forall(c => responseAsCacheFriendly(c.responseAs)) && responseAsCacheFriendly(default)
case MappedResponseAs(raw, _, _) => responseAsCacheFriendly(raw)
case ResponseAsBoth(l, r) => responseAsCacheFriendly(l) && responseAsCacheFriendly(r)
}

private def runResponseAs[T](
responseAs: GenericResponseAs[T, _],
data: Array[Byte],
responseMetadata: ResponseMetadata
): T =
responseAs match {
case IgnoreResponse => ()
case ResponseAsByteArray => data
case ResponseAsStream(_, _) => throw new UnsupportedOperationException()
case ResponseAsStreamUnsafe(s) => throw new UnsupportedOperationException()
case ResponseAsInputStream(f) => f(new ByteArrayInputStream(data))
case ResponseAsInputStreamUnsafe => new ByteArrayInputStream(data)
case ResponseAsFile(_) => throw new UnsupportedOperationException()
case ResponseAsWebSocket(_) => throw new UnsupportedOperationException()
case ResponseAsWebSocketUnsafe() => throw new UnsupportedOperationException()
case ResponseAsWebSocketStream(_, _) => throw new UnsupportedOperationException()
case ResponseAsFromMetadata(conditions, default) =>
runResponseAs(
conditions.find(_.condition(responseMetadata)).map(_.responseAs).getOrElse(default),
data,
responseMetadata
)
case MappedResponseAs(raw, g, _) => g(runResponseAs(raw, data, responseMetadata), responseMetadata)
case ResponseAsBoth(l, r) =>
(runResponseAs(l, data, responseMetadata), Some(runResponseAs(r, data, responseMetadata)))
}

private def requestWithResponseAsByteArray[T](
request: GenericRequest[T, P with Effect[F]]
): GenericRequest[Array[Byte], P with Effect[F]] =
request match {
case r: Request[T] @unchecked => r.response(asByteArrayAlways)
case _ => throw new IllegalStateException("WebSocket/streaming requests are not cacheable!")
}
}

object CachingBackend {
def apply(backend: SyncBackend, cache: Cache[Identity]): SyncBackend =
new CachingBackend(backend, cache, CachingConfig.Default) with SyncBackend {}

def apply[F[_]](backend: Backend[F], cache: Cache[F]): Backend[F] =
new CachingBackend(backend, cache, CachingConfig.Default) with Backend[F] {}

def apply(backend: WebSocketSyncBackend, cache: Cache[Identity]): WebSocketSyncBackend =
new CachingBackend(backend, cache, CachingConfig.Default) with WebSocketSyncBackend {}

def apply[F[_]](backend: WebSocketBackend[F], cache: Cache[F]): WebSocketBackend[F] =
new CachingBackend(backend, cache, CachingConfig.Default) with WebSocketBackend[F] {}

def apply[F[_], S](backend: StreamBackend[F, S], cache: Cache[F]): StreamBackend[F, S] =
new CachingBackend(backend, cache, CachingConfig.Default) with StreamBackend[F, S] {}

def apply[F[_], S](backend: WebSocketStreamBackend[F, S], cache: Cache[F]): WebSocketStreamBackend[F, S] =
new CachingBackend(backend, cache, CachingConfig.Default) with WebSocketStreamBackend[F, S] {}

// with config

def apply(backend: SyncBackend, cache: Cache[Identity], config: CachingConfig): SyncBackend =
new CachingBackend(backend, cache, config) with SyncBackend {}

def apply[F[_]](backend: Backend[F], cache: Cache[F], config: CachingConfig): Backend[F] =
new CachingBackend(backend, cache, config) with Backend[F] {}

def apply(backend: WebSocketSyncBackend, cache: Cache[Identity], config: CachingConfig): WebSocketSyncBackend =
new CachingBackend(backend, cache, config) with WebSocketSyncBackend {}

def apply[F[_]](backend: WebSocketBackend[F], cache: Cache[F], config: CachingConfig): WebSocketBackend[F] =
new CachingBackend(backend, cache, config) with WebSocketBackend[F] {}

def apply[F[_], S](backend: StreamBackend[F, S], cache: Cache[F], config: CachingConfig): StreamBackend[F, S] =
new CachingBackend(backend, cache, config) with StreamBackend[F, S] {}

def apply[F[_], S](
backend: WebSocketStreamBackend[F, S],
cache: Cache[F],
config: CachingConfig
): WebSocketStreamBackend[F, S] =
new CachingBackend(backend, cache, config) with WebSocketStreamBackend[F, S] {}
}
80 changes: 80 additions & 0 deletions caching/src/main/scala/sttp/client4/caching/CachingConfig.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package sttp.client4.caching

import com.github.plokhotnyuk.jsoniter_scala.core._
import sttp.model.HeaderNames
import sttp.model.Method
import sttp.model.RequestMetadata
import sttp.model.ResponseMetadata
import sttp.model.headers.CacheDirective

import scala.concurrent.duration.FiniteDuration
import scala.util.Try

/** Configuration for [[CachingBackend]].
*
* @param eligibleForCaching
* Checks if a request is eligible for caching, before it is sent. By default only GET and HEAD requests are
* eligible.
* @param cacheDuration
* Calculates the duration for which a response should be cached, based on the request and response. A `None` result
* means that the response should not be cached. By default uses the max-age cache directive from the `Cache-Control`
* header.
* @param cacheKey
* Creates the cache key for a request. The default implementation includes the method, URI and headers specified in
* the `Vary` header.
* @param serializeResponse
* The function used to serialize the response to be cached. By default uses JSON serialization using jsoniter-scala.
* @param deserializeResponse
* The function used to deserialize the response from the cache. By default uses JSON deserialization using
* jsoniter-scala.
*/
case class CachingConfig(
eligibleForCaching: RequestMetadata => Boolean = CachingConfig.EligibleWhenMethodIsGetOrHead,
cacheDuration: (RequestMetadata, ResponseMetadata) => Option[FiniteDuration] =
CachingConfig.CacheDurationFromCacheDirectives,
cacheKey: RequestMetadata => Array[Byte] = CachingConfig.DefaultCacheKey,
serializeResponse: CachedResponse => Array[Byte] = CachingConfig.SerializeResponseToJson,
deserializeResponse: Array[Byte] => Try[CachedResponse] = CachingConfig.DeserializeResponseFromJson
)

object CachingConfig {
val EligibleWhenMethodIsGetOrHead: RequestMetadata => Boolean = { request =>
request.method == Method.GET || request.method == Method.HEAD
}

val CacheDurationFromCacheDirectives: (RequestMetadata, ResponseMetadata) => Option[FiniteDuration] = {
(_, response) =>
val directives: List[CacheDirective] =
response.header(HeaderNames.CacheControl).map(CacheDirective.parse).getOrElse(Nil).flatMap(_.toOption)

directives.collectFirst { case CacheDirective.MaxAge(d) =>
d
}
}

val DefaultCacheKey: RequestMetadata => Array[Byte] = { request =>
val base = s"${request.method} ${request.uri}"
// the list of headers to include in the cache key, basing on the Vary header
val varyHeaders: List[String] = request
.header(HeaderNames.Vary)
.map(_.split(",").toList.map(_.trim))
.getOrElse(Nil)

varyHeaders
.foldLeft(base)((key, headerName) => key + s" ${headerName}=${request.header(headerName)}")
.getBytes()
}

val SerializeResponseToJson: CachedResponse => Array[Byte] = response => writeToArray(response)
val DeserializeResponseFromJson: Array[Byte] => Try[CachedResponse] = bytes =>
Try(readFromArray[CachedResponse](bytes))

/** Default caching config. Caching happens when:
* - the request is a GET or HEAD request
* - the response contains a Cache-Control header with a max-age directive; the response is cached for the duration
* specified in this directive
*
* Responses are stored in the cache, serialized to JSON using jsoniter-scala.
*/
val Default: CachingConfig = CachingConfig()
}
Loading

0 comments on commit 4d8f5ca

Please sign in to comment.