Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #870 from Netflix/restart_option_with_latest_defs
Browse files Browse the repository at this point in the history
added option to restart with latest definitions
  • Loading branch information
apanicker-nflx authored Nov 15, 2018
2 parents 85b7c1a + 035d6bb commit fc44576
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,24 +337,33 @@ public String rerun(RerunWorkflowRequest request) {
}

/**
* @param workflowId the id of the workflow to be restarted
* @param workflowId the id of the workflow to be restarted
* @param useLatestDefinitions if true, use the latest workflow and task definitions upon restart
* @throws ApplicationException in the following cases:
* <ul>
* <li>Workflow is not in a terminal state</li>
* <li>Workflow definition is not found</li>
* <li>Workflow is deemed non-restartable as per workflow definition</li>
* </ul>
*/
public void rewind(String workflowId) {
public void rewind(String workflowId, boolean useLatestDefinitions) {
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true);
if (!workflow.getStatus().isTerminal()) {
throw new ApplicationException(CONFLICT, "Workflow is still running. status=" + workflow.getStatus());
}

WorkflowDef workflowDef = Optional.ofNullable(workflow.getWorkflowDefinition())
.orElseGet(() -> metadataDAO.get(workflow.getWorkflowName(), workflow.getWorkflowVersion())
.orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find definition for %s", workflowId)))
);
WorkflowDef workflowDef;
if (useLatestDefinitions) {
workflowDef = metadataDAO.getLatest(workflow.getWorkflowName())
.orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find latest definition for %s", workflowId)));
workflow.setVersion(workflowDef.getVersion()); // setting this here to ensure backward compatibility and consistency for workflows without the embedded workflow definition
workflow.setWorkflowDefinition(workflowDef);
} else {
workflowDef = Optional.ofNullable(workflow.getWorkflowDefinition())
.orElseGet(() -> metadataDAO.get(workflow.getWorkflowName(), workflow.getWorkflowVersion())
.orElseThrow(() -> new ApplicationException(NOT_FOUND, String.format("Unable to find definition for %s", workflowId)))
);
}

if (!workflowDef.isRestartable() && workflow.getStatus().equals(WorkflowStatus.COMPLETED)) { // Can only restart non-completed workflows when the configuration is set to false
throw new ApplicationException(CONFLICT, String.format("WorkflowId: %s is an instance of WorkflowDef: %s and version: %d and is non restartable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ public void resumeWorkflow(List<String> workflowIds) {
}
}

public void restart(List<String> workflowIds) {
public void restart(List<String> workflowIds, boolean useLatestDefinitions) {
ServiceUtils.checkNotNullOrEmpty(workflowIds, "WorkflowIds list cannot be null.");
ServiceUtils.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, String.format("Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS));
for (String workflowId : workflowIds) {
workflowExecutor.rewind(workflowId);
workflowExecutor.rewind(workflowId, useLatestDefinitions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,13 @@ public String rerunWorkflow(String workflowId, RerunWorkflowRequest request) {

/**
* Restarts a completed workflow.
* @param workflowId WorkflowId of the workflow.
*
* @param workflowId WorkflowId of the workflow.
* @param useLatestDefinitions if true, use the latest workflow and task definitions upon restart
*/
public void restartWorkflow(String workflowId) {
ServiceUtils.checkNotNullOrEmpty(workflowId,"WorkflowId cannot be null or empty.");
workflowExecutor.rewind(workflowId);
public void restartWorkflow(String workflowId, boolean useLatestDefinitions) {
ServiceUtils.checkNotNullOrEmpty(workflowId, "WorkflowId cannot be null or empty.");
workflowExecutor.rewind(workflowId, useLatestDefinitions);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
Expand All @@ -45,6 +46,7 @@
import com.netflix.conductor.dao.QueueDAO;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -69,7 +71,9 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -84,6 +88,7 @@ public class TestWorkflowExecutor {
private MetadataDAO metadataDAO;
private QueueDAO queueDAO;
private WorkflowStatusListener workflowStatusListener;
private DeciderService deciderService;

@Before
public void init() {
Expand All @@ -107,7 +112,7 @@ public void init() {
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));

DeciderService deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config);
}
Expand Down Expand Up @@ -328,7 +333,73 @@ public void testTerminatedWorkflow() {
workflowExecutor.completeWorkflow(workflow);
verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class));
}


@Test
public void testRestartWorkflow() {
WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setName("test_task");
workflowTask.setTaskReferenceName("task_ref");

WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("testDef");
workflowDef.setVersion(1);
workflowDef.setRestartable(true);
workflowDef.getTasks().addAll(Collections.singletonList(workflowTask));

Task task_1 = new Task();
task_1.setTaskId(UUID.randomUUID().toString());
task_1.setSeq(1);
task_1.setStatus(Status.FAILED);
task_1.setTaskDefName(workflowTask.getName());
task_1.setReferenceTaskName(workflowTask.getTaskReferenceName());

Task task_2 = new Task();
task_2.setTaskId(UUID.randomUUID().toString());
task_2.setSeq(2);
task_2.setStatus(Status.FAILED);
task_2.setTaskDefName(workflowTask.getName());
task_2.setReferenceTaskName(workflowTask.getTaskReferenceName());

Workflow workflow = new Workflow();
workflow.setWorkflowDefinition(workflowDef);
workflow.setWorkflowId("test-workflow-id");
workflow.getTasks().addAll(Arrays.asList(task_1, task_2));
workflow.setStatus(Workflow.WorkflowStatus.FAILED);

when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean())).thenReturn(workflow);
doNothing().when(executionDAOFacade).removeTask(any());
when(metadataDAO.get(workflow.getWorkflowName(), workflow.getWorkflowVersion())).thenReturn(Optional.of(workflowDef));
when(metadataDAO.getTaskDef(workflowTask.getName())).thenReturn(new TaskDef());
when(executionDAOFacade.updateWorkflow(any())).thenReturn("");

workflowExecutor.rewind(workflow.getWorkflowId(), false);
assertEquals(Workflow.WorkflowStatus.RUNNING, workflow.getStatus());
verify(metadataDAO, never()).getLatest(any());

ArgumentCaptor<Workflow> argumentCaptor = ArgumentCaptor.forClass(Workflow.class);
verify(executionDAOFacade, times(2)).updateWorkflow(argumentCaptor.capture());
assertEquals(workflow.getWorkflowId(), argumentCaptor.getAllValues().get(1).getWorkflowId());
assertEquals(workflow.getWorkflowDefinition(), argumentCaptor.getAllValues().get(1).getWorkflowDefinition());

// add a new version of the workflow definition and restart with latest
workflow.setStatus(Workflow.WorkflowStatus.COMPLETED);
workflowDef = new WorkflowDef();
workflowDef.setName("testDef");
workflowDef.setVersion(2);
workflowDef.setRestartable(true);
workflowDef.getTasks().addAll(Collections.singletonList(workflowTask));

when(metadataDAO.getLatest(workflow.getWorkflowName())).thenReturn(Optional.of(workflowDef));
workflowExecutor.rewind(workflow.getWorkflowId(), true);
assertEquals(Workflow.WorkflowStatus.RUNNING, workflow.getStatus());
verify(metadataDAO, times(1)).getLatest(anyString());

argumentCaptor = ArgumentCaptor.forClass(Workflow.class);
verify(executionDAOFacade, times(4)).updateWorkflow(argumentCaptor.capture());
assertEquals(workflow.getWorkflowId(), argumentCaptor.getAllValues().get(3).getWorkflowId());
assertEquals(workflowDef, argumentCaptor.getAllValues().get(3).getWorkflowDefinition());
}

@Test
public void testGetFailedTasksToRetry() {
//setup
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.conductor.service;

import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
Expand Down Expand Up @@ -251,8 +266,8 @@ public void testRerunWorkflowReturnWorkflowId() {

@Test
public void testRestartWorkflow() {
workflowService.restartWorkflow("w123");
verify(mockWorkflowExecutor, times(1)).rewind(anyString());
workflowService.restartWorkflow("w123", false);
verify(mockWorkflowExecutor, times(1)).rewind(anyString(), anyBoolean());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.conductor.grpc.server.service;

import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest;
Expand Down Expand Up @@ -215,7 +230,7 @@ public void rerunWorkflow(RerunWorkflowRequestPb.RerunWorkflowRequest req, Strea
@Override
public void restartWorkflow(WorkflowServicePb.RestartWorkflowRequest req, StreamObserver<WorkflowServicePb.RestartWorkflowResponse> response) {
try {
executor.rewind(req.getWorkflowId());
executor.rewind(req.getWorkflowId(), req.getUseLatestDefinitions());
response.onNext(WorkflowServicePb.RestartWorkflowResponse.getDefaultInstance());
response.onCompleted();
} catch (Exception e) {
Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/grpc/workflow_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ message RerunWorkflowResponse {

message RestartWorkflowRequest {
string workflow_id = 1;
bool use_latest_definitions = 2;
}

message RestartWorkflowResponse {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.inject.Singleton;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -117,22 +118,24 @@ public BulkResponse resumeWorkflow(List<String> workflowIds) throws IllegalArgum

/**
* Restart the list of workflows.
* @param workflowIds - list of workflow Ids to perform restart operation on
*
* @param workflowIds - list of workflow Ids to perform restart operation on
* @param useLatestDefinitions if true, use latest workflow and task definitions upon restart
* @return bulk response object containing a list of succeeded workflows and a list of failed ones with errors
* @throws IllegalArgumentException - too many workflowIds in one batch request
* @throws NullPointerException workflowIds list is null
* @throws NullPointerException workflowIds list is null
*/
@POST
@Path("/restart")
@ApiOperation("Restart the list of completed workflow")
public BulkResponse restart(List<String> workflowIds) throws IllegalArgumentException, NullPointerException {
public BulkResponse restart(List<String> workflowIds, @QueryParam("useLatestDefinitions") @DefaultValue("false") boolean useLatestDefinitions) throws IllegalArgumentException, NullPointerException {
Preconditions.checkNotNull(workflowIds, "workflowIds list cannot be null.");
Preconditions.checkArgument(workflowIds.size() < MAX_REQUEST_ITEMS, "Cannot process more than %s workflows. Please use multiple requests", MAX_REQUEST_ITEMS);

BulkResponse bulkResponse = new BulkResponse();
for (String workflowId : workflowIds) {
try {
workflowExecutor.rewind(workflowId);
workflowExecutor.rewind(workflowId, useLatestDefinitions);
bulkResponse.appendSuccessResponse(workflowId);
} catch (Exception e) {
LOGGER.error("bulk restart exception, workflowId {}, message: {} ",workflowId, e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

/**
* @author Viren
*
*/
@Api(value = "/workflow", produces = MediaType.APPLICATION_JSON, consumes = MediaType.APPLICATION_JSON, tags = "Workflow Management")
@Path("/workflow")
Expand Down Expand Up @@ -184,8 +183,8 @@ public String rerun(@PathParam("workflowId") String workflowId,
@Path("/{workflowId}/restart")
@ApiOperation("Restarts a completed workflow")
@Consumes(MediaType.WILDCARD)
public void restart(@PathParam("workflowId") String workflowId) {
workflowService.restartWorkflow(workflowId);
public void restart(@PathParam("workflowId") String workflowId, @QueryParam("useLatestDefinitions") @DefaultValue("false") boolean useLatestDefinitions) {
workflowService.restartWorkflow(workflowId, useLatestDefinitions);
}

@POST
Expand Down
Loading

0 comments on commit fc44576

Please sign in to comment.