Skip to content

Commit a2c0254

Browse files
committed
Use separate dispatchers for MemoryQueue, QueueManager, and Akka heartbeat
1 parent f59a45e commit a2c0254

File tree

8 files changed

+108
-19
lines changed

8 files changed

+108
-19
lines changed

common/scala/src/main/resources/application.conf

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,26 @@ akka.http {
4141
preview.enable-http2 = on
4242
parsing.illegal-header-warnings = off
4343
}
44+
45+
cluster {
46+
use-dispatcher = "dispatchers.heartbeat-dispatcher"
47+
failure-detector {
48+
# How often keep-alive heartbeat messages should be sent to each connection.
49+
heartbeat-interval = 1s
50+
51+
# Number of potentially lost/delayed heartbeats that will be
52+
# accepted before considering it to be an anomaly.
53+
# This margin is important to be able to survive sudden, occasional,
54+
# pauses in heartbeat arrivals, due to for example garbage collect or
55+
# network drop.
56+
acceptable-heartbeat-pause = 5s
57+
58+
# After the heartbeat request has been sent the first failure detection
59+
# will start after this period, even though no heartbeat message has
60+
# been received.
61+
expected-response-after = 1s
62+
}
63+
}
4464
}
4565

4666
#kamon related configuration
@@ -72,7 +92,7 @@ kamon {
7292
service = "openwhisk-statsd"
7393
}
7494
metric {
75-
tick-interval = 1 second
95+
tick-interval = 10 second
7696
}
7797

7898
statsd {

common/scala/src/main/resources/reference.conf

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,17 @@ dispatchers {
8686
type = PinnedDispatcher
8787
executor = "thread-pool-executor"
8888
}
89+
90+
# This is for akka-cluster heartbeat. Since heartbeat is a periodic light-weight message,
91+
# fork-join executor should be enough
92+
heartbeat-dispatcher {
93+
type = Dispatcher
94+
executor = "fork-join-executor"
95+
fork-join-executor {
96+
parallelism-min = 2
97+
parallelism-factor = 2.0
98+
parallelism-max = 10
99+
}
100+
throughput = 100
101+
}
89102
}

common/scala/src/main/scala/org/apache/openwhisk/common/Scheduler.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,11 @@
1717

1818
package org.apache.openwhisk.common
1919

20+
import akka.actor.{Actor, ActorSystem, Cancellable, Props}
21+
2022
import scala.concurrent.Future
2123
import scala.concurrent.duration._
22-
import scala.util.Failure
23-
import scala.util.Success
24-
import scala.util.Try
25-
26-
import akka.actor.Actor
27-
import akka.actor.ActorSystem
28-
import akka.actor.Cancellable
29-
import akka.actor.Props
24+
import scala.util.{Failure, Success, Try}
3025

3126
/**
3227
* Scheduler utility functions to execute tasks in a repetitive way with controllable behavior
@@ -122,4 +117,25 @@ object Scheduler {
122117
require(interval > Duration.Zero)
123118
system.actorOf(Props(new Worker(initialDelay, interval, true, name, f)))
124119
}
120+
121+
/**
122+
* Schedules a closure to run continuously scheduled, with at least the given interval in between runs using the dispatcher.
123+
* This waits until the Future of the closure has finished, ignores its result and then waits for the
124+
* given interval.
125+
*
126+
* @param interval the time to wait between two runs of the closure
127+
* @param initialDelay optionally delay the first scheduled iteration by given duration
128+
* @param dispatcher the dispatcher to handle this scheduled work
129+
* @param f the function to run
130+
*/
131+
def scheduleWaitAtLeastWith(interval: FiniteDuration,
132+
initialDelay: FiniteDuration = Duration.Zero,
133+
name: String = "Scheduler",
134+
dispatcher: String)(f: () => Future[Any])(
135+
implicit system: ActorSystem,
136+
logging: Logging,
137+
transid: TransactionId = TransactionId.unknown) = {
138+
require(interval > Duration.Zero)
139+
system.actorOf(Props(new Worker(initialDelay, interval, true, name, f)).withDispatcher(dispatcher))
140+
}
125141
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/FPCPoolBalancer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class FPCPoolBalancer(config: WhiskConfig,
6565

6666
private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
6767
// This value is given according to the total waiting time at QueueManager for a new queue to be created.
68-
private implicit val requestTimeout: Timeout = Timeout(8.seconds)
68+
private implicit val requestTimeout: Timeout = Timeout(1.seconds)
6969

7070
private val entityStore = WhiskEntityStore.datastore()
7171

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
dispatchers {
2+
# A custom dispatcher for the queue manager
3+
queue-manager-dispatcher {
4+
type = Dispatcher
5+
executor = "fork-join-executor"
6+
fork-join-executor {
7+
parallelism-min = 1
8+
parallelism-factor = 1
9+
parallelism-max = 15
10+
}
11+
throughput = 5
12+
}
13+
14+
# A custom dispatcher for memory queues.
15+
memory-queue-dispatcher {
16+
type = Dispatcher
17+
executor = "fork-join-executor"
18+
fork-join-executor {
19+
parallelism-min = 10
20+
parallelism-factor = 2
21+
parallelism-max = 60
22+
}
23+
throughput = 5
24+
}
25+
26+
# A custom dispatcher for monitoring actors of memory queues.
27+
monitoring-dispatcher {
28+
type = Dispatcher
29+
executor = "fork-join-executor"
30+
fork-join-executor {
31+
parallelism-min = 10
32+
parallelism-factor = 2
33+
parallelism-max = 60
34+
}
35+
throughput = 5
36+
}
37+
}

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/Scheduler.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
4747
import org.apache.openwhisk.grpc.ActivationServiceHandler
4848
import org.apache.openwhisk.http.BasicHttpService
4949
import org.apache.openwhisk.spi.SpiLoader
50-
import org.apache.openwhisk.utils.ExecutionContextFactory
5150
import pureconfig.generic.auto._
5251
import pureconfig.loadConfigOrThrow
5352
import spray.json.{DefaultJsonProtocol, _}
@@ -287,9 +286,8 @@ object Scheduler {
287286
}
288287

289288
def main(args: Array[String]): Unit = {
290-
implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
291-
implicit val actorSystem: ActorSystem =
292-
ActorSystem(name = "scheduler-actor-system", defaultExecutionContext = Some(ec))
289+
implicit val actorSystem: ActorSystem = ActorSystem("scheduler-actor-system")
290+
implicit val ec = actorSystem.dispatcher
293291

294292
implicit val logger = new AkkaLogging(akka.event.Logging.getLogger(actorSystem, this))
295293

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/MemoryQueue.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ class MemoryQueue(private val etcdClient: EtcdClient,
148148
extends FSM[MemoryQueueState, MemoryQueueData]
149149
with Stash {
150150

151-
private implicit val ec: ExecutionContextExecutor = context.dispatcher
151+
private implicit val ec: ExecutionContextExecutor =
152+
context.system.dispatchers.lookup("dispatchers.memory-queue-dispatcher")
152153
private implicit val actorSystem: ActorSystem = context.system
153154
private implicit val timeout = Timeout(5.seconds)
154155
private implicit val order: Ordering[BufferedRequest] = Ordering.by(_.containerId)
@@ -181,7 +182,7 @@ class MemoryQueue(private val etcdClient: EtcdClient,
181182
private[queue] var limit: Option[Int] = None
182183
private[queue] var initialized = false
183184

184-
private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) { () =>
185+
private val logScheduler: Cancellable = context.system.scheduler.scheduleWithFixedDelay(0.seconds, 10.seconds) { () =>
185186
MetricEmitter.emitGaugeMetric(
186187
LoggingMarkers
187188
.SCHEDULER_QUEUE_WAITING_ACTIVATION(invocationNamespace, action.asString, action.toStringWithoutVersion),
@@ -926,7 +927,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
926927
// since there is no initial delay, it will try to create a container at initialization time
927928
// these schedulers will run forever and stop when the memory queue stops
928929
private def startMonitoring(): (ActorRef, ActorRef) = {
929-
val droppingScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.dropInterval) { () =>
930+
val droppingScheduler = Scheduler.scheduleWaitAtLeastWith(
931+
schedulingConfig.dropInterval,
932+
dispatcher = "dispatchers.monitoring-dispatcher") { () =>
930933
checkToDropStaleActivation(
931934
clock,
932935
queue,
@@ -939,7 +942,9 @@ class MemoryQueue(private val etcdClient: EtcdClient,
939942
Future.successful(())
940943
}
941944

942-
val monitoringScheduler = Scheduler.scheduleWaitAtLeast(schedulingConfig.checkInterval) { () =>
945+
val monitoringScheduler = Scheduler.scheduleWaitAtLeastWith(
946+
schedulingConfig.checkInterval,
947+
dispatcher = "dispatchers.monitoring-dispatcher") { () =>
943948
// the average duration is updated every checkInterval
944949
if (averageDurationBuffer.nonEmpty) {
945950
averageDuration = Some(averageDurationBuffer.average)

core/scheduler/src/main/scala/org/apache/openwhisk/core/scheduler/queue/QueueManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class QueueManager(
9191
private val leaderElectionCallbacks = TrieMap[String, (Either[EtcdFollower, EtcdLeader], Boolean) => Unit]()
9292

9393
private implicit val askTimeout = Timeout(5.seconds)
94-
private implicit val ec = context.dispatcher
94+
private implicit val ec = context.system.dispatchers.lookup("dispatchers.queue-manager-dispatcher")
9595
private implicit val system = context.system
9696

9797
private val watcherName = "queue-manager"

0 commit comments

Comments
 (0)