Skip to content

Commit

Permalink
safe message passing sample (#681)
Browse files Browse the repository at this point in the history
Add safe message passing sample
  • Loading branch information
Quinn-With-Two-Ns authored Oct 30, 2024
1 parent 0da7716 commit 5c96864
Show file tree
Hide file tree
Showing 9 changed files with 938 additions and 0 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ See the README.md file in each main sample directory for cut/paste Gradle comman

- [**Sliding Window Batch**](/core/src/main/java/io/temporal/samples/batch/slidingwindow): A batch implementation that maintains a configured number of child workflows during processing.

- [**Safe Message Passing**](/core/src/main/java/io/temporal/samples/safemessagepassing): Safely handling concurrent updates and signals messages.

#### API demonstrations

- [**Async Untyped Child Workflow**](/core/src/main/java/io/temporal/samples/asyncuntypedchild): Demonstrates how to invoke an untyped child workflow async, that can complete after parent workflow is already completed.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.safemessagepassing;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
import java.util.List;
import java.util.Set;

@ActivityInterface
public interface ClusterManagerActivities {

class AssignNodesToJobInput {
private final List<String> nodes;
private final String jobName;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public AssignNodesToJobInput(
@JsonProperty("nodes_to_assign") Set<String> nodesToAssign,
@JsonProperty("job_name") String jobName) {
this.nodes = List.copyOf(nodesToAssign);
this.jobName = jobName;
}

@JsonProperty("nodes_to_assign")
public List<String> getNodes() {
return nodes;
}

@JsonProperty("job_name")
public String getJobName() {
return jobName;
}
}

@ActivityMethod
void assignNodesToJob(AssignNodesToJobInput input);

class UnassignNodesForJobInput {
private final List<String> nodes;
private final String jobName;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public UnassignNodesForJobInput(
@JsonProperty("nodes") Set<String> nodes, @JsonProperty("job_name") String jobName) {
this.nodes = List.copyOf(nodes);
this.jobName = jobName;
}

@JsonProperty("nodes")
public List<String> getNodes() {
return nodes;
}

@JsonProperty("job_name")
public String getJobName() {
return jobName;
}
}

@ActivityMethod
void unassignNodesForJob(UnassignNodesForJobInput input);

class FindBadNodesInput {
private final Set<String> nodesToCheck;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public FindBadNodesInput(@JsonProperty("assigned_nodes") Set<String> assignedNodes) {
this.nodesToCheck = assignedNodes;
}

@JsonProperty("assigned_nodes")
public Set<String> getNodesToCheck() {
return nodesToCheck;
}
}

@ActivityMethod
Set<String> findBadNodes(FindBadNodesInput input);

@ActivityMethod
void shutdown();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.safemessagepassing;

import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterManagerActivitiesImpl implements ClusterManagerActivities {
private static final Logger log = LoggerFactory.getLogger(ClusterManagerActivitiesImpl.class);

@Override
public void assignNodesToJob(AssignNodesToJobInput input) {
for (String node : input.getNodes()) {
log.info("Assigned node " + node + " to job " + input.getJobName());
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void unassignNodesForJob(UnassignNodesForJobInput input) {
for (String node : input.getNodes()) {
log.info("Unassigned node " + node + " from job " + input.getJobName());
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public Set<String> findBadNodes(FindBadNodesInput input) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Set<String> badNodes =
input.getNodesToCheck().stream()
.filter(n -> Integer.parseInt(n) % 5 == 0)
.collect(Collectors.toSet());
if (!badNodes.isEmpty()) {
log.info("Found bad nodes: " + badNodes);
} else {
log.info("No bad nodes found");
}
return badNodes;
}

@Override
public void shutdown() {
log.info("Shutting down cluster");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.samples.safemessagepassing;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.UpdateMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import java.util.*;

/**
* ClusterManagerWorkflow keeps track of the assignments of a cluster of nodes. Via signals, the
* cluster can be started and shutdown. Via updates, clients can also assign jobs to nodes and
* delete jobs. These updates must run atomically.
*/
@WorkflowInterface
public interface ClusterManagerWorkflow {

enum ClusterState {
NOT_STARTED,
STARTED,
SHUTTING_DOWN
}
// In workflows that continue-as-new, it's convenient to store all your state in one serializable
// structure to make it easier to pass between runs
class ClusterManagerState {
public ClusterState workflowState = ClusterState.NOT_STARTED;
public Map<String, Optional<String>> nodes = new HashMap<>();
public Set<String> jobAssigned = new HashSet<>();
}

class ClusterManagerInput {
private final Optional<ClusterManagerState> state;
private final boolean testContinueAsNew;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public ClusterManagerInput(
@JsonProperty("state") Optional<ClusterManagerState> state,
@JsonProperty("test_continue_as_new") boolean testContinueAsNew) {
this.state = state;
this.testContinueAsNew = testContinueAsNew;
}

@JsonProperty("state")
public Optional<ClusterManagerState> getState() {
return state;
}

@JsonProperty("test_continue_as_new")
public boolean isTestContinueAsNew() {
return testContinueAsNew;
}
}

class ClusterManagerResult {
private final int numCurrentlyAssignedNodes;
private final int numBadNodes;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public ClusterManagerResult(
@JsonProperty("num_currently_assigned_nodes") int numCurrentlyAssignedNodes,
@JsonProperty("num_bad_nodes") int numBadNodes) {
this.numCurrentlyAssignedNodes = numCurrentlyAssignedNodes;
this.numBadNodes = numBadNodes;
}

@JsonProperty("num_currently_assigned_nodes")
public int getNumCurrentlyAssignedNodes() {
return numCurrentlyAssignedNodes;
}

@JsonProperty("num_bad_nodes")
public int getNumBadNodes() {
return numBadNodes;
}
}

// Be in the habit of storing message inputs and outputs in serializable structures.
// This makes it easier to add more overtime in a backward-compatible way.
class ClusterManagerAssignNodesToJobInput {
// If larger or smaller than previous amounts, will resize the job.
private final int totalNumNodes;
private final String jobName;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public ClusterManagerAssignNodesToJobInput(
@JsonProperty("total_num_nodes") int totalNumNodes,
@JsonProperty("job_name") String jobName) {
this.totalNumNodes = totalNumNodes;
this.jobName = jobName;
}

@JsonProperty("total_num_nodes")
public int getTotalNumNodes() {
return totalNumNodes;
}

@JsonProperty("job_name")
public String getJobName() {
return jobName;
}
}

class ClusterManagerDeleteJobInput {
private final String jobName;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public ClusterManagerDeleteJobInput(@JsonProperty("job_name") String jobName) {
this.jobName = jobName;
}

@JsonProperty("job_name")
public String getJobName() {
return jobName;
}
}

class ClusterManagerAssignNodesToJobResult {
private final Set<String> nodesAssigned;

@JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
public ClusterManagerAssignNodesToJobResult(
@JsonProperty("assigned_nodes") Set<String> assignedNodes) {
this.nodesAssigned = assignedNodes;
}

@JsonProperty("assigned_nodes")
public Set<String> getNodesAssigned() {
return nodesAssigned;
}
}

@WorkflowMethod
ClusterManagerResult run(ClusterManagerInput input);

@SignalMethod
void startCluster();

@UpdateMethod
boolean stopCluster();

// This is an update as opposed to a signal because the client may want to wait for nodes to be
// allocated before sending work to those nodes. Returns the list of node names that were
// allocated to the job.
@UpdateMethod
ClusterManagerAssignNodesToJobResult assignNodesToJobs(ClusterManagerAssignNodesToJobInput input);

// Even though it returns nothing, this is an update because the client may want to track it, for
// example to wait for nodes to be unassigned before reassigning them.
@UpdateMethod
void deleteJob(ClusterManagerDeleteJobInput input);
}
Loading

0 comments on commit 5c96864

Please sign in to comment.