diff --git a/caching/src/main/scala/sttp/client4/caching/Cache.scala b/caching/src/main/scala/sttp/client4/caching/Cache.scala
new file mode 100644
index 000000000..ef466a89c
--- /dev/null
+++ b/caching/src/main/scala/sttp/client4/caching/Cache.scala
@@ -0,0 +1,16 @@
+package sttp.client4.caching
+
+import scala.concurrent.duration.FiniteDuration
+
+/** A cache interface to be used with [[CachingBackend]].
+  *
+  * @tparam f
+  *   The effect type, [[sttp.shared.Identity]] for direct-style (synchronous). Must be the same as used by the backend,
+  *   which is being wrapped.
+  */
+trait Cache[F[_]] {
+  def get(key: Array[Byte]): F[Option[Array[Byte]]]
+  def delete(key: Array[Byte]): F[Unit]
+  def set(key: Array[Byte], value: Array[Byte], ttl: FiniteDuration): F[Unit]
+  def close(): F[Unit]
+}
diff --git a/caching/src/main/scala/sttp/client4/caching/CachedResponse.scala b/caching/src/main/scala/sttp/client4/caching/CachedResponse.scala
new file mode 100644
index 000000000..acdde5027
--- /dev/null
+++ b/caching/src/main/scala/sttp/client4/caching/CachedResponse.scala
@@ -0,0 +1,44 @@
+package sttp.client4.caching
+
+import sttp.model.StatusCode
+import sttp.model.Header
+import sttp.client4.Response
+import sttp.model.Method
+import java.util.Base64
+import sttp.model.RequestMetadata
+import sttp.model.Uri
+import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
+import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
+
+case class CachedResponse(
+    body: String,
+    code: StatusCode,
+    statusText: String,
+    headers: List[Header],
+    requestMethod: Method,
+    requestUri: String,
+    requestHeaders: List[Header]
+) {
+  def toResponse: Response[Array[Byte]] = Response(
+    Base64.getDecoder.decode(body),
+    code,
+    statusText,
+    headers,
+    Nil,
+    RequestMetadata(requestMethod, Uri.unsafeParse(requestUri), requestHeaders)
+  )
+}
+
+object CachedResponse {
+  def apply(response: Response[Array[Byte]]): CachedResponse = CachedResponse(
+    Base64.getEncoder.encodeToString(response.body),
+    response.code,
+    response.statusText,
+    response.headers.toList,
+    response.request.method,
+    response.request.uri.toString,
+    response.request.headers.toList
+  )
+
+  implicit val cachedResponseCodec: JsonValueCodec[CachedResponse] = JsonCodecMaker.make
+}
diff --git a/caching/src/main/scala/sttp/client4/caching/CachingBackend.scala b/caching/src/main/scala/sttp/client4/caching/CachingBackend.scala
new file mode 100644
index 000000000..f9f93c579
--- /dev/null
+++ b/caching/src/main/scala/sttp/client4/caching/CachingBackend.scala
@@ -0,0 +1,200 @@
+package sttp.client4.caching
+
+import org.slf4j.LoggerFactory
+import sttp.capabilities.Effect
+import sttp.client4._
+import sttp.client4.wrappers.DelegateBackend
+import sttp.model.ResponseMetadata
+import sttp.shared.Identity
+
+import java.io.ByteArrayInputStream
+import scala.util.Failure
+import scala.util.Success
+
+/** A backend wrapper which implements caching of HTTP responses.
+  *
+  * Caching happens when response-as description is "cache-friendly". This excludes non-blocking streaming responses,
+  * file-based responses and WebSockets. Additionally, caching eligibility & duration is determined by the [[config]].
+  * See [[CachingConfig.Default]] for the default behavior.
+  *
+  * For requests which might be cached, the response's body is read into a byte array. If the response is then
+  * determined to be cacheable, it is serialized and stored in the cache. After that, the response body is adjusted as
+  * specified by response-as and returned to the user.
+  *
+  * For details on how the cache key is created, and the responses are serialized/deserialized, see [[CachingConfig]].
+  *
+  * The cache will be closed (using [[Cache.close]]) when this backend is closed.
+  *
+  * @param config
+  *   The caching backend configuration.
+  * @param cache
+  *   The cache where responses will be stored. Must use the same effect type as the backend. If the backend and cache
+  *   are both synchronous, this should be [[sttp.shared.Identity]].
+  */
+class CachingBackend[F[_], P](delegate: GenericBackend[F, P], cache: Cache[F], config: CachingConfig)
+    extends DelegateBackend(delegate) {
+  private val log = LoggerFactory.getLogger(this.getClass())
+
+  import sttp.monad.syntax._
+
+  override def send[T](request: GenericRequest[T, P with Effect[F]]): F[Response[T]] = {
+    val cacheableFromConfig = config.eligibleForCaching(request)
+
+    // Only requests with "cache-friendly" response-as descriptions can be cached, so that we can convert a cached
+    // response (as a byte array) into the desired type. This is not possible if we're requesting a non-blocking
+    // stream, storing the response to a file, or opening a web socket. These can only be handled by the backend
+    // directly.
+    val cacheableFromResponseAs = responseAsCacheFriendly(request.response.delegate)
+
+    if (cacheableFromConfig && cacheableFromResponseAs) {
+      // checking if the request is already cached
+      val key = config.cacheKey(request)
+      cache.get(key).flatMap { cached =>
+        cached.map(c => config.deserializeResponse(c)) match {
+          case None => sendNotInCache(request, key)
+          case Some(Success(cachedResponse)) =>
+            log.debug(s"Found a cached response for ${request.showBasic}.")
+            monad.unit(adjustResponseReadFromCache(cachedResponse.toResponse, request))
+          case Some(Failure(e)) =>
+            log.warn(s"Exception when deserializing response from cache for: ${request.showBasic}", e)
+            // clear the cache & send the request
+            cache.delete(key).flatMap(_ => sendNotInCache(request, key))
+        }
+      }
+    } else {
+      log.debug(s"Request ${request.showBasic} is not cacheable (${
+          if (!cacheableFromConfig) "due to config" else "due to response-as"
+        }).")
+      delegate.send(request) // we know that we won't be able to cache the response
+    }
+  }
+
+  override def close(): F[Unit] = super.close().ensure(cache.close())
+
+  private def sendNotInCache[T](request: GenericRequest[T, P with Effect[F]], key: Array[Byte]): F[Response[T]] = {
+    // Replacing the original response as with a byte array; we know that response-as is cache-friendly, so we'll be
+    // able to obtain a T-body later.
+    val byteArrayRequest = requestWithResponseAsByteArray(request)
+    delegate
+      .send(byteArrayRequest)
+      .flatMap { byteArrayResponse =>
+        config.cacheDuration(request, byteArrayResponse) match {
+          case Some(d) =>
+            log.debug(s"Storing response for ${request.showBasic} in the cache.")
+            cache
+              .set(key, config.serializeResponse(CachedResponse(byteArrayResponse)), d)
+              .map(_ => byteArrayResponse)
+          case None => monad.unit(byteArrayResponse)
+        }
+      }
+      .map { byteArrayResponse =>
+        adjustResponseReadFromCache(byteArrayResponse, request)
+      }
+  }
+
+  private def adjustResponseReadFromCache[T](
+      responseFromCache: Response[Array[Byte]],
+      request: GenericRequest[T, _]
+  ): Response[T] = {
+    // We assume that it has been verified that responseAs is cache-friendly, so this won't throw an UOE.
+    val body: T = runResponseAs(request.response.delegate, responseFromCache.body, responseFromCache)
+    responseFromCache.copy(body = body)
+  }
+
+  private def responseAsCacheFriendly(responseAs: GenericResponseAs[_, _]): Boolean =
+    responseAs match {
+      case IgnoreResponse                  => true
+      case ResponseAsByteArray             => true
+      case ResponseAsStream(_, _)          => false
+      case ResponseAsStreamUnsafe(_)       => false
+      case ResponseAsInputStream(_)        => true
+      case ResponseAsInputStreamUnsafe     => true
+      case ResponseAsFile(_)               => false
+      case ResponseAsWebSocket(_)          => false
+      case ResponseAsWebSocketUnsafe()     => false
+      case ResponseAsWebSocketStream(_, _) => false
+      case ResponseAsFromMetadata(conditions, default) =>
+        conditions.forall(c => responseAsCacheFriendly(c.responseAs)) && responseAsCacheFriendly(default)
+      case MappedResponseAs(raw, _, _) => responseAsCacheFriendly(raw)
+      case ResponseAsBoth(l, r)        => responseAsCacheFriendly(l) && responseAsCacheFriendly(r)
+    }
+
+  private def runResponseAs[T](
+      responseAs: GenericResponseAs[T, _],
+      data: Array[Byte],
+      responseMetadata: ResponseMetadata
+  ): T =
+    responseAs match {
+      case IgnoreResponse                  => ()
+      case ResponseAsByteArray             => data
+      case ResponseAsStream(_, _)          => throw new UnsupportedOperationException()
+      case ResponseAsStreamUnsafe(s)       => throw new UnsupportedOperationException()
+      case ResponseAsInputStream(f)        => f(new ByteArrayInputStream(data))
+      case ResponseAsInputStreamUnsafe     => new ByteArrayInputStream(data)
+      case ResponseAsFile(_)               => throw new UnsupportedOperationException()
+      case ResponseAsWebSocket(_)          => throw new UnsupportedOperationException()
+      case ResponseAsWebSocketUnsafe()     => throw new UnsupportedOperationException()
+      case ResponseAsWebSocketStream(_, _) => throw new UnsupportedOperationException()
+      case ResponseAsFromMetadata(conditions, default) =>
+        runResponseAs(
+          conditions.find(_.condition(responseMetadata)).map(_.responseAs).getOrElse(default),
+          data,
+          responseMetadata
+        )
+      case MappedResponseAs(raw, g, _) => g(runResponseAs(raw, data, responseMetadata), responseMetadata)
+      case ResponseAsBoth(l, r) =>
+        (runResponseAs(l, data, responseMetadata), Some(runResponseAs(r, data, responseMetadata)))
+    }
+
+  private def requestWithResponseAsByteArray[T](
+      request: GenericRequest[T, P with Effect[F]]
+  ): GenericRequest[Array[Byte], P with Effect[F]] =
+    request match {
+      case r: Request[T] @unchecked => r.response(asByteArrayAlways)
+      case _ => throw new IllegalStateException("WebSocket/streaming requests are not cacheable!")
+    }
+}
+
+object CachingBackend {
+  def apply(backend: SyncBackend, cache: Cache[Identity]): SyncBackend =
+    new CachingBackend(backend, cache, CachingConfig.Default) with SyncBackend {}
+
+  def apply[F[_]](backend: Backend[F], cache: Cache[F]): Backend[F] =
+    new CachingBackend(backend, cache, CachingConfig.Default) with Backend[F] {}
+
+  def apply(backend: WebSocketSyncBackend, cache: Cache[Identity]): WebSocketSyncBackend =
+    new CachingBackend(backend, cache, CachingConfig.Default) with WebSocketSyncBackend {}
+
+  def apply[F[_]](backend: WebSocketBackend[F], cache: Cache[F]): WebSocketBackend[F] =
+    new CachingBackend(backend, cache, CachingConfig.Default) with WebSocketBackend[F] {}
+
+  def apply[F[_], S](backend: StreamBackend[F, S], cache: Cache[F]): StreamBackend[F, S] =
+    new CachingBackend(backend, cache, CachingConfig.Default) with StreamBackend[F, S] {}
+
+  def apply[F[_], S](backend: WebSocketStreamBackend[F, S], cache: Cache[F]): WebSocketStreamBackend[F, S] =
+    new CachingBackend(backend, cache, CachingConfig.Default) with WebSocketStreamBackend[F, S] {}
+
+  // with config
+
+  def apply(backend: SyncBackend, cache: Cache[Identity], config: CachingConfig): SyncBackend =
+    new CachingBackend(backend, cache, config) with SyncBackend {}
+
+  def apply[F[_]](backend: Backend[F], cache: Cache[F], config: CachingConfig): Backend[F] =
+    new CachingBackend(backend, cache, config) with Backend[F] {}
+
+  def apply(backend: WebSocketSyncBackend, cache: Cache[Identity], config: CachingConfig): WebSocketSyncBackend =
+    new CachingBackend(backend, cache, config) with WebSocketSyncBackend {}
+
+  def apply[F[_]](backend: WebSocketBackend[F], cache: Cache[F], config: CachingConfig): WebSocketBackend[F] =
+    new CachingBackend(backend, cache, config) with WebSocketBackend[F] {}
+
+  def apply[F[_], S](backend: StreamBackend[F, S], cache: Cache[F], config: CachingConfig): StreamBackend[F, S] =
+    new CachingBackend(backend, cache, config) with StreamBackend[F, S] {}
+
+  def apply[F[_], S](
+      backend: WebSocketStreamBackend[F, S],
+      cache: Cache[F],
+      config: CachingConfig
+  ): WebSocketStreamBackend[F, S] =
+    new CachingBackend(backend, cache, config) with WebSocketStreamBackend[F, S] {}
+}
diff --git a/caching/src/main/scala/sttp/client4/caching/CachingConfig.scala b/caching/src/main/scala/sttp/client4/caching/CachingConfig.scala
new file mode 100644
index 000000000..2712b7ed6
--- /dev/null
+++ b/caching/src/main/scala/sttp/client4/caching/CachingConfig.scala
@@ -0,0 +1,80 @@
+package sttp.client4.caching
+
+import com.github.plokhotnyuk.jsoniter_scala.core._
+import sttp.model.HeaderNames
+import sttp.model.Method
+import sttp.model.RequestMetadata
+import sttp.model.ResponseMetadata
+import sttp.model.headers.CacheDirective
+
+import scala.concurrent.duration.FiniteDuration
+import scala.util.Try
+
+/** Configuration for [[CachingBackend]].
+  *
+  * @param eligibleForCaching
+  *   Checks if a request is eligible for caching, before it is sent. By default only GET and HEAD requests are
+  *   eligible.
+  * @param cacheDuration
+  *   Calculates the duration for which a response should be cached, based on the request and response. A `None` result
+  *   means that the response should not be cached. By default uses the max-age cache directive from the `Cache-Control`
+  *   header.
+  * @param cacheKey
+  *   Creates the cache key for a request. The default implementation includes the method, URI and headers specified in
+  *   the `Vary` header.
+  * @param serializeResponse
+  *   The function used to serialize the response to be cached. By default uses JSON serialization using jsoniter-scala.
+  * @param deserializeResponse
+  *   The function used to deserialize the response from the cache. By default uses JSON deserialization using
+  *   jsoniter-scala.
+  */
+case class CachingConfig(
+    eligibleForCaching: RequestMetadata => Boolean = CachingConfig.EligibleWhenMethodIsGetOrHead,
+    cacheDuration: (RequestMetadata, ResponseMetadata) => Option[FiniteDuration] =
+      CachingConfig.CacheDurationFromCacheDirectives,
+    cacheKey: RequestMetadata => Array[Byte] = CachingConfig.DefaultCacheKey,
+    serializeResponse: CachedResponse => Array[Byte] = CachingConfig.SerializeResponseToJson,
+    deserializeResponse: Array[Byte] => Try[CachedResponse] = CachingConfig.DeserializeResponseFromJson
+)
+
+object CachingConfig {
+  val EligibleWhenMethodIsGetOrHead: RequestMetadata => Boolean = { request =>
+    request.method == Method.GET || request.method == Method.HEAD
+  }
+
+  val CacheDurationFromCacheDirectives: (RequestMetadata, ResponseMetadata) => Option[FiniteDuration] = {
+    (_, response) =>
+      val directives: List[CacheDirective] =
+        response.header(HeaderNames.CacheControl).map(CacheDirective.parse).getOrElse(Nil).flatMap(_.toOption)
+
+      directives.collectFirst { case CacheDirective.MaxAge(d) =>
+        d
+      }
+  }
+
+  val DefaultCacheKey: RequestMetadata => Array[Byte] = { request =>
+    val base = s"${request.method} ${request.uri}"
+    // the list of headers to include in the cache key, basing on the Vary header
+    val varyHeaders: List[String] = request
+      .header(HeaderNames.Vary)
+      .map(_.split(",").toList.map(_.trim))
+      .getOrElse(Nil)
+
+    varyHeaders
+      .foldLeft(base)((key, headerName) => key + s" ${headerName}=${request.header(headerName)}")
+      .getBytes()
+  }
+
+  val SerializeResponseToJson: CachedResponse => Array[Byte] = response => writeToArray(response)
+  val DeserializeResponseFromJson: Array[Byte] => Try[CachedResponse] = bytes =>
+    Try(readFromArray[CachedResponse](bytes))
+
+  /** Default caching config. Caching happens when:
+    *   - the request is a GET or HEAD request
+    *   - the response contains a Cache-Control header with a max-age directive; the response is cached for the duration
+    *     specified in this directive
+    *
+    * Responses are stored in the cache, serialized to JSON using jsoniter-scala.
+    */
+  val Default: CachingConfig = CachingConfig()
+}
diff --git a/caching/src/test/scala/sttp/client4/caching/CachingBackendTest.scala b/caching/src/test/scala/sttp/client4/caching/CachingBackendTest.scala
new file mode 100644
index 000000000..0874362b5
--- /dev/null
+++ b/caching/src/test/scala/sttp/client4/caching/CachingBackendTest.scala
@@ -0,0 +1,168 @@
+package sttp.client4.caching
+
+import com.github.plokhotnyuk.jsoniter_scala.core._
+import com.github.plokhotnyuk.jsoniter_scala.macros._
+import org.scalatest.flatspec.AnyFlatSpec
+import org.scalatest.matchers.should.Matchers
+import sttp.client4._
+import sttp.client4.jsoniter._
+import sttp.client4.testing.ResponseStub
+import sttp.model.Header
+import sttp.model.HeaderNames
+import sttp.model.headers.CacheDirective
+import sttp.shared.Identity
+
+import scala.concurrent.duration._
+
+class CachingBackendTest extends AnyFlatSpec with Matchers {
+
+  trait StubCache[F[_]] extends Cache[F] {
+    def timePassed(seconds: Int): Unit
+  }
+
+  def newInMemoryCache = new StubCache[Identity] {
+    var storage = new collection.mutable.HashMap[List[Byte], (Array[Byte], Long)]()
+    var now = 0L // how many seconds have passed till the dawn of time
+
+    def timePassed(seconds: Int): Unit = {
+      now += seconds
+      storage = storage.filter { case (_, (_, ttl)) => ttl >= now }
+    }
+
+    override def get(key: Array[Byte]): Option[Array[Byte]] = storage.get(key.toList).map(_._1)
+
+    override def delete(key: Array[Byte]): Unit = {
+      val _ = storage.remove(key.toList)
+    }
+
+    override def set(key: Array[Byte], value: Array[Byte], ttl: FiniteDuration): Unit = {
+      val _ = storage.put(key.toList, (value, now + ttl.toSeconds))
+    }
+
+    override def close(): Unit = ()
+
+  }
+
+  it should "cache responses" in {
+    // given
+    val cache = newInMemoryCache
+    var invocationCounter = 0 // how many times the request was "sent" by the delegate backend
+    val delegate = DefaultSyncBackend.stub
+      .whenRequestMatches(_.uri.toString == "http://example1.org")
+      .thenRespond {
+        invocationCounter += 1
+        ResponseStub
+          .ok("response body 1")
+          .copy(headers = List(Header(HeaderNames.CacheControl, CacheDirective.MaxAge(5.seconds).toString)))
+      }
+      .whenRequestMatches(_.uri.toString == "http://example2.org")
+      .thenRespond {
+        invocationCounter += 1
+        ResponseStub.ok("response body 2") // no cache-control header
+      }
+    val cachingBackend = CachingBackend(delegate, cache)
+
+    val request1 = basicRequest.get(uri"http://example1.org").response(asString)
+    val request2 = basicRequest.get(uri"http://example2.org").response(asString)
+
+    // A: initial request
+    val responseA = cachingBackend.send(request1)
+    invocationCounter shouldBe 1
+
+    // B: request before timeout
+    // cache.timePassed(2) // cache should be valid
+    val responseB = cachingBackend.send(request1)
+    invocationCounter shouldBe 1
+    responseA.body shouldBe responseB.body
+
+    // C: request after timeout
+    cache.timePassed(7) // cache should be emptied
+    val responseC = cachingBackend.send(request1)
+    invocationCounter shouldBe 2
+    responseC.body shouldBe responseA.body
+
+    // D: request to another endpoint
+    cachingBackend.send(request2)
+    invocationCounter shouldBe 3
+
+    // E: another request to another endpoint, which shouldn't be cached
+    cachingBackend.send(request2)
+    invocationCounter shouldBe 4
+  }
+
+  case class Data(v1: Int, v2: String, v3: Boolean)
+  implicit val dataCOdec: JsonValueCodec[Data] = JsonCodecMaker.make
+
+  it should "deserialize cached responses" in {
+    // given
+    val cache = newInMemoryCache
+    var invocationCounter = 0 // how many times the request was "sent" by the delegate backend
+    val delegate = DefaultSyncBackend.stub
+      .whenRequestMatches(_.uri.toString == "http://example1.org")
+      .thenRespond {
+        invocationCounter += 1
+        ResponseStub
+          .ok("""{"v1": 42, "v2": "foo", "v3": true}""")
+          .copy(headers = List(Header(HeaderNames.CacheControl, CacheDirective.MaxAge(5.seconds).toString)))
+      }
+    val cachingBackend = CachingBackend(delegate, cache)
+
+    val request = basicRequest.get(uri"http://example1.org").response(asJson[Data])
+
+    // A: initial request
+    val responseA = cachingBackend.send(request)
+    invocationCounter shouldBe 1
+    responseA.body shouldBe Right(Data(42, "foo", true))
+
+    // B: repeated request (from cache)
+    val responseB = cachingBackend.send(request)
+    invocationCounter shouldBe 1
+    responseB.body shouldBe Right(Data(42, "foo", true))
+  }
+
+  it should "include the vary header values in the cache key" in {
+    // given
+    val cache = newInMemoryCache
+    var invocationCounter = 0 // how many times the request was "sent" by the delegate backend
+    val delegate = DefaultSyncBackend.stub
+      .whenRequestMatches(_.uri.toString == "http://example1.org")
+      .thenRespondF { request =>
+        invocationCounter += 1
+        ResponseStub
+          .ok(s"response body: ${request.header("X-Test").getOrElse("no-x-test")}")
+          .copy(headers = List(Header(HeaderNames.CacheControl, CacheDirective.MaxAge(5.seconds).toString)))
+      }
+    val cachingBackend = CachingBackend(delegate, cache)
+
+    val request1 =
+      basicRequest.get(uri"http://example1.org").header(HeaderNames.Vary, "X-Test").header("X-Test", "a-value")
+    val request2 =
+      basicRequest.get(uri"http://example1.org").header(HeaderNames.Vary, "X-Test").header("X-Test", "b-value")
+
+    // A: request with vary headers, first variant
+    val responseA = cachingBackend.send(request1)
+    invocationCounter shouldBe 1
+    responseA.body shouldBe Right("response body: a-value")
+
+    // B: request with vary headers, second variant
+    val responseB = cachingBackend.send(request2)
+    invocationCounter shouldBe 2 // different vary header values
+    responseB.body shouldBe Right("response body: b-value")
+
+    // C: repeated first variant
+    val responseC = cachingBackend.send(request1)
+    invocationCounter shouldBe 2
+    responseC.body shouldBe Right("response body: a-value")
+
+    // D: repeated second variant
+    val responseD = cachingBackend.send(request2)
+    invocationCounter shouldBe 2
+    responseD.body shouldBe Right("response body: b-value")
+
+    // E: first variant, after some time
+    cache.timePassed(10)
+    val responseE = cachingBackend.send(request1)
+    invocationCounter shouldBe 3
+    responseE.body shouldBe Right("response body: a-value")
+  }
+}
diff --git a/docs/backends/wrappers/cache.md b/docs/backends/wrappers/cache.md
new file mode 100644
index 000000000..f362a966f
--- /dev/null
+++ b/docs/backends/wrappers/cache.md
@@ -0,0 +1,38 @@
+# Caching backend
+
+To use the caching backend, add the following dependency:
+
+```
+"com.softwaremill.sttp.client4" %% "caching-backend" % "@VERSION@"
+```
+
+The backend caches responses to eligible requests, and returns them from the cache if a repeated request is made. A prerequisite for a request to be considered for caching is that its response-as description is "cache-friendly"; this excludes non-blocking streaming responses, file-based responses and WebSockets.
+
+An implementation of a `Cache` trait is required when creating the backend. The `Cache` allows storing cached values (with a TTL), and retrieving them.
+
+The cache is highly configurable, including:
+* determining if a request is eligible for caching (before it is sent)
+* computing the cache key
+* computing the caching duration (basing on the response)
+* serialization and deserialization of the response
+
+To use, wrap your backend (the below uses default configuration):
+
+```scala
+import sttp.client4.caching.CachingBackend
+
+CachingBackend(delegateBackend, myCacheImplementation)
+```
+
+## Default configuration
+
+Using `CachingConfig.Default`, caching happens if:
+
+* the request is a `GET` or `HEAD` request 
+* the response contains a `Cache-Control` header with a `max-age` directive (standard HTTP semantics); the response is cached for the duration specified in this directive
+
+The cache key is created using the request method, URI, and the values of headers specified in the `Vary` header.
+
+For requests which might be cached, the response's body is read into a byte array. If the response is determined to be cached, it is serialized to JSON (using jsoniter-scala) and stored in the cache.
+
+See [examples](../../examples.md) for an example usage of the caching backend, using Redis.
diff --git a/docs/index.md b/docs/index.md
index aa0bd0e5f..6a291a079 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -164,6 +164,7 @@ Third party projects:
    backends/wrappers/opentelemetry
    backends/wrappers/prometheus
    backends/wrappers/logging
+   backends/wrappers/cache
    backends/wrappers/custom
 
 .. toctree::
diff --git a/examples/src/main/scala/sttp/client4/examples/wrapper/redisCachingBackend.scala b/examples/src/main/scala/sttp/client4/examples/wrapper/redisCachingBackend.scala
new file mode 100644
index 000000000..76c4807fb
--- /dev/null
+++ b/examples/src/main/scala/sttp/client4/examples/wrapper/redisCachingBackend.scala
@@ -0,0 +1,52 @@
+// {cat=Backend wrapper; effects=Synchronous; backend=HttpClient}: Use the caching backend wrapper with Redis
+
+package sttp.client4.examples.wrapper
+
+//> using dep com.softwaremill.sttp.client4::core:4.0.0-M22
+// > using dep com.softwaremill.sttp.client4::caching:4.0.0-M22 // TODO: enable once the module is released
+//> using dep redis.clients:jedis:5.2.0
+//> using dep ch.qos.logback:logback-classic:1.5.15
+
+@main def redisCachingBackend(): Unit = ()
+
+// import redis.clients.jedis.UnifiedJedis
+// import sttp.client4.*
+// import sttp.client4.caching.Cache
+// import sttp.client4.caching.CachingBackend
+// import sttp.shared.Identity
+
+// import scala.concurrent.duration.FiniteDuration
+
+// // you'll need to start redis to run this demo, e.g. using Docker:
+// // docker run -d --name redis-stack -p 6379:6379 -p 8001:8001 redis/redis-stack:latest
+
+// class RedisCache(jedis: UnifiedJedis) extends Cache[Identity]:
+//   override def get(key: Array[Byte]): Option[Array[Byte]] = Option(jedis.get(key))
+
+//   override def delete(key: Array[Byte]): Unit =
+//     val _ = jedis.del(key)
+
+//   override def set(key: Array[Byte], value: Array[Byte], ttl: FiniteDuration): Unit =
+//     val _ = jedis.setex(key, ttl.toSeconds.toInt, value)
+
+//   override def close(): Unit = jedis.close()
+
+// @main def redisCachingBackend(): Unit =
+//   val backend: WebSocketSyncBackend =
+//     CachingBackend(DefaultSyncBackend(), new RedisCache(new UnifiedJedis("redis://localhost:6379")))
+
+//   // returns a response with a max-age of 3 seconds
+//   val request = basicRequest.get(uri"https://httpbin.org/cache/3")
+
+//   val response1 = request.send(backend) // the logs should contain information that the response is stored in the cache
+//   println(s"Response 1: (${response1.code})")
+//   println(response1.body)
+
+//   val response2 = request.send(backend) // an immediate subsequent request should be read from the cache
+//   println(s"Response 2: (${response2.code})")
+//   println(response2.body)
+
+//   Thread.sleep(5000)
+//   val response3 = request.send(backend) // after 5 seconds, the cache should be invalidated
+//   println(s"Response 3: (${response3.code})")
+//   println(response3.body)