Skip to content

Commit 52d0d0a

Browse files
committed
Merge branch 'release-0.18.x'
2 parents f4f39f2 + 31be5f9 commit 52d0d0a

File tree

9 files changed

+102
-62
lines changed

9 files changed

+102
-62
lines changed

async-http-client/src/main/scala/org/http4s/client/asynchttpclient/AsyncHttpClient.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ object AsyncHttpClient {
3535
* Create an HTTP client based on the AsyncHttpClient library
3636
*
3737
* @param config configuration for the client
38-
* @param bufferSize body chunks to buffer when reading the body; defaults to 8
3938
* @param ec The ExecutionContext to run responses on
4039
*/
4140
def apply[F[_]](config: AsyncHttpClientConfig = defaultConfig)(
@@ -53,6 +52,19 @@ object AsyncHttpClient {
5352
)
5453
}
5554

55+
/**
56+
* Create a bracketed HTTP client based on the AsyncHttpClient library.
57+
*
58+
* @param config configuration for the client
59+
* @param ec The ExecutionContext to run responses on
60+
* @return a singleton stream of the client. The client will be
61+
* shutdown when the stream terminates.
62+
*/
63+
def stream[F[_]](config: AsyncHttpClientConfig = defaultConfig)(
64+
implicit F: Effect[F],
65+
ec: ExecutionContext): Stream[F, Client[F]] =
66+
Stream.bracket(F.delay(apply(config)))(c => Stream.emit(c), _.shutdown)
67+
5668
private def asyncHandler[F[_]](
5769
cb: Callback[DisposableResponse[F]])(implicit F: Effect[F], ec: ExecutionContext) =
5870
new StreamedAsyncHandler[Unit] {

blaze-core/src/main/scala/org/http4s/blazecore/util/Http1Writer.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,20 @@ import java.nio.ByteBuffer
77
import java.nio.charset.StandardCharsets
88
import org.http4s.syntax.async._
99
import org.http4s.util.StringWriter
10+
import org.log4s.getLogger
1011
import scala.concurrent._
1112

1213
private[http4s] trait Http1Writer[F[_]] extends EntityBodyWriter[F] {
1314
final def write(headerWriter: StringWriter, body: EntityBody[F]): F[Boolean] =
1415
F.fromFuture(writeHeaders(headerWriter)).attempt.flatMap {
15-
case Left(_) => body.drain.compile.drain.map(_ => true)
16-
case Right(_) => writeEntityBody(body)
16+
case Right(()) =>
17+
writeEntityBody(body)
18+
case Left(t) =>
19+
body.drain.compile.drain.handleError { t2 =>
20+
// Don't lose this error when sending the other
21+
// TODO implement with cats.effect.Bracket when we have it
22+
Http1Writer.logger.error(t2)("Error draining body")
23+
} *> F.raiseError(t)
1724
}
1825

1926
/* Writes the header. It is up to the writer whether to flush immediately or to
@@ -22,6 +29,8 @@ private[http4s] trait Http1Writer[F[_]] extends EntityBodyWriter[F] {
2229
}
2330

2431
private[util] object Http1Writer {
32+
private val logger = getLogger
33+
2534
def headersToByteBuffer(headers: String): ByteBuffer =
2635
ByteBuffer.wrap(headers.getBytes(StandardCharsets.ISO_8859_1))
2736
}

blaze-server/src/main/scala/org/http4s/server/blaze/Http1ServerStage.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package org.http4s
22
package server
33
package blaze
44

5-
import cats.effect.{Effect, IO}
5+
import cats.data.OptionT
6+
import cats.effect.{Effect, IO, Sync}
67
import cats.implicits._
78
import fs2._
89
import java.nio.ByteBuffer
@@ -60,6 +61,7 @@ private[blaze] class Http1ServerStage[F[_]](
6061

6162
// micro-optimization: unwrap the service and call its .run directly
6263
private[this] val serviceFn = service.run
64+
private[this] val optionTSync = Sync[OptionT[F, ?]]
6365

6466
// both `parser` and `isClosed` are protected by synchronization on `parser`
6567
private[this] val parser = new Http1ServerParser[F](logger, maxRequestLineLen, maxHeadersLen)
@@ -138,19 +140,21 @@ private[blaze] class Http1ServerStage[F[_]](
138140
parser.collectMessage(body, requestAttrs) match {
139141
case Right(req) =>
140142
executionContext.execute(new Runnable {
141-
def run(): Unit =
142-
F.runAsync {
143-
try serviceFn(req)
144-
.getOrElse(Response.notFound)
145-
.handleErrorWith(serviceErrorHandler(req))
146-
catch serviceErrorHandler(req)
147-
} {
148-
case Right(resp) =>
149-
IO(renderResponse(req, resp, cleanup))
143+
def run(): Unit = {
144+
val action = optionTSync
145+
.suspend(serviceFn(req))
146+
.getOrElse(Response.notFound)
147+
.recoverWith(serviceErrorHandler(req))
148+
.flatMap(resp => F.delay(renderResponse(req, resp, cleanup)))
149+
150+
F.runAsync(action) {
151+
case Right(()) => IO.unit
150152
case Left(t) =>
151-
IO(internalServerError(s"Error running route: $req", t, req, cleanup))
153+
IO(logger.error(t)(s"Error running request: $req")).attempt *> IO(
154+
closeConnection())
152155
}
153156
.unsafeRunSync()
157+
}
154158
})
155159
case Left((e, protocol)) =>
156160
badMessage(e.details, new BadMessage(e.sanitized), Request[F]().withHttpVersion(protocol))

blaze-server/src/main/scala/org/http4s/server/blaze/Http2NodeStage.scala

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ package org.http4s
22
package server
33
package blaze
44

5-
import cats.effect.{Effect, IO}
5+
import cats.data.OptionT
6+
import cats.effect.{Effect, IO, Sync}
67
import cats.implicits._
78
import fs2._
89
import fs2.Stream._
910
import java.util.Locale
1011
import org.http4s.{Headers => HHeaders, Method => HMethod}
1112
import org.http4s.Header.Raw
12-
import org.http4s.Status._
1313
import org.http4s.blaze.http.{HeaderNames, Headers}
1414
import org.http4s.blaze.http.http2._
1515
import org.http4s.blaze.pipeline.{TailStage, Command => Cmd}
@@ -29,6 +29,10 @@ private class Http2NodeStage[F[_]](
2929
serviceErrorHandler: ServiceErrorHandler[F])(implicit F: Effect[F])
3030
extends TailStage[StreamFrame] {
3131

32+
// micro-optimization: unwrap the service and call its .run directly
33+
private[this] val serviceFn = service.run
34+
private[this] val optionTSync = Sync[OptionT[F, ?]]
35+
3236
override def name = "Http2NodeStage"
3337

3438
override protected def stageStartup(): Unit = {
@@ -183,21 +187,20 @@ private class Http2NodeStage[F[_]](
183187
val hs = HHeaders(headers.result())
184188
val req = Request(method, path, HttpVersion.`HTTP/2.0`, hs, body, attributes)
185189
executionContext.execute(new Runnable {
186-
def run(): Unit =
187-
F.runAsync {
188-
try service(req)
189-
.getOrElse(Response.notFound)
190-
.recoverWith(serviceErrorHandler(req))
191-
.handleError(_ => Response(InternalServerError, req.httpVersion))
192-
.flatMap(renderResponse)
193-
catch serviceErrorHandler(req)
194-
} {
195-
case Right(_) =>
196-
IO.unit
197-
case Left(t) =>
198-
IO(logger.error(t)("Error rendering response"))
199-
}
200-
.unsafeRunSync()
190+
def run(): Unit = {
191+
val action = optionTSync
192+
.suspend(serviceFn(req))
193+
.getOrElse(Response.notFound)
194+
.recoverWith(serviceErrorHandler(req))
195+
.flatMap(renderResponse)
196+
197+
F.runAsync(action) {
198+
case Right(()) => IO.unit
199+
case Left(t) =>
200+
IO(logger.error(t)(s"Error running request: $req")).attempt *> IO(
201+
shutdownWithCommand(Cmd.Disconnect))
202+
}
203+
}.unsafeRunSync()
201204
})
202205
}
203206
}

project/Http4sPlugin.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ object Http4sPlugin extends AutoPlugin {
283283
lazy val fs2ReactiveStreams = "com.github.zainab-ali" %% "fs2-reactive-streams" % "0.5.1"
284284
lazy val gatlingTest = "io.gatling" % "gatling-test-framework" % "2.3.1"
285285
lazy val gatlingHighCharts = "io.gatling.highcharts" % "gatling-charts-highcharts" % gatlingTest.revision
286-
lazy val http4sWebsocket = "org.http4s" %% "http4s-websocket" % "0.2.0"
286+
lazy val http4sWebsocket = "org.http4s" %% "http4s-websocket" % "0.2.1"
287287
lazy val javaxServletApi = "javax.servlet" % "javax.servlet-api" % "3.1.0"
288288
lazy val jawnJson4s = "org.spire-math" %% "jawn-json4s" % "0.11.1"
289289
lazy val jawnFs2 = "org.http4s" %% "jawn-fs2" % "0.12.2"
@@ -298,18 +298,18 @@ object Http4sPlugin extends AutoPlugin {
298298
lazy val macroCompat = "org.typelevel" %% "macro-compat" % "1.1.1"
299299
lazy val metricsCore = "io.dropwizard.metrics" % "metrics-core" % "4.0.2"
300300
lazy val metricsJson = "io.dropwizard.metrics" % "metrics-json" % metricsCore.revision
301-
lazy val prometheusClient = "io.prometheus" % "simpleclient_common" % "0.2.0"
301+
lazy val prometheusClient = "io.prometheus" % "simpleclient_common" % "0.3.0"
302302
lazy val prometheusHotspot = "io.prometheus" % "simpleclient_hotspot" % prometheusClient.revision
303303
lazy val parboiled = "org.http4s" %% "parboiled" % "1.0.0"
304304
lazy val quasiquotes = "org.scalamacros" %% "quasiquotes" % "2.1.0"
305305
lazy val scalacheck = "org.scalacheck" %% "scalacheck" % "1.13.5"
306306
def scalaCompiler(so: String, sv: String) = so % "scala-compiler" % sv
307307
def scalaReflect(so: String, sv: String) = so % "scala-reflect" % sv
308308
lazy val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.1.0"
309-
lazy val specs2Core = "org.specs2" %% "specs2-core" % "4.0.3"
309+
lazy val specs2Core = "org.specs2" %% "specs2-core" % "4.0.4"
310310
lazy val specs2MatcherExtra = "org.specs2" %% "specs2-matcher-extra" % specs2Core.revision
311311
lazy val specs2Scalacheck = "org.specs2" %% "specs2-scalacheck" % specs2Core.revision
312-
lazy val tomcatCatalina = "org.apache.tomcat" % "tomcat-catalina" % "9.0.6"
312+
lazy val tomcatCatalina = "org.apache.tomcat" % "tomcat-catalina" % "9.0.7"
313313
lazy val tomcatCoyote = "org.apache.tomcat" % "tomcat-coyote" % tomcatCatalina.revision
314314
lazy val twirlApi = "com.typesafe.play" %% "twirl-api" % "1.3.15"
315315
}

prometheus-server-metrics/src/main/scala/org/http4s/server/prometheus/PrometheusExportService.scala

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,16 @@ import io.prometheus.client.hotspot._
1010
import org.http4s._
1111
import org.http4s.dsl.Http4sDsl
1212

13-
class PrometheusExportService[F[_]: Sync] private (
14-
s: HttpService[F],
15-
cr: CollectorRegistry
16-
) {
17-
def withCollectorRegistry(cr: CollectorRegistry): PrometheusExportService[F] =
18-
new PrometheusExportService[F](
19-
PrometheusExportService.service[F](cr),
20-
cr
21-
)
22-
23-
def service: HttpService[F] = s
24-
25-
def collectorRegistry: CollectorRegistry = cr
26-
}
13+
/*
14+
* PromethusExportService Contains an HttpService
15+
* ready to be scraped by Prometheus, paired
16+
* with the CollectorRegistry that it is creating
17+
* metrics for, allowing custom metric registration.
18+
*/
19+
final class PrometheusExportService[F[_]: Sync] private (
20+
val service: HttpService[F],
21+
val collectorRegistry: CollectorRegistry
22+
)
2723

2824
object PrometheusExportService {
2925

server/src/main/scala/org/http4s/server/package.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import cats.implicits._
77
import org.http4s.headers.{Connection, `Content-Length`}
88
import org.http4s.syntax.string._
99
import org.log4s.getLogger
10-
import scala.util.control.NonFatal
1110

1211
package object server {
1312

@@ -102,7 +101,7 @@ package object server {
102101
s"""Message failure handling request: ${req.method} ${req.pathInfo} from ${req.remoteAddr
103102
.getOrElse("<unknown>")}""")
104103
mf.toHttpResponse(req.httpVersion)
105-
case NonFatal(t) =>
104+
case t if !t.isInstanceOf[VirtualMachineError] =>
106105
serviceErrorLogger.error(t)(
107106
s"""Error servicing request: ${req.method} ${req.pathInfo} from ${req.remoteAddr.getOrElse(
108107
"<unknown>")}""")

servlet/src/main/scala/org/http4s/servlet/Http4sServlet.scala

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.http4s
22
package servlet
33

4+
import cats.data.OptionT
45
import cats.effect._
56
import cats.implicits.{catsSyntaxEither => _, _}
67
import fs2.async
@@ -29,6 +30,7 @@ class Http4sServlet[F[_]](
2930

3031
// micro-optimization: unwrap the service and call its .run directly
3132
private[this] val serviceFn = service.run
33+
private[this] val optionTSync = Sync[OptionT[F, ?]]
3234

3335
object ServletRequestKeys {
3436
val HttpSession: AttributeKey[Option[HttpSession]] = AttributeKey[Option[HttpSession]]
@@ -98,10 +100,11 @@ class Http4sServlet[F[_]](
98100
bodyWriter: BodyWriter[F]): F[Unit] = {
99101
ctx.addListener(new AsyncTimeoutHandler(request, bodyWriter))
100102
// Note: We're catching silly user errors in the lift => flatten.
101-
val response = Async.shift(executionContext) *> F
102-
.delay(serviceFn(request).getOrElse(Response.notFound))
103-
.flatten
104-
.handleErrorWith(serviceErrorHandler(request))
103+
val response = Async.shift(executionContext) *>
104+
optionTSync
105+
.suspend(serviceFn(request))
106+
.getOrElse(Response.notFound)
107+
.recoverWith(serviceErrorHandler(request))
105108

106109
val servletResponse = ctx.getResponse.asInstanceOf[HttpServletResponse]
107110
renderResponse(response, servletResponse, bodyWriter)
@@ -124,9 +127,9 @@ class Http4sServlet[F[_]](
124127
s"Async context timed out, but response was already committed: ${request.method} ${request.uri.path}")
125128
F.pure(())
126129
}
127-
} { _ =>
128-
ctx.complete()
129-
IO.unit
130+
} {
131+
case Right(()) => IO(ctx.complete())
132+
case Left(t) => IO(logger.error(t)("Error timing out async context")) *> IO(ctx.complete())
130133
}
131134
}
132135
}
@@ -138,10 +141,19 @@ class Http4sServlet[F[_]](
138141
response.flatMap { resp =>
139142
// Note: the servlet API gives us no undeprecated method to both set
140143
// a body and a status reason. We sacrifice the status reason.
141-
servletResponse.setStatus(resp.status.code)
142-
for (header <- resp.headers if header.isNot(`Transfer-Encoding`))
143-
servletResponse.addHeader(header.name.toString, header.value)
144-
bodyWriter(resp)
144+
F.delay {
145+
servletResponse.setStatus(resp.status.code)
146+
for (header <- resp.headers if header.isNot(`Transfer-Encoding`))
147+
servletResponse.addHeader(header.name.toString, header.value)
148+
}
149+
.attempt
150+
.flatMap {
151+
case Right(()) => bodyWriter(resp)
152+
case Left(t) =>
153+
resp.body.drain.compile.drain.handleError {
154+
case t2 => logger.error(t2)("Error draining body")
155+
} *> F.raiseError(t)
156+
}
145157
}
146158

147159
private def errorHandler(

website/src/hugo/content/changelog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,11 @@ Maintenance branches are merged before each new release. This change log is
88
ordered chronologically, so each release contains all changes described below
99
it.
1010

11+
# v0.18.8 (2018-04-11)
12+
* Improved ScalaDoc for BlazeBuilder [#1775](https://github.com/http4s/http4s/pull/1775)
13+
* Added a stream constructor for async-http-client [#1776](https://github.com/http4s/http4s/pull/1776)
14+
* http4s-prometheus-server-metrics project created. Prometheus Metrics middleware implemented for metrics on http4s server. Exposes an HttpService ready to be scraped by Prometheus, as well pairing to a CollectorRegistry for custom metric registration. [#1778](https://github.com/http4s/http4s/pull/1778)
15+
1116
# v0.18.7 (2018-04-04)
1217
* Multipart parser defaults to fields interpreted as utf-8. [#1767](https://github.com/http4s/http4s/pull/1767)
1318

0 commit comments

Comments
 (0)