diff --git a/client/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java b/client/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java
new file mode 100644
index 0000000000..55597736cb
--- /dev/null
+++ b/client/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netflix.conductor.client.config;
+
+public interface ConductorClientConfiguration {
+
+ /**
+ * @return the workflow input payload size threshold in KB,
+ * beyond which the payload will be processed based on {@link ConductorClientConfiguration#isExternalPayloadStorageEnabled()}.
+ */
+ int getWorkflowInputPayloadThresholdKB();
+
+ /**
+ * @return the max value of workflow input payload size threshold in KB,
+ * beyond which the payload will be rejected regardless external payload storage is enabled.
+ */
+ int getWorkflowInputMaxPayloadThresholdKB();
+
+ /**
+ * @return the task output payload size threshold in KB,
+ * beyond which the payload will be processed based on {@link ConductorClientConfiguration#isExternalPayloadStorageEnabled()}.
+ */
+ int getTaskOutputPayloadThresholdKB();
+
+ /**
+ * @return the max value of task output payload size threshold in KB,
+ * beyond which the payload will be rejected regardless external payload storage is enabled.
+ */
+ int getTaskOutputMaxPayloadThresholdKB();
+
+ /**
+ * @return the flag which controls the use of external storage for storing workflow/task
+ * input and output JSON payloads with size greater than threshold.
+ * If it is set to true, the payload is stored in external location.
+ * If it is set to false, the payload is rejected and the task/workflow execution fails.
+ */
+ boolean isExternalPayloadStorageEnabled();
+}
diff --git a/client/src/main/java/com/netflix/conductor/client/config/DefaultConductorClientConfiguration.java b/client/src/main/java/com/netflix/conductor/client/config/DefaultConductorClientConfiguration.java
new file mode 100644
index 0000000000..5ea2c12435
--- /dev/null
+++ b/client/src/main/java/com/netflix/conductor/client/config/DefaultConductorClientConfiguration.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2018 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netflix.conductor.client.config;
+
+/**
+ * A default implementation of {@link ConductorClientConfiguration}
+ * where external payload storage is disabled.
+ */
+public class DefaultConductorClientConfiguration implements ConductorClientConfiguration {
+
+ @Override
+ public int getWorkflowInputPayloadThresholdKB() {
+ return 5120;
+ }
+
+ @Override
+ public int getWorkflowInputMaxPayloadThresholdKB() {
+ return 10240;
+ }
+
+ @Override
+ public int getTaskOutputPayloadThresholdKB() {
+ return 3072;
+ }
+
+ @Override
+ public int getTaskOutputMaxPayloadThresholdKB() {
+ return 10240;
+ }
+
+ @Override
+ public boolean isExternalPayloadStorageEnabled() {
+ return false;
+ }
+}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java
index a72eb678ad..90684d47a8 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,17 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- *
- */
+
package com.netflix.conductor.client.http;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
+import com.google.common.base.Preconditions;
+import com.netflix.conductor.client.config.ConductorClientConfiguration;
+import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.client.exceptions.ConductorClientException;
import com.netflix.conductor.client.exceptions.ErrorResponse;
+import com.netflix.conductor.common.run.ExternalStorageLocation;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.ClientHandlerException;
@@ -33,14 +36,18 @@
import com.sun.jersey.api.client.WebResource.Builder;
import com.sun.jersey.api.client.config.ClientConfig;
import com.sun.jersey.api.client.config.DefaultClientConfig;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
+import java.util.Map;
import java.util.function.Function;
/**
@@ -56,15 +63,23 @@ public abstract class ClientBase {
protected ObjectMapper objectMapper;
+ protected PayloadStorage payloadStorage;
+
+ protected ConductorClientConfiguration conductorClientConfiguration;
+
protected ClientBase() {
- this(new DefaultClientConfig(), null);
+ this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
+ }
+
+ protected ClientBase(ClientConfig config) {
+ this(config, new DefaultConductorClientConfiguration(), null);
}
- protected ClientBase(ClientConfig clientConfig) {
- this(clientConfig, null);
+ protected ClientBase(ClientConfig config, ClientHandler handler) {
+ this(config, new DefaultConductorClientConfiguration(), handler);
}
- protected ClientBase(ClientConfig clientConfig, ClientHandler handler) {
+ protected ClientBase(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler) {
objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
@@ -73,13 +88,16 @@ protected ClientBase(ClientConfig clientConfig, ClientHandler handler) {
objectMapper.setSerializationInclusion(Include.NON_EMPTY);
JacksonJsonProvider provider = new JacksonJsonProvider(objectMapper);
- clientConfig.getSingletons().add(provider);
+ config.getSingletons().add(provider);
if (handler == null) {
- this.client = Client.create(clientConfig);
+ this.client = Client.create(config);
} else {
- this.client = new Client(handler, clientConfig);
+ this.client = new Client(handler, config);
}
+
+ conductorClientConfiguration = clientConfiguration;
+ payloadStorage = new PayloadStorage(this);
}
public void setRootURI(String root) {
@@ -92,7 +110,6 @@ protected void delete(String url, Object... uriVariables) {
protected void delete(Object[] queryParams, String url, Object... uriVariables) {
URI uri = null;
- ClientResponse clientResponse = null;
try {
uri = getURIBuilder(root + url, queryParams).build(uriVariables);
client.resource(uri).delete();
@@ -111,28 +128,11 @@ protected void put(String url, Object[] queryParams, Object request, Object... u
}
}
- /**
- * @deprecated replaced by {@link #postForEntityWithRequestOnly(String, Object)} ()}
- */
- @Deprecated
- protected void postForEntity(String url, Object request) {
- postForEntityWithRequestOnly(url, request);
- }
-
-
protected void postForEntityWithRequestOnly(String url, Object request) {
Class> type = null;
postForEntity(url, request, null, type);
}
- /**
- * @deprecated replaced by {@link #postForEntityWithUriVariablesOnly(String, Object...)} ()}
- */
- @Deprecated
- protected void postForEntity1(String url, Object... uriVariables) {
- postForEntityWithUriVariablesOnly(url, uriVariables);
- }
-
protected void postForEntityWithUriVariablesOnly(String url, Object... uriVariables) {
Class> type = null;
postForEntity(url, null, null, type, uriVariables);
@@ -165,26 +165,27 @@ private T postForEntity(String url, Object request, Object[] queryParams, Ob
return null;
}
- protected T getForEntity(String url, Object[] queryParams, Class responseType, Object... uriVariables) {
+ T getForEntity(String url, Object[] queryParams, Class responseType, Object... uriVariables) {
return getForEntity(url, queryParams, response -> response.getEntity(responseType), uriVariables);
}
- protected T getForEntity(String url, Object[] queryParams, GenericType responseType, Object... uriVariables) {
+ T getForEntity(String url, Object[] queryParams, GenericType responseType, Object... uriVariables) {
return getForEntity(url, queryParams, response -> response.getEntity(responseType), uriVariables);
}
- private T getForEntity(String url, Object[] queryParams, Function entityPvoider, Object... uriVariables) {
+ private T getForEntity(String url, Object[] queryParams, Function entityProvider, Object... uriVariables) {
URI uri = null;
- ClientResponse clientResponse = null;
+ ClientResponse clientResponse;
try {
uri = getURIBuilder(root + url, queryParams).build(uriVariables);
clientResponse = client.resource(uri)
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
if (clientResponse.getStatus() < 300) {
- return entityPvoider.apply(clientResponse);
+ return entityProvider.apply(clientResponse);
+
} else {
- throw new UniformInterfaceException(clientResponse); // let handleUniformInterfaceException to handle unexpected response consistently
+ throw new UniformInterfaceException(clientResponse);
}
} catch (UniformInterfaceException e) {
handleUniformInterfaceException(e, uri);
@@ -194,11 +195,49 @@ private T getForEntity(String url, Object[] queryParams, Function downloadFromExternalStorage(ExternalPayloadStorage.PayloadType payloadType, String path) {
+ Preconditions.checkArgument(StringUtils.isNotBlank(path), "uri cannot be blank");
+ ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.READ, payloadType, path);
+ try (InputStream inputStream = payloadStorage.download(externalStorageLocation.getUri())) {
+ return objectMapper.readValue(inputStream, Map.class);
+ } catch (IOException e) {
+ String errorMsg = String.format("Unable to download payload frome external storage location: %s", path);
+ logger.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ }
+ }
+
private Builder getWebResourceBuilder(URI URI, Object entity) {
return client.resource(URI).type(MediaType.APPLICATION_JSON).entity(entity).accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON);
}
- private void handleClientHandlerException(ClientHandlerException exception, URI uri){
+ private void handleClientHandlerException(ClientHandlerException exception, URI uri) {
String errorMessage = String.format("Unable to invoke Conductor API with uri: %s, failure to process request or response", uri);
logger.error(errorMessage, exception);
throw new ConductorClientException(errorMessage, exception);
@@ -221,7 +260,7 @@ private void handleUniformInterfaceException(UniformInterfaceException exception
}
String errorMessage = clientResponse.getEntity(String.class);
logger.error("Unable to invoke Conductor API with uri: {}, unexpected response from server: {}", uri, clientResponseToString(exception.getResponse()), exception);
- ErrorResponse errorResponse = null;
+ ErrorResponse errorResponse;
try {
errorResponse = objectMapper.readValue(errorMessage, ErrorResponse.class);
} catch (IOException e) {
@@ -243,7 +282,7 @@ private void handleException(URI uri, RuntimeException e) {
if (e instanceof UniformInterfaceException) {
handleUniformInterfaceException(((UniformInterfaceException) e), uri);
} else if (e instanceof ClientHandlerException) {
- handleClientHandlerException((ClientHandlerException)e, uri);
+ handleClientHandlerException((ClientHandlerException) e, uri);
} else {
handleRuntimeException(e, uri);
}
@@ -296,5 +335,4 @@ private UriBuilder getURIBuilder(String path, Object[] queryParams) {
}
return builder;
}
-
}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
index f227c770a7..73c4a444a0 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java
@@ -17,16 +17,17 @@
package com.netflix.conductor.client.http;
import com.google.common.base.Preconditions;
+import com.netflix.conductor.client.config.ConductorClientConfiguration;
+import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import org.apache.commons.lang.StringUtils;
-import javax.ws.rs.QueryParam;
-import java.util.Arrays;
import java.util.List;
public class MetadataClient extends ClientBase {
@@ -41,14 +42,14 @@ public class MetadataClient extends ClientBase {
* Creates a default metadata client
*/
public MetadataClient() {
- super();
+ this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
}
/**
* @param clientConfig REST Client configuration
*/
public MetadataClient(ClientConfig clientConfig) {
- super(clientConfig);
+ this(clientConfig, new DefaultConductorClientConfiguration(), null);
}
/**
@@ -56,7 +57,7 @@ public MetadataClient(ClientConfig clientConfig) {
* @param clientHandler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
*/
public MetadataClient(ClientConfig clientConfig, ClientHandler clientHandler) {
- super(clientConfig, clientHandler);
+ this(clientConfig, new DefaultConductorClientConfiguration(), clientHandler);
}
/**
@@ -65,7 +66,17 @@ public MetadataClient(ClientConfig clientConfig, ClientHandler clientHandler) {
* @param filters Chain of client side filters to be applied per request
*/
public MetadataClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) {
- super(config, handler);
+ this(config, new DefaultConductorClientConfiguration(), handler, filters);
+ }
+
+ /**
+ * @param config REST Client configuration
+ * @param clientConfiguration Specific properties configured for the client, see {@link ConductorClientConfiguration}
+ * @param handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
+ * @param filters Chain of client side filters to be applied per request
+ */
+ public MetadataClient(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler, ClientFilter... filters) {
+ super(config, clientConfiguration, handler);
for (ClientFilter filter : filters) {
super.client.addFilter(filter);
}
@@ -81,7 +92,7 @@ public MetadataClient(ClientConfig config, ClientHandler handler, ClientFilter..
*/
public void registerWorkflowDef(WorkflowDef workflowDef) {
Preconditions.checkNotNull(workflowDef, "Worfklow definition cannot be null");
- postForEntity("metadata/workflow", workflowDef);
+ postForEntityWithRequestOnly("metadata/workflow", workflowDef);
}
/**
@@ -119,13 +130,13 @@ public List getAllWorkflowDefs() {
* Removes the workflow definition of a workflow from the conductor server.
* It does not remove associated workflows. Use with caution.
*
- * @param name Name of the workflow to be unregistered.
+ * @param name Name of the workflow to be unregistered.
* @param version Version of the workflow definition to be unregistered.
*/
public void unregisterWorkflowDef(String name, Integer version) {
Preconditions.checkArgument(StringUtils.isNotBlank(name), "Workflow name cannot be blank");
Preconditions.checkNotNull(version, "Version cannot be null");
- delete("metadata/workflow/{name}/{version}", name, version);
+ delete("metadata/workflow/{name}/{version}", name, version);
}
// Task Metadata Operations
@@ -137,7 +148,7 @@ public void unregisterWorkflowDef(String name, Integer version) {
*/
public void registerTaskDefs(List taskDefs) {
Preconditions.checkNotNull(taskDefs, "Task defs list cannot be null");
- postForEntity("metadata/taskdefs", taskDefs);
+ postForEntityWithRequestOnly("metadata/taskdefs", taskDefs);
}
/**
diff --git a/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java b/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java
new file mode 100644
index 0000000000..47f74026ac
--- /dev/null
+++ b/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2018 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netflix.conductor.client.http;
+
+import com.amazonaws.util.IOUtils;
+import com.netflix.conductor.client.exceptions.ConductorClientException;
+import com.netflix.conductor.common.run.ExternalStorageLocation;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+
+/**
+ * An implementation of {@link ExternalPayloadStorage} for storing large JSON payload data.
+ */
+class PayloadStorage implements ExternalPayloadStorage {
+ private static final Logger logger = LoggerFactory.getLogger(PayloadStorage.class);
+
+ private final ClientBase clientBase;
+
+ PayloadStorage(ClientBase clientBase) {
+ this.clientBase = clientBase;
+ }
+
+ /**
+ * This method is not intended to be used in the client.
+ * The client makes a request to the server to get the {@link ExternalStorageLocation}
+ */
+ @Override
+ public ExternalStorageLocation getLocation(Operation operation, PayloadType payloadType, String path) {
+ String uri;
+ switch (payloadType) {
+ case WORKFLOW_INPUT:
+ case WORKFLOW_OUTPUT:
+ uri = "workflow";
+ break;
+ case TASK_INPUT:
+ case TASK_OUTPUT:
+ uri = "tasks";
+ break;
+ default:
+ throw new ConductorClientException(String.format("Invalid payload type: %s for operation: %s", payloadType.toString(), operation.toString()));
+ }
+ return clientBase.getForEntity(String.format("%s/externalstoragelocation", uri), new Object[]{"path", path}, ExternalStorageLocation.class);
+ }
+
+ /**
+ * Uploads the payload to the uri specified.
+ *
+ * @param uri the location to which the object is to be uploaded
+ * @param payload an {@link InputStream} containing the json payload which is to be uploaded
+ * @param payloadSize the size of the json payload in bytes
+ * @throws ConductorClientException if the upload fails due to an invalid path or an error from external storage
+ */
+ @Override
+ public void upload(String uri, InputStream payload, long payloadSize) {
+ HttpURLConnection connection = null;
+ try {
+ URL url = new URI(uri).toURL();
+
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(true);
+ connection.setRequestMethod("PUT");
+
+ try (BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(connection.getOutputStream())) {
+ IOUtils.copy(payload, bufferedOutputStream);
+
+ // Check the HTTP response code
+ int responseCode = connection.getResponseCode();
+ logger.debug("Upload completed with HTTP response code: {}", responseCode);
+ }
+ } catch (URISyntaxException | MalformedURLException e) {
+ String errorMsg = String.format("Invalid path specified: %s", uri);
+ logger.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } catch (IOException e) {
+ String errorMsg = String.format("Error uploading to path: %s", uri);
+ logger.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ /**
+ * Downloads the payload from the given uri.
+ *
+ * @param uri the location from where the object is to be downloaded
+ * @return an inputstream of the payload in the external storage
+ * @throws ConductorClientException if the download fails due to an invalid path or an error from external storage
+ */
+ @Override
+ public InputStream download(String uri) {
+ HttpURLConnection connection = null;
+ String errorMsg;
+ try {
+ URL url = new URI(uri).toURL();
+ connection = (HttpURLConnection) url.openConnection();
+ connection.setDoOutput(false);
+
+ // Check the HTTP response code
+ int responseCode = connection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK) {
+ logger.debug("Download completed with HTTP response code: {}", connection.getResponseCode());
+ return connection.getInputStream();
+ }
+ errorMsg = String.format("Unable to download. Response code: %d", responseCode);
+ logger.error(errorMsg);
+ throw new ConductorClientException(errorMsg);
+ } catch (URISyntaxException | MalformedURLException e) {
+ errorMsg = String.format("Invalid uri specified: %s", uri);
+ logger.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } catch (IOException e) {
+ errorMsg = String.format("Error downloading from uri: %s", uri);
+ logger.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
index 8aeb49e14d..970de846c1 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java
@@ -16,7 +16,9 @@
package com.netflix.conductor.client.http;
import com.google.common.base.Preconditions;
-import com.google.common.io.CountingOutputStream;
+import com.netflix.conductor.client.config.ConductorClientConfiguration;
+import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
+import com.netflix.conductor.client.exceptions.ConductorClientException;
import com.netflix.conductor.client.task.WorkflowTaskMetrics;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
@@ -25,9 +27,11 @@
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -67,14 +71,14 @@ public class TaskClient extends ClientBase {
* Creates a default task client
*/
public TaskClient() {
- super();
+ this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
}
/**
* @param config REST Client configuration
*/
public TaskClient(ClientConfig config) {
- super(config);
+ this(config, new DefaultConductorClientConfiguration(), null);
}
/**
@@ -82,16 +86,26 @@ public TaskClient(ClientConfig config) {
* @param handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
*/
public TaskClient(ClientConfig config, ClientHandler handler) {
- super(config, handler);
+ this(config, new DefaultConductorClientConfiguration(), handler);
}
/**
- * @param config config REST Client configuration
- * @param handler handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
+ * @param config REST Client configuration
+ * @param handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
* @param filters Chain of client side filters to be applied per request
*/
public TaskClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) {
- super(config, handler);
+ this(config, new DefaultConductorClientConfiguration(), handler, filters);
+ }
+
+ /**
+ * @param config REST Client configuration
+ * @param clientConfiguration Specific properties configured for the client, see {@link ConductorClientConfiguration}
+ * @param handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
+ * @param filters Chain of client side filters to be applied per request
+ */
+ public TaskClient(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler, ClientFilter... filters) {
+ super(config, clientConfiguration, handler);
for (ClientFilter filter : filters) {
super.client.addFilter(filter);
}
@@ -111,7 +125,9 @@ public Task pollTask(String taskType, String workerId, String domain) {
Preconditions.checkArgument(StringUtils.isNotBlank(workerId), "Worker id cannot be blank");
Object[] params = new Object[]{"workerid", workerId, "domain", domain};
- return getForEntity("tasks/poll/{taskType}", params, Task.class, taskType);
+ Task task = getForEntity("tasks/poll/{taskType}", params, Task.class, taskType);
+ populateTaskInput(task);
+ return task;
}
/**
@@ -138,7 +154,9 @@ public List batchPollTasksByTaskType(String taskType, String workerId, int
Preconditions.checkArgument(count > 0, "Count must be greater than 0");
Object[] params = new Object[]{"workerid", workerId, "count", count, "timeout", timeoutInMillisecond};
- return getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
+ List tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
+ tasks.forEach(this::populateTaskInput);
+ return tasks;
}
/**
@@ -166,7 +184,22 @@ public List batchPollTasksInDomain(String taskType, String domain, String
Preconditions.checkArgument(count > 0, "Count must be greater than 0");
Object[] params = new Object[]{"workerid", workerId, "count", count, "timeout", timeoutInMillisecond, "domain", domain};
- return getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
+ List tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
+ tasks.forEach(this::populateTaskInput);
+ return tasks;
+ }
+
+ /**
+ * Populates the task input from external payload storage if the external storage path is specified.
+ *
+ * @param task the task for which the input is to be populated.
+ */
+ private void populateTaskInput(Task task) {
+ if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
+ WorkflowTaskMetrics.incrementExternalPayloadUsedCount(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.name(), ExternalPayloadStorage.PayloadType.TASK_INPUT.name());
+ task.setInputData(downloadFromExternalStorage(ExternalPayloadStorage.PayloadType.TASK_INPUT, task.getExternalInputPayloadStoragePath()));
+ task.setExternalInputPayloadStoragePath(null);
+ }
}
/**
@@ -209,26 +242,40 @@ public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceNam
/**
* Updates the result of a task execution.
+ * If the size of the task output payload is bigger than {@link ConductorClientConfiguration#getTaskOutputPayloadThresholdKB()},
+ * it is uploaded to {@link ExternalPayloadStorage}, if enabled, else the task is marked as FAILED_WITH_TERMINAL_ERROR.
*
- * @param taskResult TaskResults to be updated.
- * @param taskType
+ * @param taskResult the {@link TaskResult} of the executed task to be updated.
+ * @param taskType the type of the task
*/
public void updateTask(TaskResult taskResult, String taskType) {
Preconditions.checkNotNull(taskResult, "Task result cannot be null");
+ Preconditions.checkArgument(StringUtils.isBlank(taskResult.getExternalOutputPayloadStoragePath()), "External Storage Path must not be set");
- try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- CountingOutputStream countingOutputStream = new CountingOutputStream(byteArrayOutputStream)) {
- objectMapper.writeValue(countingOutputStream, taskResult);
- long taskResultSize = countingOutputStream.getCount();
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
+ objectMapper.writeValue(byteArrayOutputStream, taskResult.getOutputData());
+ byte[] taskOutputBytes = byteArrayOutputStream.toByteArray();
+ long taskResultSize = taskOutputBytes.length;
WorkflowTaskMetrics.recordTaskResultPayloadSize(taskType, taskResultSize);
- if (taskResultSize > (3 * 1024 * 1024)) { //There is hard coded since there is no easy way to pass a config in here
- taskResult.setReasonForIncompletion(String.format("The TaskResult payload: %d is greater than the permissible 3MB", taskResultSize));
- taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
- taskResult.setOutputData(null);
+
+ long payloadSizeThreshold = conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024;
+ if (taskResultSize > payloadSizeThreshold) {
+ if (!conductorClientConfiguration.isExternalPayloadStorageEnabled()
+ || taskResultSize > conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB() * 1024) {
+ taskResult.setReasonForIncompletion(String.format("The TaskResult payload size: %d is greater than the permissible %d MB", taskResultSize, payloadSizeThreshold));
+ taskResult.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
+ taskResult.setOutputData(null);
+ } else {
+ WorkflowTaskMetrics.incrementExternalPayloadUsedCount(taskType, ExternalPayloadStorage.Operation.WRITE.name(), ExternalPayloadStorage.PayloadType.TASK_OUTPUT.name());
+ String externalStoragePath = uploadToExternalPayloadStorage(ExternalPayloadStorage.PayloadType.TASK_OUTPUT, taskOutputBytes, taskResultSize);
+ taskResult.setExternalOutputPayloadStoragePath(externalStoragePath);
+ taskResult.setOutputData(null);
+ }
}
} catch (IOException e) {
- logger.error("Unable to update task with task result: {}", taskResult, e);
- throw new RuntimeException("Unable to update task", e);
+ String errorMsg = String.format("Unable to update task: %s with task result", taskResult.getTaskId());
+ logger.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
}
postForEntityWithRequestOnly("tasks", taskResult);
}
@@ -264,7 +311,7 @@ public void log(String taskId, String logMessage) {
*/
public void logMessageForTask(String taskId, String logMessage) {
Preconditions.checkArgument(StringUtils.isNotBlank(taskId), "Task id cannot be blank");
- postForEntity("tasks/" + taskId + "/log", logMessage);
+ postForEntityWithRequestOnly("tasks/" + taskId + "/log", logMessage);
}
/**
@@ -422,6 +469,6 @@ public void unregisterTaskDef(String taskType) {
@Deprecated
public void registerTaskDefs(List taskDefs) {
Preconditions.checkNotNull(taskDefs, "Task defs cannot be null");
- postForEntity("metadata/taskdefs", taskDefs);
+ postForEntityWithRequestOnly("metadata/taskdefs", taskDefs);
}
}
diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java
index f91a04ad41..096d901c91 100644
--- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java
+++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java
@@ -16,18 +16,28 @@
package com.netflix.conductor.client.http;
import com.google.common.base.Preconditions;
+import com.netflix.conductor.client.config.ConductorClientConfiguration;
+import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
+import com.netflix.conductor.client.exceptions.ConductorClientException;
+import com.netflix.conductor.client.task.WorkflowTaskMetrics;
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.api.client.filter.ClientFilter;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -43,18 +53,20 @@ public class WorkflowClient extends ClientBase {
private static GenericType> searchResultWorkflowSummary = new GenericType>() {
};
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowClient.class);
+
/**
* Creates a default task client
*/
public WorkflowClient() {
- super();
+ this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
}
/**
* @param config REST Client configuration
*/
public WorkflowClient(ClientConfig config) {
- super(config);
+ this(config, new DefaultConductorClientConfiguration(), null);
}
/**
@@ -62,16 +74,26 @@ public WorkflowClient(ClientConfig config) {
* @param handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
*/
public WorkflowClient(ClientConfig config, ClientHandler handler) {
- super(config, handler);
+ this(config, new DefaultConductorClientConfiguration(), handler);
}
/**
- * @param config config REST Client configuration
- * @param handler handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
+ * @param config REST Client configuration
+ * @param handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
* @param filters Chain of client side filters to be applied per request
*/
public WorkflowClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) {
- super(config, handler);
+ this(config, new DefaultConductorClientConfiguration(), handler, filters);
+ }
+
+ /**
+ * @param config REST Client configuration
+ * @param clientConfiguration Specific properties configured for the client, see {@link ConductorClientConfiguration}
+ * @param handler Jersey client handler. Useful when plugging in various http client interaction modules (e.g. ribbon)
+ * @param filters Chain of client side filters to be applied per request
+ */
+ public WorkflowClient(ClientConfig config, ConductorClientConfiguration clientConfiguration, ClientHandler handler, ClientFilter... filters) {
+ super(config, clientConfiguration, handler);
for (ClientFilter filter : filters) {
super.client.addFilter(filter);
}
@@ -96,7 +118,7 @@ public List getAllWorkflowDefs() {
@Deprecated
public void registerWorkflow(WorkflowDef workflowDef) {
Preconditions.checkNotNull(workflowDef, "Worfklow definition cannot be null");
- postForEntity("metadata/workflow", workflowDef);
+ postForEntityWithRequestOnly("metadata/workflow", workflowDef);
}
/**
@@ -120,22 +142,57 @@ public WorkflowDef getWorkflowDef(String name, Integer version) {
* @param correlationId the correlation id
* @param input the input to set in the workflow
* @return the id of the workflow instance that can be used for tracking
+ * @deprecated This API is deprecated and will be removed in the next version
+ * use {@link #startWorkflow(StartWorkflowRequest)} instead
*/
+ @Deprecated
public String startWorkflow(String name, Integer version, String correlationId, Map input) {
Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot be blank");
-
- Object[] params = new Object[]{"version", version, "correlationId", correlationId};
- return postForEntity("workflow/{name}", input, params, String.class, name);
+ StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest();
+ startWorkflowRequest.setName(name);
+ startWorkflowRequest.setVersion(version);
+ startWorkflowRequest.setCorrelationId(correlationId);
+ startWorkflowRequest.setInput(input);
+ return startWorkflow(startWorkflowRequest);
}
/**
- * Starts a workflow
+ * Starts a workflow.
+ * If the size of the workflow input payload is bigger than {@link ConductorClientConfiguration#getWorkflowInputPayloadThresholdKB()},
+ * it is uploaded to {@link ExternalPayloadStorage}, if enabled, else the workflow is rejected.
*
* @param startWorkflowRequest the {@link StartWorkflowRequest} object to start the workflow
* @return the id of the workflow instance that can be used for tracking
+ * @throws ConductorClientException if {@link ExternalPayloadStorage} is disabled or if the payload size is greater than {@link ConductorClientConfiguration#getWorkflowInputMaxPayloadThresholdKB()}
*/
public String startWorkflow(StartWorkflowRequest startWorkflowRequest) {
Preconditions.checkNotNull(startWorkflowRequest, "StartWorkflowRequest cannot be null");
+ Preconditions.checkArgument(StringUtils.isNotBlank(startWorkflowRequest.getName()), "Workflow name cannot be null or empty");
+ Preconditions.checkArgument(StringUtils.isBlank(startWorkflowRequest.getExternalInputPayloadStoragePath()), "External Storage Path must not be set");
+
+ String version = startWorkflowRequest.getVersion() != null ? startWorkflowRequest.getVersion().toString() : "latest";
+ try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
+ objectMapper.writeValue(byteArrayOutputStream, startWorkflowRequest.getInput());
+ byte[] workflowInputBytes = byteArrayOutputStream.toByteArray();
+ long workflowInputSize = workflowInputBytes.length;
+ WorkflowTaskMetrics.recordWorkflowInputPayloadSize(startWorkflowRequest.getName(), version, workflowInputSize);
+ if (workflowInputSize > conductorClientConfiguration.getWorkflowInputPayloadThresholdKB() * 1024) {
+ if (!conductorClientConfiguration.isExternalPayloadStorageEnabled() ||
+ (workflowInputSize > conductorClientConfiguration.getWorkflowInputMaxPayloadThresholdKB() * 1024)) {
+ String errorMsg = String.format("Input payload larger than the allowed threshold of: %d KB", conductorClientConfiguration.getWorkflowInputPayloadThresholdKB());
+ throw new ConductorClientException(errorMsg);
+ } else {
+ WorkflowTaskMetrics.incrementExternalPayloadUsedCount(startWorkflowRequest.getName(), ExternalPayloadStorage.Operation.WRITE.name(), ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT.name());
+ String externalStoragePath = uploadToExternalPayloadStorage(ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT, workflowInputBytes, workflowInputSize);
+ startWorkflowRequest.setExternalInputPayloadStoragePath(externalStoragePath);
+ startWorkflowRequest.setInput(null);
+ }
+ }
+ } catch (IOException e) {
+ String errorMsg = String.format("Unable to start workflow:%s, version:%s", startWorkflowRequest.getName(), version);
+ logger.error(errorMsg, e);
+ throw new ConductorClientException(errorMsg, e);
+ }
return postForEntity("workflow", startWorkflowRequest, null, String.class, startWorkflowRequest.getName());
}
@@ -158,7 +215,9 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) {
*/
public Workflow getWorkflow(String workflowId, boolean includeTasks) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "workflow id cannot be blank");
- return getForEntity("workflow/{workflowId}", new Object[]{"includeTasks", includeTasks}, Workflow.class, workflowId);
+ Workflow workflow = getForEntity("workflow/{workflowId}", new Object[]{"includeTasks", includeTasks}, Workflow.class, workflowId);
+ populateWorkflowOutput(workflow);
+ return workflow;
}
/**
@@ -175,8 +234,22 @@ public List getWorkflows(String name, String correlationId, boolean in
Preconditions.checkArgument(StringUtils.isNotBlank(correlationId), "correlationId cannot be blank");
Object[] params = new Object[]{"includeClosed", includeClosed, "includeTasks", includeTasks};
- return getForEntity("workflow/{name}/correlated/{correlationId}", params, new GenericType>() {
+ List workflows = getForEntity("workflow/{name}/correlated/{correlationId}", params, new GenericType>() {
}, name, correlationId);
+ workflows.forEach(this::populateWorkflowOutput);
+ return workflows;
+ }
+
+ /**
+ * Populates the workflow output from external payload storage if the external storage path is specified.
+ *
+ * @param workflow the workflow for which the output is to be populated.
+ */
+ private void populateWorkflowOutput(Workflow workflow) {
+ if (StringUtils.isNotBlank(workflow.getExternalOutputPayloadStoragePath())) {
+ WorkflowTaskMetrics.incrementExternalPayloadUsedCount(workflow.getWorkflowType(), ExternalPayloadStorage.Operation.READ.name(), ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT.name());
+ workflow.setOutput(downloadFromExternalStorage(ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT, workflow.getExternalOutputPayloadStoragePath()));
+ }
}
/**
@@ -201,7 +274,8 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
*/
public List getRunningWorkflow(String workflowName, Integer version) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowName), "Workflow name cannot be blank");
- return getForEntity("workflow/running/{name}", new Object[]{"version", version}, new GenericType>() {}, workflowName);
+ return getForEntity("workflow/running/{name}", new Object[]{"version", version}, new GenericType>() {
+ }, workflowName);
}
/**
@@ -219,7 +293,8 @@ public List getWorkflowsByTimePeriod(String workflowName, int version, L
Preconditions.checkNotNull(endTime, "End time cannot be null");
Object[] params = new Object[]{"version", version, "startTime", startTime, "endTime", endTime};
- return getForEntity("workflow/running/{name}", params, new GenericType>() {}, workflowName);
+ return getForEntity("workflow/running/{name}", params, new GenericType>() {
+ }, workflowName);
}
/**
@@ -286,7 +361,7 @@ public String rerunWorkflow(String workflowId, RerunWorkflowRequest rerunWorkflo
*/
public void restart(String workflowId) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "workflow id cannot be blank");
- postForEntity1("workflow/{workflowId}/restart", workflowId);
+ postForEntityWithUriVariablesOnly("workflow/{workflowId}/restart", workflowId);
}
/**
@@ -296,7 +371,7 @@ public void restart(String workflowId) {
*/
public void retryLastFailedTask(String workflowId) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "workflow id cannot be blank");
- postForEntity1("workflow/{workflowId}/retry", workflowId);
+ postForEntityWithUriVariablesOnly("workflow/{workflowId}/retry", workflowId);
}
/**
@@ -306,7 +381,7 @@ public void retryLastFailedTask(String workflowId) {
*/
public void resetCallbacksForInProgressTasks(String workflowId) {
Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "workflow id cannot be blank");
- postForEntity1("workflow/{workflowId}/resetcallbacks", workflowId);
+ postForEntityWithUriVariablesOnly("workflow/{workflowId}/resetcallbacks", workflowId);
}
/**
diff --git a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java
index d3ed42467e..c72f5feb2e 100644
--- a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java
+++ b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -273,15 +273,15 @@ public synchronized void init() {
});
}
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- this.executorService.shutdown();
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ this.executorService.shutdown();
- shutdownExecutorService(this.scheduledExecutorService, SHUTDOWN_WAIT_TIME_IN_SEC);
- shutdownExecutorService(this.executorService, SHUTDOWN_WAIT_TIME_IN_SEC);
- }
+ shutdownExecutorService(this.scheduledExecutorService, SHUTDOWN_WAIT_TIME_IN_SEC);
+ shutdownExecutorService(this.executorService, SHUTDOWN_WAIT_TIME_IN_SEC);
+ }
- private void shutdownExecutorService(ExecutorService executorService, long timeout) {
+ private void shutdownExecutorService(ExecutorService executorService, long timeout) {
try {
if (executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
logger.debug("tasks completed, shutting down");
diff --git a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskMetrics.java b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskMetrics.java
index d7bed563b7..39961d5afa 100644
--- a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskMetrics.java
+++ b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskMetrics.java
@@ -42,7 +42,12 @@
public class WorkflowTaskMetrics {
private static final String TASK_TYPE = "taskType";
+ private static final String WORFLOW_TYPE = "workflowType";
+ private static final String WORKFLOW_VERSION = "version";
private static final String EXCEPTION = "exception";
+ private static final String NAME = "name";
+ private static final String OPERATION = "operation";
+ private static final String PAYLOAD_TYPE = "payload_type";
private static final String TASK_EXECUTION_QUEUE_FULL = "task_execution_queue_full";
private static final String TASK_POLL_ERROR = "task_poll_error";
@@ -54,7 +59,9 @@ public class WorkflowTaskMetrics {
private static final String TASK_POLL_COUNTER = "task_poll_counter";
private static final String TASK_EXECUTE_TIME = "task_execute_time";
private static final String TASK_POLL_TIME = "task_poll_time";
- public static final String TASK_RESULT_SIZE = "task_result_size";
+ private static final String TASK_RESULT_SIZE = "task_result_size";
+ private static final String WORKFLOW_INPUT_SIZE = "workflow_input_size";
+ private static final String EXTERNAL_PAYLOAD_USED = "external_payload_used";
private static Registry registry = Spectator.globalRegistry();
@@ -158,5 +165,11 @@ public static void incrementTaskPollCount(String taskType, int taskCount) {
getCounter(TASK_POLL_COUNTER, TASK_TYPE, taskType).increment(taskCount);
}
+ public static void recordWorkflowInputPayloadSize(String workflowType, String version, long payloadSize) {
+ getGauge(WORKFLOW_INPUT_SIZE, WORFLOW_TYPE, workflowType, WORKFLOW_VERSION, version).getAndSet(payloadSize);
+ }
+ public static void incrementExternalPayloadUsedCount(String name, String operation, String payloadType) {
+ incrementCount(EXTERNAL_PAYLOAD_USED, NAME, name, OPERATION, operation, PAYLOAD_TYPE, payloadType);
+ }
}
diff --git a/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java b/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java
index c8b4b94604..b037778dc4 100644
--- a/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java
+++ b/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java
@@ -13,63 +13,55 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- *
- */
+
package com.netflix.conductor.client.worker;
+import com.netflix.conductor.common.metadata.tasks.TaskResult;
+import org.junit.Test;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import org.junit.Test;
-
-import com.netflix.conductor.common.metadata.tasks.Task;
-import com.netflix.conductor.common.metadata.tasks.TaskResult;
-
/**
* @author Viren
*
*/
public class TestPropertyFactory {
-
+
@Test
public void testIdentity(){
- Worker worker = Worker.create("Test2", (Task task)->{
- return new TaskResult(task);
- });
+ Worker worker = Worker.create("Test2", TaskResult::new);
assertNotNull(worker.getIdentity());
boolean paused = worker.paused();
assertFalse("Paused? " + paused, paused);
}
-
+
@Test
public void test() {
-
- int val = PropertyFactory.getInteger("workerB", "pollingInterval", 100).intValue();
+
+ int val = PropertyFactory.getInteger("workerB", "pollingInterval", 100);
assertEquals("got: " + val, 2, val);
assertEquals(100, PropertyFactory.getInteger("workerB", "propWithoutValue", 100).intValue());
-
+
assertFalse(PropertyFactory.getBoolean("workerB", "paused", true)); //Global value set to 'false'
assertTrue(PropertyFactory.getBoolean("workerA", "paused", false)); //WorkerA value set to 'true'
-
-
+
+
assertEquals(42, PropertyFactory.getInteger("workerA", "batchSize", 42).intValue()); //No global value set, so will return the default value supplied
assertEquals(84, PropertyFactory.getInteger("workerB", "batchSize", 42).intValue()); //WorkerB's value set to 84
- assertEquals("domainA", PropertyFactory.getString("workerA", "domain", null));
- assertEquals("domainB", PropertyFactory.getString("workerB", "domain", null));
- assertEquals(null, PropertyFactory.getString("workerC", "domain", null)); // Non Existent
+ assertEquals("domainA", PropertyFactory.getString("workerA", "domain", null));
+ assertEquals("domainB", PropertyFactory.getString("workerB", "domain", null));
+ assertNull(PropertyFactory.getString("workerC", "domain", null)); // Non Existent
}
-
+
@Test
public void testProperty() {
- Worker worker = Worker.create("Test", (Task task)->{
- return new TaskResult(task);
- });
+ Worker worker = Worker.create("Test", TaskResult::new);
boolean paused = worker.paused();
assertTrue("Paused? " + paused, paused);
}
-
}
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
index 20b64bf381..7455398254 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
@@ -132,8 +132,11 @@ public boolean isRetriable() {
private int rateLimitFrequencyInSeconds;
- public Task() {
+ private String externalInputPayloadStoragePath;
+
+ private String externalOutputPayloadStoragePath;
+ public Task() {
}
/**
@@ -438,7 +441,8 @@ public String getWorkflowType() {
/**
- * @param workflowType workflow type
+ * @param workflowType the name of the workflow
+ * @return the task object with the workflow type set
*/
public Task setWorkflowType(String workflowType) {
this.workflowType = workflowType;
@@ -559,8 +563,35 @@ public void setRateLimitFrequencyInSeconds(int rateLimitFrequencyInSeconds) {
this.rateLimitFrequencyInSeconds = rateLimitFrequencyInSeconds;
}
- public Task copy() {
+ /**
+ * @return the external storage path for the task input payload
+ */
+ public String getExternalInputPayloadStoragePath() {
+ return externalInputPayloadStoragePath;
+ }
+
+ /**
+ * @param externalInputPayloadStoragePath the external storage path where the task input payload is stored
+ */
+ public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
+ this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
+ }
+
+ /**
+ * @return the external storage path for the task output payload
+ */
+ public String getExternalOutputPayloadStoragePath() {
+ return externalOutputPayloadStoragePath;
+ }
+
+ /**
+ * @param externalOutputPayloadStoragePath the external storage path where the task output payload is stored
+ */
+ public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) {
+ this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath;
+ }
+ public Task copy() {
Task copy = new Task();
copy.setCallbackAfterSeconds(callbackAfterSeconds);
copy.setCallbackFromWorker(callbackFromWorker);
@@ -583,6 +614,8 @@ public Task copy() {
copy.setDomain(domain);
copy.setRateLimitPerFrequency(rateLimitPerFrequency);
copy.setRateLimitFrequencyInSeconds(rateLimitFrequencyInSeconds);
+ copy.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath);
+ copy.setExternalOutputPayloadStoragePath(externalOutputPayloadStoragePath);
return copy;
}
@@ -606,11 +639,11 @@ public String toString() {
", startDelayInSeconds=" + startDelayInSeconds +
", retriedTaskId='" + retriedTaskId + '\'' +
", retried=" + retried +
+ ", executed=" + executed +
", callbackFromWorker=" + callbackFromWorker +
- ", rateLimitFrequencyInSeconds=" + rateLimitFrequencyInSeconds +
- ", rateLimitPerFrequency=" + rateLimitPerFrequency +
", responseTimeoutSeconds=" + responseTimeoutSeconds +
", workflowInstanceId='" + workflowInstanceId + '\'' +
+ ", workflowType='" + workflowType + '\'' +
", taskId='" + taskId + '\'' +
", reasonForIncompletion='" + reasonForIncompletion + '\'' +
", callbackAfterSeconds=" + callbackAfterSeconds +
@@ -618,6 +651,10 @@ public String toString() {
", outputData=" + outputData +
", workflowTask=" + workflowTask +
", domain='" + domain + '\'' +
+ ", rateLimitPerFrequency=" + rateLimitPerFrequency +
+ ", rateLimitFrequencyInSeconds=" + rateLimitFrequencyInSeconds +
+ ", externalInputPayloadStoragePath='" + externalInputPayloadStoragePath + '\'' +
+ ", externalOutputPayloadStoragePath='" + externalOutputPayloadStoragePath + '\'' +
'}';
}
}
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
index 1dfb6ec1fb..aecc801e8e 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskResult.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,9 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- *
- */
+
package com.netflix.conductor.common.metadata.tasks;
import java.util.HashMap;
@@ -50,6 +48,8 @@ public enum Status {
private List logs = new CopyOnWriteArrayList<>();
+ private String externalOutputPayloadStoragePath;
+
public TaskResult(Task task) {
this.workflowInstanceId = task.getWorkflowInstanceId();
this.taskId = task.getTaskId();
@@ -58,6 +58,7 @@ public TaskResult(Task task) {
this.status = Status.valueOf(task.getStatus().name());
this.workerId = task.getWorkerId();
this.outputData = task.getOutputData();
+ this.externalOutputPayloadStoragePath = task.getExternalOutputPayloadStoragePath();
}
public TaskResult() {
@@ -191,6 +192,22 @@ public TaskResult log(String log) {
return this;
}
+ /**
+ *
+ * @return the path where the task output is stored in external storage
+ */
+ public String getExternalOutputPayloadStoragePath() {
+ return externalOutputPayloadStoragePath;
+ }
+
+ /**
+ *
+ * @param externalOutputPayloadStoragePath path in the external storage where the task output is stored
+ */
+ public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) {
+ this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath;
+ }
+
@Override
public String toString() {
return "TaskResult{" +
@@ -202,6 +219,7 @@ public String toString() {
", status=" + status +
", outputData=" + outputData +
", logs=" + logs +
+ ", externalOutputPayloadStoragePath='" + externalOutputPayloadStoragePath + '\'' +
'}';
}
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java
index 595094b23d..9157aa3127 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java
@@ -7,6 +7,7 @@ public class StartWorkflowRequest {
private String name;
private Integer version;
private String correlationId;
+ private String externalInputPayloadStoragePath;
private Map input = new HashMap<>();
private Map taskToDomain = new HashMap<>();
@@ -19,7 +20,8 @@ public void setName(String name) {
public StartWorkflowRequest withName(String name) {
this.name = name;
return this;
- }
+ }
+
public Integer getVersion() {
return version;
}
@@ -30,6 +32,7 @@ public StartWorkflowRequest withVersion(Integer version) {
this.version = version;
return this;
}
+
public String getCorrelationId() {
return correlationId;
}
@@ -40,6 +43,15 @@ public StartWorkflowRequest withCorrelationId(String correlationId) {
this.correlationId = correlationId;
return this;
}
+
+ public String getExternalInputPayloadStoragePath() {
+ return externalInputPayloadStoragePath;
+ }
+
+ public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
+ this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
+ }
+
public Map getInput() {
return input;
}
@@ -50,6 +62,7 @@ public StartWorkflowRequest withInput(Map input) {
this.input = input;
return this;
}
+
public Map getTaskToDomain() {
return taskToDomain;
}
diff --git a/common/src/main/java/com/netflix/conductor/common/run/ExternalStorageLocation.java b/common/src/main/java/com/netflix/conductor/common/run/ExternalStorageLocation.java
new file mode 100644
index 0000000000..3bf3faca94
--- /dev/null
+++ b/common/src/main/java/com/netflix/conductor/common/run/ExternalStorageLocation.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netflix.conductor.common.run;
+
+/**
+ * Describes the location where the JSON payload is stored in external storage.
+ *
+ * -
+ * The location is described using the following fields:
+ *
- uri: The uri of the json file in external storage
+ * - path: The relative path of the file in external storage
+ *
+ */
+public class ExternalStorageLocation {
+
+ private String uri;
+ private String path;
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ @Override
+ public String toString() {
+ return "ExternalStorageLocation{" +
+ "uri='" + uri + '\'' +
+ ", path='" + path + '\'' +
+ '}';
+ }
+}
diff --git a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
index 783954452b..e70624ccc3 100644
--- a/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
+++ b/common/src/main/java/com/netflix/conductor/common/run/Workflow.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,76 +15,80 @@
*/
package com.netflix.conductor.common.run;
+import com.netflix.conductor.common.metadata.Auditable;
+import com.netflix.conductor.common.metadata.tasks.Task;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import com.netflix.conductor.common.metadata.Auditable;
-import com.netflix.conductor.common.metadata.tasks.Task;
-
+import java.util.stream.Collectors;
public class Workflow extends Auditable{
-
+
public enum WorkflowStatus {
RUNNING(false, false), COMPLETED(true, true), FAILED(true, false), TIMED_OUT(true, false), TERMINATED(true, false), PAUSED(false, true);
-
+
private boolean terminal;
-
+
private boolean successful;
-
+
WorkflowStatus(boolean terminal, boolean successful){
this.terminal = terminal;
this.successful = successful;
}
-
+
public boolean isTerminal(){
return terminal;
}
-
+
public boolean isSuccessful(){
return successful;
}
}
-
+
private WorkflowStatus status = WorkflowStatus.RUNNING;
-
+
private long endTime;
private String workflowId;
-
+
private String parentWorkflowId;
private String parentWorkflowTaskId;
private List tasks = new LinkedList<>();
-
+
private Map input = new HashMap<>();
-
+
private Map output = new HashMap<>();;
-
+
private String workflowType;
-
+
private int version;
-
+
private String correlationId;
-
+
private String reRunFromWorkflowId;
-
+
private String reasonForIncompletion;
-
+
private int schemaVersion;
-
+
private String event;
private Map taskToDomain = new HashMap<>();
private Set failedReferenceTaskNames = new HashSet<>();
+ private String externalInputPayloadStoragePath;
+
+ private String externalOutputPayloadStoragePath;
+
public Workflow(){
-
+
}
/**
* @return the status
@@ -152,7 +156,7 @@ public List getTasks() {
public void setTasks(List tasks) {
this.tasks = tasks;
}
-
+
/**
* @return the input
*/
@@ -189,40 +193,40 @@ public Map getOutput() {
public void setOutput(Map output) {
this.output = output;
}
-
+
/**
- *
+ *
* @return The correlation id used when starting the workflow
*/
public String getCorrelationId() {
return correlationId;
}
-
+
/**
- *
+ *
* @param correlationId the correlation id
*/
public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
}
-
+
/**
- *
+ *
* @return Workflow Type / Definition
*/
public String getWorkflowType() {
return workflowType;
}
-
+
/**
- *
+ *
* @param workflowType Workflow type
*/
public void setWorkflowType(String workflowType) {
this.workflowType = workflowType;
}
-
-
+
+
/**
* @return the version
*/
@@ -235,23 +239,23 @@ public int getVersion() {
public void setVersion(int version) {
this.version = version;
}
-
+
public String getReRunFromWorkflowId() {
return reRunFromWorkflowId;
}
-
+
public void setReRunFromWorkflowId(String reRunFromWorkflowId) {
this.reRunFromWorkflowId = reRunFromWorkflowId;
}
-
+
public String getReasonForIncompletion() {
return reasonForIncompletion;
}
-
+
public void setReasonForIncompletion(String reasonForIncompletion) {
this.reasonForIncompletion = reasonForIncompletion;
}
-
+
/**
* @return the parentWorkflowId
*/
@@ -264,7 +268,7 @@ public String getParentWorkflowId() {
public void setParentWorkflowId(String parentWorkflowId) {
this.parentWorkflowId = parentWorkflowId;
}
-
+
/**
* @return the parentWorkflowTaskId
*/
@@ -289,17 +293,17 @@ public int getSchemaVersion() {
public void setSchemaVersion(int schemaVersion) {
this.schemaVersion = schemaVersion;
}
-
+
/**
- *
+ *
* @return Name of the event that started the workflow
*/
public String getEvent() {
return event;
}
-
+
/**
- *
+ *
* @param event Name of the event that started the workflow
*/
public void setEvent(String event) {
@@ -314,16 +318,39 @@ public void setFailedReferenceTaskNames(Set failedReferenceTaskNames) {
this.failedReferenceTaskNames = failedReferenceTaskNames;
}
- @Override
- public String toString() {
- return workflowType + "." + version + "/" + workflowId + "." + status;
+ /**
+ * @return the external storage path of the workflow input payload
+ */
+ public String getExternalInputPayloadStoragePath() {
+ return externalInputPayloadStoragePath;
+ }
+
+ /**
+ * @param externalInputPayloadStoragePath the external storage path where the workflow input payload is stored
+ */
+ public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) {
+ this.externalInputPayloadStoragePath = externalInputPayloadStoragePath;
}
-
+
+ /**
+ * @return the external storage path of the workflow output payload
+ */
+ public String getExternalOutputPayloadStoragePath() {
+ return externalOutputPayloadStoragePath;
+ }
+
+ /**
+ * @param externalOutputPayloadStoragePath the external storage path where the workflow output payload is stored
+ */
+ public void setExternalOutputPayloadStoragePath(String externalOutputPayloadStoragePath) {
+ this.externalOutputPayloadStoragePath = externalOutputPayloadStoragePath;
+ }
+
public Task getTaskByRefName(String refName) {
if (refName == null) {
throw new RuntimeException("refName passed is null. Check the workflow execution. For dynamic tasks, make sure referenceTaskName is set to a not null value");
}
- LinkedList found = new LinkedList();
+ LinkedList found = new LinkedList<>();
for (Task t : tasks) {
if (t.getReferenceTaskName() == null) {
throw new RuntimeException("Task " + t.getTaskDefName() + ", seq=" + t.getSeq() + " does not have reference name specified.");
@@ -337,5 +364,41 @@ public Task getTaskByRefName(String refName) {
}
return found.getLast();
}
-
-}
\ No newline at end of file
+
+ /**
+ * @return a deep copy of the workflow instance
+ * Note: This does not copy the following fields:
+ *
+ * - endTime
+ * - taskToDomain
+ * - failedReferenceTaskNames
+ * - externalInputPayloadStoragePath
+ * - externalOutputPayloadStoragePath
+ *
+ */
+ public Workflow copy() {
+ Workflow copy = new Workflow();
+ copy.setInput(input);
+ copy.setOutput(output);
+ copy.setStatus(status);
+ copy.setWorkflowId(workflowId);
+ copy.setParentWorkflowId(parentWorkflowId);
+ copy.setParentWorkflowTaskId(parentWorkflowTaskId);
+ copy.setReRunFromWorkflowId(reRunFromWorkflowId);
+ copy.setWorkflowType(workflowType);
+ copy.setVersion(version);
+ copy.setCorrelationId(correlationId);
+ copy.setEvent(event);
+ copy.setReasonForIncompletion(reasonForIncompletion);
+ copy.setSchemaVersion(schemaVersion);
+ copy.setTasks(tasks.stream()
+ .map(Task::copy)
+ .collect(Collectors.toList()));
+ return copy;
+ }
+
+ @Override
+ public String toString() {
+ return workflowType + "." + version + "/" + workflowId + "." + status;
+ }
+}
diff --git a/common/src/main/java/com/netflix/conductor/common/utils/ExternalPayloadStorage.java b/common/src/main/java/com/netflix/conductor/common/utils/ExternalPayloadStorage.java
new file mode 100644
index 0000000000..4497dbecc3
--- /dev/null
+++ b/common/src/main/java/com/netflix/conductor/common/utils/ExternalPayloadStorage.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.netflix.conductor.common.utils;
+
+import com.netflix.conductor.common.run.ExternalStorageLocation;
+
+import java.io.InputStream;
+
+/**
+ * Interface used to externalize the storage of large JSON payloads in workflow and task input/output
+ */
+public interface ExternalPayloadStorage {
+
+ enum Operation {READ, WRITE}
+
+ enum PayloadType {WORKFLOW_INPUT, WORKFLOW_OUTPUT, TASK_INPUT, TASK_OUTPUT}
+
+ /**
+ * Obtain a uri used to store/access a json payload in external storage.
+ *
+ * @param operation the type of {@link Operation} to be performed with the uri
+ * @param payloadType the {@link PayloadType} that is being accessed at the uri
+ * @param path (optional) the relative path for which the external storage location object is to be populated.
+ * If path is not specified, it will be computed and populated.
+ * @return a {@link ExternalStorageLocation} object which contains the uri and the path for the json payload
+ */
+ ExternalStorageLocation getLocation(Operation operation, PayloadType payloadType, String path);
+
+ /**
+ * Upload a json payload to the specified external storage location.
+ *
+ * @param path the location to which the object is to be uploaded
+ * @param payload an {@link InputStream} containing the json payload which is to be uploaded
+ * @param payloadSize the size of the json payload in bytes
+ */
+ void upload(String path, InputStream payload, long payloadSize);
+
+ /**
+ * Download the json payload from the specified external storage location.
+ *
+ * @param path the location from where the object is to be downloaded
+ * @return an {@link InputStream} of the json payload at the specified location
+ */
+ InputStream download(String path);
+}
diff --git a/common/src/test/java/com/netflix/conductor/common/workflow/TestWorkflowDef.java b/common/src/test/java/com/netflix/conductor/common/workflow/TestWorkflowDef.java
index 21903f39b6..7e77e5f01c 100644
--- a/common/src/test/java/com/netflix/conductor/common/workflow/TestWorkflowDef.java
+++ b/common/src/test/java/com/netflix/conductor/common/workflow/TestWorkflowDef.java
@@ -18,20 +18,20 @@
*/
package com.netflix.conductor.common.workflow;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
+import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
+import com.netflix.conductor.common.metadata.workflow.WorkflowTask.Type;
+import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.junit.Test;
-
-import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
-import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
-import com.netflix.conductor.common.metadata.workflow.WorkflowTask.Type;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
/**
* @author Viren
@@ -47,7 +47,7 @@ private WorkflowTask createTask(int c){
}
@Test
- public void test() throws Exception {
+ public void test() {
String COND_TASK_WF = "COND_TASK_WF";
List wfts = new ArrayList(10);
@@ -78,7 +78,7 @@ public void test() throws Exception {
caseTask.setTaskReferenceName("case");
Map> dc = new HashMap<>();
dc.put("c1", Arrays.asList(wfts.get(0), subCaseTask, wfts.get(1)));
- dc.put("c2", Arrays.asList(wfts.get(3)));
+ dc.put("c2", Collections.singletonList(wfts.get(3)));
caseTask.setDecisionCases(dc);
WorkflowTask finalTask = new WorkflowTask();
diff --git a/contribs/dependencies.lock b/contribs/dependencies.lock
index 35f95f4197..8683de789b 100644
--- a/contribs/dependencies.lock
+++ b/contribs/dependencies.lock
@@ -1,7 +1,13 @@
{
"compile": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -112,8 +118,14 @@
}
},
"compileClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -224,8 +236,14 @@
}
},
"default": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -346,8 +364,14 @@
}
},
"runtime": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -458,8 +482,14 @@
}
},
"runtimeClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -570,8 +600,14 @@
}
},
"testCompile": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -698,8 +734,14 @@
}
},
"testCompileClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -826,8 +868,14 @@
}
},
"testRuntime": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
@@ -954,8 +1002,14 @@
}
},
"testRuntimeClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.amazonaws:aws-java-sdk-sqs": {
- "locked": "1.11.380",
+ "locked": "1.11.389",
"requested": "latest.release"
},
"com.fasterxml.jackson.core:jackson-core": {
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java b/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java
index 9333f0baad..876c7292d1 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/http/HttpTask.java
@@ -160,9 +160,7 @@ protected HttpResponse httpCall(Input input) throws Exception {
if(input.body != null) {
builder.entity(input.body);
}
- input.headers.entrySet().forEach(e -> {
- builder.header(e.getKey(), e.getValue());
- });
+ input.headers.forEach(builder::header);
HttpResponse response = new HttpResponse();
try {
diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java b/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java
index 89eaa32ccf..1f5b604845 100644
--- a/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java
+++ b/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java
@@ -42,6 +42,7 @@
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.UserDefinedTaskMapper;
import com.netflix.conductor.core.execution.mapper.WaitTaskMapper;
+import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import org.eclipse.jetty.server.Request;
@@ -53,7 +54,6 @@
import org.junit.BeforeClass;
import org.junit.Test;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
@@ -89,7 +89,9 @@ public class TestHttpTask {
private HttpTask httpTask;
- private WorkflowExecutor executor = mock(WorkflowExecutor.class);
+ private WorkflowExecutor workflowExecutor;
+
+ private Configuration config;
private Workflow workflow = new Workflow();
@@ -125,13 +127,14 @@ public static void cleanup() {
@Before
public void setup() {
RestClientManager rcm = new RestClientManager();
- Configuration config = mock(Configuration.class);
+ workflowExecutor = mock(WorkflowExecutor.class);
+ config = mock(Configuration.class);
when(config.getServerId()).thenReturn("test_server_id");
httpTask = new HttpTask(rcm, config);
}
@Test
- public void testPost() throws Exception {
+ public void testPost() {
Task task = new Task();
Input input = new Input();
@@ -143,7 +146,7 @@ public void testPost() throws Exception {
input.setMethod("POST");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
assertEquals(task.getReasonForIncompletion(), Task.Status.COMPLETED, task.getStatus());
Map hr = (Map) task.getOutputData().get("response");
Object response = hr.get("body");
@@ -158,7 +161,7 @@ public void testPost() throws Exception {
@Test
- public void testPostNoContent() throws Exception {
+ public void testPostNoContent() {
Task task = new Task();
Input input = new Input();
@@ -170,7 +173,7 @@ public void testPostNoContent() throws Exception {
input.setMethod("POST");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
assertEquals(task.getReasonForIncompletion(), Task.Status.COMPLETED, task.getStatus());
Map hr = (Map) task.getOutputData().get("response");
Object response = hr.get("body");
@@ -179,7 +182,7 @@ public void testPostNoContent() throws Exception {
}
@Test
- public void testFailure() throws Exception {
+ public void testFailure() {
Task task = new Task();
Input input = new Input();
@@ -187,19 +190,19 @@ public void testFailure() throws Exception {
input.setMethod("GET");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
assertEquals("Task output: " + task.getOutputData(), Task.Status.FAILED, task.getStatus());
assertEquals(ERROR_RESPONSE, task.getReasonForIncompletion());
task.setStatus(Status.SCHEDULED);
task.getInputData().remove(HttpTask.REQUEST_PARAMETER_NAME);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
assertEquals(Task.Status.FAILED, task.getStatus());
assertEquals(HttpTask.MISSING_REQUEST, task.getReasonForIncompletion());
}
@Test
- public void testTextGET() throws Exception {
+ public void testTextGET() {
Task task = new Task();
Input input = new Input();
@@ -207,7 +210,7 @@ public void testTextGET() throws Exception {
input.setMethod("GET");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
Map hr = (Map) task.getOutputData().get("response");
Object response = hr.get("body");
assertEquals(Task.Status.COMPLETED, task.getStatus());
@@ -215,7 +218,7 @@ public void testTextGET() throws Exception {
}
@Test
- public void testNumberGET() throws Exception {
+ public void testNumberGET() {
Task task = new Task();
Input input = new Input();
@@ -223,7 +226,7 @@ public void testNumberGET() throws Exception {
input.setMethod("GET");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
Map hr = (Map) task.getOutputData().get("response");
Object response = hr.get("body");
assertEquals(Task.Status.COMPLETED, task.getStatus());
@@ -240,7 +243,7 @@ public void testJsonGET() throws Exception {
input.setMethod("GET");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
Map hr = (Map) task.getOutputData().get("response");
Object response = hr.get("body");
assertEquals(Task.Status.COMPLETED, task.getStatus());
@@ -250,7 +253,7 @@ public void testJsonGET() throws Exception {
}
@Test
- public void testExecute() throws Exception {
+ public void testExecute() {
Task task = new Task();
Input input = new Input();
@@ -259,20 +262,19 @@ public void testExecute() throws Exception {
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
task.setStatus(Status.SCHEDULED);
task.setScheduledTime(0);
- boolean executed = httpTask.execute(workflow, task, executor);
+ boolean executed = httpTask.execute(workflow, task, workflowExecutor);
assertFalse(executed);
-
}
@Test
- public void testOptional() throws Exception {
+ public void testOptional() {
Task task = new Task();
Input input = new Input();
input.setUri("http://localhost:7009/failure");
input.setMethod("GET");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
assertEquals("Task output: " + task.getOutputData(), Task.Status.FAILED, task.getStatus());
assertEquals(ERROR_RESPONSE, task.getReasonForIncompletion());
assertTrue(!task.getStatus().isSuccessful());
@@ -280,7 +282,7 @@ public void testOptional() throws Exception {
task.setStatus(Status.SCHEDULED);
task.getInputData().remove(HttpTask.REQUEST_PARAMETER_NAME);
task.setReferenceTaskName("t1");
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
assertEquals(Task.Status.FAILED, task.getStatus());
assertEquals(HttpTask.MISSING_REQUEST, task.getReasonForIncompletion());
assertTrue(!task.getStatus().isSuccessful());
@@ -297,7 +299,8 @@ public void testOptional() throws Exception {
def.getTasks().add(wft);
MetadataDAO metadataDAO = mock(MetadataDAO.class);
QueueDAO queueDAO = mock(QueueDAO.class);
- ParametersUtils parametersUtils = new ParametersUtils();
+ ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
+ ParametersUtils parametersUtils = mock(ParametersUtils.class);
Map taskMappers = new HashMap<>();
taskMappers.put("DECISION", new DecisionTaskMapper());
taskMappers.put("DYNAMIC", new DynamicTaskMapper(parametersUtils, metadataDAO));
@@ -309,14 +312,14 @@ public void testOptional() throws Exception {
taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
- new DeciderService(metadataDAO, queueDAO, taskMappers).decide(workflow, def);
+ new DeciderService(metadataDAO, parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers).decide(workflow, def);
System.out.println(workflow.getTasks());
System.out.println(workflow.getStatus());
}
@Test
- public void testOAuth() throws Exception {
+ public void testOAuth() {
Task task = new Task();
Input input = new Input();
input.setUri("http://localhost:7009/oauth");
@@ -325,7 +328,7 @@ public void testOAuth() throws Exception {
input.setOauthConsumerSecret("someSecret");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
- httpTask.start(workflow, task, executor);
+ httpTask.start(workflow, task, workflowExecutor);
Map response = (Map) task.getOutputData().get("response");
Map body = (Map) response.get("body");
@@ -346,7 +349,7 @@ private static class EchoHandler extends AbstractHandler {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
+ throws IOException {
if(request.getMethod().equals("GET") && request.getRequestURI().equals("/text")) {
PrintWriter writer = response.getWriter();
writer.print(TEXT_RESPONSE);
diff --git a/core/build.gradle b/core/build.gradle
index 07cdcd4eca..17a764f97a 100644
--- a/core/build.gradle
+++ b/core/build.gradle
@@ -8,13 +8,13 @@ dependencies {
compile "com.netflix.servo:servo-core:${revServo}"
compile "com.netflix.spectator:spectator-api:${revSpectator}"
+
compile "com.fasterxml.jackson.core:jackson-databind:${revJacksonDatabind}"
compile "com.fasterxml.jackson.core:jackson-core:${revJacksonCore}"
compile "com.jayway.jsonpath:json-path:${revJsonPath}"
-
compile "org.apache.commons:commons-lang3:${revCommonsLang3}"
-
compile "com.spotify:completable-futures:${revSpotifyCompletableFutures}"
+ compile "com.amazonaws:aws-java-sdk-s3:${revAwsSdk}"
testCompile "org.slf4j:slf4j-log4j12:${revSlf4jlog4j}"
}
diff --git a/core/dependencies.lock b/core/dependencies.lock
index 355f6fb4d0..b4a2b8ae79 100644
--- a/core/dependencies.lock
+++ b/core/dependencies.lock
@@ -1,5 +1,9 @@
{
"compile": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -57,6 +61,10 @@
}
},
"compileClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -114,6 +122,10 @@
}
},
"default": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -171,6 +183,10 @@
}
},
"runtime": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -228,6 +244,10 @@
}
},
"runtimeClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -285,6 +305,10 @@
}
},
"testCompile": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -354,6 +378,10 @@
}
},
"testCompileClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -423,6 +451,10 @@
}
},
"testRuntime": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
@@ -492,6 +524,10 @@
}
},
"testRuntimeClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "locked": "1.11.86",
+ "requested": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"locked": "2.7.5",
"requested": "2.7.5"
diff --git a/core/src/main/java/com/netflix/conductor/core/config/Configuration.java b/core/src/main/java/com/netflix/conductor/core/config/Configuration.java
index bb1a188fa8..ab2c795e62 100644
--- a/core/src/main/java/com/netflix/conductor/core/config/Configuration.java
+++ b/core/src/main/java/com/netflix/conductor/core/config/Configuration.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,16 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- *
- */
+
package com.netflix.conductor.core.config;
+import com.google.inject.AbstractModule;
+
import java.util.List;
import java.util.Map;
-import com.google.inject.AbstractModule;
-
/**
* @author Viren
*
@@ -33,13 +31,13 @@ public interface Configuration {
*
* @return time frequency in seconds, at which the workflow sweeper should run to evaluate running workflows.
*/
- public int getSweepFrequency();
+ int getSweepFrequency();
/**
*
* @return when set to true, the sweep is disabled
*/
- public boolean disableSweep();
+ boolean disableSweep();
/**
@@ -47,43 +45,91 @@ public interface Configuration {
* @return when set to true, the background task workers executing async system tasks (eg HTTP) are disabled
*
*/
- public boolean disableAsyncWorkers();
+ boolean disableAsyncWorkers();
/**
*
* @return ID of the server. Can be host name, IP address or any other meaningful identifier. Used for logging
*/
- public String getServerId();
+ String getServerId();
/**
*
* @return Current environment. e.g. test, prod
*/
- public String getEnvironment();
+ String getEnvironment();
/**
*
* @return name of the stack under which the app is running. e.g. devint, testintg, staging, prod etc.
*/
- public String getStack();
+ String getStack();
/**
*
* @return APP ID. Used for logging
*/
- public String getAppId();
+ String getAppId();
/**
*
* @return Data center region. if hosting on Amazon the value is something like us-east-1, us-west-2 etc.
*/
- public String getRegion();
+ String getRegion();
/**
*
* @return Availability zone / rack. for AWS deployments, the value is something like us-east-1a, etc.
*/
- public String getAvailabilityZone();
+ String getAvailabilityZone();
+
+ /**
+ *
+ * @return The threshold of the workflow input payload size in KB beyond which the payload will be stored in {@link com.netflix.conductor.common.utils.ExternalPayloadStorage}
+ */
+ Long getWorkflowInputPayloadSizeThresholdKB();
+
+ /**
+ *
+ * @return The maximum threshold of the workflow input payload size in KB beyond which input will be rejected and the workflow will be marked as FAILED
+ */
+ Long getMaxWorkflowInputPayloadSizeThresholdKB();
+
+ /**
+ *
+ * @return The threshold of the workflow output payload size in KB beyond which the payload will be stored in {@link com.netflix.conductor.common.utils.ExternalPayloadStorage}
+ */
+ Long getWorkflowOutputPayloadSizeThresholdKB();
+
+ /**
+ *
+ * @return The maximum threshold of the workflow output payload size in KB beyond which output will be rejected and the workflow will be marked as FAILED
+ */
+ Long getMaxWorkflowOutputPayloadSizeThresholdKB();
+
+ /**
+ *
+ * @return The threshold of the task input payload size in KB beyond which the payload will be stored in {@link com.netflix.conductor.common.utils.ExternalPayloadStorage}
+ */
+ Long getTaskInputPayloadSizeThresholdKB();
+
+ /**
+ *
+ * @return The maximum threshold of the task input payload size in KB beyond which the task input will be rejected and the task will be marked as FAILED_WITH_TERMINAL_ERROR
+ */
+ Long getMaxTaskInputPayloadSizeThresholdKB();
+
+ /**
+ *
+ * @return The threshold of the task output payload size in KB beyond which the payload will be stored in {@link com.netflix.conductor.common.utils.ExternalPayloadStorage}
+ */
+ Long getTaskOutputPayloadSizeThresholdKB();
+
+ /**
+ *
+ * @return The maximum threshold of the task output payload size in KB beyond which the task input will be rejected and the task will be marked as FAILED_WITH_TERMINAL_ERROR
+ */
+ Long getMaxTaskOutputPayloadSizeThresholdKB();
/**
*
@@ -91,7 +137,7 @@ public interface Configuration {
* @param defaultValue Default value when not specified
* @return User defined integer property.
*/
- public int getIntProperty(String name, int defaultValue);
+ int getIntProperty(String name, int defaultValue);
/**
@@ -100,7 +146,7 @@ public interface Configuration {
* @param defaultValue Default value when not specified
* @return User defined Long property.
*/
- public long getLongProperty(String name, long defaultValue);
+ long getLongProperty(String name, long defaultValue);
/**
*
@@ -108,14 +154,14 @@ public interface Configuration {
* @param defaultValue Default value when not specified
* @return User defined string property.
*/
- public String getProperty(String name, String defaultValue);
+ String getProperty(String name, String defaultValue);
/**
*
* @return Returns all the configurations in a map.
*/
- public Map getAll();
+ Map getAll();
/**
*
@@ -123,7 +169,7 @@ public interface Configuration {
* Use this to inject additional modules that should be loaded as part of the Conductor server initialization
* If you are creating custom tasks (com.netflix.conductor.core.execution.tasks.WorkflowSystemTask) then initialize them as part of the custom modules.
*/
- public default List getAdditionalModules() {
+ default List getAdditionalModules() {
return null;
}
diff --git a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java
index 7c504e1450..30297331d7 100644
--- a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java
+++ b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -29,7 +29,6 @@
import com.netflix.conductor.core.events.ActionProcessor;
import com.netflix.conductor.core.events.EventProcessor;
import com.netflix.conductor.core.events.EventQueueProvider;
-import com.netflix.conductor.core.events.EventQueues;
import com.netflix.conductor.core.events.queue.dyno.DynoEventQueueProvider;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.mapper.DecisionTaskMapper;
@@ -50,7 +49,6 @@
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
-
/**
* @author Viren
*/
@@ -59,7 +57,6 @@ public class CoreModule extends AbstractModule {
@Override
protected void configure() {
install(MultibindingsScanner.asModule());
- requestStaticInjection(EventQueues.class);
bind(ActionProcessor.class).asEagerSingleton();
bind(EventProcessor.class).asEagerSingleton();
bind(SystemTaskWorkerCoordinator.class).asEagerSingleton();
@@ -74,7 +71,6 @@ public ParametersUtils getParameterUtils() {
return new ParametersUtils();
}
-
@ProvidesIntoMap
@StringMapKey("conductor")
@Singleton
@@ -108,7 +104,6 @@ public TaskMapper getJoinTaskMapper() {
}
-
@ProvidesIntoMap
@StringMapKey("FORK_JOIN_DYNAMIC")
@Singleton
@@ -164,6 +159,4 @@ public TaskMapper getUserDefinedTaskMapper(ParametersUtils parametersUtils, Meta
public TaskMapper getSimpleTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
return new SimpleTaskMapper(parametersUtils, metadataDAO);
}
-
-
}
diff --git a/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java
index c9ba401fe8..7aaeffdb17 100644
--- a/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java
+++ b/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java
@@ -50,13 +50,15 @@ public class ActionProcessor {
private final WorkflowExecutor executor;
private final MetadataService metadataService;
- private final ParametersUtils parametersUtils = new ParametersUtils();
+ private final ParametersUtils parametersUtils;
+
private final JsonUtils jsonUtils = new JsonUtils();
@Inject
- public ActionProcessor(WorkflowExecutor executor, MetadataService metadataService) {
+ public ActionProcessor(WorkflowExecutor executor, MetadataService metadataService, ParametersUtils parametersUtils) {
this.executor = executor;
this.metadataService = metadataService;
+ this.parametersUtils = parametersUtils;
}
public Map execute(Action action, Object payloadObject, String event, String messageId) {
diff --git a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java
index 155fa452e0..528380f856 100644
--- a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java
+++ b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java
@@ -70,6 +70,7 @@ public class EventProcessor {
private final MetadataService metadataService;
private final ExecutionService executionService;
private final ActionProcessor actionProcessor;
+ private final EventQueues eventQueues;
private ExecutorService executorService;
private final Map eventToQueueMap = new ConcurrentHashMap<>();
@@ -78,10 +79,11 @@ public class EventProcessor {
@Inject
public EventProcessor(ExecutionService executionService, MetadataService metadataService,
- ActionProcessor actionProcessor, Configuration config) {
+ ActionProcessor actionProcessor, EventQueues eventQueues, Configuration config) {
this.executionService = executionService;
this.metadataService = metadataService;
this.actionProcessor = actionProcessor;
+ this.eventQueues = eventQueues;
int executorThreadCount = config.getIntProperty("workflow.event.processor.thread.count", 2);
if (executorThreadCount > 0) {
@@ -120,7 +122,7 @@ private void refresh() {
List createdQueues = new LinkedList<>();
events.forEach(event -> eventToQueueMap.computeIfAbsent(event, s -> {
- ObservableQueue q = EventQueues.getQueue(event);
+ ObservableQueue q = eventQueues.getQueue(event);
createdQueues.add(q);
return q;
}
@@ -141,7 +143,7 @@ private void listen(ObservableQueue queue) {
queue.observe().subscribe((Message msg) -> handle(queue, msg));
}
- @SuppressWarnings({"unchecked", "ToArrayCallWithZeroLengthArrayArgument"})
+ @SuppressWarnings({"unchecked"})
private void handle(ObservableQueue queue, Message msg) {
try {
executionService.addMessage(queue.getName(), msg);
diff --git a/core/src/main/java/com/netflix/conductor/core/events/EventQueues.java b/core/src/main/java/com/netflix/conductor/core/events/EventQueues.java
index 9aabc40e5a..0c865fd97b 100644
--- a/core/src/main/java/com/netflix/conductor/core/events/EventQueues.java
+++ b/core/src/main/java/com/netflix/conductor/core/events/EventQueues.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
/**
- *
+ *
*/
package com.netflix.conductor.core.events;
@@ -25,34 +25,37 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author Viren
- * Static holders for internal event queues
+ * Holders for internal event queues
*/
+@Singleton
public class EventQueues {
-
- private static Logger logger = LoggerFactory.getLogger(EventQueues.class);
- private static ParametersUtils parametersUtils = new ParametersUtils();
+ private static final Logger logger = LoggerFactory.getLogger(EventQueues.class);
- @Inject
- @Named("EventQueueProviders")
- public static Map providers; //TODO this is a leaky abstraction, when the static injection is moved to singleton this will be fixed
+ private final ParametersUtils parametersUtils;
- private EventQueues() {
+ private final Map providers;
+
+ @Inject
+ public EventQueues(@Named("EventQueueProviders") Map providers, ParametersUtils parametersUtils) {
+ this.providers = providers;
+ this.parametersUtils = parametersUtils;
}
- public static List providers() {
+ public List getProviders() {
return providers.values().stream()
.map(p -> p.getClass().getName())
.collect(Collectors.toList());
}
- public static ObservableQueue getQueue(String eventType) {
+ public ObservableQueue getQueue(String eventType) {
String event = parametersUtils.replace(eventType).toString();
int index = event.indexOf(':');
if (index == -1) {
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/Code.java b/core/src/main/java/com/netflix/conductor/core/execution/Code.java
index 808efce7f9..18422aad0a 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/Code.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/Code.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package com.netflix.conductor.core.execution;
import java.util.HashMap;
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java
index dd73264fcc..b6bfbc9a61 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,9 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-/**
- *
- */
package com.netflix.conductor.core.execution;
import com.google.common.annotations.VisibleForTesting;
@@ -27,16 +24,20 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowTask.Type;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.TaskMapperContext;
+import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.Collections;
@@ -48,6 +49,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.COMPLETED_WITH_ERRORS;
@@ -71,15 +73,25 @@ public class DeciderService {
private final QueueDAO queueDAO;
- private ParametersUtils parametersUtils = new ParametersUtils();
+ private final ParametersUtils parametersUtils;
private final Map taskMappers;
+ private final ExternalPayloadStorageUtils externalPayloadStorageUtils;
+
+
+ @SuppressWarnings("ConstantConditions")
+ private final Predicate isNonPendingTask = task -> !task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted() || SystemTaskType.isBuiltIn(task.getTaskType());
+
@Inject
- public DeciderService(MetadataDAO metadataDAO, QueueDAO queueDAO, @Named("TaskMappers") Map taskMappers) {
+ public DeciderService(MetadataDAO metadataDAO, ParametersUtils parametersUtils, QueueDAO queueDAO,
+ ExternalPayloadStorageUtils externalPayloadStorageUtils,
+ @Named("TaskMappers") Map taskMappers) {
this.metadataDAO = metadataDAO;
this.queueDAO = queueDAO;
+ this.parametersUtils = parametersUtils;
this.taskMappers = taskMappers;
+ this.externalPayloadStorageUtils = externalPayloadStorageUtils;
}
//QQ public method validation of the input params
@@ -90,7 +102,7 @@ public DeciderOutcome decide(Workflow workflow, WorkflowDef workflowDef) throws
final List tasks = workflow.getTasks();
//In case of a new workflow the list of executedTasks will also be empty
List executedTasks = tasks.stream()
- .filter(t -> !t.getStatus().equals(SKIPPED) && !t.getStatus().equals(READY_FOR_RERUN))
+ .filter(t -> !t.getStatus().equals(SKIPPED) && !t.getStatus().equals(READY_FOR_RERUN) && !t.isExecuted())
.collect(Collectors.toList());
List tasksToBeScheduled = new LinkedList<>();
@@ -119,19 +131,19 @@ private DeciderOutcome decide(final WorkflowDef workflowDef, final Workflow work
return outcome;
}
- // Filter the list of tasks and include only tasks that are not retried,
+ // Filter the list of tasks and include only tasks that are not retried, not executed,
// marked to be skipped and not part of System tasks that is DECISION, FORK, JOIN
// This list will be empty for a new workflow being started
List pendingTasks = workflow.getTasks()
.stream()
- .filter(task -> (!task.isRetried() && !task.getStatus().equals(SKIPPED)) || SystemTaskType.isBuiltIn(task.getTaskType()))
+ .filter(isNonPendingTask)
.collect(Collectors.toList());
// Get all the tasks that are ready to rerun or not marked to be skipped
// This list will be empty for a new workflow
Set executedTaskRefNames = workflow.getTasks()
.stream()
- .filter(task -> !task.getStatus().equals(SKIPPED) && !task.getStatus().equals(READY_FOR_RERUN))
+ .filter(Task::isExecuted)
.map(Task::getReferenceTaskName)
.collect(Collectors.toSet());
@@ -151,9 +163,9 @@ private DeciderOutcome decide(final WorkflowDef workflowDef, final Workflow work
TaskDef taskDefinition = metadataDAO.getTaskDef(pendingTask.getTaskDefName());
if (taskDefinition != null) {
checkForTimeout(taskDefinition, pendingTask);
- // If the task has not been updated for "responseTimeout" then mark task as TIMED_OUT
+ // If the task has not been updated for "responseTimeoutSeconds" then mark task as TIMED_OUT
if (isResponseTimedOut(taskDefinition, pendingTask)) {
- timeoutTask(pendingTask);
+ timeoutTask(taskDefinition, pendingTask);
}
}
@@ -177,10 +189,10 @@ private DeciderOutcome decide(final WorkflowDef workflowDef, final Workflow work
List nextTasks = getNextTask(workflowDef, workflow, pendingTask);
nextTasks.forEach(nextTask -> tasksToBeScheduled.putIfAbsent(nextTask.getReferenceTaskName(), nextTask));
outcome.tasksToBeUpdated.add(pendingTask);
- logger.debug("Scheduling Tasks from {}, next = {}", pendingTask.getTaskDefName(),
+ logger.debug("Scheduling Tasks from {}, next = {} for workflow: {}", pendingTask.getTaskDefName(),
nextTasks.stream()
.map(Task::getTaskDefName)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()), workflow.getWorkflowId());
}
}
@@ -189,12 +201,11 @@ private DeciderOutcome decide(final WorkflowDef workflowDef, final Workflow work
.filter(task -> !executedTaskRefNames.contains(task.getReferenceTaskName()))
.collect(Collectors.toList());
if (!unScheduledTasks.isEmpty()) {
- logger.debug("Scheduling Tasks {} ", unScheduledTasks.stream()
+ logger.debug("Scheduling Tasks {} for workflow: {}", unScheduledTasks.stream()
.map(Task::getTaskDefName)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()), workflow.getWorkflowId());
outcome.tasksToBeScheduled.addAll(unScheduledTasks);
}
- updateOutput(workflowDef, workflow);
if (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflowDef, workflow)) {
logger.debug("Marking workflow as complete. workflow=" + workflow.getWorkflowId() + ", tasks=" + workflow.getTasks());
outcome.isComplete = true;
@@ -203,26 +214,26 @@ private DeciderOutcome decide(final WorkflowDef workflowDef, final Workflow work
return outcome;
}
- private List startWorkflow(Workflow workflow, WorkflowDef def) throws TerminateWorkflowException {
+ private List startWorkflow(Workflow workflow, WorkflowDef workflowDef) throws TerminateWorkflowException {
- logger.debug("Starting workflow " + def.getName() + "/" + workflow.getWorkflowId());
+ logger.debug("Starting workflow " + workflowDef.getName() + "/" + workflow.getWorkflowId());
//The tasks will be empty in case of new workflow
List tasks = workflow.getTasks();
// Check if the workflow is a re-run case or if it is a new workflow execution
if (workflow.getReRunFromWorkflowId() == null || tasks.isEmpty()) {
- if (def.getTasks().isEmpty()) {
+ if (workflowDef.getTasks().isEmpty()) {
throw new TerminateWorkflowException("No tasks found to be executed", WorkflowStatus.COMPLETED);
}
- WorkflowTask taskToSchedule = def.getTasks().getFirst(); //Nothing is running yet - so schedule the first task
+ WorkflowTask taskToSchedule = workflowDef.getTasks().getFirst(); //Nothing is running yet - so schedule the first task
//Loop until a non-skipped task is found
while (isTaskSkipped(taskToSchedule, workflow)) {
- taskToSchedule = def.getNextTask(taskToSchedule.getTaskReferenceName());
+ taskToSchedule = workflowDef.getNextTask(taskToSchedule.getTaskReferenceName());
}
//In case of a new workflow a the first non-skippable task will be scheduled
- return getTasksToBeScheduled(def, workflow, taskToSchedule, 0);
+ return getTasksToBeScheduled(workflowDef, workflow, taskToSchedule, 0);
}
// Get the first task to schedule
@@ -242,24 +253,36 @@ private List startWorkflow(Workflow workflow, WorkflowDef def) throws Term
});
return Collections.singletonList(rerunFromTask);
-
}
- private void updateOutput(final WorkflowDef def, final Workflow workflow) {
-
+ /**
+ * Updates the workflow output.
+ *
+ * @param workflow the workflow instance
+ * @param task if not null, the output of this task will be copied to workflow output if no output parameters are specified in the workflow defintion
+ * if null, the output of the last task in the workflow will be copied to workflow output of no output parameters are specified in the workflow definition
+ */
+ void updateWorkflowOutput(final Workflow workflow, @Nullable Task task) {
List allTasks = workflow.getTasks();
if (allTasks.isEmpty()) {
return;
}
- Task last;
- last = allTasks.get(allTasks.size() - 1);
- Map output = last.getOutputData();
-
+ Task last = Optional.ofNullable(task).orElse(allTasks.get(allTasks.size() - 1));
+ WorkflowDef def = metadataDAO.get(workflow.getWorkflowType(), workflow.getVersion());
+ Map output;
if (!def.getOutputParameters().isEmpty()) {
- output = parametersUtils.getTaskInput(def.getOutputParameters(), workflow, null, null);
+ Workflow workflowInstance = populateWorkflowAndTaskData(workflow);
+ output = parametersUtils.getTaskInput(def.getOutputParameters(), workflowInstance, null, null);
+ } else if (StringUtils.isNotBlank(last.getExternalOutputPayloadStoragePath())) {
+ output = externalPayloadStorageUtils.downloadPayload(last.getExternalOutputPayloadStoragePath());
+ Monitors.recordExternalPayloadStorageUsage(last.getTaskDefName(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.TASK_OUTPUT.toString());
+ } else {
+ output = last.getOutputData();
}
+
workflow.setOutput(output);
+ externalPayloadStorageUtils.verifyAndUpload(workflow, ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT);
}
private boolean checkForWorkflowCompletion(final WorkflowDef def, final Workflow workflow) throws TerminateWorkflowException {
@@ -287,15 +310,10 @@ private boolean checkForWorkflowCompletion(final WorkflowDef def, final Workflow
return next != null && !taskStatusMap.containsKey(next);
}).collect(Collectors.toList()).isEmpty();
- if (allCompletedSuccessfully && noPendingTasks && noPendingSchedule) {
- return true;
- }
-
- return false;
+ return allCompletedSuccessfully && noPendingTasks && noPendingSchedule;
}
- @VisibleForTesting
- List getNextTask(WorkflowDef def, Workflow workflow, Task task) {
+ private List getNextTask(WorkflowDef def, Workflow workflow, Task task) {
// Get the following task after the last completed task
if (SystemTaskType.is(task.getTaskType()) && SystemTaskType.DECISION.name().equals(task.getTaskType())) {
@@ -358,15 +376,57 @@ Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflo
rescheduled.setStatus(SCHEDULED);
rescheduled.setPollCount(0);
rescheduled.setInputData(new HashMap<>());
- rescheduled.getInputData().putAll(task.getInputData());
+ if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
+ rescheduled.setExternalInputPayloadStoragePath(task.getExternalInputPayloadStoragePath());
+ } else {
+ rescheduled.getInputData().putAll(task.getInputData());
+ }
if (workflowTask != null && workflow.getSchemaVersion() > 1) {
- Map taskInput = parametersUtils.getTaskInputV2(workflowTask.getInputParameters(), workflow, rescheduled.getTaskId(), taskDefinition);
+ Workflow workflowInstance = populateWorkflowAndTaskData(workflow);
+ Map taskInput = parametersUtils.getTaskInputV2(workflowTask.getInputParameters(), workflowInstance, rescheduled.getTaskId(), taskDefinition);
rescheduled.getInputData().putAll(taskInput);
}
+ externalPayloadStorageUtils.verifyAndUpload(rescheduled, ExternalPayloadStorage.PayloadType.TASK_INPUT);
//for the schema version 1, we do not have to recompute the inputs
return rescheduled;
}
+ /**
+ * Populates the workflow input data and the tasks input/output data if stored in external payload storage.
+ * This method creates a deep copy of the workflow instance where the payloads will be stored after downloading from external payload storage.
+ *
+ * @param workflow the workflow for which the data needs to be populated
+ * @return a copy of the workflow with the payload data populated
+ */
+ @VisibleForTesting
+ Workflow populateWorkflowAndTaskData(Workflow workflow) {
+ Workflow workflowInstance = workflow.copy();
+
+ if (StringUtils.isNotBlank(workflow.getExternalInputPayloadStoragePath())) {
+ // download the workflow input from external storage here and plug it into the workflow
+ Map workflowInputParams = externalPayloadStorageUtils.downloadPayload(workflow.getExternalInputPayloadStoragePath());
+ Monitors.recordExternalPayloadStorageUsage(workflow.getWorkflowType(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT.toString());
+ workflowInstance.setInput(workflowInputParams);
+ workflowInstance.setExternalInputPayloadStoragePath(null);
+ }
+
+ workflowInstance.getTasks().stream()
+ .filter(task -> StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath()) || StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath()))
+ .forEach(task -> {
+ if (StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath())) {
+ task.setOutputData(externalPayloadStorageUtils.downloadPayload(task.getExternalOutputPayloadStoragePath()));
+ Monitors.recordExternalPayloadStorageUsage(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.TASK_OUTPUT.toString());
+ task.setExternalOutputPayloadStoragePath(null);
+ }
+ if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
+ task.setInputData(externalPayloadStorageUtils.downloadPayload(task.getExternalInputPayloadStoragePath()));
+ Monitors.recordExternalPayloadStorageUsage(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.toString(), ExternalPayloadStorage.PayloadType.TASK_INPUT.toString());
+ task.setExternalInputPayloadStoragePath(null);
+ }
+ });
+ return workflowInstance;
+ }
+
@VisibleForTesting
void checkForTimeout(TaskDef taskDef, Task task) {
@@ -435,21 +495,22 @@ boolean isResponseTimedOut(TaskDef taskDefinition, Task task) {
return true;
}
- private void timeoutTask(Task task) {
- String reason = "responseTimeout: " + task.getResponseTimeoutSeconds() + " exceeded for the taskId: " + task.getTaskId() + " with Task Definition: " + task.getTaskDefName();
+ private void timeoutTask(TaskDef taskDef, Task task) {
+ String reason = "responseTimeout: " + taskDef.getResponseTimeoutSeconds() + " exceeded for the taskId: " + task.getTaskId() + " with Task Definition: " + task.getTaskDefName();
logger.debug(reason);
task.setStatus(TIMED_OUT);
task.setReasonForIncompletion(reason);
}
- public List getTasksToBeScheduled(WorkflowDef def, Workflow workflow,
+ public List getTasksToBeScheduled(WorkflowDef workflowDef, Workflow workflow,
WorkflowTask taskToSchedule, int retryCount) {
- return getTasksToBeScheduled(def, workflow, taskToSchedule, retryCount, null);
+ return getTasksToBeScheduled(workflowDef, workflow, taskToSchedule, retryCount, null);
}
- public List getTasksToBeScheduled(WorkflowDef workflowDefinition, Workflow workflowInstance,
+ public List getTasksToBeScheduled(WorkflowDef workflowDefinition, Workflow workflow,
WorkflowTask taskToSchedule, int retryCount, String retriedTaskId) {
+ Workflow workflowInstance = populateWorkflowAndTaskData(workflow);
Map input = parametersUtils.getTaskInput(taskToSchedule.getInputParameters(),
workflowInstance, null, null);
@@ -461,7 +522,7 @@ public List getTasksToBeScheduled(WorkflowDef workflowDefinition, Workflow
// get in progress tasks for this workflow instance
List inProgressTasks = workflowInstance.getTasks().stream()
- .filter(pendingTask -> pendingTask.getStatus().equals(Status.IN_PROGRESS))
+ .filter(runningTask -> runningTask.getStatus().equals(Status.IN_PROGRESS))
.map(Task::getReferenceTaskName)
.collect(Collectors.toList());
@@ -484,12 +545,13 @@ public List getTasksToBeScheduled(WorkflowDef workflowDefinition, Workflow
// for static forks, each branch of the fork creates a join task upon completion
// for dynamic forks, a join task is created with the fork and also with each branch of the fork
// a new task must only be scheduled if a task with the same reference name is not in progress for this workflow instance
- return taskMappers.get(taskType.name()).getMappedTasks(taskMapperContext).stream()
+ List tasks = taskMappers.get(taskType.name()).getMappedTasks(taskMapperContext).stream()
.filter(task -> !inProgressTasks.contains(task.getReferenceTaskName()))
.collect(Collectors.toList());
+ tasks.forEach(task -> externalPayloadStorageUtils.verifyAndUpload(task, ExternalPayloadStorage.PayloadType.TASK_INPUT));
+ return tasks;
}
-
private boolean isTaskSkipped(WorkflowTask taskToSchedule, Workflow workflow) {
try {
boolean retval = false;
@@ -505,11 +567,9 @@ private boolean isTaskSkipped(WorkflowTask taskToSchedule, Workflow workflow) {
} catch (Exception e) {
throw new TerminateWorkflowException(e.getMessage());
}
-
}
-
- public static class DeciderOutcome {
+ static class DeciderOutcome {
List tasksToBeScheduled = new LinkedList<>();
@@ -521,6 +581,5 @@ public static class DeciderOutcome {
private DeciderOutcome() {
}
-
}
}
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java b/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java
index 2530ef4fac..7326f501e6 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -36,13 +36,11 @@
import java.util.Optional;
/**
- *
- *
- *
+ * Used to parse and resolve the JSONPath bindings in the workflow and task definitions.
*/
public class ParametersUtils {
- private ObjectMapper om = new ObjectMapper();
+ private ObjectMapper objectMapper = new ObjectMapper();
private TypeReference