Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
metrics, javadoc, refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Sep 9, 2018
1 parent b9a2a80 commit b1db3b5
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceNam

/**
* Updates the result of a task execution.
* If the size of the task output payload is bigger than {@link ConductorClientConfiguration#getTaskOutputPayloadThresholdKB()},
* it is uploaded to {@link ExternalPayloadStorage}, if enabled, else the task is marked as FAILED_WITH_TERMINAL_ERROR.
*
* @param taskResult the {@link TaskResult} of the executed task to be updated.
* @param taskType the type of the task
Expand All @@ -259,7 +261,7 @@ public void updateTask(TaskResult taskResult, String taskType) {
long payloadSizeThreshold = conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024;
if (taskResultSize > payloadSizeThreshold) {
if (!conductorClientConfiguration.isExternalPayloadStorageEnabled()
|| taskResultSize > (conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB() * 1024)) {
|| taskResultSize > conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB() * 1024) {
taskResult.setReasonForIncompletion(String.format("The TaskResult payload size: %d is greater than the permissible %d MB", taskResultSize, payloadSizeThreshold));
taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
taskResult.setOutputData(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,13 @@ public String startWorkflow(String name, Integer version, String correlationId,
}

/**
* Starts a workflow
* Starts a workflow.
* If the size of the workflow input payload is bigger than {@link ConductorClientConfiguration#getWorkflowInputPayloadThresholdKB()},
* it is uploaded to {@link ExternalPayloadStorage}, if enabled, else the workflow is rejected.
*
* @param startWorkflowRequest the {@link StartWorkflowRequest} object to start the workflow
* @return the id of the workflow instance that can be used for tracking
* @throws ConductorClientException if {@link ExternalPayloadStorage} is disabled or if the payload size is greater than {@link ConductorClientConfiguration#getWorkflowInputMaxPayloadThresholdKB()}
*/
public String startWorkflow(StartWorkflowRequest startWorkflowRequest) {
Preconditions.checkNotNull(startWorkflowRequest, "StartWorkflowRequest cannot be null");
Expand All @@ -174,14 +177,15 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) {
long workflowInputSize = workflowInputBytes.length;
WorkflowTaskMetrics.recordWorkflowInputPayloadSize(startWorkflowRequest.getName(), version, workflowInputSize);
if (workflowInputSize > conductorClientConfiguration.getWorkflowInputPayloadThresholdKB() * 1024) {
if (conductorClientConfiguration.isExternalPayloadStorageEnabled()) {
if (!conductorClientConfiguration.isExternalPayloadStorageEnabled() ||
(workflowInputSize > conductorClientConfiguration.getWorkflowInputMaxPayloadThresholdKB() * 1024)) {
String errorMsg = String.format("Input payload larger than the allowed threshold of: %d KB", conductorClientConfiguration.getWorkflowInputPayloadThresholdKB());
throw new ConductorClientException(errorMsg);
} else {
WorkflowTaskMetrics.incrementExternalPayloadUsedCount(startWorkflowRequest.getName(), ExternalPayloadStorage.Operation.WRITE.name(), ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT.name());
String externalStoragePath = uploadToExternalPayloadStorage(ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT, workflowInputBytes, workflowInputSize);
startWorkflowRequest.setExternalInputPayloadStoragePath(externalStoragePath);
startWorkflowRequest.setInput(null);
} else {
String errorMsg = String.format("Input payload larger than the allowed threshold of: %d KB", conductorClientConfiguration.getWorkflowInputPayloadThresholdKB());
throw new ConductorClientException(errorMsg);
}
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -26,7 +26,6 @@
import java.util.Set;
import java.util.stream.Collectors;


public class Workflow extends Auditable{

public enum WorkflowStatus {
Expand Down Expand Up @@ -402,4 +401,4 @@ public Workflow copy() {
public String toString() {
return workflowType + "." + version + "/" + workflowId + "." + status;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ private DeciderOutcome decide(final WorkflowDef workflowDef, final Workflow work
.collect(Collectors.toList()), workflow.getWorkflowId());
outcome.tasksToBeScheduled.addAll(unScheduledTasks);
}
updateOutput(workflowDef, workflow, null);
if (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflowDef, workflow)) {
logger.debug("Marking workflow as complete. workflow=" + workflow.getWorkflowId() + ", tasks=" + workflow.getTasks());
outcome.isComplete = true;
Expand Down Expand Up @@ -256,25 +255,28 @@ private List<Task> startWorkflow(Workflow workflow, WorkflowDef workflowDef) thr
return Collections.singletonList(rerunFromTask);
}

void updateOutput(final WorkflowDef def, final Workflow workflow, @Nullable Task task) {
/**
* Updates the workflow output.
*
* @param workflow the workflow instance
* @param task if not null, the output of this task will be copied to workflow output if no output parameters are specified in the workflow defintion
* if null, the output of the last task in the workflow will be copied to workflow output of no output parameters are specified in the workflow definition
*/
void updateWorkflowOutput(final Workflow workflow, @Nullable Task task) {
List<Task> allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
return;
}

Task last = Optional.ofNullable(task).orElse(allTasks.get(allTasks.size() - 1));
WorkflowDef def = metadataDAO.get(workflow.getWorkflowType(), workflow.getVersion());
Map<String, Object> output;
Task last;
if (task != null) {
last = task;
} else {
last = allTasks.get(allTasks.size() - 1);
}

if (!def.getOutputParameters().isEmpty()) {
Workflow workflowInstance = populateWorkflowAndTaskData(workflow);
output = parametersUtils.getTaskInput(def.getOutputParameters(), workflowInstance, null, null);
} else if (StringUtils.isNotBlank(last.getExternalOutputPayloadStoragePath())) {
output = externalPayloadStorageUtils.downloadPayload(last.getExternalOutputPayloadStoragePath());
Monitors.recordExternalPayloadStorageUsage(last.getTaskDefName(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.TASK_OUTPUT.toString());
} else {
output = last.getOutputData();
}
Expand Down Expand Up @@ -389,25 +391,36 @@ Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflo
return rescheduled;
}

/**
* Populates the workflow input data and the tasks input/output data if stored in external payload storage.
* This method creates a deep copy of the workflow instance where the payloads will be stored after downloading from external payload storage.
*
* @param workflow the workflow for which the data needs to be populated
* @return a copy of the workflow with the payload data populated
*/
@VisibleForTesting
Workflow populateWorkflowAndTaskData(Workflow workflow) {
Workflow workflowInstance = workflow.copy();

if (StringUtils.isNotBlank(workflow.getExternalInputPayloadStoragePath())) {
// download the workflow input from external storage here and plug it into the workflow
Map<String, Object> workflowInputParams = externalPayloadStorageUtils.downloadPayload(workflow.getExternalInputPayloadStoragePath());
Monitors.recordExternalPayloadStorageUsage(workflow.getWorkflowType(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT.toString());
workflowInstance.setInput(workflowInputParams);
workflowInstance.setExternalInputPayloadStoragePath(null);
}

workflowInstance.getTasks().stream()
.filter(task -> StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath()) || StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath()))
.forEach(task -> {
if (StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath())) {
task.setOutputData(externalPayloadStorageUtils.downloadPayload(task.getExternalOutputPayloadStoragePath()));
Monitors.recordExternalPayloadStorageUsage(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.TASK_OUTPUT.toString());
task.setExternalOutputPayloadStoragePath(null);
}
if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
task.setInputData(externalPayloadStorageUtils.downloadPayload(task.getExternalInputPayloadStoragePath()));
Monitors.recordExternalPayloadStorageUsage(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.TASK_INPUT.toString());
task.setExternalInputPayloadStoragePath(null);
}
});
Expand Down Expand Up @@ -556,10 +569,6 @@ private boolean isTaskSkipped(WorkflowTask taskToSchedule, Workflow workflow) {
}
}

private void populateTaskData(Workflow workflow) {

}

static class DeciderOutcome {

List<Task> tasksToBeScheduled = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ void completeWorkflow(Workflow wf) {
throw new ApplicationException(CONFLICT, msg);
}

deciderService.updateWorkflowOutput(wf, null);
workflow.setStatus(WorkflowStatus.COMPLETED);
workflow.setOutput(wf.getOutput());
workflow.setExternalOutputPayloadStoragePath(wf.getExternalOutputPayloadStoragePath());
Expand Down Expand Up @@ -402,6 +403,8 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
workflow.setStatus(WorkflowStatus.TERMINATED);
}

deciderService.updateWorkflowOutput(workflow, null);

String workflowId = workflow.getWorkflowId();
workflow.setReasonForIncompletion(reason);
executionDAO.updateWorkflow(workflow);
Expand Down Expand Up @@ -522,8 +525,8 @@ public void updateTask(TaskResult taskResult) {
taskByRefName.setReasonForIncompletion(task.getReasonForIncompletion());
taskByRefName.setWorkerId(task.getWorkerId());
taskByRefName.setCallbackAfterSeconds(task.getCallbackAfterSeconds());
WorkflowDef workflowDef = metadataDAO.get(workflowInstance.getWorkflowType(), workflowInstance.getVersion());
deciderService.updateOutput(workflowDef, workflowInstance, task);
//WorkflowDef workflowDef = metadataDAO.get(workflowInstance.getWorkflowType(), workflowInstance.getVersion());
deciderService.updateWorkflowOutput(workflowInstance, task);
}
executionDAO.updateWorkflow(workflowInstance);
logger.debug("Task: {} has a {} status and the Workflow has been updated with failed task reference", task, task.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.core.execution.TerminateWorkflowException;
import com.netflix.conductor.metrics.Monitors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -115,34 +116,37 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
objectMapper.writeValue(byteArrayOutputStream, payload);
byte[] payloadBytes = byteArrayOutputStream.toByteArray();
long payloadSize = payloadBytes.length / 1024;
if (payloadSize > threshold) {
if (payloadSize > maxThreshold) {
if (entity instanceof Task) {
String errorMsg = String.format("The payload size: %dKB of task: %s in workflow: %s is greater than the permissible limit: %dKB", payloadSize, ((Task) entity).getTaskId(), ((Task) entity).getWorkflowInstanceId(), maxThreshold);
failTask(((Task) entity), payloadType, errorMsg);
} else {
String errorMsg = String.format("The output payload size: %dKB of workflow: %s is greater than the permissible limit: %dKB", payloadSize, ((Workflow) entity).getWorkflowId(), maxThreshold);
failWorkflow(errorMsg);
}
}
long payloadSize = payloadBytes.length;

if (payloadSize > maxThreshold * 1024) {
if (entity instanceof Task) {
String errorMsg = String.format("The payload size: %dKB of task: %s in workflow: %s is greater than the permissible limit: %dKB", payloadSize, ((Task) entity).getTaskId(), ((Task) entity).getWorkflowInstanceId(), maxThreshold);
failTask(((Task) entity), payloadType, errorMsg);
} else {
String errorMsg = String.format("The output payload size: %dKB of workflow: %s is greater than the permissible limit: %dKB", payloadSize, ((Workflow) entity).getWorkflowId(), maxThreshold);
failWorkflow(errorMsg);
}
} else if (payloadSize > threshold * 1024) {
switch (payloadType) {
case TASK_INPUT:
((Task) entity).setInputData(null);
((Task) entity).setExternalInputPayloadStoragePath(uploadHelper(payloadBytes, payloadSize, PayloadType.TASK_INPUT));
Monitors.recordExternalPayloadStorageUsage(((Task) entity).getTaskDefName(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.TASK_INPUT.toString());
break;
case TASK_OUTPUT:
((Task) entity).setOutputData(null);
((Task) entity).setExternalOutputPayloadStoragePath(uploadHelper(payloadBytes, payloadSize, PayloadType.TASK_OUTPUT));
Monitors.recordExternalPayloadStorageUsage(((Task) entity).getTaskDefName(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.TASK_OUTPUT.toString());
break;
case WORKFLOW_INPUT:
((Workflow) entity).setInput(null);
((Workflow) entity).setExternalInputPayloadStoragePath(uploadHelper(payloadBytes, payloadSize, PayloadType.WORKFLOW_INPUT));
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowType(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.WORKFLOW_INPUT.toString());
break;
case WORKFLOW_OUTPUT:
((Workflow) entity).setOutput(null);
((Workflow) entity).setExternalOutputPayloadStoragePath(uploadHelper(payloadBytes, payloadSize, PayloadType.WORKFLOW_OUTPUT));
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowType(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.WORKFLOW_OUTPUT.toString());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class S3PayloadStorage implements ExternalPayloadStorage {
@Inject
public S3PayloadStorage(Configuration config) {
s3Client = AmazonS3ClientBuilder.standard().withRegion("us-east-1").build();
bucketName = config.getProperty("s3bucket", "");
expirationSec = config.getIntProperty("s3signedurlexpirationseconds", 5);
bucketName = config.getProperty("workflow.external.payload.storage.s3.bucket", "");
expirationSec = config.getIntProperty("workflow.external.payload.storage.s3.signedurlexpirationseconds", 5);
}

/**
Expand Down Expand Up @@ -104,6 +104,14 @@ public ExternalStorageLocation getLocation(Operation operation, PayloadType payl
}
}

/**
* Uploads the payload to the given s3 object key.
* It is expected that the caller retrieves the object key using {@link #getLocation(Operation, PayloadType, String)} before making this call.
*
* @param path the s3 key of the object to be uploaded
* @param payload an {@link InputStream} containing the json payload which is to be uploaded
* @param payloadSize the size of the json payload in bytes
*/
@Override
public void upload(String path, InputStream payload, long payloadSize) {
try {
Expand All @@ -120,6 +128,8 @@ public void upload(String path, InputStream payload, long payloadSize) {
}

/**
* Downloads the payload stored in the s3 object.
*
* @param path the S3 key of the object
* @return an input stream containing the contents of the object
* Caller is expected to close the input stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,8 @@ public static void recordDaoPayloadSize(String dao, String action, int size) {
public static void recordDaoPayloadSize(String dao, String action, String taskType, String workflowType, int size) {
gauge(classQualifier, "dao_payload_size", size, "dao", dao, "action", action, "taskType", taskType, "workflowType", workflowType);
}

public static void recordExternalPayloadStorageUsage(String name, String operation, String payloadType) {
counter(classQualifier, "external_payload_storage_usage", "name", name, "operation", operation, "payloadType", payloadType);
}
}
Loading

0 comments on commit b1db3b5

Please sign in to comment.