From 5c96864e2c6e45963bee2f9b13605d24670f8b1a Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Tue, 29 Oct 2024 18:56:45 -0700 Subject: [PATCH] safe message passing sample (#681) Add safe message passing sample --- README.md | 2 + .../ClusterManagerActivities.java | 102 ++++++++ .../ClusterManagerActivitiesImpl.java | 82 ++++++ .../ClusterManagerWorkflow.java | 171 ++++++++++++ .../ClusterManagerWorkflowImpl.java | 244 ++++++++++++++++++ .../ClusterManagerWorkflowStarter.java | 114 ++++++++ .../ClusterManagerWorkflowWorker.java | 44 ++++ .../samples/safemessagepassing/README.md | 22 ++ .../ClusterManagerWorkflowWorkerTest.java | 157 +++++++++++ 9 files changed, 938 insertions(+) create mode 100644 core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java create mode 100644 core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java create mode 100644 core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java create mode 100644 core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java create mode 100644 core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java create mode 100644 core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorker.java create mode 100644 core/src/main/java/io/temporal/samples/safemessagepassing/README.md create mode 100644 core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java diff --git a/README.md b/README.md index 1df205ae..ebc55820 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,8 @@ See the README.md file in each main sample directory for cut/paste Gradle comman - [**Sliding Window Batch**](/core/src/main/java/io/temporal/samples/batch/slidingwindow): A batch implementation that maintains a configured number of child workflows during processing. +- [**Safe Message Passing**](/core/src/main/java/io/temporal/samples/safemessagepassing): Safely handling concurrent updates and signals messages. + #### API demonstrations - [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed. diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java new file mode 100644 index 00000000..267a9626 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivities.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.safemessagepassing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import java.util.List; +import java.util.Set; + +@ActivityInterface +public interface ClusterManagerActivities { + + class AssignNodesToJobInput { + private final List nodes; + private final String jobName; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public AssignNodesToJobInput( + @JsonProperty("nodes_to_assign") Set nodesToAssign, + @JsonProperty("job_name") String jobName) { + this.nodes = List.copyOf(nodesToAssign); + this.jobName = jobName; + } + + @JsonProperty("nodes_to_assign") + public List getNodes() { + return nodes; + } + + @JsonProperty("job_name") + public String getJobName() { + return jobName; + } + } + + @ActivityMethod + void assignNodesToJob(AssignNodesToJobInput input); + + class UnassignNodesForJobInput { + private final List nodes; + private final String jobName; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public UnassignNodesForJobInput( + @JsonProperty("nodes") Set nodes, @JsonProperty("job_name") String jobName) { + this.nodes = List.copyOf(nodes); + this.jobName = jobName; + } + + @JsonProperty("nodes") + public List getNodes() { + return nodes; + } + + @JsonProperty("job_name") + public String getJobName() { + return jobName; + } + } + + @ActivityMethod + void unassignNodesForJob(UnassignNodesForJobInput input); + + class FindBadNodesInput { + private final Set nodesToCheck; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public FindBadNodesInput(@JsonProperty("assigned_nodes") Set assignedNodes) { + this.nodesToCheck = assignedNodes; + } + + @JsonProperty("assigned_nodes") + public Set getNodesToCheck() { + return nodesToCheck; + } + } + + @ActivityMethod + Set findBadNodes(FindBadNodesInput input); + + @ActivityMethod + void shutdown(); +} diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java new file mode 100644 index 00000000..827ea2e1 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerActivitiesImpl.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.safemessagepassing; + +import java.util.Set; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterManagerActivitiesImpl implements ClusterManagerActivities { + private static final Logger log = LoggerFactory.getLogger(ClusterManagerActivitiesImpl.class); + + @Override + public void assignNodesToJob(AssignNodesToJobInput input) { + for (String node : input.getNodes()) { + log.info("Assigned node " + node + " to job " + input.getJobName()); + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void unassignNodesForJob(UnassignNodesForJobInput input) { + for (String node : input.getNodes()) { + log.info("Unassigned node " + node + " from job " + input.getJobName()); + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public Set findBadNodes(FindBadNodesInput input) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Set badNodes = + input.getNodesToCheck().stream() + .filter(n -> Integer.parseInt(n) % 5 == 0) + .collect(Collectors.toSet()); + if (!badNodes.isEmpty()) { + log.info("Found bad nodes: " + badNodes); + } else { + log.info("No bad nodes found"); + } + return badNodes; + } + + @Override + public void shutdown() { + log.info("Shutting down cluster"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java new file mode 100644 index 00000000..52ab2447 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.safemessagepassing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.util.*; + +/** + * ClusterManagerWorkflow keeps track of the assignments of a cluster of nodes. Via signals, the + * cluster can be started and shutdown. Via updates, clients can also assign jobs to nodes and + * delete jobs. These updates must run atomically. + */ +@WorkflowInterface +public interface ClusterManagerWorkflow { + + enum ClusterState { + NOT_STARTED, + STARTED, + SHUTTING_DOWN + } + // In workflows that continue-as-new, it's convenient to store all your state in one serializable + // structure to make it easier to pass between runs + class ClusterManagerState { + public ClusterState workflowState = ClusterState.NOT_STARTED; + public Map> nodes = new HashMap<>(); + public Set jobAssigned = new HashSet<>(); + } + + class ClusterManagerInput { + private final Optional state; + private final boolean testContinueAsNew; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ClusterManagerInput( + @JsonProperty("state") Optional state, + @JsonProperty("test_continue_as_new") boolean testContinueAsNew) { + this.state = state; + this.testContinueAsNew = testContinueAsNew; + } + + @JsonProperty("state") + public Optional getState() { + return state; + } + + @JsonProperty("test_continue_as_new") + public boolean isTestContinueAsNew() { + return testContinueAsNew; + } + } + + class ClusterManagerResult { + private final int numCurrentlyAssignedNodes; + private final int numBadNodes; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ClusterManagerResult( + @JsonProperty("num_currently_assigned_nodes") int numCurrentlyAssignedNodes, + @JsonProperty("num_bad_nodes") int numBadNodes) { + this.numCurrentlyAssignedNodes = numCurrentlyAssignedNodes; + this.numBadNodes = numBadNodes; + } + + @JsonProperty("num_currently_assigned_nodes") + public int getNumCurrentlyAssignedNodes() { + return numCurrentlyAssignedNodes; + } + + @JsonProperty("num_bad_nodes") + public int getNumBadNodes() { + return numBadNodes; + } + } + + // Be in the habit of storing message inputs and outputs in serializable structures. + // This makes it easier to add more overtime in a backward-compatible way. + class ClusterManagerAssignNodesToJobInput { + // If larger or smaller than previous amounts, will resize the job. + private final int totalNumNodes; + private final String jobName; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ClusterManagerAssignNodesToJobInput( + @JsonProperty("total_num_nodes") int totalNumNodes, + @JsonProperty("job_name") String jobName) { + this.totalNumNodes = totalNumNodes; + this.jobName = jobName; + } + + @JsonProperty("total_num_nodes") + public int getTotalNumNodes() { + return totalNumNodes; + } + + @JsonProperty("job_name") + public String getJobName() { + return jobName; + } + } + + class ClusterManagerDeleteJobInput { + private final String jobName; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ClusterManagerDeleteJobInput(@JsonProperty("job_name") String jobName) { + this.jobName = jobName; + } + + @JsonProperty("job_name") + public String getJobName() { + return jobName; + } + } + + class ClusterManagerAssignNodesToJobResult { + private final Set nodesAssigned; + + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public ClusterManagerAssignNodesToJobResult( + @JsonProperty("assigned_nodes") Set assignedNodes) { + this.nodesAssigned = assignedNodes; + } + + @JsonProperty("assigned_nodes") + public Set getNodesAssigned() { + return nodesAssigned; + } + } + + @WorkflowMethod + ClusterManagerResult run(ClusterManagerInput input); + + @SignalMethod + void startCluster(); + + @UpdateMethod + boolean stopCluster(); + + // This is an update as opposed to a signal because the client may want to wait for nodes to be + // allocated before sending work to those nodes. Returns the list of node names that were + // allocated to the job. + @UpdateMethod + ClusterManagerAssignNodesToJobResult assignNodesToJobs(ClusterManagerAssignNodesToJobInput input); + + // Even though it returns nothing, this is an update because the client may want to track it, for + // example to wait for nodes to be unassigned before reassigning them. + @UpdateMethod + void deleteJob(ClusterManagerDeleteJobInput input); +} diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java new file mode 100644 index 00000000..d25453ec --- /dev/null +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java @@ -0,0 +1,244 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.safemessagepassing; + +import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; +import io.temporal.failure.ApplicationFailure; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import io.temporal.workflow.WorkflowLock; +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterManagerWorkflowImpl implements ClusterManagerWorkflow { + + private static final Logger logger = LoggerFactory.getLogger(ClusterManagerWorkflowImpl.class); + private final ClusterManagerState state; + private final WorkflowLock nodeLock; + private final Duration sleepInterval; + private final int maxHistoryLength; + + private ClusterManagerActivities activities = + Workflow.newActivityStub( + ClusterManagerActivities.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build(), + Collections.singletonMap( + "findBadNodes", + ActivityOptions.newBuilder() + .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build())); + + @WorkflowInit + public ClusterManagerWorkflowImpl(ClusterManagerInput input) { + nodeLock = Workflow.newWorkflowLock(); + if (input.getState().isPresent()) { + state = input.getState().get(); + } else { + state = new ClusterManagerState(); + } + if (input.isTestContinueAsNew()) { + maxHistoryLength = 120; + sleepInterval = Duration.ofSeconds(1); + } else { + sleepInterval = Duration.ofSeconds(10); + maxHistoryLength = 0; + } + } + + @Override + public ClusterManagerResult run(ClusterManagerInput input) { + Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED); + // The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint + // its state and + // continue-as-new. + while (true) { + performHealthChecks(); + if (!Workflow.await( + sleepInterval, + () -> state.workflowState == ClusterState.SHUTTING_DOWN || shouldContinueAsNew())) { + } else if (state.workflowState == ClusterState.SHUTTING_DOWN) { + break; + } else if (shouldContinueAsNew()) { + // We don't want to leave any job assignment or deletion handlers half-finished when we + // continue as new. + Workflow.await(() -> Workflow.isEveryHandlerFinished()); + logger.info("Continuing as new"); + Workflow.continueAsNew( + new ClusterManagerInput(Optional.of(state), input.isTestContinueAsNew())); + } + } + // Make sure we finish off handlers such as deleting jobs before we complete the workflow. + Workflow.await(() -> Workflow.isEveryHandlerFinished()); + return new ClusterManagerResult(getAssignedNodes(null).size(), getBadNodes().size()); + } + + @Override + public void startCluster() { + if (state.workflowState != ClusterState.NOT_STARTED) { + logger.warn("Cannot start cluster in state {}", state.workflowState); + return; + } + state.workflowState = ClusterState.STARTED; + for (int i = 0; i < 25; i++) { + state.nodes.put(String.valueOf(i), Optional.empty()); + } + logger.info("Cluster started"); + } + + @Override + public boolean stopCluster() { + if (state.workflowState != ClusterState.STARTED) { + // This is used as an Update handler so that we can return an error to the caller. + throw ApplicationFailure.newFailure( + "Cannot shutdown cluster in state " + state.workflowState, "IllegalState"); + } + activities.shutdown(); + state.workflowState = ClusterState.SHUTTING_DOWN; + logger.info("Cluster shut down"); + return true; + } + + @Override + public ClusterManagerAssignNodesToJobResult assignNodesToJobs( + ClusterManagerAssignNodesToJobInput input) { + Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED); + if (state.workflowState == ClusterState.SHUTTING_DOWN) { + throw ApplicationFailure.newFailure( + "Cannot assign nodes to a job: Cluster is already shut down", "IllegalState"); + } + nodeLock.lock(); + try { + // Idempotency guard. + if (state.jobAssigned.contains(input.getJobName())) { + return new ClusterManagerAssignNodesToJobResult(getAssignedNodes(input.getJobName())); + } + Set unassignedNodes = getUnassignedNodes(); + if (unassignedNodes.size() < input.getTotalNumNodes()) { + // If you want the client to receive a failure, either add an update validator and throw the + // exception from there, or raise an ApplicationFailure. Other exceptions in the main + // handler will cause the workflow to keep retrying and get it stuck. + throw ApplicationFailure.newFailure( + "Cannot assign nodes to a job: Not enough nodes available", "IllegalState"); + } + Set nodesToAssign = + unassignedNodes.stream().limit(input.getTotalNumNodes()).collect(Collectors.toSet()); + // This call would be dangerous without nodesLock because it yields control and allows + // interleaving with deleteJob and performHealthChecks, which both touch this.state.nodes. + activities.assignNodesToJob( + new ClusterManagerActivities.AssignNodesToJobInput(nodesToAssign, input.getJobName())); + for (String node : nodesToAssign) { + state.nodes.put(node, Optional.of(input.getJobName())); + } + state.jobAssigned.add(input.getJobName()); + return new ClusterManagerAssignNodesToJobResult(nodesToAssign); + } finally { + nodeLock.unlock(); + } + } + + @Override + public void deleteJob(ClusterManagerDeleteJobInput input) { + Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED); + if (state.workflowState == ClusterState.SHUTTING_DOWN) { + // If you want the client to receive a failure, either add an update validator and throw the + // exception from there, or raise an ApplicationFailure. Other exceptions in the main handler + // will cause the workflow to keep retrying and get it stuck. + throw ApplicationFailure.newFailure( + "Cannot delete a job: Cluster is already shut down", "IllegalState"); + } + nodeLock.lock(); + try { + Set nodesToUnassign = getAssignedNodes(input.getJobName()); + // This call would be dangerous without nodesLock because it yields control and allows + // interleaving + // with assignNodesToJob and performHealthChecks, which all touch this.state.nodes. + activities.unassignNodesForJob( + new ClusterManagerActivities.UnassignNodesForJobInput( + nodesToUnassign, input.getJobName())); + for (String node : nodesToUnassign) { + state.nodes.put(node, Optional.empty()); + } + } finally { + nodeLock.unlock(); + } + } + + private Set getAssignedNodes(String jobName) { + if (jobName != null) { + return state.nodes.entrySet().stream() + .filter(e -> e.getValue().isPresent() && e.getValue().get().equals(jobName)) + .map(e -> e.getKey()) + .collect(Collectors.toSet()); + } else { + return state.nodes.entrySet().stream() + .filter(e -> e.getValue().isPresent() && !e.getValue().get().equals("BAD!")) + .map(e -> e.getKey()) + .collect(Collectors.toSet()); + } + } + + private Set getUnassignedNodes() { + return state.nodes.entrySet().stream() + .filter(e -> !e.getValue().isPresent()) + .map(e -> e.getKey()) + .collect(Collectors.toSet()); + } + + private Set getBadNodes() { + return state.nodes.entrySet().stream() + .filter(e -> e.getValue().isPresent() && e.getValue().get().equals("BAD!")) + .map(e -> e.getKey()) + .collect(Collectors.toSet()); + } + + private void performHealthChecks() { + nodeLock.lock(); + try { + Set assignedNodes = getAssignedNodes(null); + Set badNodes = + activities.findBadNodes(new ClusterManagerActivities.FindBadNodesInput(assignedNodes)); + for (String badNode : badNodes) { + state.nodes.put(badNode, Optional.of("BAD!")); + } + } catch (Exception e) { + logger.error("Health check failed", e); + } finally { + nodeLock.unlock(); + } + } + + private boolean shouldContinueAsNew() { + if (Workflow.getInfo().isContinueAsNewSuggested()) { + return true; + } + // This is just for ease-of-testing. In production, we trust temporal to tell us when to + // continue as new. + if (maxHistoryLength > 0 && Workflow.getInfo().getHistoryLength() > maxHistoryLength) { + return true; + } + return false; + } +} diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java new file mode 100644 index 00000000..ea3aa443 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.safemessagepassing; + +import static io.temporal.samples.safemessagepassing.ClusterManagerWorkflowWorker.CLUSTER_MANAGER_WORKFLOW_ID; +import static io.temporal.samples.safemessagepassing.ClusterManagerWorkflowWorker.TASK_QUEUE; + +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowOptions; +import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowUpdateStage; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterManagerWorkflowStarter { + + private static final Logger logger = LoggerFactory.getLogger(ClusterManagerWorkflowStarter.class); + + public static void main(String[] args) { + if (args.length > 1) { + System.err.println( + "Usage: java " + + ClusterManagerWorkflowStarter.class.getName() + + " "); + System.exit(1); + } + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + boolean shouldTestContinueAsNew = args.length > 0 ? Boolean.parseBoolean(args[0]) : false; + ClusterManagerWorkflow cluster = + client.newWorkflowStub( + ClusterManagerWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(TASK_QUEUE) + .setWorkflowId(CLUSTER_MANAGER_WORKFLOW_ID + "-" + UUID.randomUUID()) + .build()); + + logger.info("Starting cluster"); + WorkflowClient.start( + cluster::run, + new ClusterManagerWorkflow.ClusterManagerInput(Optional.empty(), shouldTestContinueAsNew)); + Duration delay = shouldTestContinueAsNew ? Duration.ofSeconds(10) : Duration.ofSeconds(1); + cluster.startCluster(); + logger.info("Assigning jobs to nodes..."); + List> + assignJobs = new ArrayList<>(); + for (int i = 0; i < 6; i++) { + assignJobs.add( + WorkflowStub.fromTyped(cluster) + .startUpdate( + "assignNodesToJobs", + WorkflowUpdateStage.ACCEPTED, + ClusterManagerWorkflow.ClusterManagerAssignNodesToJobResult.class, + new ClusterManagerWorkflow.ClusterManagerAssignNodesToJobInput(2, "job" + i)) + .getResultAsync()); + } + assignJobs.forEach(CompletableFuture::join); + + logger.info("Sleeping for " + delay.getSeconds() + " seconds"); + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + logger.info("Deleting jobs..."); + List> deleteJobs = new ArrayList<>(); + for (int i = 0; i < 6; i++) { + deleteJobs.add( + WorkflowStub.fromTyped(cluster) + .startUpdate( + "deleteJob", + WorkflowUpdateStage.ACCEPTED, + Void.class, + new ClusterManagerWorkflow.ClusterManagerDeleteJobInput("job" + i)) + .getResultAsync()); + } + deleteJobs.forEach(CompletableFuture::join); + + logger.info("Stopping cluster..."); + cluster.stopCluster(); + + ClusterManagerWorkflow.ClusterManagerResult result = + cluster.run(new ClusterManagerWorkflow.ClusterManagerInput(Optional.empty(), false)); + logger.info( + "Cluster shut down successfully. It had " + + result.getNumCurrentlyAssignedNodes() + + " nodes assigned at the end."); + System.exit(0); + } +} diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorker.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorker.java new file mode 100644 index 00000000..ae98968e --- /dev/null +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorker.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.safemessagepassing; + +import io.temporal.client.WorkflowClient; +import io.temporal.serviceclient.WorkflowServiceStubs; +import io.temporal.worker.Worker; +import io.temporal.worker.WorkerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterManagerWorkflowWorker { + private static final Logger logger = LoggerFactory.getLogger(ClusterManagerWorkflowWorker.class); + static final String TASK_QUEUE = "ClusterManagerWorkflowTaskQueue"; + static final String CLUSTER_MANAGER_WORKFLOW_ID = "ClusterManagerWorkflow"; + + public static void main(String[] args) { + WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs(); + WorkflowClient client = WorkflowClient.newInstance(service); + WorkerFactory factory = WorkerFactory.newInstance(client); + final Worker worker = factory.newWorker(TASK_QUEUE); + worker.registerWorkflowImplementationTypes(ClusterManagerWorkflowImpl.class); + worker.registerActivitiesImplementations(new ClusterManagerActivitiesImpl()); + factory.start(); + logger.info("Worker started for task queue: " + TASK_QUEUE); + } +} diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/README.md b/core/src/main/java/io/temporal/samples/safemessagepassing/README.md new file mode 100644 index 00000000..bb442f94 --- /dev/null +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/README.md @@ -0,0 +1,22 @@ +# Safe Message Passing + +This sample shows off important techniques for handling signals and updates, aka messages. In particular, it illustrates how message handlers can interleave or not be completed before the workflow completes, and how you can manage that. + +* Here, using Workflow.await, signal and update handlers will only operate when the workflow is within a certain state--between clusterStarted and clusterShutdown. +* You can run start_workflow with an initializer signal that you want to run before anything else other than the workflow's constructor. This pattern is known as "signal-with-start." +* Message handlers can block and their actions can be interleaved with one another and with the main workflow. This can easily cause bugs, so you can use a lock to protect shared state from interleaved access. +* An "Entity" workflow, i.e. a long-lived workflow, periodically "continues as new". It must do this to prevent its history from growing too large, and it passes its state to the next workflow. You can check `Workflow.getInfo().isContinueAsNewSuggested()` to see when it's time. +* Most people want their message handlers to finish before the workflow run completes or continues as new. Use `Workflow.await(() -> Workflow.isEveryHandlerFinished())` to achieve this. +* Message handlers can be made idempotent. See update `ClusterManagerWorkflow.assignNodesToJobs`. + +First start the Worker: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.safemessagepassing.ClusterManagerWorkflowWorker +``` + +Then in a different terminal window start the Workflow Execution: + +```bash +./gradlew -q execute -PmainClass=io.temporal.samples.safemessagepassing.ClusterManagerWorkflowStarter +``` diff --git a/core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java b/core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java new file mode 100644 index 00000000..382ea110 --- /dev/null +++ b/core/src/test/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowWorkerTest.java @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved + * + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package io.temporal.samples.safemessagepassing; + +import io.temporal.client.*; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.TestWorkflowRule; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class ClusterManagerWorkflowWorkerTest { + + @Rule + public TestWorkflowRule testWorkflowRule = + TestWorkflowRule.newBuilder() + .setWorkflowTypes(ClusterManagerWorkflowImpl.class) + .setActivityImplementations(new ClusterManagerActivitiesImpl()) + .build(); + + @Test + public void testSafeMessageHandler() throws ExecutionException, InterruptedException { + ClusterManagerWorkflow cluster = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + ClusterManagerWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); + CompletableFuture result = + WorkflowClient.execute( + cluster::run, new ClusterManagerWorkflow.ClusterManagerInput(Optional.empty(), false)); + + cluster.startCluster(); + + List> + assignJobs = new ArrayList<>(); + for (int i = 0; i < 6; i++) { + assignJobs.add( + WorkflowStub.fromTyped(cluster) + .startUpdate( + "assignNodesToJobs", + WorkflowUpdateStage.ACCEPTED, + ClusterManagerWorkflow.ClusterManagerAssignNodesToJobResult.class, + new ClusterManagerWorkflow.ClusterManagerAssignNodesToJobInput(2, "job-" + i)) + .getResultAsync()); + } + assignJobs.forEach( + (f) -> { + try { + Assert.assertEquals(2, f.get().getNodesAssigned().size()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + }); + + testWorkflowRule.getTestEnvironment().sleep(Duration.ofSeconds(1)); + + List> deleteJobs = new ArrayList<>(); + for (int i = 0; i < 6; i++) { + deleteJobs.add( + WorkflowStub.fromTyped(cluster) + .startUpdate( + "deleteJob", + WorkflowUpdateStage.ACCEPTED, + Void.class, + new ClusterManagerWorkflow.ClusterManagerDeleteJobInput("job-" + i)) + .getResultAsync()); + } + deleteJobs.forEach(CompletableFuture::join); + + cluster.stopCluster(); + Assert.assertEquals(0, result.get().getNumCurrentlyAssignedNodes()); + } + + @Test + public void testUpdateIdempotency() { + ClusterManagerWorkflow cluster = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + ClusterManagerWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); + CompletableFuture result = + WorkflowClient.execute( + cluster::run, new ClusterManagerWorkflow.ClusterManagerInput(Optional.empty(), false)); + + cluster.startCluster(); + + ClusterManagerWorkflow.ClusterManagerAssignNodesToJobResult result1 = + cluster.assignNodesToJobs( + new ClusterManagerWorkflow.ClusterManagerAssignNodesToJobInput(5, "test-job")); + + ClusterManagerWorkflow.ClusterManagerAssignNodesToJobResult result2 = + cluster.assignNodesToJobs( + new ClusterManagerWorkflow.ClusterManagerAssignNodesToJobInput(5, "test-job")); + + Assert.assertTrue(result1.getNodesAssigned().size() >= result2.getNodesAssigned().size()); + } + + @Test + public void testUpdateFailure() throws ExecutionException, InterruptedException { + ClusterManagerWorkflow cluster = + testWorkflowRule + .getWorkflowClient() + .newWorkflowStub( + ClusterManagerWorkflow.class, + WorkflowOptions.newBuilder().setTaskQueue(testWorkflowRule.getTaskQueue()).build()); + CompletableFuture result = + WorkflowClient.execute( + cluster::run, new ClusterManagerWorkflow.ClusterManagerInput(Optional.empty(), false)); + + cluster.startCluster(); + + cluster.assignNodesToJobs( + new ClusterManagerWorkflow.ClusterManagerAssignNodesToJobInput(24, "big-job")); + WorkflowUpdateException updateFailure = + Assert.assertThrows( + WorkflowUpdateException.class, + () -> + cluster.assignNodesToJobs( + new ClusterManagerWorkflow.ClusterManagerAssignNodesToJobInput( + 3, "little-job"))); + Assert.assertTrue(updateFailure.getCause() instanceof ApplicationFailure); + Assert.assertEquals( + "Cannot assign nodes to a job: Not enough nodes available", + ((ApplicationFailure) updateFailure.getCause()).getOriginalMessage()); + + cluster.stopCluster(); + Assert.assertEquals( + 24, result.get().getNumCurrentlyAssignedNodes() + result.get().getNumBadNodes()); + } +}