Skip to content

Commit

Permalink
Respond to PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 17, 2024
1 parent 3010542 commit cba8ce2
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,7 @@ public Set<String> getNodesToCheck() {

@ActivityMethod
Set<String> findBadNodes(FindBadNodesInput input);

@ActivityMethod
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,14 @@ public Set<String> findBadNodes(FindBadNodesInput input) {
}
return badNodes;
}

@Override
public void shutdown() {
log.info("Shutting down cluster");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@
*/
@WorkflowInterface
public interface ClusterManagerWorkflow {

enum ClusterState {
NOT_STARTED,
STARTED,
SHUTTING_DOWN
}
// In workflows that continue-as-new, it's convenient to store all your state in one serializable
// structure
// to make it easier to pass between runs
// structure to make it easier to pass between runs
class ClusterManagerState {
public boolean clusterStarted;
public boolean clusterShutdown;
public ClusterState workflowState;
public Map<String, Optional<String>> nodes = new HashMap<>();
public Set<String> jobAssigned = new HashSet<>();
}
Expand Down Expand Up @@ -151,19 +155,17 @@ public Set<String> getNodesAssigned() {
@SignalMethod
void startCluster();

@SignalMethod
void stopCluster();
@UpdateMethod
boolean stopCluster();

// This is an update as opposed to a signal because the client may want to wait for nodes to be
// allocated
// before sending work to those nodes.
// Returns the list of node names that were allocated to the job.
// allocated before sending work to those nodes. Returns the list of node names that were
// allocated to the job.
@UpdateMethod
ClusterManagerAssignNodesToJobResult assignNodesToJobs(ClusterManagerAssignNodesToJobInput input);

// Even though it returns nothing, this is an update because the client may want to track it, for
// example
// to wait for nodes to be unassigned before reassigning them.
// example to wait for nodes to be unassigned before reassigning them.
@UpdateMethod
void deleteJob(ClusterManagerDeleteJobInput input);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterManagerWorkflowImpl implements ClusterManagerWorkflow {

Expand Down Expand Up @@ -59,14 +60,16 @@ public ClusterManagerResult run(ClusterManagerInput input) {
maxHistoryLength = 120;
sleepInterval = Duration.ofSeconds(1);
}
Workflow.await(() -> state.clusterStarted);
Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED);
// The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint
// its state and
// continue-as-new.
while (true) {
performHealthChecks();
if (!Workflow.await(sleepInterval, () -> state.clusterShutdown || shouldContinueAsNew())) {
} else if (state.clusterShutdown) {
if (!Workflow.await(
sleepInterval,
() -> state.workflowState == ClusterState.SHUTTING_DOWN || shouldContinueAsNew())) {
} else if (state.workflowState == ClusterState.SHUTTING_DOWN) {
break;
} else if (shouldContinueAsNew()) {
// We don't want to leave any job assignment or deletion handlers half-finished when we
Expand All @@ -84,25 +87,35 @@ public ClusterManagerResult run(ClusterManagerInput input) {

@Override
public void startCluster() {
state.clusterStarted = true;
if (state.workflowState != ClusterState.NOT_STARTED) {
logger.warn("Cannot start cluster in state {}", state.workflowState);
return;
}
state.workflowState = ClusterState.STARTED;
for (int i = 0; i < 25; i++) {
state.nodes.put(String.valueOf(i), Optional.empty());
}
logger.info("Cluster started");
}

@Override
public void stopCluster() {
Workflow.await(() -> state.clusterStarted);
state.clusterShutdown = true;
public boolean stopCluster() {
if (state.workflowState != ClusterState.STARTED) {
// This is used as an Update handler we return an error to the caller.
throw ApplicationFailure.newFailure(
"Cannot shutdown cluster in state " + state.workflowState, "IllegalState");
}
activities.shutdown();
state.workflowState = ClusterState.SHUTTING_DOWN;
logger.info("Cluster shut down");
return true;
}

@Override
public ClusterManagerAssignNodesToJobResult assignNodesToJobs(
ClusterManagerAssignNodesToJobInput input) {
Workflow.await(() -> state.clusterStarted);
if (state.clusterShutdown) {
Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED);
if (state.workflowState == ClusterState.SHUTTING_DOWN) {
throw ApplicationFailure.newFailure(
"Cannot assign nodes to a job: Cluster is already shut down", "IllegalState");
}
Expand All @@ -116,16 +129,14 @@ public ClusterManagerAssignNodesToJobResult assignNodesToJobs(
if (unassignedNodes.size() < input.getTotalNumNodes()) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationFailure. Other exceptions in the main
// handler
// will cause the workflow to keep retrying and get it stuck.
// handler will cause the workflow to keep retrying and get it stuck.
throw ApplicationFailure.newFailure(
"Cannot assign nodes to a job: Not enough nodes available", "IllegalState");
}
Set<String> nodesToAssign =
unassignedNodes.stream().limit(input.getTotalNumNodes()).collect(Collectors.toSet());
// This call would be dangerous without nodesLock because it yields control and allows
// interleaving
// with deleteJob and performHealthChecks, which both touch this.state.nodes.
// interleaving with deleteJob and performHealthChecks, which both touch this.state.nodes.
activities.assignNodesToJob(
new ClusterManagerActivities.AssignNodesToJobInput(nodesToAssign, input.getJobName()));
for (String node : nodesToAssign) {
Expand All @@ -140,8 +151,8 @@ public ClusterManagerAssignNodesToJobResult assignNodesToJobs(

@Override
public void deleteJob(ClusterManagerDeleteJobInput input) {
Workflow.await(() -> state.clusterStarted);
if (state.clusterShutdown) {
Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED);
if (state.workflowState == ClusterState.SHUTTING_DOWN) {
// If you want the client to receive a failure, either add an update validator and throw the
// exception from there, or raise an ApplicationFailure. Other exceptions in the main handler
// will cause the workflow to keep retrying and get it stuck.
Expand Down

0 comments on commit cba8ce2

Please sign in to comment.