Skip to content

Commit

Permalink
Use workflow init
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Oct 17, 2024
1 parent 8972e7c commit 52226ac
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ enum ClusterState {
// In workflows that continue-as-new, it's convenient to store all your state in one serializable
// structure to make it easier to pass between runs
class ClusterManagerState {
public ClusterState workflowState;
public ClusterState workflowState = ClusterState.NOT_STARTED;
public Map<String, Optional<String>> nodes = new HashMap<>();
public Set<String> jobAssigned = new HashSet<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,23 @@
import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.WorkflowInit;
import io.temporal.workflow.WorkflowLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterManagerWorkflowImpl implements ClusterManagerWorkflow {

private static final Logger logger = LoggerFactory.getLogger(ClusterManagerWorkflowImpl.class);
private ClusterManagerState state = new ClusterManagerState();
private WorkflowLock nodeLock = Workflow.newWorkflowLock();
private Duration sleepInterval = Duration.ofMinutes(10);
private int maxHistoryLength = 0;
private final ClusterManagerState state;
private final WorkflowLock nodeLock;
private final Duration sleepInterval;
private final int maxHistoryLength;

private ClusterManagerActivities activities =
Workflow.newActivityStub(
Expand All @@ -51,15 +51,25 @@ public class ClusterManagerWorkflowImpl implements ClusterManagerWorkflow {
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
.build()));

@Override
public ClusterManagerResult run(ClusterManagerInput input) {
@WorkflowInit
public ClusterManagerWorkflowImpl(ClusterManagerInput input) {
nodeLock = Workflow.newWorkflowLock();
if (input.getState().isPresent()) {
state = input.getState().get();
} else {
state = new ClusterManagerState();
}
if (input.isTestContinueAsNew()) {
maxHistoryLength = 120;
sleepInterval = Duration.ofSeconds(1);
} else {
sleepInterval = Duration.ofSeconds(10);
maxHistoryLength = 0;
}
}

@Override
public ClusterManagerResult run(ClusterManagerInput input) {
Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED);
// The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint
// its state and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,6 @@ public static void main(String[] args) {
"Cluster shut down successfully. It had "
+ result.getNumCurrentlyAssignedNodes()
+ " nodes assigned at the end.");
System.exit(0);
}
}

0 comments on commit 52226ac

Please sign in to comment.