diff --git a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java index 73c4a444a0..cba8dc0d7f 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java @@ -117,15 +117,6 @@ public WorkflowDef getWorkflowDef(String name, Integer version) { return getForEntity("metadata/workflow/{name}", new Object[]{"version", version}, WorkflowDef.class, name); } - /** - * @deprecated This API is deprecated and will be removed in the next version - * This API can return 503 for a large number of workflow definitions because of no pagination. - */ - @Deprecated - public List getAllWorkflowDefs() { - return getForEntity("metadata/workflow", null, workflowDefList); - } - /** * Removes the workflow definition of a workflow from the conductor server. * It does not remove associated workflows. Use with caution. @@ -161,15 +152,6 @@ public void updateTaskDef(TaskDef taskDef) { put("metadata/taskdefs", null, taskDef); } - /** - * @deprecated This API is deprecated and will be removed in the next version - * This API can return 503 for a large number of workflow definitions because of no pagination. - */ - @Deprecated - public List getAllTaskDefs() { - return getForEntity("metadata/taskdefs", null, taskDefList); - } - /** * Retrieve the task definition of a given task type * diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 970de846c1..923119bc79 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -22,7 +22,6 @@ import com.netflix.conductor.client.task.WorkflowTaskMetrics; import com.netflix.conductor.common.metadata.tasks.PollData; import com.netflix.conductor.common.metadata.tasks.Task; -import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; import com.netflix.conductor.common.run.SearchResult; @@ -53,9 +52,6 @@ public class TaskClient extends ClientBase { private static GenericType> taskList = new GenericType>() { }; - private static GenericType> taskDefList = new GenericType>() { - }; - private static GenericType> taskExecLogList = new GenericType>() { }; @@ -65,6 +61,9 @@ public class TaskClient extends ClientBase { private static GenericType> searchResultTaskSummary = new GenericType>() { }; + private static GenericType> queueSizeMap = new GenericType>() { + }; + private static final Logger logger = LoggerFactory.getLogger(TaskClient.class); /** @@ -130,15 +129,6 @@ public Task pollTask(String taskType, String workerId, String domain) { return task; } - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link #batchPollTasksByTaskType(String, String, int, int)} instead - */ - @Deprecated - public List poll(String taskType, String workerId, int count, int timeoutInMillisecond) { - return batchPollTasksByTaskType(taskType, workerId, count, timeoutInMillisecond); - } - /** * Perform a batch poll for tasks by task type. Batch size is configurable by count. * @@ -159,15 +149,6 @@ public List batchPollTasksByTaskType(String taskType, String workerId, int return tasks; } - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link #batchPollTasksInDomain(String, String, String, int, int)} instead - */ - @Deprecated - public List poll(String taskType, String domain, String workerId, int count, int timeoutInMillisecond) { - return batchPollTasksInDomain(taskType, domain, workerId, count, timeoutInMillisecond); - } - /** * Batch poll for tasks in a domain. Batch size is configurable by count. * @@ -202,15 +183,6 @@ private void populateTaskInput(Task task) { } } - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link #getPendingTasksByType(String, String, Integer)} instead - */ - @Deprecated - public List getTasks(String taskType, String startKey, Integer count) { - return getPendingTasksByType(taskType, startKey, count); - } - /** * Retrieve pending tasks by type * @@ -294,15 +266,6 @@ public Boolean ack(String taskId, String workerId) { return Boolean.valueOf(response); } - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link #logMessageForTask(String, String)} instead - */ - @Deprecated - public void log(String taskId, String logMessage) { - logMessageForTask(taskId, logMessage); - } - /** * Log execution messages for a task. * @@ -324,15 +287,6 @@ public List getTaskLogs(String taskId) { return getForEntity("tasks/{taskId}/log", null, taskExecLogList, taskId); } - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link #getTaskDetails(String)} instead - */ - @Deprecated - public Task get(String taskId) { - return getTaskDetails(taskId); - } - /** * Retrieve information about the task * @@ -360,9 +314,9 @@ public void removeTaskFromQueue(String taskType, String taskId) { public int getQueueSizeForTask(String taskType) { Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank"); - Map queueSizeMap = getForEntity("tasks/queue/sizes", new Object[]{"taskType", taskType}, Map.class); - if (queueSizeMap.keySet().contains(taskType)) { - return queueSizeMap.get(taskType); + Map taskTypeToQueueSizeMap = getForEntity("tasks/queue/sizes", new Object[]{"taskType", taskType}, queueSizeMap); + if (taskTypeToQueueSizeMap.containsKey(taskType)) { + return taskTypeToQueueSizeMap.get(taskType); } return 0; } @@ -432,43 +386,4 @@ public SearchResult search(Integer start, Integer size, String sort Object[] params = new Object[]{"start", start, "size", size, "sort", sort, "freeText", freeText, "query", query}; return getForEntity("tasks/search", params, searchResultTaskSummary); } - - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link MetadataClient#getAllTaskDefs()} instead - */ - @Deprecated - public List getTaskDef() { - return getForEntity("metadata/taskdefs", null, taskDefList); - } - - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link MetadataClient#getTaskDef(String)} instead - */ - @Deprecated - public TaskDef getTaskDef(String taskType) { - Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank"); - return getForEntity("metadata/taskdefs/{tasktype}", null, TaskDef.class, taskType); - } - - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link MetadataClient#unregisterTaskDef(String)} instead - */ - @Deprecated - public void unregisterTaskDef(String taskType) { - Preconditions.checkArgument(StringUtils.isNotBlank(taskType), "Task type cannot be blank"); - delete("metadata/taskdefs/{tasktype}", taskType); - } - - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link MetadataClient#registerTaskDefs(List)} instead - */ - @Deprecated - public void registerTaskDefs(List taskDefs) { - Preconditions.checkNotNull(taskDefs, "Task defs cannot be null"); - postForEntityWithRequestOnly("metadata/taskdefs", taskDefs); - } } diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 83cb51bf55..803c9403a1 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -22,7 +22,6 @@ import com.netflix.conductor.client.task.WorkflowTaskMetrics; import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; -import com.netflix.conductor.common.metadata.workflow.WorkflowDef; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowSummary; @@ -39,7 +38,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.List; -import java.util.Map; /** @@ -47,9 +45,6 @@ */ public class WorkflowClient extends ClientBase { - private static GenericType> workflowDefList = new GenericType>() { - }; - private static GenericType> searchResultWorkflowSummary = new GenericType>() { }; @@ -99,63 +94,6 @@ public WorkflowClient(ClientConfig config, ConductorClientConfiguration clientCo } } - - //Metadata Operations - - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link MetadataClient#getAllWorkflowDefs()} instead - */ - @Deprecated - public List getAllWorkflowDefs() { - return getForEntity("metadata/workflow", null, workflowDefList); - } - - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link MetadataClient#registerWorkflowDef(WorkflowDef)} instead - */ - @Deprecated - public void registerWorkflow(WorkflowDef workflowDef) { - Preconditions.checkNotNull(workflowDef, "Worfklow definition cannot be null"); - postForEntityWithRequestOnly("metadata/workflow", workflowDef); - } - - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link MetadataClient#getWorkflowDef(String, Integer)} instead - */ - @Deprecated - public WorkflowDef getWorkflowDef(String name, Integer version) { - Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot be blank"); - return getForEntity("metadata/workflow/{name}", new Object[]{"version", version}, WorkflowDef.class, name); - } - - - //Runtime Operations - - /** - * Starts a workflow identified by the name and version - * - * @param name the name of the workflow - * @param version the version of the workflow def - * @param correlationId the correlation id - * @param input the input to set in the workflow - * @return the id of the workflow instance that can be used for tracking - * @deprecated This API is deprecated and will be removed in the next version - * use {@link #startWorkflow(StartWorkflowRequest)} instead - */ - @Deprecated - public String startWorkflow(String name, Integer version, String correlationId, Map input) { - Preconditions.checkArgument(StringUtils.isNotBlank(name), "name cannot be blank"); - StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest(); - startWorkflowRequest.setName(name); - startWorkflowRequest.setVersion(version); - startWorkflowRequest.setCorrelationId(correlationId); - startWorkflowRequest.setInput(input); - return startWorkflow(startWorkflowRequest); - } - /** * Starts a workflow. * If the size of the workflow input payload is bigger than {@link ConductorClientConfiguration#getWorkflowInputPayloadThresholdKB()}, @@ -196,16 +134,6 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { return postForEntity("workflow", startWorkflowRequest, null, String.class, startWorkflowRequest.getName()); } - /** - * @deprecated This API is deprecated and will be removed in the next version - * use {@link #getWorkflow(String, boolean)} instead - */ - @Deprecated - public Workflow getExecutionStatus(String workflowId, boolean includeTasks) { - Preconditions.checkArgument(StringUtils.isNotBlank(workflowId), "workflow id cannot be blank"); - return getWorkflow(workflowId, includeTasks); - } - /** * Retrieve a workflow by workflow id * diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java b/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java index 5392b59763..4ed7cb56d3 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/events/EventHandler.java @@ -132,22 +132,24 @@ public void setActive(boolean active) { public static class Action { @ProtoEnum - public enum Type { START_WORKFLOW, COMPLETE_TASK, FAIL_TASK } + public enum Type { + start_workflow, complete_task, fail_task + } @ProtoField(id = 1) private Type action; @ProtoField(id = 2) - private StartWorkflow startWorkflow; + private StartWorkflow start_workflow; @ProtoField(id = 3) - private TaskDetails completeTask; + private TaskDetails complete_task; @ProtoField(id = 4) - private TaskDetails failTask; + private TaskDetails fail_task; @ProtoField(id = 5) - private boolean expandInlineJson; + private boolean expandInlineJSON; /** * @return the action @@ -165,64 +167,64 @@ public void setAction(Type action) { } /** - * @return the startWorkflow + * @return the start_workflow */ - public StartWorkflow getStartWorkflow() { - return startWorkflow; + public StartWorkflow getStart_workflow() { + return start_workflow; } /** - * @param startWorkflow the startWorkflow to set + * @param start_workflow the start_workflow to set * */ - public void setStartWorkflow(StartWorkflow startWorkflow) { - this.startWorkflow = startWorkflow; + public void setStart_workflow(StartWorkflow start_workflow) { + this.start_workflow = start_workflow; } /** - * @return the completeTask + * @return the complete_task */ - public TaskDetails getCompleteTask() { - return completeTask; + public TaskDetails getComplete_task() { + return complete_task; } /** - * @param completeTask the completeTask to set + * @param complete_task the complete_task to set * */ - public void setCompleteTask(TaskDetails completeTask) { - this.completeTask = completeTask; + public void setComplete_task(TaskDetails complete_task) { + this.complete_task = complete_task; } /** - * @return the failTask + * @return the fail_task */ - public TaskDetails getFailTask() { - return failTask; + public TaskDetails getFail_task() { + return fail_task; } /** - * @param failTask the failTask to set + * @param fail_task the fail_task to set * */ - public void setFailTask(TaskDetails failTask) { - this.failTask = failTask; + public void setFail_task(TaskDetails fail_task) { + this.fail_task = fail_task; } /** * - * @param expandInlineJson when set to true, the in-lined JSON strings are expanded to a full json document + * @param expandInlineJSON when set to true, the in-lined JSON strings are expanded to a full json document */ - public void setExpandInlineJson(boolean expandInlineJson) { - this.expandInlineJson = expandInlineJson; + public void setExpandInlineJSON(boolean expandInlineJSON) { + this.expandInlineJSON = expandInlineJSON; } /** * * @return true if the json strings within the payload should be expanded. */ - public boolean isExpandInlineJson() { - return expandInlineJson; + public boolean isExpandInlineJSON() { + return expandInlineJSON; } } diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java index 786415ee69..866a3a740f 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/StartWorkflowRequest.java @@ -1,3 +1,15 @@ +/* + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ package com.netflix.conductor.common.metadata.workflow; import com.github.vmg.protogen.annotations.ProtoField; @@ -66,10 +78,13 @@ public StartWorkflowRequest withCorrelationId(String correlationId) { public String getExternalInputPayloadStoragePath() { return externalInputPayloadStoragePath; } - public void setExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) { this.externalInputPayloadStoragePath = externalInputPayloadStoragePath; } + public StartWorkflowRequest withExternalInputPayloadStoragePath(String externalInputPayloadStoragePath) { + this.externalInputPayloadStoragePath = externalInputPayloadStoragePath; + return this; + } public Map getInput() { return input; @@ -96,11 +111,9 @@ public StartWorkflowRequest withTaskToDomain(Map taskToDomain) { public WorkflowDef getWorkflowDef() { return workflowDef; } - public void setWorkflowDef(WorkflowDef workflowDef) { this.workflowDef = workflowDef; } - public StartWorkflowRequest withWorkflowDef(WorkflowDef workflowDef) { this.workflowDef = workflowDef; return this; diff --git a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java index f7940e75df..0010663a5c 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java +++ b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java @@ -43,6 +43,7 @@ import com.netflix.conductor.core.execution.tasks.SubWorkflow; import com.netflix.conductor.core.execution.tasks.SystemTaskWorkerCoordinator; import com.netflix.conductor.core.execution.tasks.Wait; +import com.netflix.conductor.core.utils.JsonUtils; import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.dao.QueueDAO; @@ -83,6 +84,12 @@ public ParametersUtils getParameterUtils() { return new ParametersUtils(); } + @Provides + @Singleton + public JsonUtils getJsonUtils() { + return new JsonUtils(); + } + @ProvidesIntoMap @StringMapKey(CONDUCTOR_QUALIFIER) @Singleton diff --git a/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java index c0f48f3633..82480b0330 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/ActionProcessor.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,12 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/** - * - */ package com.netflix.conductor.core.events; -import com.google.common.annotations.VisibleForTesting; import com.netflix.conductor.common.metadata.events.EventHandler.Action; import com.netflix.conductor.common.metadata.events.EventHandler.StartWorkflow; import com.netflix.conductor.common.metadata.events.EventHandler.TaskDetails; @@ -41,7 +37,6 @@ /** * @author Viren * Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start workflow etc) - *

Warning: This is a work in progress and may be changed in future. Not ready for production yet. */ @Singleton public class ActionProcessor { @@ -49,12 +44,13 @@ public class ActionProcessor { private final WorkflowExecutor executor; private final ParametersUtils parametersUtils; - private final JsonUtils jsonUtils = new JsonUtils(); + private final JsonUtils jsonUtils; @Inject - public ActionProcessor(WorkflowExecutor executor, ParametersUtils parametersUtils) { + public ActionProcessor(WorkflowExecutor executor, ParametersUtils parametersUtils, JsonUtils jsonUtils) { this.executor = executor; this.parametersUtils = parametersUtils; + this.jsonUtils = jsonUtils; } public Map execute(Action action, Object payloadObject, String event, String messageId) { @@ -62,25 +58,24 @@ public Map execute(Action action, Object payloadObject, String e logger.debug("Executing action: {} for event: {} with messageId:{}", action.getAction(), event, messageId); Object jsonObject = payloadObject; - if (action.isExpandInlineJson()) { + if (action.isExpandInlineJSON()) { jsonObject = jsonUtils.expand(payloadObject); } switch (action.getAction()) { - case START_WORKFLOW: + case start_workflow: return startWorkflow(action, jsonObject, event, messageId); - case COMPLETE_TASK: - return completeTask(action, jsonObject, action.getCompleteTask(), Status.COMPLETED, event, messageId); - case FAIL_TASK: - return completeTask(action, jsonObject, action.getFailTask(), Status.FAILED, event, messageId); + case complete_task: + return completeTask(action, jsonObject, action.getComplete_task(), Status.COMPLETED, event, messageId); + case fail_task: + return completeTask(action, jsonObject, action.getFail_task(), Status.FAILED, event, messageId); default: break; } throw new UnsupportedOperationException("Action not supported " + action.getAction() + " for event " + event); } - @VisibleForTesting - Map completeTask(Action action, Object payload, TaskDetails taskDetails, Status status, String event, String messageId) { + private Map completeTask(Action action, Object payload, TaskDetails taskDetails, Status status, String event, String messageId) { Map input = new HashMap<>(); input.put("workflowId", taskDetails.getWorkflowId()); @@ -127,7 +122,7 @@ Map completeTask(Action action, Object payload, TaskDetails task } private Map startWorkflow(Action action, Object payload, String event, String messageId) { - StartWorkflow params = action.getStartWorkflow(); + StartWorkflow params = action.getStart_workflow(); Map output = new HashMap<>(); try { Map inputParams = params.getInput(); @@ -135,7 +130,7 @@ private Map startWorkflow(Action action, Object payload, String workflowInput.put("conductor.event.messageId", messageId); workflowInput.put("conductor.event.name", event); - String id = executor.startWorkflow(params.getName(), params.getVersion(), params.getCorrelationId(), workflowInput, event); + String id = executor.startWorkflow(params.getName(), params.getVersion(), params.getCorrelationId(), workflowInput, null, event); output.put("workflowId", id); } catch (RuntimeException e) { diff --git a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java index 449a238047..050dfca16b 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,9 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/** - * - */ package com.netflix.conductor.core.events; import com.fasterxml.jackson.databind.ObjectMapper; @@ -75,15 +72,16 @@ public class EventProcessor { private ExecutorService executorService; private final Map eventToQueueMap = new ConcurrentHashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); - private final JsonUtils jsonUtils = new JsonUtils(); + private final JsonUtils jsonUtils; @Inject public EventProcessor(ExecutionService executionService, MetadataService metadataService, - ActionProcessor actionProcessor, EventQueues eventQueues, Configuration config) { + ActionProcessor actionProcessor, EventQueues eventQueues, JsonUtils jsonUtils, Configuration config) { this.executionService = executionService; this.metadataService = metadataService; this.actionProcessor = actionProcessor; this.eventQueues = eventQueues; + this.jsonUtils = jsonUtils; int executorThreadCount = config.getIntProperty("workflow.event.processor.thread.count", 2); if (executorThreadCount > 0) { diff --git a/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java b/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java index 74c4c8b8c5..b8cccd8eae 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/ParametersUtils.java @@ -159,7 +159,7 @@ public Object replace(String paramString) { private Map replace(Map input, DocumentContext documentContext, String taskId) { for (Entry e : input.entrySet()) { Object value = e.getValue(); - if (value instanceof String || value instanceof Number) { + if (value instanceof String) { Object replaced = replaceVariables(value.toString(), documentContext, taskId); e.setValue(replaced); } else if (value instanceof Map) { diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestActionProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestActionProcessor.java index bfa1751140..43551636e5 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestActionProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestActionProcessor.java @@ -1,14 +1,20 @@ +/* + * Copyright 2017 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.netflix.conductor.core.events; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.conductor.common.metadata.events.EventHandler.Action; import com.netflix.conductor.common.metadata.events.EventHandler.Action.Type; @@ -21,11 +27,22 @@ import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.core.execution.ParametersUtils; import com.netflix.conductor.core.execution.WorkflowExecutor; -import java.util.Map; +import com.netflix.conductor.core.utils.JsonUtils; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + public class TestActionProcessor { private WorkflowExecutor workflowExecutor; private ActionProcessor actionProcessor; @@ -34,9 +51,10 @@ public class TestActionProcessor { public void setup() { workflowExecutor = mock(WorkflowExecutor.class); - actionProcessor = new ActionProcessor(workflowExecutor, new ParametersUtils()); + actionProcessor = new ActionProcessor(workflowExecutor, new ParametersUtils(), new JsonUtils()); } + @SuppressWarnings("unchecked") @Test public void testStartWorkflow() throws Exception { StartWorkflow startWorkflow = new StartWorkflow(); @@ -44,8 +62,8 @@ public void testStartWorkflow() throws Exception { startWorkflow.getInput().put("testInput", "${testId}"); Action action = new Action(); - action.setAction(Type.START_WORKFLOW); - action.setStartWorkflow(startWorkflow); + action.setAction(Type.start_workflow); + action.setStart_workflow(startWorkflow); Object payload = new ObjectMapper().readValue("{\"testId\":\"test_1\"}", Object.class); @@ -53,8 +71,8 @@ public void testStartWorkflow() throws Exception { workflowDef.setName("testWorkflow"); workflowDef.setVersion(1); - when(workflowExecutor.startWorkflow(eq("testWorkflow"), eq(null), any(), any(), eq("testEvent"))) - .thenReturn("workflow_1"); + when(workflowExecutor.startWorkflow(eq("testWorkflow"), eq(null), any(), any(), any(), eq("testEvent"))) + .thenReturn("workflow_1"); Map output = actionProcessor.execute(action, payload, "testEvent", "testMessage"); @@ -62,7 +80,7 @@ public void testStartWorkflow() throws Exception { assertEquals("workflow_1", output.get("workflowId")); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Map.class); - verify(workflowExecutor).startWorkflow(eq("testWorkflow"), eq(null), any(), argumentCaptor.capture(), eq("testEvent")); + verify(workflowExecutor).startWorkflow(eq("testWorkflow"), eq(null), any(), argumentCaptor.capture(), any(), eq("testEvent")); assertEquals("test_1", argumentCaptor.getValue().get("testInput")); assertEquals("testMessage", argumentCaptor.getValue().get("conductor.event.messageId")); assertEquals("testEvent", argumentCaptor.getValue().get("conductor.event.name")); @@ -75,8 +93,8 @@ public void testCompleteTask() throws Exception { taskDetails.setTaskRefName("testTask"); Action action = new Action(); - action.setAction(Type.COMPLETE_TASK); - action.setCompleteTask(taskDetails); + action.setAction(Type.complete_task); + action.setComplete_task(taskDetails); Object payload = new ObjectMapper().readValue("{\"workflowId\":\"workflow_1\"}", Object.class); @@ -105,8 +123,8 @@ public void testCompleteTaskByTaskId() throws Exception { taskDetails.setTaskId("${taskId}"); Action action = new Action(); - action.setAction(Type.COMPLETE_TASK); - action.setCompleteTask(taskDetails); + action.setAction(Type.complete_task); + action.setComplete_task(taskDetails); Object payload = new ObjectMapper().readValue("{\"workflowId\":\"workflow_1\", \"taskId\":\"task_1\"}", Object.class); diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestEventProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestEventProcessor.java index 2f7e78957d..dc97efdaf7 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestEventProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestEventProcessor.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,9 +13,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/** - * - */ package com.netflix.conductor.core.events; import com.google.common.util.concurrent.Uninterruptibles; @@ -34,6 +31,7 @@ import com.netflix.conductor.core.execution.ParametersUtils; import com.netflix.conductor.core.execution.TestConfiguration; import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.core.utils.JsonUtils; import com.netflix.conductor.service.ExecutionService; import com.netflix.conductor.service.MetadataService; import org.junit.Before; @@ -50,9 +48,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static junit.framework.Assert.assertNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atMost; @@ -77,6 +75,7 @@ public class TestEventProcessor { private ActionProcessor actionProcessor; private EventQueues eventQueues; private ParametersUtils parametersUtils; + private JsonUtils jsonUtils; @Before public void setup() { @@ -88,6 +87,7 @@ public void setup() { workflowExecutor = mock(WorkflowExecutor.class); actionProcessor = mock(ActionProcessor.class); parametersUtils = new ParametersUtils(); + jsonUtils = new JsonUtils(); EventQueueProvider provider = mock(EventQueueProvider.class); queue = mock(ObservableQueue.class); @@ -114,18 +114,18 @@ public void testEventProcessor() { eventHandler.setActive(true); Action startWorkflowAction = new Action(); - startWorkflowAction.setAction(Type.START_WORKFLOW); - startWorkflowAction.setStartWorkflow(new StartWorkflow()); - startWorkflowAction.getStartWorkflow().setName("workflow_x"); - startWorkflowAction.getStartWorkflow().setVersion(1); + startWorkflowAction.setAction(Type.start_workflow); + startWorkflowAction.setStart_workflow(new StartWorkflow()); + startWorkflowAction.getStart_workflow().setName("workflow_x"); + startWorkflowAction.getStart_workflow().setVersion(1); eventHandler.getActions().add(startWorkflowAction); Action completeTaskAction = new Action(); - completeTaskAction.setAction(Type.COMPLETE_TASK); - completeTaskAction.setCompleteTask(new TaskDetails()); - completeTaskAction.getCompleteTask().setTaskRefName("task_x"); - completeTaskAction.getCompleteTask().setWorkflowId(UUID.randomUUID().toString()); - completeTaskAction.getCompleteTask().setOutput(new HashMap<>()); + completeTaskAction.setAction(Type.complete_task); + completeTaskAction.setComplete_task(new TaskDetails()); + completeTaskAction.getComplete_task().setTaskRefName("task_x"); + completeTaskAction.getComplete_task().setWorkflowId(UUID.randomUUID().toString()); + completeTaskAction.getComplete_task().setOutput(new HashMap<>()); eventHandler.getActions().add(completeTaskAction); eventHandler.setEvent(event); @@ -140,7 +140,7 @@ public void testEventProcessor() { doAnswer((Answer) invocation -> { started.set(true); return id; - }).when(workflowExecutor).startWorkflow(startWorkflowAction.getStartWorkflow().getName(), startWorkflowAction.getStartWorkflow().getVersion(), startWorkflowAction.getStartWorkflow().getCorrelationId(), startWorkflowAction.getStartWorkflow().getInput(), event); + }).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event); AtomicBoolean completed = new AtomicBoolean(false); doAnswer((Answer) invocation -> { @@ -149,19 +149,19 @@ public void testEventProcessor() { }).when(workflowExecutor).updateTask(any()); Task task = new Task(); - task.setReferenceTaskName(completeTaskAction.getCompleteTask().getTaskRefName()); + task.setReferenceTaskName(completeTaskAction.getComplete_task().getTaskRefName()); Workflow workflow = new Workflow(); workflow.setTasks(Collections.singletonList(task)); - when(workflowExecutor.getWorkflow(completeTaskAction.getCompleteTask().getWorkflowId(), true)).thenReturn(workflow); + when(workflowExecutor.getWorkflow(completeTaskAction.getComplete_task().getWorkflowId(), true)).thenReturn(workflow); WorkflowDef workflowDef = new WorkflowDef(); - workflowDef.setVersion(startWorkflowAction.getStartWorkflow().getVersion()); - workflowDef.setName(startWorkflowAction.getStartWorkflow().getName()); + workflowDef.setVersion(startWorkflowAction.getStart_workflow().getVersion()); + workflowDef.setName(startWorkflowAction.getStart_workflow().getName()); when(metadataService.getWorkflowDef(any(), any())).thenReturn(workflowDef); - ActionProcessor actionProcessor = new ActionProcessor(workflowExecutor, parametersUtils); + ActionProcessor actionProcessor = new ActionProcessor(workflowExecutor, parametersUtils, jsonUtils); - EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, new TestConfiguration()); + EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration()); assertNotNull(eventProcessor.getQueues()); assertEquals(1, eventProcessor.getQueues().size()); @@ -192,12 +192,12 @@ public void testEventHandlerWithCondition() { startWorkflowInput.put("param2", "SQS-${MessageId}"); Action startWorkflowAction = new Action(); - startWorkflowAction.setAction(Type.START_WORKFLOW); - startWorkflowAction.setStartWorkflow(new StartWorkflow()); - startWorkflowAction.getStartWorkflow().setName("cms_artwork_automation"); - startWorkflowAction.getStartWorkflow().setVersion(1); - startWorkflowAction.getStartWorkflow().setInput(startWorkflowInput); - startWorkflowAction.setExpandInlineJson(true); + startWorkflowAction.setAction(Type.start_workflow); + startWorkflowAction.setStart_workflow(new StartWorkflow()); + startWorkflowAction.getStart_workflow().setName("cms_artwork_automation"); + startWorkflowAction.getStart_workflow().setVersion(1); + startWorkflowAction.getStart_workflow().setInput(startWorkflowInput); + startWorkflowAction.setExpandInlineJSON(true); eventHandler.getActions().add(startWorkflowAction); eventHandler.setEvent(event); @@ -212,15 +212,15 @@ public void testEventHandlerWithCondition() { doAnswer((Answer) invocation -> { started.set(true); return id; - }).when(workflowExecutor).startWorkflow(startWorkflowAction.getStartWorkflow().getName(), startWorkflowAction.getStartWorkflow().getVersion(), startWorkflowAction.getStartWorkflow().getCorrelationId(), startWorkflowAction.getStartWorkflow().getInput(), event); + }).when(workflowExecutor).startWorkflow(startWorkflowAction.getStart_workflow().getName(), startWorkflowAction.getStart_workflow().getVersion(), startWorkflowAction.getStart_workflow().getCorrelationId(), startWorkflowAction.getStart_workflow().getInput(), null, event); WorkflowDef workflowDef = new WorkflowDef(); - workflowDef.setName(startWorkflowAction.getStartWorkflow().getName()); + workflowDef.setName(startWorkflowAction.getStart_workflow().getName()); when(metadataService.getWorkflowDef(any(), any())).thenReturn(workflowDef); - ActionProcessor actionProcessor = new ActionProcessor(workflowExecutor, parametersUtils); + ActionProcessor actionProcessor = new ActionProcessor(workflowExecutor, parametersUtils, jsonUtils); - EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, new TestConfiguration()); + EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration()); assertNotNull(eventProcessor.getQueues()); assertEquals(1, eventProcessor.getQueues().size()); @@ -236,11 +236,11 @@ public void testEventProcessorWithRetriableError() { eventHandler.setEvent(event); Action completeTaskAction = new Action(); - completeTaskAction.setAction(Type.COMPLETE_TASK); - completeTaskAction.setCompleteTask(new TaskDetails()); - completeTaskAction.getCompleteTask().setTaskRefName("task_x"); - completeTaskAction.getCompleteTask().setWorkflowId(UUID.randomUUID().toString()); - completeTaskAction.getCompleteTask().setOutput(new HashMap<>()); + completeTaskAction.setAction(Type.complete_task); + completeTaskAction.setComplete_task(new TaskDetails()); + completeTaskAction.getComplete_task().setTaskRefName("task_x"); + completeTaskAction.getComplete_task().setWorkflowId(UUID.randomUUID().toString()); + completeTaskAction.getComplete_task().setOutput(new HashMap<>()); eventHandler.getActions().add(completeTaskAction); when(queue.rePublishIfNoAck()).thenReturn(false); @@ -249,7 +249,7 @@ public void testEventProcessorWithRetriableError() { when(executionService.addEventExecution(any())).thenReturn(true); when(actionProcessor.execute(any(), any(), any(), any())).thenThrow(new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "some retriable error")); - EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, new TestConfiguration()); + EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration()); assertNotNull(eventProcessor.getQueues()); assertEquals(1, eventProcessor.getQueues().size()); @@ -267,11 +267,11 @@ public void testEventProcessorWithNonRetriableError() { eventHandler.setEvent(event); Action completeTaskAction = new Action(); - completeTaskAction.setAction(Type.COMPLETE_TASK); - completeTaskAction.setCompleteTask(new TaskDetails()); - completeTaskAction.getCompleteTask().setTaskRefName("task_x"); - completeTaskAction.getCompleteTask().setWorkflowId(UUID.randomUUID().toString()); - completeTaskAction.getCompleteTask().setOutput(new HashMap<>()); + completeTaskAction.setAction(Type.complete_task); + completeTaskAction.setComplete_task(new TaskDetails()); + completeTaskAction.getComplete_task().setTaskRefName("task_x"); + completeTaskAction.getComplete_task().setWorkflowId(UUID.randomUUID().toString()); + completeTaskAction.getComplete_task().setOutput(new HashMap<>()); eventHandler.getActions().add(completeTaskAction); when(metadataService.getEventHandlers()).thenReturn(Collections.singletonList(eventHandler)); @@ -280,7 +280,7 @@ public void testEventProcessorWithNonRetriableError() { when(actionProcessor.execute(any(), any(), any(), any())).thenThrow(new ApplicationException(ApplicationException.Code.INVALID_INPUT, "some non-retriable error")); - EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, new TestConfiguration()); + EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration()); assertNotNull(eventProcessor.getQueues()); assertEquals(1, eventProcessor.getQueues().size()); @@ -299,7 +299,7 @@ public void testExecuteInvalidAction() { throw new UnsupportedOperationException("error"); }).when(actionProcessor).execute(any(), any(), any(), any()); - EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, new TestConfiguration()); + EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration()); EventExecution eventExecution = new EventExecution("id", "messageId"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); eventExecution.setEvent("event"); @@ -319,12 +319,12 @@ public void testExecuteNonRetriableApplicationException() { throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, "some non-retriable error"); }).when(actionProcessor).execute(any(), any(), any(), any()); - EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, new TestConfiguration()); + EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration()); EventExecution eventExecution = new EventExecution("id", "messageId"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); eventExecution.setEvent("event"); Action action = new Action(); - action.setAction(Type.START_WORKFLOW); + action.setAction(Type.start_workflow); eventProcessor.execute(eventExecution, action, "payload"); assertEquals(1, executeInvoked.get()); @@ -340,12 +340,12 @@ public void testExecuteRetriableApplicationException() { throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "some retriable error"); }).when(actionProcessor).execute(any(), any(), any(), any()); - EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, new TestConfiguration()); + EventProcessor eventProcessor = new EventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration()); EventExecution eventExecution = new EventExecution("id", "messageId"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); eventExecution.setEvent("event"); Action action = new Action(); - action.setAction(Type.START_WORKFLOW); + action.setAction(Type.start_workflow); eventProcessor.execute(eventExecution, action, "payload"); assertEquals(3, executeInvoked.get()); diff --git a/docs/docs/domains/index.md b/docs/docs/domains/index.md index 036811d95f..6e6ec4498c 100644 --- a/docs/docs/domains/index.md +++ b/docs/docs/domains/index.md @@ -41,7 +41,7 @@ When starting the workflow, make sure the task to domain mapping is passes .withInput(input) .withTaskToDomain(taskToDomain); - wfclient.startWorkflow(swr); + wfclient.start_workflow(swr); ``` diff --git a/docs/docs/events/index.md b/docs/docs/events/index.md index 25351d468b..9b83552e2d 100644 --- a/docs/docs/events/index.md +++ b/docs/docs/events/index.md @@ -66,8 +66,8 @@ Given the following payload in the message: ```json { - "action": "startWorkflow", - "startWorkflow": { + "action": "start_workflow", + "start_workflow": { "name": "WORKFLOW_NAME", "version": "input": { @@ -81,15 +81,15 @@ Given the following payload in the message: ```json { - "action": "completeTask", - "completeTask": { + "action": "complete_task", + "complete_task": { "workflowId": "${source.externalId.workflowId}", "taskRefName": "task_1", "output": { "response": "${source.result}" } }, - "expandInlineJson": true + "expandInlineJSON": true } ``` @@ -97,21 +97,21 @@ Given the following payload in the message: ```json { - "action": "failTask", - "failTask": { + "action": "fail_task", + "fail_task": { "workflowId": "${source.externalId.workflowId}", "taskRefName": "task_1", "output": { "response": "${source.result}" } }, - "expandInlineJson": true + "expandInlineJSON": true } ``` Input for starting a workflow and output when completing / failing task follows the same [expressions](/metadata/#wiring-inputs-and-outputs) used for wiring workflow inputs. !!!info "Expanding stringified JSON elements in payload" - `expandInlineJson` property, when set to true will expand the inlined stringified JSON elements in the payload to JSON documents and replace the string value with JSON document. + `expandInlineJSON` property, when set to true will expand the inlined stringified JSON elements in the payload to JSON documents and replace the string value with JSON document. This feature allows such elements to be used with JSON path expressions. ## Extending diff --git a/docs/docs/runtime/index.md b/docs/docs/runtime/index.md index d14030e7d6..eef16b7669 100644 --- a/docs/docs/runtime/index.md +++ b/docs/docs/runtime/index.md @@ -45,7 +45,7 @@ JSON for start workflow request { "name": "myWorkflow", // Name of the workflow "version": 1, // Version - “correlatond”: “corr1” // correlation Id + “correlationId”: “corr1”, // correlation Id "input": { // Input map. }, diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 74fdeb3e49..978401accf 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -51,6 +51,55 @@ @Generated("com.github.vmg.protogen.ProtoGen") public abstract class AbstractProtoMapper { + public DynamicForkJoinTaskPb.DynamicForkJoinTask toProto(DynamicForkJoinTask from) { + DynamicForkJoinTaskPb.DynamicForkJoinTask.Builder to = DynamicForkJoinTaskPb.DynamicForkJoinTask.newBuilder(); + if (from.getTaskName() != null) { + to.setTaskName( from.getTaskName() ); + } + if (from.getWorkflowName() != null) { + to.setWorkflowName( from.getWorkflowName() ); + } + if (from.getReferenceName() != null) { + to.setReferenceName( from.getReferenceName() ); + } + for (Map.Entry pair : from.getInput().entrySet()) { + to.putInput( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.getType() != null) { + to.setType( from.getType() ); + } + return to.build(); + } + + public DynamicForkJoinTask fromProto(DynamicForkJoinTaskPb.DynamicForkJoinTask from) { + DynamicForkJoinTask to = new DynamicForkJoinTask(); + to.setTaskName( from.getTaskName() ); + to.setWorkflowName( from.getWorkflowName() ); + to.setReferenceName( from.getReferenceName() ); + Map inputMap = new HashMap(); + for (Map.Entry pair : from.getInputMap().entrySet()) { + inputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setInput(inputMap); + to.setType( from.getType() ); + return to; + } + + public DynamicForkJoinTaskListPb.DynamicForkJoinTaskList toProto(DynamicForkJoinTaskList from) { + DynamicForkJoinTaskListPb.DynamicForkJoinTaskList.Builder to = DynamicForkJoinTaskListPb.DynamicForkJoinTaskList.newBuilder(); + for (DynamicForkJoinTask elem : from.getDynamicTasks()) { + to.addDynamicTasks( toProto(elem) ); + } + return to.build(); + } + + public DynamicForkJoinTaskList fromProto( + DynamicForkJoinTaskListPb.DynamicForkJoinTaskList from) { + DynamicForkJoinTaskList to = new DynamicForkJoinTaskList(); + to.setDynamicTasks( from.getDynamicTasksList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); + return to; + } + public EventExecutionPb.EventExecution toProto(EventExecution from) { EventExecutionPb.EventExecution.Builder to = EventExecutionPb.EventExecution.newBuilder(); if (from.getId() != null) { @@ -224,16 +273,16 @@ public EventHandlerPb.EventHandler.Action toProto(EventHandler.Action from) { if (from.getAction() != null) { to.setAction( toProto( from.getAction() ) ); } - if (from.getStartWorkflow() != null) { - to.setStartWorkflow( toProto( from.getStartWorkflow() ) ); + if (from.getStart_workflow() != null) { + to.setStartWorkflow( toProto( from.getStart_workflow() ) ); } - if (from.getCompleteTask() != null) { - to.setCompleteTask( toProto( from.getCompleteTask() ) ); + if (from.getComplete_task() != null) { + to.setCompleteTask( toProto( from.getComplete_task() ) ); } - if (from.getFailTask() != null) { - to.setFailTask( toProto( from.getFailTask() ) ); + if (from.getFail_task() != null) { + to.setFailTask( toProto( from.getFail_task() ) ); } - to.setExpandInlineJson( from.isExpandInlineJson() ); + to.setExpandInlineJson( from.isExpandInlineJSON() ); return to.build(); } @@ -241,24 +290,24 @@ public EventHandler.Action fromProto(EventHandlerPb.EventHandler.Action from) { EventHandler.Action to = new EventHandler.Action(); to.setAction( fromProto( from.getAction() ) ); if (from.hasStartWorkflow()) { - to.setStartWorkflow( fromProto( from.getStartWorkflow() ) ); + to.setStart_workflow( fromProto( from.getStartWorkflow() ) ); } if (from.hasCompleteTask()) { - to.setCompleteTask( fromProto( from.getCompleteTask() ) ); + to.setComplete_task( fromProto( from.getCompleteTask() ) ); } if (from.hasFailTask()) { - to.setFailTask( fromProto( from.getFailTask() ) ); + to.setFail_task( fromProto( from.getFailTask() ) ); } - to.setExpandInlineJson( from.getExpandInlineJson() ); + to.setExpandInlineJSON( from.getExpandInlineJson() ); return to; } public EventHandlerPb.EventHandler.Action.Type toProto(EventHandler.Action.Type from) { EventHandlerPb.EventHandler.Action.Type to; switch (from) { - case START_WORKFLOW: to = EventHandlerPb.EventHandler.Action.Type.START_WORKFLOW; break; - case COMPLETE_TASK: to = EventHandlerPb.EventHandler.Action.Type.COMPLETE_TASK; break; - case FAIL_TASK: to = EventHandlerPb.EventHandler.Action.Type.FAIL_TASK; break; + case start_workflow: to = EventHandlerPb.EventHandler.Action.Type.START_WORKFLOW; break; + case complete_task: to = EventHandlerPb.EventHandler.Action.Type.COMPLETE_TASK; break; + case fail_task: to = EventHandlerPb.EventHandler.Action.Type.FAIL_TASK; break; default: throw new IllegalArgumentException("Unexpected enum constant: " + from); } return to; @@ -267,9 +316,9 @@ public EventHandlerPb.EventHandler.Action.Type toProto(EventHandler.Action.Type public EventHandler.Action.Type fromProto(EventHandlerPb.EventHandler.Action.Type from) { EventHandler.Action.Type to; switch (from) { - case START_WORKFLOW: to = EventHandler.Action.Type.START_WORKFLOW; break; - case COMPLETE_TASK: to = EventHandler.Action.Type.COMPLETE_TASK; break; - case FAIL_TASK: to = EventHandler.Action.Type.FAIL_TASK; break; + case START_WORKFLOW: to = EventHandler.Action.Type.start_workflow; break; + case COMPLETE_TASK: to = EventHandler.Action.Type.complete_task; break; + case FAIL_TASK: to = EventHandler.Action.Type.fail_task; break; default: throw new IllegalArgumentException("Unexpected enum constant: " + from); } return to; @@ -299,6 +348,125 @@ public PollData fromProto(PollDataPb.PollData from) { return to; } + public RerunWorkflowRequestPb.RerunWorkflowRequest toProto(RerunWorkflowRequest from) { + RerunWorkflowRequestPb.RerunWorkflowRequest.Builder to = RerunWorkflowRequestPb.RerunWorkflowRequest.newBuilder(); + if (from.getReRunFromWorkflowId() != null) { + to.setReRunFromWorkflowId( from.getReRunFromWorkflowId() ); + } + for (Map.Entry pair : from.getWorkflowInput().entrySet()) { + to.putWorkflowInput( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.getReRunFromTaskId() != null) { + to.setReRunFromTaskId( from.getReRunFromTaskId() ); + } + for (Map.Entry pair : from.getTaskInput().entrySet()) { + to.putTaskInput( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.getCorrelationId() != null) { + to.setCorrelationId( from.getCorrelationId() ); + } + return to.build(); + } + + public RerunWorkflowRequest fromProto(RerunWorkflowRequestPb.RerunWorkflowRequest from) { + RerunWorkflowRequest to = new RerunWorkflowRequest(); + to.setReRunFromWorkflowId( from.getReRunFromWorkflowId() ); + Map workflowInputMap = new HashMap(); + for (Map.Entry pair : from.getWorkflowInputMap().entrySet()) { + workflowInputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setWorkflowInput(workflowInputMap); + to.setReRunFromTaskId( from.getReRunFromTaskId() ); + Map taskInputMap = new HashMap(); + for (Map.Entry pair : from.getTaskInputMap().entrySet()) { + taskInputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setTaskInput(taskInputMap); + to.setCorrelationId( from.getCorrelationId() ); + return to; + } + + public SkipTaskRequest fromProto(SkipTaskRequestPb.SkipTaskRequest from) { + SkipTaskRequest to = new SkipTaskRequest(); + Map taskInputMap = new HashMap(); + for (Map.Entry pair : from.getTaskInputMap().entrySet()) { + taskInputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setTaskInput(taskInputMap); + Map taskOutputMap = new HashMap(); + for (Map.Entry pair : from.getTaskOutputMap().entrySet()) { + taskOutputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setTaskOutput(taskOutputMap); + if (from.hasTaskInputMessage()) { + to.setTaskInputMessage( fromProto( from.getTaskInputMessage() ) ); + } + if (from.hasTaskOutputMessage()) { + to.setTaskOutputMessage( fromProto( from.getTaskOutputMessage() ) ); + } + return to; + } + + public StartWorkflowRequestPb.StartWorkflowRequest toProto(StartWorkflowRequest from) { + StartWorkflowRequestPb.StartWorkflowRequest.Builder to = StartWorkflowRequestPb.StartWorkflowRequest.newBuilder(); + if (from.getName() != null) { + to.setName( from.getName() ); + } + if (from.getVersion() != null) { + to.setVersion( from.getVersion() ); + } + if (from.getCorrelationId() != null) { + to.setCorrelationId( from.getCorrelationId() ); + } + for (Map.Entry pair : from.getInput().entrySet()) { + to.putInput( pair.getKey(), toProto( pair.getValue() ) ); + } + to.putAllTaskToDomain( from.getTaskToDomain() ); + if (from.getWorkflowDef() != null) { + to.setWorkflowDef( toProto( from.getWorkflowDef() ) ); + } + if (from.getExternalInputPayloadStoragePath() != null) { + to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); + } + return to.build(); + } + + public StartWorkflowRequest fromProto(StartWorkflowRequestPb.StartWorkflowRequest from) { + StartWorkflowRequest to = new StartWorkflowRequest(); + to.setName( from.getName() ); + to.setVersion( from.getVersion() ); + to.setCorrelationId( from.getCorrelationId() ); + Map inputMap = new HashMap(); + for (Map.Entry pair : from.getInputMap().entrySet()) { + inputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setInput(inputMap); + to.setTaskToDomain( from.getTaskToDomainMap() ); + if (from.hasWorkflowDef()) { + to.setWorkflowDef( fromProto( from.getWorkflowDef() ) ); + } + to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); + return to; + } + + public SubWorkflowParamsPb.SubWorkflowParams toProto(SubWorkflowParams from) { + SubWorkflowParamsPb.SubWorkflowParams.Builder to = SubWorkflowParamsPb.SubWorkflowParams.newBuilder(); + if (from.getName() != null) { + to.setName( from.getName() ); + } + if (from.getVersion() != null) { + to.setVersion( from.getVersion() ); + } + return to.build(); + } + + public SubWorkflowParams fromProto(SubWorkflowParamsPb.SubWorkflowParams from) { + SubWorkflowParams to = new SubWorkflowParams(); + to.setName( from.getName() ); + to.setVersion( from.getVersion() ); + return to; + } + public TaskPb.Task toProto(Task from) { TaskPb.Task.Builder to = TaskPb.Task.newBuilder(); if (from.getTaskType() != null) { @@ -657,357 +825,51 @@ public TaskResult.Status fromProto(TaskResultPb.TaskResult.Status from) { return to; } - public DynamicForkJoinTaskPb.DynamicForkJoinTask toProto(DynamicForkJoinTask from) { - DynamicForkJoinTaskPb.DynamicForkJoinTask.Builder to = DynamicForkJoinTaskPb.DynamicForkJoinTask.newBuilder(); - if (from.getTaskName() != null) { - to.setTaskName( from.getTaskName() ); + public TaskSummaryPb.TaskSummary toProto(TaskSummary from) { + TaskSummaryPb.TaskSummary.Builder to = TaskSummaryPb.TaskSummary.newBuilder(); + if (from.getWorkflowId() != null) { + to.setWorkflowId( from.getWorkflowId() ); } - if (from.getWorkflowName() != null) { - to.setWorkflowName( from.getWorkflowName() ); + if (from.getWorkflowType() != null) { + to.setWorkflowType( from.getWorkflowType() ); } - if (from.getReferenceName() != null) { - to.setReferenceName( from.getReferenceName() ); + if (from.getCorrelationId() != null) { + to.setCorrelationId( from.getCorrelationId() ); } - for (Map.Entry pair : from.getInput().entrySet()) { - to.putInput( pair.getKey(), toProto( pair.getValue() ) ); + if (from.getScheduledTime() != null) { + to.setScheduledTime( from.getScheduledTime() ); } - if (from.getType() != null) { - to.setType( from.getType() ); + if (from.getStartTime() != null) { + to.setStartTime( from.getStartTime() ); } - return to.build(); - } - - public DynamicForkJoinTask fromProto(DynamicForkJoinTaskPb.DynamicForkJoinTask from) { - DynamicForkJoinTask to = new DynamicForkJoinTask(); - to.setTaskName( from.getTaskName() ); - to.setWorkflowName( from.getWorkflowName() ); - to.setReferenceName( from.getReferenceName() ); - Map inputMap = new HashMap(); - for (Map.Entry pair : from.getInputMap().entrySet()) { - inputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + if (from.getUpdateTime() != null) { + to.setUpdateTime( from.getUpdateTime() ); } - to.setInput(inputMap); - to.setType( from.getType() ); - return to; - } - - public DynamicForkJoinTaskListPb.DynamicForkJoinTaskList toProto(DynamicForkJoinTaskList from) { - DynamicForkJoinTaskListPb.DynamicForkJoinTaskList.Builder to = DynamicForkJoinTaskListPb.DynamicForkJoinTaskList.newBuilder(); - for (DynamicForkJoinTask elem : from.getDynamicTasks()) { - to.addDynamicTasks( toProto(elem) ); + if (from.getEndTime() != null) { + to.setEndTime( from.getEndTime() ); } - return to.build(); - } - - public DynamicForkJoinTaskList fromProto( - DynamicForkJoinTaskListPb.DynamicForkJoinTaskList from) { - DynamicForkJoinTaskList to = new DynamicForkJoinTaskList(); - to.setDynamicTasks( from.getDynamicTasksList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); - return to; - } - - public RerunWorkflowRequestPb.RerunWorkflowRequest toProto(RerunWorkflowRequest from) { - RerunWorkflowRequestPb.RerunWorkflowRequest.Builder to = RerunWorkflowRequestPb.RerunWorkflowRequest.newBuilder(); - if (from.getReRunFromWorkflowId() != null) { - to.setReRunFromWorkflowId( from.getReRunFromWorkflowId() ); + if (from.getStatus() != null) { + to.setStatus( toProto( from.getStatus() ) ); } - for (Map.Entry pair : from.getWorkflowInput().entrySet()) { - to.putWorkflowInput( pair.getKey(), toProto( pair.getValue() ) ); + if (from.getReasonForIncompletion() != null) { + to.setReasonForIncompletion( from.getReasonForIncompletion() ); } - if (from.getReRunFromTaskId() != null) { - to.setReRunFromTaskId( from.getReRunFromTaskId() ); + to.setExecutionTime( from.getExecutionTime() ); + to.setQueueWaitTime( from.getQueueWaitTime() ); + if (from.getTaskDefName() != null) { + to.setTaskDefName( from.getTaskDefName() ); } - for (Map.Entry pair : from.getTaskInput().entrySet()) { - to.putTaskInput( pair.getKey(), toProto( pair.getValue() ) ); + if (from.getTaskType() != null) { + to.setTaskType( from.getTaskType() ); } - if (from.getCorrelationId() != null) { - to.setCorrelationId( from.getCorrelationId() ); - } - return to.build(); - } - - public RerunWorkflowRequest fromProto(RerunWorkflowRequestPb.RerunWorkflowRequest from) { - RerunWorkflowRequest to = new RerunWorkflowRequest(); - to.setReRunFromWorkflowId( from.getReRunFromWorkflowId() ); - Map workflowInputMap = new HashMap(); - for (Map.Entry pair : from.getWorkflowInputMap().entrySet()) { - workflowInputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setWorkflowInput(workflowInputMap); - to.setReRunFromTaskId( from.getReRunFromTaskId() ); - Map taskInputMap = new HashMap(); - for (Map.Entry pair : from.getTaskInputMap().entrySet()) { - taskInputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setTaskInput(taskInputMap); - to.setCorrelationId( from.getCorrelationId() ); - return to; - } - - public SkipTaskRequest fromProto(SkipTaskRequestPb.SkipTaskRequest from) { - SkipTaskRequest to = new SkipTaskRequest(); - Map taskInputMap = new HashMap(); - for (Map.Entry pair : from.getTaskInputMap().entrySet()) { - taskInputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setTaskInput(taskInputMap); - Map taskOutputMap = new HashMap(); - for (Map.Entry pair : from.getTaskOutputMap().entrySet()) { - taskOutputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setTaskOutput(taskOutputMap); - if (from.hasTaskInputMessage()) { - to.setTaskInputMessage( fromProto( from.getTaskInputMessage() ) ); - } - if (from.hasTaskOutputMessage()) { - to.setTaskOutputMessage( fromProto( from.getTaskOutputMessage() ) ); - } - return to; - } - - public StartWorkflowRequestPb.StartWorkflowRequest toProto(StartWorkflowRequest from) { - StartWorkflowRequestPb.StartWorkflowRequest.Builder to = StartWorkflowRequestPb.StartWorkflowRequest.newBuilder(); - if (from.getName() != null) { - to.setName( from.getName() ); - } - if (from.getVersion() != null) { - to.setVersion( from.getVersion() ); - } - if (from.getCorrelationId() != null) { - to.setCorrelationId( from.getCorrelationId() ); - } - for (Map.Entry pair : from.getInput().entrySet()) { - to.putInput( pair.getKey(), toProto( pair.getValue() ) ); - } - to.putAllTaskToDomain( from.getTaskToDomain() ); - if (from.getWorkflowDef() != null) { - to.setWorkflowDef( toProto( from.getWorkflowDef() ) ); - } - if (from.getExternalInputPayloadStoragePath() != null) { - to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); - } - return to.build(); - } - - public StartWorkflowRequest fromProto(StartWorkflowRequestPb.StartWorkflowRequest from) { - StartWorkflowRequest to = new StartWorkflowRequest(); - to.setName( from.getName() ); - to.setVersion( from.getVersion() ); - to.setCorrelationId( from.getCorrelationId() ); - Map inputMap = new HashMap(); - for (Map.Entry pair : from.getInputMap().entrySet()) { - inputMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setInput(inputMap); - to.setTaskToDomain( from.getTaskToDomainMap() ); - if (from.hasWorkflowDef()) { - to.setWorkflowDef( fromProto( from.getWorkflowDef() ) ); - } - to.setExternalInputPayloadStoragePath( from.getExternalInputPayloadStoragePath() ); - return to; - } - - public SubWorkflowParamsPb.SubWorkflowParams toProto(SubWorkflowParams from) { - SubWorkflowParamsPb.SubWorkflowParams.Builder to = SubWorkflowParamsPb.SubWorkflowParams.newBuilder(); - if (from.getName() != null) { - to.setName( from.getName() ); - } - if (from.getVersion() != null) { - to.setVersion( from.getVersion() ); - } - return to.build(); - } - - public SubWorkflowParams fromProto(SubWorkflowParamsPb.SubWorkflowParams from) { - SubWorkflowParams to = new SubWorkflowParams(); - to.setName( from.getName() ); - to.setVersion( from.getVersion() ); - return to; - } - - public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) { - WorkflowDefPb.WorkflowDef.Builder to = WorkflowDefPb.WorkflowDef.newBuilder(); - if (from.getName() != null) { - to.setName( from.getName() ); - } - if (from.getDescription() != null) { - to.setDescription( from.getDescription() ); - } - to.setVersion( from.getVersion() ); - for (WorkflowTask elem : from.getTasks()) { - to.addTasks( toProto(elem) ); - } - to.addAllInputParameters( from.getInputParameters() ); - for (Map.Entry pair : from.getOutputParameters().entrySet()) { - to.putOutputParameters( pair.getKey(), toProto( pair.getValue() ) ); - } - if (from.getFailureWorkflow() != null) { - to.setFailureWorkflow( from.getFailureWorkflow() ); - } - to.setSchemaVersion( from.getSchemaVersion() ); - to.setRestartable( from.isRestartable() ); - return to.build(); - } - - public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) { - WorkflowDef to = new WorkflowDef(); - to.setName( from.getName() ); - to.setDescription( from.getDescription() ); - to.setVersion( from.getVersion() ); - to.setTasks( from.getTasksList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); - to.setInputParameters( from.getInputParametersList().stream().collect(Collectors.toCollection(ArrayList::new)) ); - Map outputParametersMap = new HashMap(); - for (Map.Entry pair : from.getOutputParametersMap().entrySet()) { - outputParametersMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setOutputParameters(outputParametersMap); - to.setFailureWorkflow( from.getFailureWorkflow() ); - to.setSchemaVersion( from.getSchemaVersion() ); - to.setRestartable( from.getRestartable() ); - return to; - } - - public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { - WorkflowTaskPb.WorkflowTask.Builder to = WorkflowTaskPb.WorkflowTask.newBuilder(); - if (from.getName() != null) { - to.setName( from.getName() ); - } - if (from.getTaskReferenceName() != null) { - to.setTaskReferenceName( from.getTaskReferenceName() ); - } - if (from.getDescription() != null) { - to.setDescription( from.getDescription() ); - } - for (Map.Entry pair : from.getInputParameters().entrySet()) { - to.putInputParameters( pair.getKey(), toProto( pair.getValue() ) ); - } - if (from.getType() != null) { - to.setType( from.getType() ); - } - if (from.getDynamicTaskNameParam() != null) { - to.setDynamicTaskNameParam( from.getDynamicTaskNameParam() ); - } - if (from.getCaseValueParam() != null) { - to.setCaseValueParam( from.getCaseValueParam() ); - } - if (from.getCaseExpression() != null) { - to.setCaseExpression( from.getCaseExpression() ); - } - for (Map.Entry> pair : from.getDecisionCases().entrySet()) { - to.putDecisionCases( pair.getKey(), toProto( pair.getValue() ) ); - } - if (from.getDynamicForkTasksParam() != null) { - to.setDynamicForkTasksParam( from.getDynamicForkTasksParam() ); - } - if (from.getDynamicForkTasksInputParamName() != null) { - to.setDynamicForkTasksInputParamName( from.getDynamicForkTasksInputParamName() ); - } - for (WorkflowTask elem : from.getDefaultCase()) { - to.addDefaultCase( toProto(elem) ); - } - for (List elem : from.getForkTasks()) { - to.addForkTasks( toProto(elem) ); - } - to.setStartDelay( from.getStartDelay() ); - if (from.getSubWorkflowParam() != null) { - to.setSubWorkflowParam( toProto( from.getSubWorkflowParam() ) ); - } - to.addAllJoinOn( from.getJoinOn() ); - if (from.getSink() != null) { - to.setSink( from.getSink() ); - } - to.setOptional( from.isOptional() ); - if (from.getTaskDefinition() != null) { - to.setTaskDefinition( toProto( from.getTaskDefinition() ) ); - } - if (from.isRateLimited() != null) { - to.setRateLimited( from.isRateLimited() ); - } - return to.build(); - } - - public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { - WorkflowTask to = new WorkflowTask(); - to.setName( from.getName() ); - to.setTaskReferenceName( from.getTaskReferenceName() ); - to.setDescription( from.getDescription() ); - Map inputParametersMap = new HashMap(); - for (Map.Entry pair : from.getInputParametersMap().entrySet()) { - inputParametersMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setInputParameters(inputParametersMap); - to.setType( from.getType() ); - to.setDynamicTaskNameParam( from.getDynamicTaskNameParam() ); - to.setCaseValueParam( from.getCaseValueParam() ); - to.setCaseExpression( from.getCaseExpression() ); - Map> decisionCasesMap = new HashMap>(); - for (Map.Entry pair : from.getDecisionCasesMap().entrySet()) { - decisionCasesMap.put( pair.getKey(), fromProto( pair.getValue() ) ); - } - to.setDecisionCases(decisionCasesMap); - to.setDynamicForkTasksParam( from.getDynamicForkTasksParam() ); - to.setDynamicForkTasksInputParamName( from.getDynamicForkTasksInputParamName() ); - to.setDefaultCase( from.getDefaultCaseList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); - to.setForkTasks( from.getForkTasksList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); - to.setStartDelay( from.getStartDelay() ); - if (from.hasSubWorkflowParam()) { - to.setSubWorkflowParam( fromProto( from.getSubWorkflowParam() ) ); - } - to.setJoinOn( from.getJoinOnList().stream().collect(Collectors.toCollection(ArrayList::new)) ); - to.setSink( from.getSink() ); - to.setOptional( from.getOptional() ); - if (from.hasTaskDefinition()) { - to.setTaskDefinition( fromProto( from.getTaskDefinition() ) ); - } - to.setRateLimited( from.getRateLimited() ); - return to; - } - - public TaskSummaryPb.TaskSummary toProto(TaskSummary from) { - TaskSummaryPb.TaskSummary.Builder to = TaskSummaryPb.TaskSummary.newBuilder(); - if (from.getWorkflowId() != null) { - to.setWorkflowId( from.getWorkflowId() ); - } - if (from.getWorkflowType() != null) { - to.setWorkflowType( from.getWorkflowType() ); - } - if (from.getCorrelationId() != null) { - to.setCorrelationId( from.getCorrelationId() ); - } - if (from.getScheduledTime() != null) { - to.setScheduledTime( from.getScheduledTime() ); - } - if (from.getStartTime() != null) { - to.setStartTime( from.getStartTime() ); - } - if (from.getUpdateTime() != null) { - to.setUpdateTime( from.getUpdateTime() ); - } - if (from.getEndTime() != null) { - to.setEndTime( from.getEndTime() ); - } - if (from.getStatus() != null) { - to.setStatus( toProto( from.getStatus() ) ); - } - if (from.getReasonForIncompletion() != null) { - to.setReasonForIncompletion( from.getReasonForIncompletion() ); - } - to.setExecutionTime( from.getExecutionTime() ); - to.setQueueWaitTime( from.getQueueWaitTime() ); - if (from.getTaskDefName() != null) { - to.setTaskDefName( from.getTaskDefName() ); - } - if (from.getTaskType() != null) { - to.setTaskType( from.getTaskType() ); - } - if (from.getInput() != null) { - to.setInput( from.getInput() ); - } - if (from.getOutput() != null) { - to.setOutput( from.getOutput() ); - } - if (from.getTaskId() != null) { - to.setTaskId( from.getTaskId() ); + if (from.getInput() != null) { + to.setInput( from.getInput() ); + } + if (from.getOutput() != null) { + to.setOutput( from.getOutput() ); + } + if (from.getTaskId() != null) { + to.setTaskId( from.getTaskId() ); } return to.build(); } @@ -1130,6 +992,48 @@ public Workflow.WorkflowStatus fromProto(WorkflowPb.Workflow.WorkflowStatus from return to; } + public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) { + WorkflowDefPb.WorkflowDef.Builder to = WorkflowDefPb.WorkflowDef.newBuilder(); + if (from.getName() != null) { + to.setName( from.getName() ); + } + if (from.getDescription() != null) { + to.setDescription( from.getDescription() ); + } + to.setVersion( from.getVersion() ); + for (WorkflowTask elem : from.getTasks()) { + to.addTasks( toProto(elem) ); + } + to.addAllInputParameters( from.getInputParameters() ); + for (Map.Entry pair : from.getOutputParameters().entrySet()) { + to.putOutputParameters( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.getFailureWorkflow() != null) { + to.setFailureWorkflow( from.getFailureWorkflow() ); + } + to.setSchemaVersion( from.getSchemaVersion() ); + to.setRestartable( from.isRestartable() ); + return to.build(); + } + + public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) { + WorkflowDef to = new WorkflowDef(); + to.setName( from.getName() ); + to.setDescription( from.getDescription() ); + to.setVersion( from.getVersion() ); + to.setTasks( from.getTasksList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); + to.setInputParameters( from.getInputParametersList().stream().collect(Collectors.toCollection(ArrayList::new)) ); + Map outputParametersMap = new HashMap(); + for (Map.Entry pair : from.getOutputParametersMap().entrySet()) { + outputParametersMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setOutputParameters(outputParametersMap); + to.setFailureWorkflow( from.getFailureWorkflow() ); + to.setSchemaVersion( from.getSchemaVersion() ); + to.setRestartable( from.getRestartable() ); + return to; + } + public WorkflowSummaryPb.WorkflowSummary toProto(WorkflowSummary from) { WorkflowSummaryPb.WorkflowSummary.Builder to = WorkflowSummaryPb.WorkflowSummary.newBuilder(); if (from.getWorkflowType() != null) { @@ -1192,6 +1096,102 @@ public WorkflowSummary fromProto(WorkflowSummaryPb.WorkflowSummary from) { return to; } + public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) { + WorkflowTaskPb.WorkflowTask.Builder to = WorkflowTaskPb.WorkflowTask.newBuilder(); + if (from.getName() != null) { + to.setName( from.getName() ); + } + if (from.getTaskReferenceName() != null) { + to.setTaskReferenceName( from.getTaskReferenceName() ); + } + if (from.getDescription() != null) { + to.setDescription( from.getDescription() ); + } + for (Map.Entry pair : from.getInputParameters().entrySet()) { + to.putInputParameters( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.getType() != null) { + to.setType( from.getType() ); + } + if (from.getDynamicTaskNameParam() != null) { + to.setDynamicTaskNameParam( from.getDynamicTaskNameParam() ); + } + if (from.getCaseValueParam() != null) { + to.setCaseValueParam( from.getCaseValueParam() ); + } + if (from.getCaseExpression() != null) { + to.setCaseExpression( from.getCaseExpression() ); + } + for (Map.Entry> pair : from.getDecisionCases().entrySet()) { + to.putDecisionCases( pair.getKey(), toProto( pair.getValue() ) ); + } + if (from.getDynamicForkTasksParam() != null) { + to.setDynamicForkTasksParam( from.getDynamicForkTasksParam() ); + } + if (from.getDynamicForkTasksInputParamName() != null) { + to.setDynamicForkTasksInputParamName( from.getDynamicForkTasksInputParamName() ); + } + for (WorkflowTask elem : from.getDefaultCase()) { + to.addDefaultCase( toProto(elem) ); + } + for (List elem : from.getForkTasks()) { + to.addForkTasks( toProto(elem) ); + } + to.setStartDelay( from.getStartDelay() ); + if (from.getSubWorkflowParam() != null) { + to.setSubWorkflowParam( toProto( from.getSubWorkflowParam() ) ); + } + to.addAllJoinOn( from.getJoinOn() ); + if (from.getSink() != null) { + to.setSink( from.getSink() ); + } + to.setOptional( from.isOptional() ); + if (from.getTaskDefinition() != null) { + to.setTaskDefinition( toProto( from.getTaskDefinition() ) ); + } + if (from.isRateLimited() != null) { + to.setRateLimited( from.isRateLimited() ); + } + return to.build(); + } + + public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) { + WorkflowTask to = new WorkflowTask(); + to.setName( from.getName() ); + to.setTaskReferenceName( from.getTaskReferenceName() ); + to.setDescription( from.getDescription() ); + Map inputParametersMap = new HashMap(); + for (Map.Entry pair : from.getInputParametersMap().entrySet()) { + inputParametersMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setInputParameters(inputParametersMap); + to.setType( from.getType() ); + to.setDynamicTaskNameParam( from.getDynamicTaskNameParam() ); + to.setCaseValueParam( from.getCaseValueParam() ); + to.setCaseExpression( from.getCaseExpression() ); + Map> decisionCasesMap = new HashMap>(); + for (Map.Entry pair : from.getDecisionCasesMap().entrySet()) { + decisionCasesMap.put( pair.getKey(), fromProto( pair.getValue() ) ); + } + to.setDecisionCases(decisionCasesMap); + to.setDynamicForkTasksParam( from.getDynamicForkTasksParam() ); + to.setDynamicForkTasksInputParamName( from.getDynamicForkTasksInputParamName() ); + to.setDefaultCase( from.getDefaultCaseList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); + to.setForkTasks( from.getForkTasksList().stream().map(this::fromProto).collect(Collectors.toCollection(ArrayList::new)) ); + to.setStartDelay( from.getStartDelay() ); + if (from.hasSubWorkflowParam()) { + to.setSubWorkflowParam( fromProto( from.getSubWorkflowParam() ) ); + } + to.setJoinOn( from.getJoinOnList().stream().collect(Collectors.toCollection(ArrayList::new)) ); + to.setSink( from.getSink() ); + to.setOptional( from.getOptional() ); + if (from.hasTaskDefinition()) { + to.setTaskDefinition( fromProto( from.getTaskDefinition() ) ); + } + to.setRateLimited( from.getRateLimited() ); + return to; + } + public abstract WorkflowTaskPb.WorkflowTask.WorkflowTaskList toProto(List in); public abstract List fromProto(WorkflowTaskPb.WorkflowTask.WorkflowTaskList in); diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAOTest.java index 96c3ef48f5..15ba845da1 100644 --- a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAOTest.java +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAOTest.java @@ -190,9 +190,9 @@ public void testEventHandlers() { eh.setName(UUID.randomUUID().toString()); eh.setActive(false); EventHandler.Action action = new EventHandler.Action(); - action.setAction(EventHandler.Action.Type.START_WORKFLOW); - action.setStartWorkflow(new EventHandler.StartWorkflow()); - action.getStartWorkflow().setName("workflow_x"); + action.setAction(EventHandler.Action.Type.start_workflow); + action.setStart_workflow(new EventHandler.StartWorkflow()); + action.getStart_workflow().setName("workflow_x"); eh.getActions().add(action); eh.setEvent(event1); diff --git a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisMetadataDAOTest.java b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisMetadataDAOTest.java index b8d70b26cb..22b5cadb4e 100644 --- a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisMetadataDAOTest.java +++ b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisMetadataDAOTest.java @@ -237,9 +237,9 @@ public void testEventHandlers() { eh.setName(UUID.randomUUID().toString()); eh.setActive(false); Action action = new Action(); - action.setAction(Type.START_WORKFLOW); - action.setStartWorkflow(new StartWorkflow()); - action.getStartWorkflow().setName("workflow_x"); + action.setAction(Type.start_workflow); + action.setStart_workflow(new StartWorkflow()); + action.getStart_workflow().setName("workflow_x"); eh.getActions().add(action); eh.setEvent(event1); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java index 02a4b219a9..a345712688 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/AbstractWorkflowServiceTest.java @@ -3488,6 +3488,9 @@ public void testSubWorkflow() { assertNotNull(task); assertNotNull(task.getOutputData()); assertNotNull("Output: " + task.getOutputData().toString() + ", status: " + task.getStatus(), task.getOutputData().get("subWorkflowId")); + assertNotNull(task.getInputData()); + assertTrue(task.getInputData().containsKey("workflowInput")); + assertEquals(42, ((Map)task.getInputData().get("workflowInput")).get("param2")); String subWorkflowId = task.getOutputData().get("subWorkflowId").toString(); es = workflowExecutionService.getExecutionStatus(subWorkflowId, true); @@ -4153,6 +4156,7 @@ private void createSubWorkflow() { Map ip2 = new HashMap<>(); ip2.put("test", "test value"); ip2.put("param1", "sub workflow input param1"); + ip2.put("param2", 42); wft2.setInputParameters(ip2); wft2.setTaskReferenceName("a2"); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java index 23625f7051..c17c7c727f 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2016 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with @@ -10,9 +10,6 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -/** - * - */ package com.netflix.conductor.tests.integration; import com.google.inject.Guice; @@ -47,7 +44,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -124,13 +120,7 @@ protected void registerTaskDefinitions(List taskDefinitionList) { @Test public void testAll() throws Exception { - List definitions = createAndRegisterTaskDefinitions("t", 5); - - List found = taskClient.getTaskDef().stream() - .filter(taskDefinition -> taskDefinition.getName().startsWith("t")) - .collect(Collectors.toList()); - assertNotNull(found); - assertEquals(definitions.size(), found.size()); + createAndRegisterTaskDefinitions("t", 5); WorkflowDef def = new WorkflowDef(); def.setName("test"); @@ -144,7 +134,6 @@ public void testAll() throws Exception { t1.setWorkflowTaskType(TaskType.SIMPLE); t1.setTaskReferenceName("t1"); - def.getTasks().add(t0); def.getTasks().add(t1); @@ -161,20 +150,23 @@ public void testAll() throws Exception { String workflowId = workflowClient.startWorkflow(startWorkflowRequest); assertNotNull(workflowId); - Workflow wf = workflowClient.getWorkflow(workflowId, false); - assertEquals(0, wf.getTasks().size()); - assertEquals(workflowId, wf.getWorkflowId()); + Workflow workflow = workflowClient.getWorkflow(workflowId, false); + assertEquals(0, workflow.getTasks().size()); + assertEquals(workflowId, workflow.getWorkflowId()); List workflowList = workflowClient.getWorkflows(def.getName(), correlationId, false, false); assertEquals(1, workflowList.size()); assertEquals(workflowId, workflowList.get(0).getWorkflowId()); - wf = workflowClient.getWorkflow(workflowId, true); - assertNotNull(wf); - assertEquals(WorkflowStatus.RUNNING, wf.getStatus()); - assertEquals(1, wf.getTasks().size()); - assertEquals(t0.getTaskReferenceName(), wf.getTasks().get(0).getReferenceTaskName()); - assertEquals(workflowId, wf.getWorkflowId()); + workflow = workflowClient.getWorkflow(workflowId, true); + assertNotNull(workflow); + assertEquals(WorkflowStatus.RUNNING, workflow.getStatus()); + assertEquals(1, workflow.getTasks().size()); + assertEquals(t0.getTaskReferenceName(), workflow.getTasks().get(0).getReferenceTaskName()); + assertEquals(workflowId, workflow.getWorkflowId()); + + int queueSize = taskClient.getQueueSizeForTask(workflow.getTasks().get(0).getTaskType()); + assertEquals(1, queueSize); List runningIds = workflowClient.getRunningWorkflow(def.getName(), def.getVersion()); assertNotNull(runningIds); @@ -203,30 +195,30 @@ public void testAll() throws Exception { assertNotNull(polled); assertTrue(polled.toString(), polled.isEmpty()); - wf = workflowClient.getWorkflow(workflowId, true); - assertNotNull(wf); - assertEquals(WorkflowStatus.RUNNING, wf.getStatus()); - assertEquals(2, wf.getTasks().size()); - assertEquals(t0.getTaskReferenceName(), wf.getTasks().get(0).getReferenceTaskName()); - assertEquals(t1.getTaskReferenceName(), wf.getTasks().get(1).getReferenceTaskName()); - assertEquals(Task.Status.COMPLETED, wf.getTasks().get(0).getStatus()); - assertEquals(Task.Status.SCHEDULED, wf.getTasks().get(1).getStatus()); + workflow = workflowClient.getWorkflow(workflowId, true); + assertNotNull(workflow); + assertEquals(WorkflowStatus.RUNNING, workflow.getStatus()); + assertEquals(2, workflow.getTasks().size()); + assertEquals(t0.getTaskReferenceName(), workflow.getTasks().get(0).getReferenceTaskName()); + assertEquals(t1.getTaskReferenceName(), workflow.getTasks().get(1).getReferenceTaskName()); + assertEquals(Task.Status.COMPLETED, workflow.getTasks().get(0).getStatus()); + assertEquals(Task.Status.SCHEDULED, workflow.getTasks().get(1).getStatus()); Task taskById = taskClient.getTaskDetails(task.getTaskId()); assertNotNull(taskById); assertEquals(task.getTaskId(), taskById.getTaskId()); + queueSize = taskClient.getQueueSizeForTask(workflow.getTasks().get(1).getTaskType()); + assertEquals(1, queueSize); List getTasks = taskClient.getPendingTasksByType(t0.getName(), null, 1); assertNotNull(getTasks); assertEquals(0, getTasks.size()); //getTasks only gives pending tasks - getTasks = taskClient.getPendingTasksByType(t1.getName(), null, 1); assertNotNull(getTasks); assertEquals(1, getTasks.size()); - Task pending = taskClient.getPendingTaskForWorkflow(workflowId, t1.getTaskReferenceName()); assertNotNull(pending); assertEquals(t1.getTaskReferenceName(), pending.getReferenceTaskName()); @@ -238,15 +230,15 @@ public void testAll() throws Exception { assertEquals(1, searchResult.getTotalHits()); workflowClient.terminateWorkflow(workflowId, "terminate reason"); - wf = workflowClient.getWorkflow(workflowId, true); - assertNotNull(wf); - assertEquals(WorkflowStatus.TERMINATED, wf.getStatus()); + workflow = workflowClient.getWorkflow(workflowId, true); + assertNotNull(workflow); + assertEquals(WorkflowStatus.TERMINATED, workflow.getStatus()); workflowClient.restart(workflowId); - wf = workflowClient.getWorkflow(workflowId, true); - assertNotNull(wf); - assertEquals(WorkflowStatus.RUNNING, wf.getStatus()); - assertEquals(1, wf.getTasks().size()); + workflow = workflowClient.getWorkflow(workflowId, true); + assertNotNull(workflow); + assertEquals(WorkflowStatus.RUNNING, workflow.getStatus()); + assertEquals(1, workflow.getTasks().size()); } @Test @@ -318,31 +310,32 @@ public void testUpdateWorkflow() { } } - @Test public void testStartWorkflow() { StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest(); - try{ + try { workflowClient.startWorkflow(startWorkflowRequest); } catch (IllegalArgumentException e) { assertEquals("Workflow name cannot be null or empty", e.getMessage()); } } + @Test public void testUpdateTask() { TaskResult taskResult = new TaskResult(); - try{ + try { taskClient.updateTask(taskResult, "taskTest"); - } catch (ConductorClientException e){ + } catch (ConductorClientException e) { int statuCode = e.getStatus(); assertEquals(400, statuCode); assertEquals("Workflow Id cannot be null or empty", e.getMessage()); assertFalse(e.isRetryable()); } } + @Test public void testGetWorfklowNotFound() { - try{ + try { workflowClient.getWorkflow("w123", true); } catch (ConductorClientException e) { assertEquals(404, e.getStatus()); @@ -350,30 +343,33 @@ public void testGetWorfklowNotFound() { assertFalse(e.isRetryable()); } } + @Test public void testEmptyCreateWorkflowDef() { - try{ + try { WorkflowDef workflowDef = new WorkflowDef(); metadataClient.registerWorkflowDef(workflowDef); - } catch (ConductorClientException e){ + } catch (ConductorClientException e) { assertEquals(400, e.getStatus()); assertEquals("Workflow name cannot be null or empty", e.getMessage()); assertFalse(e.isRetryable()); } } + @Test public void testUpdateWorkflowDef() { - try{ + try { WorkflowDef workflowDef = new WorkflowDef(); List workflowDefList = new ArrayList<>(); workflowDefList.add(workflowDef); metadataClient.updateWorkflowDefs(workflowDefList); - } catch (ConductorClientException e){ + } catch (ConductorClientException e) { assertEquals(400, e.getStatus()); assertEquals("WorkflowDef name cannot be null", e.getMessage()); assertFalse(e.isRetryable()); } } + @Test public void testGetTaskInProgress() { taskClient.getPendingTaskForWorkflow("test", "t1"); diff --git a/versionsOfDependencies.gradle b/versionsOfDependencies.gradle index 387c4dc9b3..41bfda0028 100644 --- a/versionsOfDependencies.gradle +++ b/versionsOfDependencies.gradle @@ -43,7 +43,7 @@ ext { revOauthSignature = '1.19.4' revProtoBuf = '3.5.1' revProtogenAnnotations = '1.0.0' - revProtogenCodegen = '1.2.0' + revProtogenCodegen = '1.4.0' revRarefiedRedis = '0.0.17' revServo = '0.12.17' revServletApi = '3.1.0'