Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
use external payload storage in workflow execution
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Sep 9, 2018
1 parent 6750f55 commit b9a2a80
Show file tree
Hide file tree
Showing 44 changed files with 2,200 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -76,7 +79,7 @@ protected ClientBase(ClientConfig config, ClientHandler handler) {
this(config, new DefaultConductorClientConfiguration(), handler);
}

protected ClientBase(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler) {
protected ClientBase(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler) {
objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
Expand All @@ -95,7 +98,6 @@ protected ClientBase(ClientConfig config, ConductorClientConfiguration clientCo

conductorClientConfiguration = clientConfiguration;
payloadStorage = new PayloadStorage(this);

}

public void setRootURI(String root) {
Expand All @@ -108,7 +110,6 @@ protected void delete(String url, Object... uriVariables) {

protected void delete(Object[] queryParams, String url, Object... uriVariables) {
URI uri = null;
ClientResponse clientResponse = null;
try {
uri = getURIBuilder(root + url, queryParams).build(uriVariables);
client.resource(uri).delete();
Expand All @@ -127,28 +128,11 @@ protected void put(String url, Object[] queryParams, Object request, Object... u
}
}

/**
* @deprecated replaced by {@link #postForEntityWithRequestOnly(String, Object)} ()}
*/
@Deprecated
protected void postForEntity(String url, Object request) {
postForEntityWithRequestOnly(url, request);
}


protected void postForEntityWithRequestOnly(String url, Object request) {
Class<?> type = null;
postForEntity(url, request, null, type);
}

/**
* @deprecated replaced by {@link #postForEntityWithUriVariablesOnly(String, Object...)} ()}
*/
@Deprecated
protected void postForEntity1(String url, Object... uriVariables) {
postForEntityWithUriVariablesOnly(url, uriVariables);
}

protected void postForEntityWithUriVariablesOnly(String url, Object... uriVariables) {
Class<?> type = null;
postForEntity(url, null, null, type, uriVariables);
Expand Down Expand Up @@ -215,19 +199,40 @@ private <T> T getForEntity(String url, Object[] queryParams, Function<ClientResp
* Uses the {@link PayloadStorage} for storing large payloads.
* Gets the uri for storing the payload from the server and then uploads to this location.
*
* @param payloadType the {@link com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType} to be uploaded
* @param payloadType the {@link com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType} to be uploaded
* @param payloadBytes the byte array containing the payload
* @param payloadSize the size of the payload
* @param payloadSize the size of the payload
* @return the path where the payload is stored in external storage
*/
protected String uploadToExternalPayloadStorage(ExternalPayloadStorage.PayloadType payloadType, byte[] payloadBytes, long payloadSize) {
Preconditions.checkArgument(payloadType.equals(ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT) || payloadType.equals(ExternalPayloadStorage.PayloadType.TASK_OUTPUT),
"Payload type must be workflow input or task output");
ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.WRITE, payloadType);
ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.WRITE, payloadType, "");
payloadStorage.upload(externalStorageLocation.getUri(), new ByteArrayInputStream(payloadBytes), payloadSize);
return externalStorageLocation.getPath();
}

/**
* Uses the {@link PayloadStorage} for downloading large payloads to be used by the client.
* Gets the uri of the payload fom the server and then downloads from this location.
*
* @param payloadType the {@link com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType} to be downloaded
* @param path the relative of the payload in external storage
* @return the payload object that is stored in external storage
*/
@SuppressWarnings("unchecked")
protected Map<String, Object> downloadFromExternalStorage(ExternalPayloadStorage.PayloadType payloadType, String path) {
Preconditions.checkArgument(StringUtils.isNotBlank(path), "uri cannot be blank");
ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.READ, payloadType, path);
try (InputStream inputStream = payloadStorage.download(externalStorageLocation.getUri())) {
return objectMapper.readValue(inputStream, Map.class);
} catch (IOException e) {
String errorMsg = String.format("Unable to download payload frome external storage location: %s", path);
logger.error(errorMsg, e);
throw new ConductorClientException(errorMsg, e);
}
}

private Builder getWebResourceBuilder(URI URI, Object entity) {
return client.resource(URI).type(MediaType.APPLICATION_JSON).entity(entity).accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@
/**
* An implementation of {@link ExternalPayloadStorage} for storing large JSON payload data.
*/
public class PayloadStorage implements ExternalPayloadStorage {
class PayloadStorage implements ExternalPayloadStorage {
private static final Logger logger = LoggerFactory.getLogger(PayloadStorage.class);

private final ClientBase clientBase;

public PayloadStorage(ClientBase clientBase) {
PayloadStorage(ClientBase clientBase) {
this.clientBase = clientBase;
}

Expand All @@ -49,7 +49,7 @@ public PayloadStorage(ClientBase clientBase) {
* The client makes a request to the server to get the {@link ExternalStorageLocation}
*/
@Override
public ExternalStorageLocation getLocation(Operation operation, PayloadType payloadType) {
public ExternalStorageLocation getLocation(Operation operation, PayloadType payloadType, String path) {
String uri;
switch (payloadType) {
case WORKFLOW_INPUT:
Expand All @@ -63,7 +63,7 @@ public ExternalStorageLocation getLocation(Operation operation, PayloadType payl
default:
throw new ConductorClientException(String.format("Invalid payload type: %s for operation: %s", payloadType.toString(), operation.toString()));
}
return clientBase.getForEntity(String.format("%s/externalstoragelocation", uri), null, ExternalStorageLocation.class);
return clientBase.getForEntity(String.format("%s/externalstoragelocation", uri), new Object[]{"path", path}, ExternalStorageLocation.class);
}

/**
Expand Down Expand Up @@ -109,16 +109,16 @@ public void upload(String uri, InputStream payload, long payloadSize) {
/**
* Downloads the payload from the given uri.
*
* @param path the location from where the object is to be downloaded
* @param uri the location from where the object is to be downloaded
* @return an inputstream of the payload in the external storage
* @throws ConductorClientException if the download fails due to an invalid path or an error from external storage
*/
@Override
public InputStream download(String path) {
public InputStream download(String uri) {
HttpURLConnection connection = null;
String errorMsg;
try {
URL url = new URI(path).toURL();
URL url = new URI(uri).toURL();
connection = (HttpURLConnection) url.openConnection();
connection.setDoOutput(false);

Expand All @@ -132,11 +132,11 @@ public InputStream download(String path) {
logger.error(errorMsg);
throw new ConductorClientException(errorMsg);
} catch (URISyntaxException | MalformedURLException e) {
errorMsg = String.format("Invalid path specified: %s", path);
errorMsg = String.format("Invalid uri specified: %s", uri);
logger.error(errorMsg, e);
throw new ConductorClientException(errorMsg, e);
} catch (IOException e) {
errorMsg = String.format("Error downloading from path: %s", path);
errorMsg = String.format("Error downloading from uri: %s", uri);
logger.error(errorMsg, e);
throw new ConductorClientException(errorMsg, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ public Task pollTask(String taskType, String workerId, String domain) {
Preconditions.checkArgument(StringUtils.isNotBlank(workerId), "Worker id cannot be blank");

Object[] params = new Object[]{"workerid", workerId, "domain", domain};
return getForEntity("tasks/poll/{taskType}", params, Task.class, taskType);
Task task = getForEntity("tasks/poll/{taskType}", params, Task.class, taskType);
populateTaskInput(task);
return task;
}

/**
Expand All @@ -152,7 +154,9 @@ public List<Task> batchPollTasksByTaskType(String taskType, String workerId, int
Preconditions.checkArgument(count > 0, "Count must be greater than 0");

Object[] params = new Object[]{"workerid", workerId, "count", count, "timeout", timeoutInMillisecond};
return getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
List<Task> tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
tasks.forEach(this::populateTaskInput);
return tasks;
}

/**
Expand Down Expand Up @@ -180,7 +184,22 @@ public List<Task> batchPollTasksInDomain(String taskType, String domain, String
Preconditions.checkArgument(count > 0, "Count must be greater than 0");

Object[] params = new Object[]{"workerid", workerId, "count", count, "timeout", timeoutInMillisecond, "domain", domain};
return getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
List<Task> tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
tasks.forEach(this::populateTaskInput);
return tasks;
}

/**
* Populates the task input from external payload storage if the external storage path is specified.
*
* @param task the task for which the input is to be populated.
*/
private void populateTaskInput(Task task) {
if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
WorkflowTaskMetrics.incrementExternalPayloadUsedCount(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.name(), ExternalPayloadStorage.PayloadType.TASK_INPUT.name());
task.setInputData(downloadFromExternalStorage(ExternalPayloadStorage.PayloadType.TASK_INPUT, task.getExternalInputPayloadStoragePath()));
task.setExternalInputPayloadStoragePath(null);
}
}

/**
Expand Down Expand Up @@ -448,6 +467,6 @@ public void unregisterTaskDef(String taskType) {
@Deprecated
public void registerTaskDefs(List<TaskDef> taskDefs) {
Preconditions.checkNotNull(taskDefs, "Task defs cannot be null");
postForEntity("metadata/taskdefs", taskDefs);
postForEntityWithRequestOnly("metadata/taskdefs", taskDefs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public List<WorkflowDef> getAllWorkflowDefs() {
@Deprecated
public void registerWorkflow(WorkflowDef workflowDef) {
Preconditions.checkNotNull(workflowDef, "Worfklow definition cannot be null");
postForEntity("metadata/workflow", workflowDef);
postForEntityWithRequestOnly("metadata/workflow", workflowDef);
}

/**
Expand Down Expand Up @@ -211,7 +211,9 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) {
*/
public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "workflow id cannot be blank");
return getForEntity("workflow/{workflowId}", new Object[]{"includeTasks", includeTasks}, Workflow.class, workflowId);
Workflow workflow = getForEntity("workflow/{workflowId}", new Object[]{"includeTasks", includeTasks}, Workflow.class, workflowId);
populateWorkflowOutput(workflow);
return workflow;
}

/**
Expand All @@ -228,8 +230,22 @@ public List<Workflow> getWorkflows(String name, String correlationId, boolean in
Preconditions.checkArgument(StringUtils.isNotBlank(correlationId), "correlationId cannot be blank");

Object[] params = new Object[]{"includeClosed", includeClosed, "includeTasks", includeTasks};
return getForEntity("workflow/{name}/correlated/{correlationId}", params, new GenericType<List<Workflow>>() {
List<Workflow> workflows = getForEntity("workflow/{name}/correlated/{correlationId}", params, new GenericType<List<Workflow>>() {
}, name, correlationId);
workflows.forEach(this::populateWorkflowOutput);
return workflows;
}

/**
* Populates the workflow output from external payload storage if the external storage path is specified.
*
* @param workflow the workflow for which the output is to be populated.
*/
private void populateWorkflowOutput(Workflow workflow) {
if (StringUtils.isNotBlank(workflow.getExternalOutputPayloadStoragePath())) {
WorkflowTaskMetrics.incrementExternalPayloadUsedCount(workflow.getWorkflowType(), ExternalPayloadStorage.Operation.READ.name(), ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT.name());
workflow.setOutput(downloadFromExternalStorage(ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT, workflow.getExternalOutputPayloadStoragePath()));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -379,7 +379,6 @@ private void execute(Worker worker, Task task) {
TaskResult result = null;
try {
logger.debug("Executing task {} in worker {} at {}", task, worker.getClass().getSimpleName(), worker.getIdentity());
// TODO
result = worker.execute(task);
result.setWorkflowInstanceId(task.getWorkflowInstanceId());
result.setTaskId(task.getTaskId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ public Task copy() {
copy.setRateLimitPerFrequency(rateLimitPerFrequency);
copy.setRateLimitFrequencyInSeconds(rateLimitFrequencyInSeconds);
copy.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath);
copy.setExternalOutputPayloadStoragePath(externalOutputPayloadStoragePath);
return copy;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

/**
* Describes the location where the JSON payload is stored in external storage.
* <ul>
* <li>
* The location is described using the following fields:
* <ul>uri: The uri of the json file in external storage</ul>
* <ul>path: The relative path of the file in external storage</ul>
* </li>
* <li>uri: The uri of the json file in external storage</li>
* <li>path: The relative path of the file in external storage</li>
* </ul>
*/
public class ExternalStorageLocation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ enum PayloadType {WORKFLOW_INPUT, WORKFLOW_OUTPUT, TASK_INPUT, TASK_OUTPUT}
*
* @param operation the type of {@link Operation} to be performed with the uri
* @param payloadType the {@link PayloadType} that is being accessed at the uri
* @param path (optional) the relative path for which the external storage location object is to be populated.
* If path is not specified, it will be computed and populated.
* @return a {@link ExternalStorageLocation} object which contains the uri and the path for the json payload
*/
ExternalStorageLocation getLocation(Operation operation, PayloadType payloadType);
ExternalStorageLocation getLocation(Operation operation, PayloadType payloadType, String path);

/**
* Upload a json payload to the specified external storage location.
Expand Down
Loading

0 comments on commit b9a2a80

Please sign in to comment.