diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 40ebda31d6..35fab5bf8e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -337,7 +337,8 @@ public String rerun(RerunWorkflowRequest request) { } /** - * @param workflowId the id of the workflow to be restarted + * @param workflowId the id of the workflow to be restarted + * @param useLatestDefinitions if true, use the latest workflow and task definitions upon restart * @throws ApplicationException in the following cases: * */ - public void rewind(String workflowId) { + public void rewind(String workflowId, boolean useLatestDefinitions) { Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true); if (!workflow.getStatus().isTerminal()) { throw new ApplicationException(CONFLICT, "Workflow is still running. status=" + workflow.getStatus()); } - WorkflowDef workflowDef = Optional.ofNullable(workflow.getWorkflowDefinition()) - .orElseGet(() -> metadataDAO.get(workflow.getWorkflowName(), workflow.getWorkflowVersion()) - .orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find definition for %s", workflowId))) - ); + WorkflowDef workflowDef; + if (useLatestDefinitions) { + workflowDef = metadataDAO.getLatest(workflow.getWorkflowName()) + .orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find latest definition for %s", workflowId))); + workflow.setVersion(workflowDef.getVersion()); // setting this here to ensure backward compatibility and consistency for workflows without the embedded workflow definition + workflow.setWorkflowDefinition(workflowDef); + } else { + workflowDef = Optional.ofNullable(workflow.getWorkflowDefinition()) + .orElseGet(() -> metadataDAO.get(workflow.getWorkflowName(), workflow.getWorkflowVersion()) + .orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find definition for %s", workflowId))) + ); + } if (!workflowDef.isRestartable() && workflow.getStatus().equals(WorkflowStatus.COMPLETED)) { // Can only restart non-completed workflows when the configuration is set to false throw new ApplicationException(CONFLICT, String.format("WorkflowId: %s is an instance of WorkflowDef: %s and version: %d and is non restartable", diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java index 26d5057283..ad5550755f 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java @@ -50,11 +50,11 @@ public void resumeWorkflow(List workflowIds) { } } - public void restart(List workflowIds) { + public void restart(List workflowIds, boolean useLatestDefinitions) { ServiceUtils.checkNotNullOrEmpty(workflowIds, "WorkflowIds list cannot be null."); ServiceUtils.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, String.format("Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS)); for (String workflowId : workflowIds) { - workflowExecutor.rewind(workflowId); + workflowExecutor.rewind(workflowId, useLatestDefinitions); } } diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java index 2811bc5622..1ce8fc8cac 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowService.java @@ -251,11 +251,13 @@ public String rerunWorkflow(String workflowId, RerunWorkflowRequest request) { /** * Restarts a completed workflow. - * @param workflowId WorkflowId of the workflow. + * + * @param workflowId WorkflowId of the workflow. + * @param useLatestDefinitions if true, use the latest workflow and task definitions upon restart */ - public void restartWorkflow(String workflowId) { - ServiceUtils.checkNotNullOrEmpty(workflowId,"WorkflowId cannot be null or empty."); - workflowExecutor.rewind(workflowId); + public void restartWorkflow(String workflowId, boolean useLatestDefinitions) { + ServiceUtils.checkNotNullOrEmpty(workflowId, "WorkflowId cannot be null or empty."); + workflowExecutor.rewind(workflowId, useLatestDefinitions); } /** diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index b11c7523da..b3682cc308 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -20,6 +20,7 @@ import com.netflix.conductor.common.metadata.tasks.PollData; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.TaskType; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.metadata.workflow.WorkflowTask; @@ -45,6 +46,7 @@ import com.netflix.conductor.dao.QueueDAO; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.util.ArrayList; import java.util.Arrays; @@ -69,7 +71,9 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -84,6 +88,7 @@ public class TestWorkflowExecutor { private MetadataDAO metadataDAO; private QueueDAO queueDAO; private WorkflowStatusListener workflowStatusListener; + private DeciderService deciderService; @Before public void init() { @@ -107,7 +112,7 @@ public void init() { taskMappers.put("EVENT", new EventTaskMapper(parametersUtils)); taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils)); - DeciderService deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers); + deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers); MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO); workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config); } @@ -328,7 +333,73 @@ public void testTerminatedWorkflow() { workflowExecutor.completeWorkflow(workflow); verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class)); } - + + @Test + public void testRestartWorkflow() { + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("test_task"); + workflowTask.setTaskReferenceName("task_ref"); + + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("testDef"); + workflowDef.setVersion(1); + workflowDef.setRestartable(true); + workflowDef.getTasks().addAll(Collections.singletonList(workflowTask)); + + Task task_1 = new Task(); + task_1.setTaskId(UUID.randomUUID().toString()); + task_1.setSeq(1); + task_1.setStatus(Status.FAILED); + task_1.setTaskDefName(workflowTask.getName()); + task_1.setReferenceTaskName(workflowTask.getTaskReferenceName()); + + Task task_2 = new Task(); + task_2.setTaskId(UUID.randomUUID().toString()); + task_2.setSeq(2); + task_2.setStatus(Status.FAILED); + task_2.setTaskDefName(workflowTask.getName()); + task_2.setReferenceTaskName(workflowTask.getTaskReferenceName()); + + Workflow workflow = new Workflow(); + workflow.setWorkflowDefinition(workflowDef); + workflow.setWorkflowId("test-workflow-id"); + workflow.getTasks().addAll(Arrays.asList(task_1, task_2)); + workflow.setStatus(Workflow.WorkflowStatus.FAILED); + + when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean())).thenReturn(workflow); + doNothing().when(executionDAOFacade).removeTask(any()); + when(metadataDAO.get(workflow.getWorkflowName(), workflow.getWorkflowVersion())).thenReturn(Optional.of(workflowDef)); + when(metadataDAO.getTaskDef(workflowTask.getName())).thenReturn(new TaskDef()); + when(executionDAOFacade.updateWorkflow(any())).thenReturn(""); + + workflowExecutor.rewind(workflow.getWorkflowId(), false); + assertEquals(Workflow.WorkflowStatus.RUNNING, workflow.getStatus()); + verify(metadataDAO, never()).getLatest(any()); + + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Workflow.class); + verify(executionDAOFacade, times(2)).updateWorkflow(argumentCaptor.capture()); + assertEquals(workflow.getWorkflowId(), argumentCaptor.getAllValues().get(1).getWorkflowId()); + assertEquals(workflow.getWorkflowDefinition(), argumentCaptor.getAllValues().get(1).getWorkflowDefinition()); + + // add a new version of the workflow definition and restart with latest + workflow.setStatus(Workflow.WorkflowStatus.COMPLETED); + workflowDef = new WorkflowDef(); + workflowDef.setName("testDef"); + workflowDef.setVersion(2); + workflowDef.setRestartable(true); + workflowDef.getTasks().addAll(Collections.singletonList(workflowTask)); + + when(metadataDAO.getLatest(workflow.getWorkflowName())).thenReturn(Optional.of(workflowDef)); + workflowExecutor.rewind(workflow.getWorkflowId(), true); + assertEquals(Workflow.WorkflowStatus.RUNNING, workflow.getStatus()); + verify(metadataDAO, times(1)).getLatest(anyString()); + + argumentCaptor = ArgumentCaptor.forClass(Workflow.class); + verify(executionDAOFacade, times(4)).updateWorkflow(argumentCaptor.capture()); + assertEquals(workflow.getWorkflowId(), argumentCaptor.getAllValues().get(3).getWorkflowId()); + assertEquals(workflowDef, argumentCaptor.getAllValues().get(3).getWorkflowDefinition()); + } + @Test public void testGetFailedTasksToRetry() { //setup diff --git a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java index f0eea22335..2cd25bfd88 100644 --- a/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/WorkflowServiceTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netflix.conductor.service; import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest; @@ -251,8 +266,8 @@ public void testRerunWorkflowReturnWorkflowId() { @Test public void testRestartWorkflow() { - workflowService.restartWorkflow("w123"); - verify(mockWorkflowExecutor, times(1)).rewind(anyString()); + workflowService.restartWorkflow("w123", false); + verify(mockWorkflowExecutor, times(1)).rewind(anyString(), anyBoolean()); } @Test diff --git a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java index 5d1778b489..e02a436a46 100644 --- a/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java +++ b/grpc-server/src/main/java/com/netflix/conductor/grpc/server/service/WorkflowServiceImpl.java @@ -1,3 +1,18 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netflix.conductor.grpc.server.service; import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest; @@ -215,7 +230,7 @@ public void rerunWorkflow(RerunWorkflowRequestPb.RerunWorkflowRequest req, Strea @Override public void restartWorkflow(WorkflowServicePb.RestartWorkflowRequest req, StreamObserver response) { try { - executor.rewind(req.getWorkflowId()); + executor.rewind(req.getWorkflowId(), req.getUseLatestDefinitions()); response.onNext(WorkflowServicePb.RestartWorkflowResponse.getDefaultInstance()); response.onCompleted(); } catch (Exception e) { diff --git a/grpc/src/main/proto/grpc/workflow_service.proto b/grpc/src/main/proto/grpc/workflow_service.proto index a1644cdbfe..85cda0f70d 100644 --- a/grpc/src/main/proto/grpc/workflow_service.proto +++ b/grpc/src/main/proto/grpc/workflow_service.proto @@ -137,6 +137,7 @@ message RerunWorkflowResponse { message RestartWorkflowRequest { string workflow_id = 1; + bool use_latest_definitions = 2; } message RestartWorkflowResponse {} diff --git a/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowBulkResource.java b/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowBulkResource.java index 42483453a7..d738c2c59f 100644 --- a/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowBulkResource.java +++ b/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowBulkResource.java @@ -30,6 +30,7 @@ import javax.inject.Singleton; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; @@ -117,22 +118,24 @@ public BulkResponse resumeWorkflow(List workflowIds) throws IllegalArgum /** * Restart the list of workflows. - * @param workflowIds - list of workflow Ids to perform restart operation on + * + * @param workflowIds - list of workflow Ids to perform restart operation on + * @param useLatestDefinitions if true, use latest workflow and task definitions upon restart * @return bulk response object containing a list of succeeded workflows and a list of failed ones with errors * @throws IllegalArgumentException - too many workflowIds in one batch request - * @throws NullPointerException workflowIds list is null + * @throws NullPointerException workflowIds list is null */ @POST @Path("/restart") @ApiOperation("Restart the list of completed workflow") - public BulkResponse restart(List workflowIds) throws IllegalArgumentException, NullPointerException { + public BulkResponse restart(List workflowIds, @QueryParam("useLatestDefinitions") @DefaultValue("false") boolean useLatestDefinitions) throws IllegalArgumentException, NullPointerException { Preconditions.checkNotNull(workflowIds, "workflowIds list cannot be null."); Preconditions.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, "Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS); BulkResponse bulkResponse = new BulkResponse(); for (String workflowId : workflowIds) { try { - workflowExecutor.rewind(workflowId); + workflowExecutor.rewind(workflowId, useLatestDefinitions); bulkResponse.appendSuccessResponse(workflowId); } catch (Exception e) { LOGGER.error("bulk restart exception, workflowId {}, message: {} ",workflowId, e.getMessage(), e); diff --git a/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java b/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java index 5101aaa09b..b0dd8b10a0 100644 --- a/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java +++ b/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java @@ -50,7 +50,6 @@ /** * @author Viren - * */ @Api(value = "/workflow", produces = MediaType.APPLICATION_JSON, consumes = MediaType.APPLICATION_JSON, tags = "Workflow Management") @Path("/workflow") @@ -184,8 +183,8 @@ public String rerun(@PathParam("workflowId") String workflowId, @Path("/{workflowId}/restart") @ApiOperation("Restarts a completed workflow") @Consumes(MediaType.WILDCARD) - public void restart(@PathParam("workflowId") String workflowId) { - workflowService.restartWorkflow(workflowId); + public void restart(@PathParam("workflowId") String workflowId, @QueryParam("useLatestDefinitions") @DefaultValue("false") boolean useLatestDefinitions) { + workflowService.restartWorkflow(workflowId, useLatestDefinitions); } @POST diff --git a/jersey/src/test/java/com/netflix/conductor/server/resources/WorkflowResourceTest.java b/jersey/src/test/java/com/netflix/conductor/server/resources/WorkflowResourceTest.java index 1ac25eddbe..0a27f7eeb6 100644 --- a/jersey/src/test/java/com/netflix/conductor/server/resources/WorkflowResourceTest.java +++ b/jersey/src/test/java/com/netflix/conductor/server/resources/WorkflowResourceTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netflix.conductor.server.resources; import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest; @@ -41,7 +56,7 @@ public void before() { } @Test - public void testStartWorkflow() throws Exception { + public void testStartWorkflow() { StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest(); startWorkflowRequest.setName("w123"); Map input = new HashMap<>(); @@ -53,7 +68,7 @@ public void testStartWorkflow() throws Exception { } @Test - public void testStartWorkflowParam() throws Exception { + public void testStartWorkflowParam() { Map input = new HashMap<>(); input.put("1", "abc"); String workflowID = "w112"; @@ -62,7 +77,7 @@ public void testStartWorkflowParam() throws Exception { } @Test - public void getWorkflows() throws Exception { + public void getWorkflows() { Workflow workflow = new Workflow(); workflow.setCorrelationId("123"); ArrayList listOfWorkflows = new ArrayList() {{ @@ -73,7 +88,7 @@ public void getWorkflows() throws Exception { } @Test - public void testGetWorklfowsMultipleCorrelationId() throws Exception { + public void testGetWorklfowsMultipleCorrelationId() { Workflow workflow = new Workflow(); workflow.setCorrelationId("c123"); @@ -95,7 +110,7 @@ public void testGetWorklfowsMultipleCorrelationId() throws Exception { } @Test - public void testGetExecutionStatus() throws Exception { + public void testGetExecutionStatus() { Workflow workflow = new Workflow(); workflow.setCorrelationId("c123"); @@ -104,13 +119,13 @@ public void testGetExecutionStatus() throws Exception { } @Test - public void testDelete() throws Exception { + public void testDelete() { workflowResource.delete("w123", true); verify(mockWorkflowService, times(1)).deleteWorkflow(anyString(), anyBoolean()); } @Test - public void testGetRunningWorkflow() throws Exception { + public void testGetRunningWorkflow() { List listOfWorklfows = new ArrayList() {{ add("w123"); }}; @@ -119,58 +134,58 @@ public void testGetRunningWorkflow() throws Exception { } @Test - public void testDecide() throws Exception { + public void testDecide() { workflowResource.decide("w123"); verify(mockWorkflowService, times(1)).decideWorkflow(anyString()); } @Test - public void testPauseWorkflow() throws Exception { + public void testPauseWorkflow() { workflowResource.pauseWorkflow("w123"); verify(mockWorkflowService, times(1)).pauseWorkflow(anyString()); } @Test - public void testResumeWorkflow() throws Exception { + public void testResumeWorkflow() { workflowResource.resumeWorkflow("test"); verify(mockWorkflowService, times(1)).resumeWorkflow(anyString()); } @Test - public void testSkipTaskFromWorkflow() throws Exception { + public void testSkipTaskFromWorkflow() { workflowResource.skipTaskFromWorkflow("test", "testTask", null); verify(mockWorkflowService, times(1)).skipTaskFromWorkflow(anyString(), anyString(), any(SkipTaskRequest.class)); } @Test - public void testRerun() throws Exception { + public void testRerun() { RerunWorkflowRequest request = new RerunWorkflowRequest(); workflowResource.rerun("test", request); verify(mockWorkflowService, times(1)).rerunWorkflow(anyString(), any(RerunWorkflowRequest.class)); } @Test - public void restart() throws Exception { - workflowResource.restart("w123"); - verify(mockWorkflowService, times(1)).restartWorkflow(anyString()); + public void restart() { + workflowResource.restart("w123", false); + verify(mockWorkflowService, times(1)).restartWorkflow(anyString(), anyBoolean()); } @Test - public void testRetry() throws Exception { + public void testRetry() { workflowResource.retry("w123"); verify(mockWorkflowService, times(1)).retryWorkflow(anyString()); } @Test - public void testResetWorkflow() throws Exception { + public void testResetWorkflow() { workflowResource.resetWorkflow("w123"); verify(mockWorkflowService, times(1)).resetWorkflow(anyString()); } @Test - public void testTerminate() throws Exception { + public void testTerminate() { workflowResource.terminate("w123", "test"); verify(mockWorkflowService, times(1)).terminateWorkflow(anyString(), anyString()); } diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java index 8e00556aa0..7f1fcbb293 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java @@ -1357,7 +1357,7 @@ public void testSimpleWorkflowFailureWithTerminalError() { boolean failed = false; try { - workflowExecutor.rewind(workflowInstanceId); + workflowExecutor.rewind(workflowInstanceId, false); } catch (ApplicationException ae) { failed = true; } @@ -1422,7 +1422,7 @@ public void testSimpleWorkflow() { boolean failed = false; try { - workflowExecutor.rewind(workflowInstanceId); + workflowExecutor.rewind(workflowInstanceId, false); } catch (ApplicationException ae) { failed = true; } @@ -1797,7 +1797,7 @@ public void testSimpleWorkflowWithAllTaskInOneDomain() { clearWorkflows(); createWorkflowDefForDomain(); - metadataService.getWorkflowDef(LINEAR_WORKFLOW_T1_T2_SW, 1); + WorkflowDef def = metadataService.getWorkflowDef(LINEAR_WORKFLOW_T1_T2_SW, 1); String correlationId = "unit_test_sw"; Map input = new HashMap(); @@ -2951,7 +2951,7 @@ public void testNonRestartartableWorkflows() { assertNotNull(workflow); assertEquals(WorkflowStatus.FAILED, workflow.getStatus()); - workflowExecutor.rewind(workflow.getWorkflowId()); + workflowExecutor.rewind(workflow.getWorkflowId(), false); // Polling for the first task should return the same task as before task = workflowExecutionService.poll("junit_task_1", "task1.junit.worker"); @@ -3003,7 +3003,7 @@ public void testNonRestartartableWorkflows() { expectedException.expect(ApplicationException.class); expectedException.expectMessage(String.format("is an instance of WorkflowDef: %s and version: %d and is non restartable", JUNIT_TEST_WF_NON_RESTARTABLE, 1)); - workflowExecutor.rewind(workflow.getWorkflowId()); + workflowExecutor.rewind(workflow.getWorkflowId(), false); } @@ -3014,51 +3014,103 @@ public void testRestart() { taskDef.setRetryCount(0); metadataService.updateTaskDef(taskDef); - WorkflowDef found = metadataService.getWorkflowDef(LINEAR_WORKFLOW_T1_T2, 1); - assertNotNull(found.getFailureWorkflow()); - assertFalse(StringUtils.isBlank(found.getFailureWorkflow())); + WorkflowDef workflowDef = metadataService.getWorkflowDef(LINEAR_WORKFLOW_T1_T2, 1); + assertNotNull(workflowDef.getFailureWorkflow()); + assertFalse(StringUtils.isBlank(workflowDef.getFailureWorkflow())); String correlationId = "unit_test_1" + UUID.randomUUID().toString(); - Map input = new HashMap(); + Map input = new HashMap<>(); String inputParam1 = "p1 value"; input.put("param1", inputParam1); input.put("param2", "p2 value"); - String wfid = startOrLoadWorkflowExecution(LINEAR_WORKFLOW_T1_T2, 1, correlationId, input, null, null); - assertNotNull(wfid); + String workflowId = startOrLoadWorkflowExecution(LINEAR_WORKFLOW_T1_T2, 1, correlationId, input, null, null); + assertNotNull(workflowId); Task task = getTask("junit_task_1"); task.setStatus(FAILED); workflowExecutionService.updateTask(task); // If we get the full workflow here then, last task should be completed and the next task should be scheduled - Workflow es = workflowExecutionService.getExecutionStatus(wfid, true); - assertNotNull(es); - assertEquals(WorkflowStatus.FAILED, es.getStatus()); + Workflow workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(WorkflowStatus.FAILED, workflow.getStatus()); - workflowExecutor.rewind(es.getWorkflowId()); - es = workflowExecutionService.getExecutionStatus(wfid, true); - assertNotNull(es); - assertEquals(RUNNING, es.getStatus()); + workflowExecutor.rewind(workflow.getWorkflowId(), false); + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(RUNNING, workflow.getStatus()); task = getTask("junit_task_1"); assertNotNull(task); - assertEquals(wfid, task.getWorkflowInstanceId()); + assertEquals(workflowId, task.getWorkflowInstanceId()); task.setStatus(COMPLETED); workflowExecutionService.updateTask(task); - es = workflowExecutionService.getExecutionStatus(wfid, true); - assertNotNull(es); - assertEquals(RUNNING, es.getStatus()); + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(RUNNING, workflow.getStatus()); task = getTask("junit_task_2"); assertNotNull(task); - assertEquals(wfid, task.getWorkflowInstanceId()); + assertEquals(workflowId, task.getWorkflowInstanceId()); task.setStatus(COMPLETED); workflowExecutionService.updateTask(task); - es = workflowExecutionService.getExecutionStatus(wfid, true); - assertNotNull(es); - assertEquals(WorkflowStatus.COMPLETED, es.getStatus()); + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(WorkflowStatus.COMPLETED, workflow.getStatus()); + + // Add a new version of the definition with an additional task + WorkflowTask workflowTask = new WorkflowTask(); + workflowTask.setName("junit_task_20"); + workflowTask.setTaskReferenceName("task_added"); + workflowTask.setWorkflowTaskType(TaskType.SIMPLE); + + workflowDef.getTasks().add(workflowTask); + workflowDef.setVersion(2); + metadataService.updateWorkflowDef(workflowDef); + + // restart with the latest definition + workflowExecutor.rewind(workflow.getWorkflowId(), true); + + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(RUNNING, workflow.getStatus()); + + task = getTask("junit_task_1"); + assertNotNull(task); + assertEquals(workflowId, task.getWorkflowInstanceId()); + task.setStatus(COMPLETED); + workflowExecutionService.updateTask(task); + + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(RUNNING, workflow.getStatus()); + + task = getTask("junit_task_2"); + assertNotNull(task); + assertEquals(workflowId, task.getWorkflowInstanceId()); + task.setStatus(COMPLETED); + workflowExecutionService.updateTask(task); + + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(RUNNING, workflow.getStatus()); + + task = getTask("junit_task_20"); + assertNotNull(task); + assertEquals(workflowId, task.getWorkflowInstanceId()); + assertEquals("task_added", task.getReferenceTaskName()); + task.setStatus(COMPLETED); + workflowExecutionService.updateTask(task); + + workflow = workflowExecutionService.getExecutionStatus(workflowId, true); + assertNotNull(workflow); + assertEquals(WorkflowStatus.COMPLETED, workflow.getStatus()); + assertEquals(3, workflow.getTasks().size()); + + // cleanup + metadataService.unregisterWorkflowDef(workflowDef.getName(), 2); }