Skip to content

Commit

Permalink
Simplify shutdown logic on client disconnect in project-manager (#11712)
Browse files Browse the repository at this point in the history
* Drop soft-shutdown on last client disconnect

Suspend on Windows confuses the reconnection logic and triggers a full
shutdown. This change simply drop shutdown on last client disconnect and
expects and explicit command.

* Various cherry-picks

Minor cherry-picks from the debugging branch. Should reduce  the amount
of non-critical warnings.
  • Loading branch information
hubertp authored Dec 2, 2024
1 parent 1676545 commit 65010df
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ final class ContextRegistry(
sender() ! AccessDenied
}

case DestroyContextResponse(_) =>
// Initiated by *this* registry. Ignore

case PushContextRequest(client, contextId, stackItem) =>
if (store.hasContext(client.clientId, contextId)) {
val item = getRuntimeStackItem(stackItem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ class JsonRpcServer(
}
.to(
Sink.actorRef[MessageHandler.WebMessage](
messageHandler,
MessageHandler.Disconnected(port),
{ _: Throwable =>
messageHandler, {
logger.trace("JSON sink stream finished with no failure")
MessageHandler.Disconnected(port)
},
{ e: Throwable =>
logger.trace("JSON sink stream finished with a failure", e)
MessageHandler.Disconnected(port)
}
)
Expand All @@ -100,7 +103,7 @@ class JsonRpcServer(
logger.trace(s"Sent text message ${textMessage.text}.")
}

Flow.fromSinkAndSource(incomingMessages, outgoingMessages)
Flow.fromSinkAndSourceCoupled(incomingMessages, outgoingMessages)
}

override protected def serverRoute(port: Int): Route = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,17 @@ class LanguageServerController(
* @param connectionInfo language server connection info
* @param serverProcessManager an actor that manages the lifecycle of the server process
* @param clients list of connected clients
* @param scheduledShutdown cancellable timeout of the hard shutdown event and a port number of the client that initiated it
* @param lastClientPort if no clients are connected denotes the last port number of a client or a project
* @return current supervising actor state
*/
private def supervising(
connectionInfo: LanguageServerConnectionInfo,
serverProcessManager: ActorRef,
clients: Set[UUID] = Set.empty,
scheduledShutdown: Option[(Cancellable, Int)] = None
clients: Set[UUID] = Set.empty,
lastClientPort: Option[Int] = None
): Receive =
LoggingReceive.withLabel("supervising") {
case StartServer(clientId, _, requestedEngineVersion, _, _) =>
scheduledShutdown.foreach(_._1.cancel())
if (requestedEngineVersion != engineVersion) {
sender() ! ServerBootFailed(
new IllegalStateException(
Expand Down Expand Up @@ -213,7 +212,6 @@ class LanguageServerController(
)
}
case Terminated(_) =>
scheduledShutdown.foreach(_._1.cancel())
logger.debug("Bootloader for {} terminated", project)

case StopServer(clientId, _) =>
Expand All @@ -224,18 +222,16 @@ class LanguageServerController(
clientId,
Some(sender()),
explicitShutdownRequested = true,
None,
scheduledShutdown
lastClientPort
)

case ScheduledShutdown(requester) =>
shutDownServer(requester)

case LanguageServerStatusRequest =>
sender() ! LanguageServerStatus(project.id, scheduledShutdown.isDefined)
sender() ! LanguageServerStatus(project.id, lastClientPort.isDefined)

case ShutDownServer =>
scheduledShutdown.foreach(_._1.cancel())
shutDownServer(None)

case ClientDisconnected(clientId, port) =>
Expand All @@ -246,13 +242,11 @@ class LanguageServerController(
clientId,
None,
explicitShutdownRequested = false,
atPort = Some(port),
scheduledShutdown
lastClientPort.orElse(Some(port))
)
case ClientConnected(clientId, clientPort) =>
scheduledShutdown match {
case Some((cancellable, port)) if clientPort == port =>
cancellable.cancel()
lastClientPort match {
case Some(port) if clientPort == port =>
context.become(
supervising(
connectionInfo,
Expand All @@ -265,7 +259,6 @@ class LanguageServerController(
}

case RenameProject(_, namespace, oldName, newName) =>
scheduledShutdown.foreach(_._1.cancel())
val socket = Socket(connectionInfo.interface, connectionInfo.rpcPort)
context.actorOf(
ProjectRenameAction
Expand All @@ -283,7 +276,6 @@ class LanguageServerController(
)

case ServerDied =>
scheduledShutdown.foreach(_._1.cancel())
logger.error("Language server died [{}]", connectionInfo)
context.stop(self)

Expand All @@ -296,36 +288,24 @@ class LanguageServerController(
clientId: UUID,
maybeRequester: Option[ActorRef],
explicitShutdownRequested: Boolean,
atPort: Option[Int],
shutdownTimeout: Option[(Cancellable, Int)]
atPort: Option[Int]
): Unit = {
val updatedClients = clients - clientId
if (updatedClients.isEmpty) {
if (!explicitShutdownRequested) {
logger.debug("Delaying shutdown for project {}", project.id)
val scheduledShutdown: Option[(Cancellable, Int)] =
shutdownTimeout.orElse(
Some(
(
context.system.scheduler.scheduleOnce(
timeoutConfig.delayedShutdownTimeout,
self,
ScheduledShutdown(maybeRequester)
),
atPort.getOrElse(0)
)
)
)
logger.debug(
"Last client disconnected for project [{}]. Awaiting re-connection or shutdown",
project.id
)
context.become(
supervising(
connectionInfo,
serverProcessManager,
Set.empty,
scheduledShutdown
atPort
)
)
} else {
shutdownTimeout.foreach(_._1.cancel())
shutDownServer(maybeRequester)
}
} else {
Expand All @@ -335,7 +315,7 @@ class LanguageServerController(
connectionInfo,
serverProcessManager,
updatedClients,
shutdownTimeout
None
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class ProjectShutdownSpec
deleteProject(projectId)(client2, implicitly[Position])
}

"ensure language server does eventually shutdown after last client disconnects" in {
"ensure language server does not shutdown after last client disconnects and can re-connect" in {
val client = new WsTestClient(address)
val projectId = createProject("Foo")(client, implicitly[Position])
val socket1 = openProject(projectId)(client, implicitly[Position])
Expand Down Expand Up @@ -154,7 +154,7 @@ class ProjectShutdownSpec
)
val client2 = new WsTestClient(address)
val socket2 = openProject(projectId)(client2, implicitly[Position])
socket2 shouldNot be(socket1)
socket2 shouldBe socket1

closeProject(projectId)(client2, implicitly[Position])
deleteProject(projectId)(client2, implicitly[Position])
Expand Down

0 comments on commit 65010df

Please sign in to comment.