From 52226acbfcaf282e8e407ca874cab3d4b1201854 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Thu, 17 Oct 2024 14:50:42 -0700 Subject: [PATCH] Use workflow init --- .../ClusterManagerWorkflow.java | 2 +- .../ClusterManagerWorkflowImpl.java | 28 +++++++++++++------ .../ClusterManagerWorkflowStarter.java | 1 + 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java index e4d8a1de..52ab2447 100644 --- a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflow.java @@ -43,7 +43,7 @@ enum ClusterState { // In workflows that continue-as-new, it's convenient to store all your state in one serializable // structure to make it easier to pass between runs class ClusterManagerState { - public ClusterState workflowState; + public ClusterState workflowState = ClusterState.NOT_STARTED; public Map> nodes = new HashMap<>(); public Set jobAssigned = new HashSet<>(); } diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java index 6507baf1..2abd2b0d 100644 --- a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowImpl.java @@ -23,23 +23,23 @@ import io.temporal.common.RetryOptions; import io.temporal.failure.ApplicationFailure; import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; import io.temporal.workflow.WorkflowLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.time.Duration; import java.util.Collections; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClusterManagerWorkflowImpl implements ClusterManagerWorkflow { private static final Logger logger = LoggerFactory.getLogger(ClusterManagerWorkflowImpl.class); - private ClusterManagerState state = new ClusterManagerState(); - private WorkflowLock nodeLock = Workflow.newWorkflowLock(); - private Duration sleepInterval = Duration.ofMinutes(10); - private int maxHistoryLength = 0; + private final ClusterManagerState state; + private final WorkflowLock nodeLock; + private final Duration sleepInterval; + private final int maxHistoryLength; private ClusterManagerActivities activities = Workflow.newActivityStub( @@ -51,15 +51,25 @@ public class ClusterManagerWorkflowImpl implements ClusterManagerWorkflow { .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build()) .build())); - @Override - public ClusterManagerResult run(ClusterManagerInput input) { + @WorkflowInit + public ClusterManagerWorkflowImpl(ClusterManagerInput input) { + nodeLock = Workflow.newWorkflowLock(); if (input.getState().isPresent()) { state = input.getState().get(); + } else { + state = new ClusterManagerState(); } if (input.isTestContinueAsNew()) { maxHistoryLength = 120; sleepInterval = Duration.ofSeconds(1); + } else { + sleepInterval = Duration.ofSeconds(10); + maxHistoryLength = 0; } + } + + @Override + public ClusterManagerResult run(ClusterManagerInput input) { Workflow.await(() -> state.workflowState != ClusterState.NOT_STARTED); // The cluster manager is a long-running "entity" workflow so we need to periodically checkpoint // its state and diff --git a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java index d3a59ca8..ea3aa443 100644 --- a/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java +++ b/core/src/main/java/io/temporal/samples/safemessagepassing/ClusterManagerWorkflowStarter.java @@ -109,5 +109,6 @@ public static void main(String[] args) { "Cluster shut down successfully. It had " + result.getNumCurrentlyAssignedNodes() + " nodes assigned at the end."); + System.exit(0); } }