From 27b56afccf391b6c10d4a58aa400d9d7916128e6 Mon Sep 17 00:00:00 2001 From: Tyson Norris Date: Fri, 28 Aug 2020 11:44:25 -0700 Subject: [PATCH 1/2] improve http graceful termination --- .../scala/src/main/resources/application.conf | 11 +++- .../openwhisk/http/BasicHttpService.scala | 59 +++++++++++++++---- .../openwhisk/http/BasicRasService.scala | 17 +++++- .../core/controller/Controller.scala | 5 +- .../openwhisk/core/invoker/Invoker.scala | 3 +- .../standalone/PlaygroundLauncher.scala | 8 ++- 6 files changed, 81 insertions(+), 22 deletions(-) diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 22a9b575843..ba712b9de3c 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -97,8 +97,17 @@ kamon { } } } - +akka { + # required to allow longer shutdown-termination-limit below + coordinated-shutdown.default-phase-timeout = 61 s +} whisk { + http { + shutdown-unready-delay = 15 seconds # /ready will return 500 for this amount of time before connection draining starts + shutdown-termination-limit = 61 seconds # hard limit on how long (after unready delay) requests have to complete + } + + shared-packages-execute-only = false metrics { # Enable/disable Prometheus support. If enabled then metrics would be exposed at `/metrics` endpoint diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala index 27f1be9588a..74d2dc3c833 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala @@ -17,7 +17,8 @@ package org.apache.openwhisk.http -import akka.actor.ActorSystem +import akka.Done +import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.event.Logging import akka.http.scaladsl.{Http, HttpConnectionContext} import akka.http.scaladsl.model.{HttpRequest, _} @@ -29,17 +30,23 @@ import kamon.metric.MeasurementUnit import spray.json._ import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common._ - +import akka.pattern.after +import org.apache.openwhisk.common.TransactionId.systemPrefix +import pureconfig._ +import pureconfig.generic.auto._ import scala.collection.immutable.Seq -import scala.concurrent.duration.DurationInt -import scala.concurrent.{Await, Future} +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.Future +import scala.util.{Failure, Success} + +case class BasicHttpServiceConfig(shutdownUnreadyDelay: FiniteDuration, shutdownTerminationLimit: FiniteDuration) /** * This trait extends the Akka Directives and Actor with logging and transaction counting * facilities common to all OpenWhisk REST services. */ trait BasicHttpService extends Directives { - + implicit val logging: Logging val OW_EXTRA_LOGGING_HEADER = "X-OW-EXTRA-LOGGING" /** @@ -156,24 +163,50 @@ trait BasicHttpService extends Directives { } object BasicHttpService { + implicit val tid = TransactionId(systemPrefix + "http_service") + //start with ready true + protected[http] var ready = true /** * Starts an HTTP(S) route handler on given port and registers a shutdown hook. */ - def startHttpService(route: Route, port: Int, config: Option[HttpsConfig] = None, interface: String = "0.0.0.0")( + def startHttpService(route: Route, port: Int, httpsConfig: Option[HttpsConfig] = None, interface: String = "0.0.0.0")( implicit actorSystem: ActorSystem, - materializer: ActorMaterializer): Unit = { - val connectionContext = config.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) + materializer: ActorMaterializer, + logging: Logging): Unit = { + val connectionContext = httpsConfig.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) val httpBinding = Http().bindAndHandle(route, interface, port, connectionContext = connectionContext) + logging.info(this, "starting http service...") addShutdownHook(httpBinding) } - def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem, - materializer: ActorMaterializer): Unit = { + def addShutdownHook(binding: Future[Http.ServerBinding], + httpServiceConfig: BasicHttpServiceConfig = + loadConfigOrThrow[BasicHttpServiceConfig]("whisk.http"))(implicit actorSystem: ActorSystem, + materializer: ActorMaterializer, + logging: Logging): Unit = { implicit val executionContext = actorSystem.dispatcher - sys.addShutdownHook { - Await.result(binding.map(_.unbind()), 30.seconds) - Await.result(actorSystem.whenTerminated, 30.seconds) + + CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "http_unready") { () => + logging.info(this, "shutdown unready...") + //return 503 status at /ready endpoint for some time before actual termination begins + ready = false + after(httpServiceConfig.shutdownUnreadyDelay, actorSystem.scheduler) { + logging.info(this, "shutdown unready complete...") + Future.successful(Done) + } + } + CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseServiceUnbind, "http_termination") { () => + logging.info(this, "shutdown terminating...") + binding + .flatMap(_.terminate(hardDeadline = httpServiceConfig.shutdownTerminationLimit)) + .andThen { + case Success(_) => logging.info(this, "shutdown termination complete...") + case Failure(t) => logging.info(this, s"shutdown termination failed... ${t}") + } + .map { _ => + Done + } } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala index cea06f431ad..d7a9e7a8cda 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala @@ -18,15 +18,16 @@ package org.apache.openwhisk.http import akka.event.Logging -import org.apache.openwhisk.common.{MetricsRoute, TransactionId} +import akka.http.scaladsl.model.StatusCodes +import org.apache.openwhisk.common.{Logging, MetricsRoute, TransactionId} /** * This trait extends the BasicHttpService with a standard "ping" endpoint which * responds to health queries, intended for monitoring. */ -trait BasicRasService extends BasicHttpService { +class BasicRasService(implicit val logging: Logging) extends BasicHttpService { - override def routes(implicit transid: TransactionId) = ping ~ MetricsRoute() + override def routes(implicit transid: TransactionId) = ping ~ ready ~ MetricsRoute() override def loglevelForRoute(route: String): Logging.LogLevel = { if (route == "/ping" || route == "/metrics") { @@ -39,4 +40,14 @@ trait BasicRasService extends BasicHttpService { val ping = path("ping") { get { complete("pong") } } + val ready = path("ready") { + get { + if (BasicHttpService.ready) { + complete("ok") + } else { + logging.warn(this, "not ready...") + complete(StatusCodes.ServiceUnavailable, "notready") + } + } + } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index 935219685ed..b796b15667b 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -77,7 +77,7 @@ class Controller(val instance: ControllerInstanceId, implicit val whiskConfig: WhiskConfig, implicit val actorSystem: ActorSystem, implicit val materializer: ActorMaterializer, - implicit val logging: Logging) + override implicit val logging: Logging) extends BasicRasService { TransactionId.controller.mark( @@ -288,7 +288,8 @@ object Controller { BasicHttpService.startHttpService(controller.route, port, httpsConfig, interface)( actorSystem, - controller.materializer) + controller.materializer, + logger) case Failure(t) => abort(s"Invalid runtimes manifest: $t") diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 9901f951a37..a3951e32bf4 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -194,7 +194,8 @@ object Invoker { val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker) BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)( actorSystem, - ActorMaterializer.create(actorSystem)) + ActorMaterializer.create(actorSystem), + logger) } } diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala index bac8cc91557..0cdc5b39295 100644 --- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala +++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala @@ -28,7 +28,7 @@ import akka.stream.scaladsl.{Sink, Source} import ch.megard.akka.http.cors.scaladsl.CorsDirectives._ import org.apache.commons.io.IOUtils import org.apache.commons.lang3.SystemUtils -import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.common.{AkkaLogging, Logging, TransactionId} import org.apache.openwhisk.core.ExecManifestSupport import org.apache.openwhisk.http.BasicHttpService import pureconfig._ @@ -75,7 +75,10 @@ class PlaygroundLauncher(host: String, private val wsk = new Wsk(host, controllerPort, authKey) def run(): ServiceContainer = { - BasicHttpService.startHttpService(PlaygroundService.route, pgPort, None, interface)(actorSystem, materializer) + BasicHttpService.startHttpService(PlaygroundService.route, pgPort, None, interface)( + actorSystem, + materializer, + logging) ServiceContainer(pgPort, pgUrl, "Playground") } @@ -119,6 +122,7 @@ class PlaygroundLauncher(host: String, } object PlaygroundService extends BasicHttpService { + override implicit val logging = new AkkaLogging(actorSystem.log) override def routes(implicit transid: TransactionId): Route = path(PathEnd | Slash | pg) { redirect(s"/$pg/ui/index.html", StatusCodes.Found) } ~ cors() { From 8143f51e7f79d3b67e4134fc6c70f8ad65224df0 Mon Sep 17 00:00:00 2001 From: Tyson Norris Date: Wed, 2 Sep 2020 13:27:39 -0700 Subject: [PATCH 2/2] move readyState to class var instead of an object var --- .../openwhisk/http/BasicHttpService.scala | 25 +++++++++++-------- .../openwhisk/http/BasicRasService.scala | 2 +- .../core/controller/Controller.scala | 2 +- .../core/database/cosmosdb/cache/Main.scala | 2 +- .../openwhisk/core/invoker/Invoker.scala | 2 +- .../standalone/PlaygroundLauncher.scala | 5 +--- 6 files changed, 20 insertions(+), 18 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala index 74d2dc3c833..feeeb828526 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala @@ -47,6 +47,10 @@ case class BasicHttpServiceConfig(shutdownUnreadyDelay: FiniteDuration, shutdown */ trait BasicHttpService extends Directives { implicit val logging: Logging + + //start with ready true + protected var readyState = true + val OW_EXTRA_LOGGING_HEADER = "X-OW-EXTRA-LOGGING" /** @@ -164,23 +168,24 @@ trait BasicHttpService extends Directives { object BasicHttpService { implicit val tid = TransactionId(systemPrefix + "http_service") - //start with ready true - protected[http] var ready = true /** * Starts an HTTP(S) route handler on given port and registers a shutdown hook. */ - def startHttpService(route: Route, port: Int, httpsConfig: Option[HttpsConfig] = None, interface: String = "0.0.0.0")( - implicit actorSystem: ActorSystem, - materializer: ActorMaterializer, - logging: Logging): Unit = { + def startHttpService(service: BasicHttpService, + port: Int, + httpsConfig: Option[HttpsConfig] = None, + interface: String = "0.0.0.0")(implicit actorSystem: ActorSystem, + materializer: ActorMaterializer, + logging: Logging): Unit = { val connectionContext = httpsConfig.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) - val httpBinding = Http().bindAndHandle(route, interface, port, connectionContext = connectionContext) + val httpBinding = Http().bindAndHandle(service.route, interface, port, connectionContext = connectionContext) logging.info(this, "starting http service...") - addShutdownHook(httpBinding) + addShutdownHook(service, httpBinding) } - def addShutdownHook(binding: Future[Http.ServerBinding], + def addShutdownHook(service: BasicHttpService, + binding: Future[Http.ServerBinding], httpServiceConfig: BasicHttpServiceConfig = loadConfigOrThrow[BasicHttpServiceConfig]("whisk.http"))(implicit actorSystem: ActorSystem, materializer: ActorMaterializer, @@ -190,7 +195,7 @@ object BasicHttpService { CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "http_unready") { () => logging.info(this, "shutdown unready...") //return 503 status at /ready endpoint for some time before actual termination begins - ready = false + service.readyState = false after(httpServiceConfig.shutdownUnreadyDelay, actorSystem.scheduler) { logging.info(this, "shutdown unready complete...") Future.successful(Done) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala index d7a9e7a8cda..5bb6f3c2569 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala @@ -42,7 +42,7 @@ class BasicRasService(implicit val logging: Logging) extends BasicHttpService { } val ready = path("ready") { get { - if (BasicHttpService.ready) { + if (readyState) { complete("ok") } else { logging.warn(this, "not ready...") diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index b796b15667b..cb2b0c17bc4 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -286,7 +286,7 @@ object Controller { val httpsConfig = if (Controller.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None - BasicHttpService.startHttpService(controller.route, port, httpsConfig, interface)( + BasicHttpService.startHttpService(controller, port, httpsConfig, interface)( actorSystem, controller.materializer, logger) diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala index 62d58a1bee8..02d6dc695df 100644 --- a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala +++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala @@ -34,7 +34,7 @@ object Main { ConfigMXBean.register() Kamon.init() val port = CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port - BasicHttpService.startHttpService(new BasicRasService {}.route, port, None) + BasicHttpService.startHttpService(new BasicRasService {}, port, None) val (start, finish) = new CacheInvalidator(system.settings.config).start() start .map(_ => log.info(this, s"Started the server at http://localhost:$port")) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index a3951e32bf4..87f792a1c27 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -192,7 +192,7 @@ object Invoker { if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker) - BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)( + BasicHttpService.startHttpService(invokerServer, port, httpsConfig)( actorSystem, ActorMaterializer.create(actorSystem), logger) diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala index 0cdc5b39295..e718b5d650a 100644 --- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala +++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala @@ -75,10 +75,7 @@ class PlaygroundLauncher(host: String, private val wsk = new Wsk(host, controllerPort, authKey) def run(): ServiceContainer = { - BasicHttpService.startHttpService(PlaygroundService.route, pgPort, None, interface)( - actorSystem, - materializer, - logging) + BasicHttpService.startHttpService(PlaygroundService, pgPort, None, interface)(actorSystem, materializer, logging) ServiceContainer(pgPort, pgUrl, "Playground") }