From cba8ce2aac8d21faec7de7019a02477b8142dbc7 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 17 Oct 2024 14:42:40 -0700 Subject: [PATCH] Respond to PR comments --- .../ClusterManagerActivities.java | 3 ++ .../ClusterManagerActivitiesImpl.java | 10 +++++ .../ClusterManagerWorkflow.java | 24 +++++----- .../ClusterManagerWorkflowImpl.java | 45 ++++++++++++------- 4 files changed, 54 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java index 10a0ed02..267a9626 100644 --- a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java @@ -96,4 +96,7 @@ public Set getNodesToCheck() { @ActivityMethod Set findBadNodes(FindBadNodesInput input); + + @ActivityMethod + void shutdown(); } diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java index f8b64239..827ea2e1 100644 --- a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java @@ -69,4 +69,14 @@ public Set findBadNodes(FindBadNodesInput input) { } return badNodes; } + + @Override + public void shutdown() { + log.info("Shutting down cluster"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java index 99a385ef..e4d8a1de 100644 --- a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java @@ -34,12 +34,16 @@ */ @WorkflowInterface public interface ClusterManagerWorkflow { + + enum ClusterState { + NOT_STARTED, + STARTED, + SHUTTING_DOWN + } // In workflows that continue-as-new, it's convenient to store all your state in one serializable - // structure - // to make it easier to pass between runs + // structure to make it easier to pass between runs class ClusterManagerState { - public boolean clusterStarted; - public boolean clusterShutdown; + public ClusterState workflowState; public Map> nodes = new HashMap<>(); public Set jobAssigned = new HashSet<>(); } @@ -151,19 +155,17 @@ public Set getNodesAssigned() { @SignalMethod void startCluster(); - @SignalMethod - void stopCluster(); + @UpdateMethod + boolean stopCluster(); // This is an update as opposed to a signal because the client may want to wait for nodes to be - // allocated - // before sending work to those nodes. - // Returns the list of node names that were allocated to the job. + // allocated before sending work to those nodes. Returns the list of node names that were + // allocated to the job. @UpdateMethod ClusterManagerAssignNodesToJobResult assignNodesToJobs(ClusterManagerAssignNodesToJobInput input); // Even though it returns nothing, this is an update because the client may want to track it, for - // example - // to wait for nodes to be unassigned before reassigning them. + // example to wait for nodes to be unassigned before reassigning them. @UpdateMethod void deleteJob(ClusterManagerDeleteJobInput input); } diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java index 93772964..6507baf1 100644 --- a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java @@ -24,13 +24,14 @@ import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.Duration; import java.util.Collections; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ClusterManagerWorkflowImpl implements ClusterManagerWorkflow { @@ -59,14 +60,16 @@ public ClusterManagerResult run(ClusterManagerInput input) { maxHistoryLength = 120; sleepInterval = Duration.ofSeconds(1); } - Workflow.await(() -> state.clusterStarted); + Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED); // The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint // its state and // continue-as-new. while (true) { performHealthChecks(); - if (!Workflow.await(sleepInterval, () -> state.clusterShutdown || shouldContinueAsNew())) { - } else if (state.clusterShutdown) { + if (!Workflow.await( + sleepInterval, + () -> state.workflowState == ClusterState.SHUTTING_DOWN || shouldContinueAsNew())) { + } else if (state.workflowState == ClusterState.SHUTTING_DOWN) { break; } else if (shouldContinueAsNew()) { // We don't want to leave any job assignment or deletion handlers half-finished when we @@ -84,7 +87,11 @@ public ClusterManagerResult run(ClusterManagerInput input) { @Override public void startCluster() { - state.clusterStarted = true; + if (state.workflowState != ClusterState.NOT_STARTED) { + logger.warn("Cannot start cluster in state {}", state.workflowState); + return; + } + state.workflowState = ClusterState.STARTED; for (int i = 0; i < 25; i++) { state.nodes.put(String.valueOf(i), Optional.empty()); } @@ -92,17 +99,23 @@ public void startCluster() { } @Override - public void stopCluster() { - Workflow.await(() -> state.clusterStarted); - state.clusterShutdown = true; + public boolean stopCluster() { + if (state.workflowState != ClusterState.STARTED) { + // This is used as an Update handler we return an error to the caller. + throw ApplicationFailure.newFailure( + "Cannot shutdown cluster in state " + state.workflowState, "IllegalState"); + } + activities.shutdown(); + state.workflowState = ClusterState.SHUTTING_DOWN; logger.info("Cluster shut down"); + return true; } @Override public ClusterManagerAssignNodesToJobResult assignNodesToJobs( ClusterManagerAssignNodesToJobInput input) { - Workflow.await(() -> state.clusterStarted); - if (state.clusterShutdown) { + Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED); + if (state.workflowState == ClusterState.SHUTTING_DOWN) { throw ApplicationFailure.newFailure( "Cannot assign nodes to a job: Cluster is already shut down", "IllegalState"); } @@ -116,16 +129,14 @@ public ClusterManagerAssignNodesToJobResult assignNodesToJobs( if (unassignedNodes.size() < input.getTotalNumNodes()) { // If you want the client to receive a failure, either add an update validator and throw the // exception from there, or raise an ApplicationFailure. Other exceptions in the main - // handler - // will cause the workflow to keep retrying and get it stuck. + // handler will cause the workflow to keep retrying and get it stuck. throw ApplicationFailure.newFailure( "Cannot assign nodes to a job: Not enough nodes available", "IllegalState"); } Set nodesToAssign = unassignedNodes.stream().limit(input.getTotalNumNodes()).collect(Collectors.toSet()); // This call would be dangerous without nodesLock because it yields control and allows - // interleaving - // with deleteJob and performHealthChecks, which both touch this.state.nodes. + // interleaving with deleteJob and performHealthChecks, which both touch this.state.nodes. activities.assignNodesToJob( new ClusterManagerActivities.AssignNodesToJobInput(nodesToAssign, input.getJobName())); for (String node : nodesToAssign) { @@ -140,8 +151,8 @@ public ClusterManagerAssignNodesToJobResult assignNodesToJobs( @Override public void deleteJob(ClusterManagerDeleteJobInput input) { - Workflow.await(() -> state.clusterStarted); - if (state.clusterShutdown) { + Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED); + if (state.workflowState == ClusterState.SHUTTING_DOWN) { // If you want the client to receive a failure, either add an update validator and throw the // exception from there, or raise an ApplicationFailure. Other exceptions in the main handler // will cause the workflow to keep retrying and get it stuck.