Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jun 12, 2018
2 parents 69b2901 + d3217ec commit 71e76c7
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 121 deletions.
6 changes: 1 addition & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ Conductor is an _orchestration_ engine that runs in the cloud.
## Get Conductor
Binaries are available from Maven Central and jcenter.

|Group|Artifact|Latest Stable Version|
|-----------|---------------|---------------------|
|com.netflix.conductor|conductor-*|1.5.+|

Below are the various artifacts published:

|Artifact|Description|
Expand Down Expand Up @@ -73,7 +69,7 @@ Conductor is maintained by Media Workflow Infrastructure team at Netflix. Use g

## LICENSE

Copyright (c) 2016 Netflix, Inc.
Copyright (c) 2018 Netflix, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -72,7 +71,7 @@
@Trace
public class WorkflowExecutor {

private static Logger logger = LoggerFactory.getLogger(WorkflowExecutor.class);
private static final Logger logger = LoggerFactory.getLogger(WorkflowExecutor.class);

private MetadataDAO metadataDAO;

Expand Down Expand Up @@ -127,7 +126,6 @@ public String startWorkflow(String workflowName, int workflowVersion, Map<String
String event, Map<String, String> taskToDomain) throws Exception {



try {
//Check if the input to the workflow is not null
//QQ When is the payload of the input validated
Expand Down Expand Up @@ -311,7 +309,9 @@ public Task getPendingTaskByWorkflow(String taskReferenceName, String workflowId
.orElse(null);
}

public void completeWorkflow(Workflow wf) throws Exception {
@VisibleForTesting
void completeWorkflow(Workflow wf) throws Exception {
logger.debug("Completing workflow execution for {}", wf.getWorkflowId());
Workflow workflow = executionDAO.getWorkflow(wf.getWorkflowId(), false);

if (workflow.getStatus().equals(WorkflowStatus.COMPLETED)) {
Expand All @@ -328,16 +328,19 @@ public void completeWorkflow(Workflow wf) throws Exception {
workflow.setStatus(WorkflowStatus.COMPLETED);
workflow.setOutput(wf.getOutput());
executionDAO.updateWorkflow(workflow);
logger.debug("Completed workflow execution for {}", wf.getWorkflowId());
executionDAO.updateTasks(wf.getTasks());

// If the following task, for some reason fails, the sweep will take
// care of this again!
if (workflow.getParentWorkflowId() != null) {
Workflow parent = executionDAO.getWorkflow(workflow.getParentWorkflowId(), false);
logger.debug("Completed sub-workflow {}, deciding parent workflow {}", wf.getWorkflowId(), wf.getParentWorkflowId());
decide(parent.getWorkflowId());
}
Monitors.recordWorkflowCompletion(workflow.getWorkflowType(), workflow.getEndTime() - workflow.getStartTime(), wf.getOwnerApp());
queueDAO.remove(deciderQueue, workflow.getWorkflowId()); //remove from the sweep queue
logger.debug("Removed workflow {} from decider queue", wf.getWorkflowId());
}

public void terminateWorkflow(String workflowId, String reason) throws Exception {
Expand Down Expand Up @@ -414,7 +417,6 @@ public void updateTask(TaskResult taskResult) throws Exception {

String workflowId = taskResult.getWorkflowInstanceId();
Workflow workflowInstance = executionDAO.getWorkflow(workflowId);
workflowInstance.getWorkflowType();
Task task = executionDAO.getTask(taskResult.getTaskId());

logger.debug("Task: {} belonging to Workflow {} being updated", task, workflowInstance);
Expand Down Expand Up @@ -467,7 +469,7 @@ public void updateTask(TaskResult taskResult) throws Exception {
workflowInstance.getFailedReferenceTaskNames().add(task.getReferenceTaskName());
//In case of a FAILED_WITH_TERMINAL_ERROR the workflow will be terminated and the output of the task is never copied
//ensuring the task output is copied to the workflow here
if(FAILED_WITH_TERMINAL_ERROR.equals(task.getStatus())) {
if (FAILED_WITH_TERMINAL_ERROR.equals(task.getStatus())) {
WorkflowDef workflowDef = metadataDAO.get(workflowInstance.getWorkflowType(), workflowInstance.getVersion());
Map<String, Object> outputData = task.getOutputData();
if (!workflowDef.getOutputParameters().isEmpty()) {
Expand All @@ -492,12 +494,12 @@ public void updateTask(TaskResult taskResult) throws Exception {
logger.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name());
break;
case IN_PROGRESS:
// put it back in queue based in callbackAfterSeconds
// put it back in queue based on callbackAfterSeconds
long callBack = taskResult.getCallbackAfterSeconds();
queueDAO.remove(taskQueueName, task.getTaskId());
logger.debug("Task: {} removed from taskQueue: {} since the task status is {}", task, taskQueueName, task.getStatus().name());
queueDAO.push(taskQueueName, task.getTaskId(), callBack); // Milliseconds
logger.debug("Task: {} pushed to taskQueue: {} since the task status is {} and callback: {}", task, taskQueueName, task.getStatus().name(), callBack);
logger.debug("Task: {} pushed back to taskQueue: {} since the task status is {} with callbackAfterSeconds: {}", task, taskQueueName, task.getStatus().name(), callBack);
break;
default:
break;
Expand Down Expand Up @@ -592,6 +594,9 @@ public boolean decide(String workflowId) throws Exception {
logger.debug(tw.getMessage(), tw);
terminate(def, workflow, tw);
return true;
} catch (Exception e) {
logger.error("Error deciding workflow: {}", workflowId, e);
throw e;
}
return false;
}
Expand Down Expand Up @@ -743,7 +748,7 @@ public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, int
logger.info("Done Executing {}/{}-{} op={}", task.getTaskType(), task.getTaskId(), task.getStatus(), task.getOutputData().toString());

} catch (Exception e) {
logger.error(e.getMessage(), e);
logger.error("Error executing system task - {}, with id: {}", systemTask, taskId, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void init(WorkflowExecutor workflowExecutor) {
sweep(workflowIds, workflowExecutor);
} catch (Exception e) {
Monitors.error(className, "sweep");
logger.error(e.getMessage(), e);
logger.error("Error when sweeping workflow", e);
}
}, 500, 500, TimeUnit.MILLISECONDS);
}
Expand Down
Loading

0 comments on commit 71e76c7

Please sign in to comment.