From cc55253e537a343e99f5c31d46056d1d9efd5170 Mon Sep 17 00:00:00 2001 From: Guy Engelhard Date: Sun, 16 Jul 2017 13:24:08 +0300 Subject: [PATCH 01/14] 1. Made the thread a daemon so that the entire process exits on Ctrl+C 2. Added a sleep(1) so that the waiting thread does not hog cpu resources. --- client/python/conductor/ConductorWorker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/client/python/conductor/ConductorWorker.py b/client/python/conductor/ConductorWorker.py index 4e64ff3879..89b715b284 100644 --- a/client/python/conductor/ConductorWorker.py +++ b/client/python/conductor/ConductorWorker.py @@ -55,11 +55,12 @@ def poll_and_execute(self, taskType, exec_function): def start(self, taskType, exec_function, wait): print('Polling for task ' + taskType + ' at a ' + str(self.polling_interval) + ' ms interval with ' + str(self.thread_count) + ' threads for task execution, with worker id as ' + hostname) for x in range(0, int(self.thread_count)): - thread = Thread(target = self.poll_and_execute, args = (taskType, exec_function, )) + thread = Thread(target=self.poll_and_execute, args=(taskType, exec_function, )) + thread.daemon = True thread.start() - if(wait): + if wait: while 1: - pass + time.sleep(1) def exc(taskType, inputData, startTime, retryCount, status, callbackAfterSeconds, pollCount): print('Executing the function') From 576a6f3ee4a987c050ee2b04514a1fb9b2b0c0c5 Mon Sep 17 00:00:00 2001 From: Nicolas BRAQUART Date: Tue, 18 Jul 2017 17:13:46 +0200 Subject: [PATCH 02/14] Add an optional description in WorkflowTask model --- .../common/metadata/workflow/WorkflowTask.java | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java index 1c00b455fa..029d6f7456 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowTask.java @@ -59,7 +59,9 @@ public static boolean is(String name) { private String name; private String taskReferenceName; - + + private String description; + //Key: Name of the input parameter. MUST be one of the keys defined in TaskDef (e.g. fileName) //Value: mapping of the parameter from another task (e.g. task1.someOutputParameterAsFileName) private Map inputParameters = new HashMap(); @@ -124,6 +126,20 @@ public void setTaskReferenceName(String taskReferenceName) { this.taskReferenceName = taskReferenceName; } + /** + * @return the description + */ + public String getDescription() { + return description; + } + + /** + * @param description the description to set + */ + public void setDescription(String description) { + this.description = description; + } + /** * @return the inputParameters */ From a24b059097b1ab6bb3eebe483876478ab4916c6f Mon Sep 17 00:00:00 2001 From: Nicolas BRAQUART Date: Tue, 18 Jul 2017 17:33:05 +0200 Subject: [PATCH 03/14] Documentation update --- docs/docs/metadata/index.md | 401 ++++++++++++++++++------------------ 1 file changed, 201 insertions(+), 200 deletions(-) diff --git a/docs/docs/metadata/index.md b/docs/docs/metadata/index.md index 76f6ec39dd..5d272bdd50 100644 --- a/docs/docs/metadata/index.md +++ b/docs/docs/metadata/index.md @@ -1,200 +1,201 @@ -# Task Definition -Conductor maintains a registry of worker task types. A task type MUST be registered before using in a workflow. - -**Example** -``` json -{ - "name": "encode_task", - "retryCount": 3, - "timeoutSeconds": 1200, - "inputKeys": [ - "sourceRequestId", - "qcElementType" - ], - "outputKeys": [ - "state", - "skipped", - "result" - ], - "timeoutPolicy": "TIME_OUT_WF", - "retryLogic": "FIXED", - "retryDelaySeconds": 600, - "responseTimeoutSeconds": 3600 -} -``` - -|field|description|Notes| -|---|---|---| -|name|Task Type|Unique| -|retryCount|No. of retries to attempt when a task is marked as failure|| -|retryLogic|Mechanism for the retries|see possible values below| -|timeoutSeconds|Time in milliseconds, after which the task is marked as TIMED_OUT if not completed after transiting to ```IN_PROGRESS``` status|No timeouts if set to 0| -|timeoutPolicy|Task's timeout policy|see possible values below| -|responseTimeoutSeconds|if greater than 0, the task is rescheduled if not updated with a status after this time. Useful when the worker polls for the task but fails to complete due to errors/network failure. -|| -|outputKeys|Set of keys of task's output. Used for documenting task's output|| - -**Retry Logic** - -* FIXED : Reschedule the task after the ```retryDelaySeconds``` -* EXPONENTIAL_BACKOFF : reschedule after ```retryDelaySeconds * attempNo``` - -**Timeout Policy** - -* RETRY : Retries the task again -* TIME_OUT_WF : Workflow is marked as TIMED_OUT and terminated -* ALERT_ONLY : Registers a counter (task_timeout) - -# Workflow Definition -Workflows are defined using a JSON based DSL. - -**Example** -```json -{ - "name": "encode_and_deploy", - "description": "Encodes a file and deploys to CDN", - "version": 1, - "tasks": [ - { - "name": "encode", - "taskReferenceName": "encode", - "type": "SIMPLE", - "inputParameters": { - "fileLocation": "${workflow.input.fileLocation}" - } - }, - { - "name": "deploy", - "taskReferenceName": "d1", - "type": "SIMPLE", - "inputParameters": { - "fileLocation": "${encode.output.encodeLocation}" - } - - } - ], - "outputParameters": { - "cdn_url": "${d1.output.location}" - }, - "schemaVersion": 2 -} -``` - -|field|description|Notes| -|:-----|:---|:---| -|name|Name of the workflow|| -|description|Descriptive name of the workflow|| -|version|Numeric field used to identify the version of the schema. Use incrementing numbers|When starting a workflow execution, if not specified, the definition with highest version is used| -|tasks|An array of task definitions as described below.|| -|outputParameters|JSON template used to generate the output of the workflow|If not specified, the output is defined as the output of the _last_ executed task| -|inputParameters|List of input parameters. Used for documenting the required inputs to workflow|optional| - -## Tasks within Workflow -```tasks``` property in a workflow defines an array of tasks to be executed in that order. -Below are the mandatory minimum parameters required for each task: - -|field|description|Notes| -|:-----|:---|:---| -|name|Name of the task. MUST be registered as a task type with Conductor before starting workflow|| -|taskReferenceName|Alias used to refer the task within the workflow. MUST be unique.|| -|type|Type of task. SIMPLE for tasks executed by remote workers, or one of the system task types|| -|optional|true or false. When set to true - workflow continues even if the task fails. The status of the task is reflected as `COMPLETED_WITH_ERRORS`|Defaults to `false`| -|inputParameters|JSON template that defines the input given to the task|See "wiring inputs and outputs" for details| - -In addition to these parameters, additional parameters specific to the task type are required as documented [here](/metadata/systask/) - -# Wiring Inputs and Outputs - -Workflows are supplied inputs by client when a new execution is triggered. -Workflow input is a JSON payload that is available via ```${workflow.input...}``` expressions. - -Each task in the workflow is given input based on the ```inputParameters``` template configured in workflow definition. ```inputParameters``` is a JSON fragment with value containing parameters for mapping values from input or output of a workflow or another task during the execution. - -Syntax for mapping the values follows the pattern as: - -__${SOURCE.input/output.JSONPath}__ - -|-|-| -|------|---| -|SOURCE|can be either "workflow" or reference name of any of the task| -|input/output|refers to either the input or output of the source| -|JSONPath|JSON path expression to extract JSON fragment from source's input/output| - - -!!! note "JSON Path Support" - Conductor supports [JSONPath](http://goessner.net/articles/JsonPath/) specification and uses Java implementation from [here](https://github.com/jayway/JsonPath). - -**Example** - -Consider a task with input configured to use input/output parameters from workflow and a task named __loc_task__. - -```json -{ - "inputParameters": { - "movieId": "${workflow.input.movieId}", - "url": "${workflow.input.fileLocation}", - "lang": "${loc_task.output.languages[0]}", - "http_request": { - "method": "POST", - "url": "http://example.com/${loc_task.output.fileId}/encode", - "body": { - "recipe": "${workflow.input.recipe}", - "params": { - "width": 100, - "height": 100 - } - }, - "headers": { - "Accept": "application/json", - "Content-Type": "application/json" - } - } - } -} -``` - -Consider the following as the _workflow input_ - -```json -{ - "movieId": "movie_123", - "fileLocation":"s3://moviebucket/file123", - "recipe":"png" -} -``` -And the output of the _loc_task_ as the following; - -```json -{ - "fileId": "file_xxx_yyy_zzz", - "languages": ["en","ja","es"] -} -``` - -When scheduling the task, Conductor will merge the values from workflow input and loc_task's output and create the input to the task as follows: - -```json -{ - "movieId": "movie_123", - "url": "s3://moviebucket/file123", - "lang": "en", - "http_request": { - "method": "POST", - "url": "http://example.com/file_xxx_yyy_zzz/encode", - "body": { - "recipe": "png", - "params": { - "width": 100, - "height": 100 - } - }, - "headers": { - "Accept": "application/json", - "Content-Type": "application/json" - } - } -} -``` - - - +# Task Definition +Conductor maintains a registry of worker task types. A task type MUST be registered before using in a workflow. + +**Example** +``` json +{ + "name": "encode_task", + "retryCount": 3, + "timeoutSeconds": 1200, + "inputKeys": [ + "sourceRequestId", + "qcElementType" + ], + "outputKeys": [ + "state", + "skipped", + "result" + ], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 600, + "responseTimeoutSeconds": 3600 +} +``` + +|field|description|Notes| +|---|---|---| +|name|Task Type|Unique| +|retryCount|No. of retries to attempt when a task is marked as failure|| +|retryLogic|Mechanism for the retries|see possible values below| +|timeoutSeconds|Time in milliseconds, after which the task is marked as TIMED_OUT if not completed after transiting to ```IN_PROGRESS``` status|No timeouts if set to 0| +|timeoutPolicy|Task's timeout policy|see possible values below| +|responseTimeoutSeconds|if greater than 0, the task is rescheduled if not updated with a status after this time. Useful when the worker polls for the task but fails to complete due to errors/network failure. +|| +|outputKeys|Set of keys of task's output. Used for documenting task's output|| + +**Retry Logic** + +* FIXED : Reschedule the task after the ```retryDelaySeconds``` +* EXPONENTIAL_BACKOFF : reschedule after ```retryDelaySeconds * attempNo``` + +**Timeout Policy** + +* RETRY : Retries the task again +* TIME_OUT_WF : Workflow is marked as TIMED_OUT and terminated +* ALERT_ONLY : Registers a counter (task_timeout) + +# Workflow Definition +Workflows are defined using a JSON based DSL. + +**Example** +```json +{ + "name": "encode_and_deploy", + "description": "Encodes a file and deploys to CDN", + "version": 1, + "tasks": [ + { + "name": "encode", + "taskReferenceName": "encode", + "type": "SIMPLE", + "inputParameters": { + "fileLocation": "${workflow.input.fileLocation}" + } + }, + { + "name": "deploy", + "taskReferenceName": "d1", + "type": "SIMPLE", + "inputParameters": { + "fileLocation": "${encode.output.encodeLocation}" + } + + } + ], + "outputParameters": { + "cdn_url": "${d1.output.location}" + }, + "schemaVersion": 2 +} +``` + +|field|description|Notes| +|:-----|:---|:---| +|name|Name of the workflow|| +|description|Descriptive name of the workflow|| +|version|Numeric field used to identify the version of the schema. Use incrementing numbers|When starting a workflow execution, if not specified, the definition with highest version is used| +|tasks|An array of task definitions as described below.|| +|outputParameters|JSON template used to generate the output of the workflow|If not specified, the output is defined as the output of the _last_ executed task| +|inputParameters|List of input parameters. Used for documenting the required inputs to workflow|optional| + +## Tasks within Workflow +```tasks``` property in a workflow defines an array of tasks to be executed in that order. +Below are the mandatory minimum parameters required for each task: + +|field|description|Notes| +|:-----|:---|:---| +|name|Name of the task. MUST be registered as a task type with Conductor before starting workflow|| +|taskReferenceName|Alias used to refer the task within the workflow. MUST be unique.|| +|type|Type of task. SIMPLE for tasks executed by remote workers, or one of the system task types|| +|description|Optional description of the task.|| +|optional|true or false. When set to true - workflow continues even if the task fails. The status of the task is reflected as `COMPLETED_WITH_ERRORS`|Defaults to `false`| +|inputParameters|JSON template that defines the input given to the task|See "wiring inputs and outputs" for details| + +In addition to these parameters, additional parameters specific to the task type are required as documented [here](/metadata/systask/) + +# Wiring Inputs and Outputs + +Workflows are supplied inputs by client when a new execution is triggered. +Workflow input is a JSON payload that is available via ```${workflow.input...}``` expressions. + +Each task in the workflow is given input based on the ```inputParameters``` template configured in workflow definition. ```inputParameters``` is a JSON fragment with value containing parameters for mapping values from input or output of a workflow or another task during the execution. + +Syntax for mapping the values follows the pattern as: + +__${SOURCE.input/output.JSONPath}__ + +|-|-| +|------|---| +|SOURCE|can be either "workflow" or reference name of any of the task| +|input/output|refers to either the input or output of the source| +|JSONPath|JSON path expression to extract JSON fragment from source's input/output| + + +!!! note "JSON Path Support" + Conductor supports [JSONPath](http://goessner.net/articles/JsonPath/) specification and uses Java implementation from [here](https://github.com/jayway/JsonPath). + +**Example** + +Consider a task with input configured to use input/output parameters from workflow and a task named __loc_task__. + +```json +{ + "inputParameters": { + "movieId": "${workflow.input.movieId}", + "url": "${workflow.input.fileLocation}", + "lang": "${loc_task.output.languages[0]}", + "http_request": { + "method": "POST", + "url": "http://example.com/${loc_task.output.fileId}/encode", + "body": { + "recipe": "${workflow.input.recipe}", + "params": { + "width": 100, + "height": 100 + } + }, + "headers": { + "Accept": "application/json", + "Content-Type": "application/json" + } + } + } +} +``` + +Consider the following as the _workflow input_ + +```json +{ + "movieId": "movie_123", + "fileLocation":"s3://moviebucket/file123", + "recipe":"png" +} +``` +And the output of the _loc_task_ as the following; + +```json +{ + "fileId": "file_xxx_yyy_zzz", + "languages": ["en","ja","es"] +} +``` + +When scheduling the task, Conductor will merge the values from workflow input and loc_task's output and create the input to the task as follows: + +```json +{ + "movieId": "movie_123", + "url": "s3://moviebucket/file123", + "lang": "en", + "http_request": { + "method": "POST", + "url": "http://example.com/file_xxx_yyy_zzz/encode", + "body": { + "recipe": "png", + "params": { + "width": 100, + "height": 100 + } + }, + "headers": { + "Accept": "application/json", + "Content-Type": "application/json" + } + } +} +``` + + + From df246035efc423f228b4fe74d0b1e51a8ff6cfde Mon Sep 17 00:00:00 2001 From: Nicolas BRAQUART Date: Tue, 18 Jul 2017 17:37:30 +0200 Subject: [PATCH 04/14] Update documentation --- docs/docs/metadata/index.md | 402 ++++++++++++++++++------------------ 1 file changed, 201 insertions(+), 201 deletions(-) diff --git a/docs/docs/metadata/index.md b/docs/docs/metadata/index.md index 5d272bdd50..0211360ad0 100644 --- a/docs/docs/metadata/index.md +++ b/docs/docs/metadata/index.md @@ -1,201 +1,201 @@ -# Task Definition -Conductor maintains a registry of worker task types. A task type MUST be registered before using in a workflow. - -**Example** -``` json -{ - "name": "encode_task", - "retryCount": 3, - "timeoutSeconds": 1200, - "inputKeys": [ - "sourceRequestId", - "qcElementType" - ], - "outputKeys": [ - "state", - "skipped", - "result" - ], - "timeoutPolicy": "TIME_OUT_WF", - "retryLogic": "FIXED", - "retryDelaySeconds": 600, - "responseTimeoutSeconds": 3600 -} -``` - -|field|description|Notes| -|---|---|---| -|name|Task Type|Unique| -|retryCount|No. of retries to attempt when a task is marked as failure|| -|retryLogic|Mechanism for the retries|see possible values below| -|timeoutSeconds|Time in milliseconds, after which the task is marked as TIMED_OUT if not completed after transiting to ```IN_PROGRESS``` status|No timeouts if set to 0| -|timeoutPolicy|Task's timeout policy|see possible values below| -|responseTimeoutSeconds|if greater than 0, the task is rescheduled if not updated with a status after this time. Useful when the worker polls for the task but fails to complete due to errors/network failure. -|| -|outputKeys|Set of keys of task's output. Used for documenting task's output|| - -**Retry Logic** - -* FIXED : Reschedule the task after the ```retryDelaySeconds``` -* EXPONENTIAL_BACKOFF : reschedule after ```retryDelaySeconds * attempNo``` - -**Timeout Policy** - -* RETRY : Retries the task again -* TIME_OUT_WF : Workflow is marked as TIMED_OUT and terminated -* ALERT_ONLY : Registers a counter (task_timeout) - -# Workflow Definition -Workflows are defined using a JSON based DSL. - -**Example** -```json -{ - "name": "encode_and_deploy", - "description": "Encodes a file and deploys to CDN", - "version": 1, - "tasks": [ - { - "name": "encode", - "taskReferenceName": "encode", - "type": "SIMPLE", - "inputParameters": { - "fileLocation": "${workflow.input.fileLocation}" - } - }, - { - "name": "deploy", - "taskReferenceName": "d1", - "type": "SIMPLE", - "inputParameters": { - "fileLocation": "${encode.output.encodeLocation}" - } - - } - ], - "outputParameters": { - "cdn_url": "${d1.output.location}" - }, - "schemaVersion": 2 -} -``` - -|field|description|Notes| -|:-----|:---|:---| -|name|Name of the workflow|| -|description|Descriptive name of the workflow|| -|version|Numeric field used to identify the version of the schema. Use incrementing numbers|When starting a workflow execution, if not specified, the definition with highest version is used| -|tasks|An array of task definitions as described below.|| -|outputParameters|JSON template used to generate the output of the workflow|If not specified, the output is defined as the output of the _last_ executed task| -|inputParameters|List of input parameters. Used for documenting the required inputs to workflow|optional| - -## Tasks within Workflow -```tasks``` property in a workflow defines an array of tasks to be executed in that order. -Below are the mandatory minimum parameters required for each task: - -|field|description|Notes| -|:-----|:---|:---| -|name|Name of the task. MUST be registered as a task type with Conductor before starting workflow|| -|taskReferenceName|Alias used to refer the task within the workflow. MUST be unique.|| -|type|Type of task. SIMPLE for tasks executed by remote workers, or one of the system task types|| -|description|Optional description of the task.|| -|optional|true or false. When set to true - workflow continues even if the task fails. The status of the task is reflected as `COMPLETED_WITH_ERRORS`|Defaults to `false`| -|inputParameters|JSON template that defines the input given to the task|See "wiring inputs and outputs" for details| - -In addition to these parameters, additional parameters specific to the task type are required as documented [here](/metadata/systask/) - -# Wiring Inputs and Outputs - -Workflows are supplied inputs by client when a new execution is triggered. -Workflow input is a JSON payload that is available via ```${workflow.input...}``` expressions. - -Each task in the workflow is given input based on the ```inputParameters``` template configured in workflow definition. ```inputParameters``` is a JSON fragment with value containing parameters for mapping values from input or output of a workflow or another task during the execution. - -Syntax for mapping the values follows the pattern as: - -__${SOURCE.input/output.JSONPath}__ - -|-|-| -|------|---| -|SOURCE|can be either "workflow" or reference name of any of the task| -|input/output|refers to either the input or output of the source| -|JSONPath|JSON path expression to extract JSON fragment from source's input/output| - - -!!! note "JSON Path Support" - Conductor supports [JSONPath](http://goessner.net/articles/JsonPath/) specification and uses Java implementation from [here](https://github.com/jayway/JsonPath). - -**Example** - -Consider a task with input configured to use input/output parameters from workflow and a task named __loc_task__. - -```json -{ - "inputParameters": { - "movieId": "${workflow.input.movieId}", - "url": "${workflow.input.fileLocation}", - "lang": "${loc_task.output.languages[0]}", - "http_request": { - "method": "POST", - "url": "http://example.com/${loc_task.output.fileId}/encode", - "body": { - "recipe": "${workflow.input.recipe}", - "params": { - "width": 100, - "height": 100 - } - }, - "headers": { - "Accept": "application/json", - "Content-Type": "application/json" - } - } - } -} -``` - -Consider the following as the _workflow input_ - -```json -{ - "movieId": "movie_123", - "fileLocation":"s3://moviebucket/file123", - "recipe":"png" -} -``` -And the output of the _loc_task_ as the following; - -```json -{ - "fileId": "file_xxx_yyy_zzz", - "languages": ["en","ja","es"] -} -``` - -When scheduling the task, Conductor will merge the values from workflow input and loc_task's output and create the input to the task as follows: - -```json -{ - "movieId": "movie_123", - "url": "s3://moviebucket/file123", - "lang": "en", - "http_request": { - "method": "POST", - "url": "http://example.com/file_xxx_yyy_zzz/encode", - "body": { - "recipe": "png", - "params": { - "width": 100, - "height": 100 - } - }, - "headers": { - "Accept": "application/json", - "Content-Type": "application/json" - } - } -} -``` - - - +# Task Definition +Conductor maintains a registry of worker task types. A task type MUST be registered before using in a workflow. + +**Example** +``` json +{ + "name": "encode_task", + "retryCount": 3, + "timeoutSeconds": 1200, + "inputKeys": [ + "sourceRequestId", + "qcElementType" + ], + "outputKeys": [ + "state", + "skipped", + "result" + ], + "timeoutPolicy": "TIME_OUT_WF", + "retryLogic": "FIXED", + "retryDelaySeconds": 600, + "responseTimeoutSeconds": 3600 +} +``` + +|field|description|Notes| +|---|---|---| +|name|Task Type|Unique| +|retryCount|No. of retries to attempt when a task is marked as failure|| +|retryLogic|Mechanism for the retries|see possible values below| +|timeoutSeconds|Time in milliseconds, after which the task is marked as TIMED_OUT if not completed after transiting to ```IN_PROGRESS``` status|No timeouts if set to 0| +|timeoutPolicy|Task's timeout policy|see possible values below| +|responseTimeoutSeconds|if greater than 0, the task is rescheduled if not updated with a status after this time. Useful when the worker polls for the task but fails to complete due to errors/network failure. +|| +|outputKeys|Set of keys of task's output. Used for documenting task's output|| + +**Retry Logic** + +* FIXED : Reschedule the task after the ```retryDelaySeconds``` +* EXPONENTIAL_BACKOFF : reschedule after ```retryDelaySeconds * attempNo``` + +**Timeout Policy** + +* RETRY : Retries the task again +* TIME_OUT_WF : Workflow is marked as TIMED_OUT and terminated +* ALERT_ONLY : Registers a counter (task_timeout) + +# Workflow Definition +Workflows are defined using a JSON based DSL. + +**Example** +```json +{ + "name": "encode_and_deploy", + "description": "Encodes a file and deploys to CDN", + "version": 1, + "tasks": [ + { + "name": "encode", + "taskReferenceName": "encode", + "type": "SIMPLE", + "inputParameters": { + "fileLocation": "${workflow.input.fileLocation}" + } + }, + { + "name": "deploy", + "taskReferenceName": "d1", + "type": "SIMPLE", + "inputParameters": { + "fileLocation": "${encode.output.encodeLocation}" + } + + } + ], + "outputParameters": { + "cdn_url": "${d1.output.location}" + }, + "schemaVersion": 2 +} +``` + +|field|description|Notes| +|:-----|:---|:---| +|name|Name of the workflow|| +|description|Descriptive name of the workflow|| +|version|Numeric field used to identify the version of the schema. Use incrementing numbers|When starting a workflow execution, if not specified, the definition with highest version is used| +|tasks|An array of task definitions as described below.|| +|outputParameters|JSON template used to generate the output of the workflow|If not specified, the output is defined as the output of the _last_ executed task| +|inputParameters|List of input parameters. Used for documenting the required inputs to workflow|optional| + +## Tasks within Workflow +```tasks``` property in a workflow defines an array of tasks to be executed in that order. +Below are the mandatory minimum parameters required for each task: + +|field|description|Notes| +|:-----|:---|:---| +|name|Name of the task. MUST be registered as a task type with Conductor before starting workflow|| +|taskReferenceName|Alias used to refer the task within the workflow. MUST be unique.|| +|type|Type of task. SIMPLE for tasks executed by remote workers, or one of the system task types|| +|description|Description of the task|optional| +|optional|true or false. When set to true - workflow continues even if the task fails. The status of the task is reflected as `COMPLETED_WITH_ERRORS`|Defaults to `false`| +|inputParameters|JSON template that defines the input given to the task|See "wiring inputs and outputs" for details| + +In addition to these parameters, additional parameters specific to the task type are required as documented [here](/metadata/systask/) + +# Wiring Inputs and Outputs + +Workflows are supplied inputs by client when a new execution is triggered. +Workflow input is a JSON payload that is available via ```${workflow.input...}``` expressions. + +Each task in the workflow is given input based on the ```inputParameters``` template configured in workflow definition. ```inputParameters``` is a JSON fragment with value containing parameters for mapping values from input or output of a workflow or another task during the execution. + +Syntax for mapping the values follows the pattern as: + +__${SOURCE.input/output.JSONPath}__ + +|-|-| +|------|---| +|SOURCE|can be either "workflow" or reference name of any of the task| +|input/output|refers to either the input or output of the source| +|JSONPath|JSON path expression to extract JSON fragment from source's input/output| + + +!!! note "JSON Path Support" + Conductor supports [JSONPath](http://goessner.net/articles/JsonPath/) specification and uses Java implementation from [here](https://github.com/jayway/JsonPath). + +**Example** + +Consider a task with input configured to use input/output parameters from workflow and a task named __loc_task__. + +```json +{ + "inputParameters": { + "movieId": "${workflow.input.movieId}", + "url": "${workflow.input.fileLocation}", + "lang": "${loc_task.output.languages[0]}", + "http_request": { + "method": "POST", + "url": "http://example.com/${loc_task.output.fileId}/encode", + "body": { + "recipe": "${workflow.input.recipe}", + "params": { + "width": 100, + "height": 100 + } + }, + "headers": { + "Accept": "application/json", + "Content-Type": "application/json" + } + } + } +} +``` + +Consider the following as the _workflow input_ + +```json +{ + "movieId": "movie_123", + "fileLocation":"s3://moviebucket/file123", + "recipe":"png" +} +``` +And the output of the _loc_task_ as the following; + +```json +{ + "fileId": "file_xxx_yyy_zzz", + "languages": ["en","ja","es"] +} +``` + +When scheduling the task, Conductor will merge the values from workflow input and loc_task's output and create the input to the task as follows: + +```json +{ + "movieId": "movie_123", + "url": "s3://moviebucket/file123", + "lang": "en", + "http_request": { + "method": "POST", + "url": "http://example.com/file_xxx_yyy_zzz/encode", + "body": { + "recipe": "png", + "params": { + "width": 100, + "height": 100 + } + }, + "headers": { + "Accept": "application/json", + "Content-Type": "application/json" + } + } +} +``` + + + From fea0831cb5958aff464da3ee69844ad789880d61 Mon Sep 17 00:00:00 2001 From: Mark Sakauye Date: Wed, 19 Jul 2017 10:58:16 -0400 Subject: [PATCH 05/14] use relative import to support both python 2 and 3 --- client/python/conductor/ConductorWorker.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client/python/conductor/ConductorWorker.py b/client/python/conductor/ConductorWorker.py index 4e64ff3879..da96c9ab7f 100644 --- a/client/python/conductor/ConductorWorker.py +++ b/client/python/conductor/ConductorWorker.py @@ -16,8 +16,7 @@ import sys import time import subprocess -import conductor -from conductor.conductor import WFClientMgr +from .conductor import WFClientMgr from threading import Thread import socket @@ -25,7 +24,7 @@ class ConductorWorker: def __init__(self, server_url, thread_count, polling_interval): - wfcMgr = conductor.conductor.WFClientMgr(server_url) + wfcMgr = WFClientMgr(server_url) self.workflowClient = wfcMgr.workflowClient self.taskClient = wfcMgr.taskClient self.thread_count = thread_count From 43301ee201249ff1f1f4cf0ee988e09d045394e9 Mon Sep 17 00:00:00 2001 From: Mark Sakauye Date: Wed, 19 Jul 2017 11:15:34 -0400 Subject: [PATCH 06/14] more canonical way of imports that works in python 2 and 3 --- client/python/conductor/ConductorWorker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/client/python/conductor/ConductorWorker.py b/client/python/conductor/ConductorWorker.py index da96c9ab7f..85c06aeced 100644 --- a/client/python/conductor/ConductorWorker.py +++ b/client/python/conductor/ConductorWorker.py @@ -12,11 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from __future__ import print_function +from __future__ import print_function, absolute_import import sys import time import subprocess -from .conductor import WFClientMgr +import conductor +from conductor.conductor import WFClientMgr from threading import Thread import socket From 53759e856d6ec75877a21c70f1c448301a31c2c8 Mon Sep 17 00:00:00 2001 From: Akhilesh Manjunath Date: Thu, 20 Jul 2017 11:31:15 -0700 Subject: [PATCH 07/14] Added option to specify worker name prefix --- .../netflix/conductor/client/task/WorkflowTaskCoordinator.java | 2 +- .../netflix/conductor/client/worker/TestPropertyFactory.java | 2 +- client/src/test/resources/config.properties | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java index d9e401a4fc..8bf215f7ff 100644 --- a/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java +++ b/client/src/main/java/com/netflix/conductor/client/task/WorkflowTaskCoordinator.java @@ -243,7 +243,7 @@ public synchronized void init() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - t.setName("workflow-worker-" + count.getAndIncrement()); + t.setName(PropertyFactory.getString("", "workerNamePrefix", "workflow-worker-") + count.getAndIncrement()); return t; } }); diff --git a/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java b/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java index 47352dc5c2..e86b7131fe 100644 --- a/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java +++ b/client/src/test/java/com/netflix/conductor/client/worker/TestPropertyFactory.java @@ -62,7 +62,7 @@ public void test() { assertEquals("domainB", PropertyFactory.getString("workerB", "domain", null)); assertEquals(null, PropertyFactory.getString("workerC", "domain", null)); // Non Existent - + assertEquals("test-group-", PropertyFactory.getString("", "workerNamePrefix", "workflow-worker-")); } @Test diff --git a/client/src/test/resources/config.properties b/client/src/test/resources/config.properties index fec67a82c8..9e87f14c49 100644 --- a/client/src/test/resources/config.properties +++ b/client/src/test/resources/config.properties @@ -6,3 +6,4 @@ conductor.worker.workerB.batchSize=84 conductor.worker.workerB.domain=domainB conductor.worker.Test.paused=true conductor.worker.domainTestTask2.domain=visinghDomain +conductor.worker.workerNamePrefix=test-group- \ No newline at end of file From b3b6d370dc59cda43fe3f1bbceade9ca1dc40753 Mon Sep 17 00:00:00 2001 From: Akhilesh Manjunath Date: Thu, 20 Jul 2017 11:49:57 -0700 Subject: [PATCH 08/14] Added support for AWS Elasticache Redis - cluster mode enabled --- .../netflix/conductor/server/ConductorServer.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/com/netflix/conductor/server/ConductorServer.java b/server/src/main/java/com/netflix/conductor/server/ConductorServer.java index 20df3c6306..b465d6cde6 100644 --- a/server/src/main/java/com/netflix/conductor/server/ConductorServer.java +++ b/server/src/main/java/com/netflix/conductor/server/ConductorServer.java @@ -29,6 +29,7 @@ import javax.servlet.DispatcherType; import javax.ws.rs.core.MediaType; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletContextHandler; @@ -51,6 +52,8 @@ import com.netflix.dyno.jedis.DynoJedisClient; import com.sun.jersey.api.client.Client; +import redis.clients.jedis.HostAndPort; +import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisCommands; /** @@ -62,7 +65,7 @@ public class ConductorServer { private static Logger logger = LoggerFactory.getLogger(ConductorServer.class); private enum DB { - redis, dynomite, memory + redis, dynomite, memory, redis_cluster } private ServerModule sm; @@ -167,6 +170,15 @@ public HostToken getTokenForHost(Host host, Set activeHosts) { } logger.info("Starting conductor server using in memory data store"); break; + + case redis_cluster: + Host host = dynoHosts.get(0); + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); + poolConfig.setMinIdle(5); + poolConfig.setMaxTotal(1000); + jedis = new JedisCluster(new HostAndPort(host.getHostName(), host.getPort()), poolConfig); + logger.info("Starting conductor server using redis_cluster " + dynoClusterName); + break; } this.sm = new ServerModule(jedis, hs, cc); From 7b833d40960c70731016752c4af63f1b3b431ad6 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Tue, 1 Aug 2017 13:08:34 -0700 Subject: [PATCH 09/14] Fix: Trying to read queue name on a null task will result in NPE. --- .../java/com/netflix/conductor/service/ExecutionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 1cd8cc8f42..cf7ecba151 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -176,9 +176,9 @@ public Task getPendingTaskForWorkflow(String taskReferenceName, String workflowI public boolean ackTaskRecieved(String taskId, String consumerId) throws Exception { Task task = getTask(taskId); - String queueName = QueueUtils.getQueueName(task); if (task != null) { + String queueName = QueueUtils.getQueueName(task); if(task.getResponseTimeoutSeconds() > 0) { logger.debug("Adding task " + queueName + "/" + taskId + " to be requeued if no response received " + task.getResponseTimeoutSeconds()); return queue.setUnackTimeout(queueName, task.getTaskId(), 1000 * task.getResponseTimeoutSeconds()); //Value is in millisecond From a00ee8fff4f9f714c916d018c51fb9029c944572 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Thu, 3 Aug 2017 09:15:36 -0700 Subject: [PATCH 10/14] Add error handling --- .../com/netflix/conductor/core/execution/DeciderService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index 22b54206de..86bfa0a640 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -549,6 +549,9 @@ private Task getDynamicTasks(WorkflowDef def, Workflow workflow, WorkflowTask ta Map input = getTaskInput(taskToSchedule.getInputParameters(), workflow, null, null); Object paramValue = input.get(paramName); DynamicForkJoinTaskList dynForkTasks0 = om.convertValue(paramValue, DynamicForkJoinTaskList.class); + if(dynForkTasks0 == null) { + throw new TerminateWorkflow("Dynamic tasks could not be created. The value of " + paramName + " from task's input " + input + " has no dynamic tasks to be scheduled"); + } for( DynamicForkJoinTask dt : dynForkTasks0.getDynamicTasks()) { WorkflowTask wft = new WorkflowTask(); wft.setTaskReferenceName(dt.getReferenceName()); From e753bf3e22e5c95c3cb3e7d9a905ee92baa6efb4 Mon Sep 17 00:00:00 2001 From: toddschilling Date: Thu, 3 Aug 2017 09:45:03 -0700 Subject: [PATCH 11/14] migrated server and ui to docker alpine --- docker/server/Dockerfile | 38 ++++++-------------------- docker/server/Dockerfile.build | 13 +++++++++ docker/server/README.md | 7 ++++- docker/server/bin/startup.sh | 4 +-- docker/ui/Dockerfile | 50 ++++++++++++++++++---------------- docker/ui/README.md | 5 +++- docker/ui/bin/startup.sh | 4 +-- 7 files changed, 63 insertions(+), 58 deletions(-) create mode 100644 docker/server/Dockerfile.build diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index faff583ed6..ab6e8313c3 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -1,42 +1,22 @@ # # conductor:server - Netflix conductor server # -FROM java:8-jdk +FROM java:8-jre-alpine MAINTAINER Netflix OSS # Make app folders RUN mkdir -p /app/config /app/logs /app/libs -# Startup script(s) -COPY ./bin /app - -# Configs -COPY ./config /app/config - -# Get all the dependencies -RUN apt-get update -y \ - && apt-get -y install git \ - - # Chmod scripts - && chmod +x /app/startup.sh - -# Get and install conductor -RUN git clone https://github.com/Netflix/conductor.git \ - && cd conductor \ - && ./gradlew build -x test \ - - # Get Server Jar - && mv ./server/build/libs/conductor-server-*-all.jar /app/libs/ \ - - # Go back to root - && cd / \ - - # Clean up - && rm -rf conductor +# Copy the project directly onto the image +COPY ./docker/server/bin /app +COPY ./docker/server/config /app/config +COPY ./server/build/libs/conductor-server-*-all.jar /app/libs +# Copy the files for the server into the app folders +RUN chmod +x /app/startup.sh EXPOSE 8080 -CMD ["/app/startup.sh"] -ENTRYPOINT ["/bin/bash"] +CMD [ "/app/startup.sh" ] +ENTRYPOINT [ "/bin/sh"] diff --git a/docker/server/Dockerfile.build b/docker/server/Dockerfile.build new file mode 100644 index 0000000000..ea82dc149c --- /dev/null +++ b/docker/server/Dockerfile.build @@ -0,0 +1,13 @@ +# +# conductor:server - Netflix conductor server +# +FROM java:8-jdk + +MAINTAINER Netflix OSS + +# Copy the project directly onto the image +COPY . /conductor +WORKDIR /conductor + +# Build the server on run +ENTRYPOINT ./gradlew build -x test diff --git a/docker/server/README.md b/docker/server/README.md index 92efcfa593..872b61b8e0 100644 --- a/docker/server/README.md +++ b/docker/server/README.md @@ -3,7 +3,12 @@ This Dockerfile create the conductor:server image ## Building the image -`docker build -t conductor:server .` + +Run the following commands from the project root. + +`docker build -f docker/server/Dockerfile.build -t conductor:server-build .` +`docker run -v $(pwd):/conductor conductor:server-build` +`docker build -f docker/server/Dockerfile -t conductor:server .` ## Running the conductor server - Standalone server (interal DB): `docker run -p 8080:8080 -d -t conductor:server` diff --git a/docker/server/bin/startup.sh b/docker/server/bin/startup.sh index 2749b1606c..1b382d16dc 100755 --- a/docker/server/bin/startup.sh +++ b/docker/server/bin/startup.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh # startup.sh - startup script for the server docker image echo "Starting Conductor server" @@ -18,4 +18,4 @@ if [ -z "$CONFIG_PROP" ]; export config_file=/app/config/$CONFIG_PROP fi -nohup java -jar conductor-server-*-all.jar $config_file 1>&2 > /app/logs/server.log +java -jar conductor-server-*-all.jar $config_file diff --git a/docker/ui/Dockerfile b/docker/ui/Dockerfile index 7efca851ac..170f92328a 100644 --- a/docker/ui/Dockerfile +++ b/docker/ui/Dockerfile @@ -1,38 +1,42 @@ # # conductor:ui - Netflix conductor UI # -FROM node - +FROM node:alpine MAINTAINER Netflix OSS -# Make app folders -RUN mkdir -p /app/config /app/logs /app/libs +# Install the required packages for the node build +# to run on alpine +RUN apk update && apk add \ + autoconf \ + automake \ + libtool \ + build-base \ + libstdc++ \ + gcc \ + abuild \ + binutils \ + nasm \ + libpng \ + libpng-dev \ + libjpeg-turbo \ + libjpeg-turbo-dev -# Startup script(s) -COPY ./bin /app +# Make app folders +RUN mkdir -p /app/ui -# Get all the dependencies -RUN apt-get update -y \ - && apt-get -y install git \ +# Copy the ui files onto the image +COPY ./docker/ui/bin /app +COPY ./ui /app/ui - # Chmod scripts - && chmod +x /app/startup.sh +# Copy the files for the server into the app folders +RUN chmod +x /app/startup.sh # Get and install conductor UI -RUN git clone https://github.com/netflix/conductor.git \ - - # Get UI project - && mv /conductor/ui /app \ - - # Remove the conductor project - && rm -rf conductor \ - - # Install UI packages - && cd /app/ui \ +RUN cd /app/ui \ && npm install \ && npm run build --server EXPOSE 5000 -CMD ["/app/startup.sh"] -ENTRYPOINT ["/bin/bash"] +CMD [ "/app/startup.sh" ] +ENTRYPOINT ["/bin/sh"] diff --git a/docker/ui/README.md b/docker/ui/README.md index 31c6306b34..52956e387c 100644 --- a/docker/ui/README.md +++ b/docker/ui/README.md @@ -3,7 +3,10 @@ This Dockerfile create the conductor:ui image ## Building the image -`docker build -t conductor:ui .` + +Run the following commands from the project root. + +`docker build -f docker/ui/Dockerfile -t conductor:ui .` ## Running the conductor server - With localhost conductor server: `docker run -p 5000:5000 -d -t conductor:ui` diff --git a/docker/ui/bin/startup.sh b/docker/ui/bin/startup.sh index 6290e991a8..49b6beb243 100755 --- a/docker/ui/bin/startup.sh +++ b/docker/ui/bin/startup.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/sh # startup.sh - startup script for the UI docker image echo "Starting Conductor UI" @@ -12,4 +12,4 @@ if [ -z "$WF_SERVER" ]; echo "using Conductor API server from '$WF_SERVER'" fi -nohup node server.js 1>&2 > /app/logs/ui.log \ No newline at end of file +node server.js \ No newline at end of file From 76b19646a77aaee5c88a7125c7819f9709646a21 Mon Sep 17 00:00:00 2001 From: Vikram Singh Date: Fri, 4 Aug 2017 14:20:16 -0700 Subject: [PATCH 12/14] Fix the rerun bug when sub workflows are involved --- .../core/execution/WorkflowExecutor.java | 171 ++++++++------- .../integration/WorkflowServiceTest.java | 199 +++++++++++++++++- 2 files changed, 296 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 536e4c814d..dc9ce35f6c 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -155,78 +155,14 @@ public String startWorkflow(String name, int version, Map input, } public String rerun(RerunWorkflowRequest request) throws Exception { - - Workflow reRunFromWorkflow = edao.getWorkflow(request.getReRunFromWorkflowId()); - - String workflowId = IDGenerator.generate(); - - // Persist the workflow and task First - Workflow wf = new Workflow(); - wf.setWorkflowId(workflowId); - wf.setCorrelationId((request.getCorrelationId() == null) ? reRunFromWorkflow.getCorrelationId() : request.getCorrelationId()); - wf.setWorkflowType(reRunFromWorkflow.getWorkflowType()); - wf.setVersion(reRunFromWorkflow.getVersion()); - wf.setInput((request.getWorkflowInput() == null) ? reRunFromWorkflow.getInput() : request.getWorkflowInput()); - wf.setReRunFromWorkflowId(request.getReRunFromWorkflowId()); - wf.setStatus(WorkflowStatus.RUNNING); - wf.setOwnerApp(WorkflowContext.get().getClientApp()); - wf.setCreateTime(System.currentTimeMillis()); - wf.setUpdatedBy(null); - wf.setUpdateTime(null); - - // If the "reRunFromTaskId" is not given in the RerunWorkflowRequest, - // then the whole - // workflow has to rerun - if (request.getReRunFromTaskId() != null) { - // We need to go thru the workflowDef and create tasks for - // all tasks before request.getReRunFromTaskId() and marked them - // skipped - List newTasks = new LinkedList<>(); - Map refNameToTask = new HashMap(); - reRunFromWorkflow.getTasks().forEach(task -> refNameToTask.put(task.getReferenceTaskName(), task)); - WorkflowDef wd = metadata.get(reRunFromWorkflow.getWorkflowType(), reRunFromWorkflow.getVersion()); - Iterator it = wd.getTasks().iterator(); - int seq = wf.getTasks().size(); - while (it.hasNext()) { - WorkflowTask wt = it.next(); - Task previousTask = refNameToTask.get(wt.getTaskReferenceName()); - if (previousTask.getTaskId().equals(request.getReRunFromTaskId())) { - Task theTask = new Task(); - theTask.setTaskId(IDGenerator.generate()); - theTask.setReferenceTaskName(previousTask.getReferenceTaskName()); - theTask.setInputData((request.getTaskInput() == null) ? previousTask.getInputData() : request.getTaskInput()); - theTask.setWorkflowInstanceId(workflowId); - theTask.setStatus(Status.READY_FOR_RERUN); - theTask.setTaskType(previousTask.getTaskType()); - theTask.setCorrelationId(wf.getCorrelationId()); - theTask.setSeq(seq++); - theTask.setRetryCount(previousTask.getRetryCount() + 1); - newTasks.add(theTask); - break; - } else { // Create with Skipped status - Task theTask = new Task(); - theTask.setTaskId(IDGenerator.generate()); - theTask.setReferenceTaskName(previousTask.getReferenceTaskName()); - theTask.setWorkflowInstanceId(workflowId); - theTask.setStatus(Status.SKIPPED); - theTask.setTaskType(previousTask.getTaskType()); - theTask.setCorrelationId(wf.getCorrelationId()); - theTask.setInputData(previousTask.getInputData()); - theTask.setOutputData(previousTask.getOutputData()); - theTask.setRetryCount(previousTask.getRetryCount() + 1); - theTask.setSeq(seq++); - newTasks.add(theTask); - } - } - - edao.createTasks(newTasks); + Preconditions.checkNotNull(request.getReRunFromWorkflowId(), "reRunFromWorkflowId is missing"); + if(!rerunWF(request.getReRunFromWorkflowId(), request.getReRunFromTaskId(), request.getTaskInput(), + request.getWorkflowInput(), request.getCorrelationId())){ + throw new ApplicationException(Code.INVALID_INPUT, "Task " + request.getReRunFromTaskId() + " not found"); } - - edao.createWorkflow(wf); - decide(workflowId); - return workflowId; + return request.getReRunFromWorkflowId(); } - + public void rewind(String workflowId) throws Exception { Workflow workflow = edao.getWorkflow(workflowId, true); if (!workflow.getStatus().isTerminal()) { @@ -762,10 +698,19 @@ boolean scheduleTask(Workflow workflow, List tasks) throws Exception { if (tasks == null || tasks.isEmpty()) { return false; } - int count = workflow.getTasks().size(); + int count = 0; + + // Get the highest seq number + for(Task t: workflow.getTasks()){ + if(t.getSeq() > count){ + count = t.getSeq(); + } + } for (Task task : tasks) { - task.setSeq(++count); + if(task.getSeq() == 0){ // Set only if the seq was not set + task.setSeq(++count); + } } List created = edao.createTasks(tasks); @@ -817,5 +762,87 @@ private void terminate(final WorkflowDef def, final Workflow workflow, Terminate terminateWorkflow(workflow, tw.getMessage(), failureWorkflow); } + private boolean rerunWF(String workflowId, String taskId, Map taskInput, + Map workflowInput, String correlationId) throws Exception{ + + // Get the workflow + Workflow workflow = edao.getWorkflow(workflowId); + + // If the task Id is null it implies that the entire workflow has to be rerun + if(taskId == null){ + // remove all tasks + workflow.getTasks().forEach(t -> edao.removeTask(t.getTaskId())); + // Set workflow as RUNNING + workflow.setStatus(WorkflowStatus.RUNNING); + if(correlationId != null){ + workflow.setCorrelationId(correlationId); + } + if(workflowInput != null){ + workflow.setInput(workflowInput); + } + + edao.updateWorkflow(workflow); + + decide(workflowId); + return true; + } + + // Now iterate thru the tasks and find the "specific" task + Task theTask = null; + for(Task t: workflow.getTasks()){ + if(t.getTaskId().equals(taskId)){ + theTask = t; + break; + } else { + // If not found look into sub workflows + if(t.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + String subWorkflowId = t.getInputData().get("subWorkflowId").toString(); + if(rerunWF(subWorkflowId, taskId, taskInput, null, null)){ + theTask = t; + break; + } + } + } + } + + + if(theTask != null){ + // Remove all later tasks from the "theTask" + for(Task t: workflow.getTasks()){ + if(t.getSeq() > theTask.getSeq()){ + edao.removeTask(t.getTaskId()); + } + } + if(theTask.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + // if task is sub workflow set task as IN_PROGRESS + theTask.setStatus(Status.IN_PROGRESS); + edao.updateTask(theTask); + } else { + // Set the task to rerun + theTask.setStatus(Status.SCHEDULED); + if(taskInput != null){ + theTask.setInputData(taskInput); + } + theTask.setRetried(false); + edao.updateTask(theTask); + addTaskToQueue(theTask); + } + // and workflow as RUNNING + workflow.setStatus(WorkflowStatus.RUNNING); + if(correlationId != null){ + workflow.setCorrelationId(correlationId); + } + if(workflowInput != null){ + workflow.setInput(workflowInput); + } + + edao.updateWorkflow(workflow); + + decide(workflowId); + return true; + } + + return false; + } } diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java index 9b65595b12..5492f71b87 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java @@ -337,7 +337,6 @@ public void testForkJoin() throws Exception { Map input = new HashMap(); String wfid = provider.startWorkflow(FORK_JOIN_WF, 1, "fanouttest", input ); System.out.println("testForkJoin.wfid=" + wfid); - Task t1 = ess.poll("junit_task_1", "test"); assertTrue(ess.ackTaskRecieved(t1.getTaskId(), "test")); @@ -517,6 +516,7 @@ public void testForkJoinNested() throws Exception { wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); assertEquals(WorkflowStatus.COMPLETED, wf.getStatus()); + } @Test @@ -1123,6 +1123,104 @@ public void testSimpleWorkflow() throws Exception { } + @Test + public void testWorkflowRerunWithSubWorkflows() throws Exception { + // Execute a workflow + String wfid = this.runWorkflowWithSubworkflow(); + // Check it completed + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals(WorkflowStatus.COMPLETED, wf.getStatus()); + assertEquals(2, wf.getTasks().size()); + + // Now lets pickup the first task in the sub workflow and rerun it from there + String subWorkflowId = null; + for(Task t: wf.getTasks()){ + if(t.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + subWorkflowId = t.getOutputData().get("subWorkflowId").toString(); + } + } + assertNotNull(subWorkflowId); + Workflow subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + Task swT1 = null; + for(Task t: subWorkflow.getTasks()){ + if(t.getTaskDefName().equalsIgnoreCase("junit_task_1")){ + swT1 = t; + } + } + assertNotNull(swT1); + + RerunWorkflowRequest request = new RerunWorkflowRequest(); + request.setReRunFromTaskId(swT1.getTaskId()); + + Map newInput = new HashMap(); + newInput.put("p1", "1"); + newInput.put("p2", "2"); + request.setTaskInput(newInput); + + String correlationId = "unit_test_sw_new"; + Map input = new HashMap(); + input.put("param1", "New p1 value"); + input.put("param2", "New p2 value"); + request.setCorrelationId(correlationId); + request.setWorkflowInput(input); + + request.setReRunFromWorkflowId(wfid); + request.setReRunFromTaskId(swT1.getTaskId()); + // Rerun + provider.rerun(request); + // The main WF and the sub WF should be in RUNNING state + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals(WorkflowStatus.RUNNING, wf.getStatus()); + assertEquals(2, wf.getTasks().size()); + assertEquals(correlationId, wf.getCorrelationId()); + assertEquals("New p1 value", wf.getInput().get("param1")); + assertEquals("New p2 value", wf.getInput().get("param2")); + + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + // Since we are re running from the sub workflow task, there + // should be only 1 task that is SCHEDULED + assertEquals(1, subWorkflow.getTasks().size()); + assertEquals(Status.SCHEDULED, subWorkflow.getTasks().get(0).getStatus()); + + // Now execute the task + Task task = ess.poll("junit_task_1", "task1.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); + assertEquals(task.getInputData().get("p1").toString(), "1"); + assertEquals(task.getInputData().get("p2").toString(), "2"); + task.getOutputData().put("op", "junit_task_1.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + // Poll for second task of the sub workflow and execute it + task = ess.poll("junit_task_2", "task2.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker")); + task.getOutputData().put("op", "junit_task_2.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + // Now the sub workflow and the main workflow must have finished + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.COMPLETED, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals(WorkflowStatus.COMPLETED, wf.getStatus()); + assertEquals(2, wf.getTasks().size()); + } + @Test public void testSimpleWorkflowWithTaskSpecificDomain() throws Exception { @@ -2474,7 +2572,7 @@ public void testReruns() throws Exception { // Check the tasks, at this time there should be 2 tasks // first one is skipped and the second one is scheduled assertEquals(esRR.getTasks().toString(), 2, esRR.getTasks().size()); - assertEquals(Status.SKIPPED, esRR.getTasks().get(0).getStatus()); + assertEquals(Status.COMPLETED, esRR.getTasks().get(0).getStatus()); Task tRR = esRR.getTasks().get(1); assertEquals(esRR.getTasks().toString(), Status.SCHEDULED, tRR.getStatus()); assertEquals(tRR.getTaskType(), "junit_task_2"); @@ -3140,4 +3238,101 @@ private void createWorkflowDefForDomain(){ ms.updateWorkflowDef(defSW); } catch (Exception e) {} } + + private String runWorkflowWithSubworkflow() throws Exception{ + clearWorkflows(); + createWorkflowDefForDomain(); + + WorkflowDef found = ms.getWorkflowDef(LINEAR_WORKFLOW_T1_T2_SW, 1); + assertNotNull(found); + + String correlationId = "unit_test_sw"; + Map input = new HashMap(); + String inputParam1 = "p1 value"; + input.put("param1", inputParam1); + input.put("param2", "p2 value"); + + String wfid = provider.startWorkflow(LINEAR_WORKFLOW_T1_T2_SW, 1, correlationId , input, null); + System.out.println("testSimpleWorkflow.wfid=" + wfid); + assertNotNull(wfid); + Workflow wf = provider.getWorkflow(wfid, false); + assertNotNull(wf); + + Workflow es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(es.getReasonForIncompletion(), WorkflowStatus.RUNNING, es.getStatus()); + + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + assertEquals(1, es.getTasks().size()); //The very first task is the one that should be scheduled. + + // Poll for first task and execute it + Task task = ess.poll("junit_task_3", "task3.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task3.junit.worker")); + task.getOutputData().put("op", "junit_task_3.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + assertEquals(2, es.getTasks().size()); + + // Get the sub workflow id + String subWorkflowId = null; + for(Task t: es.getTasks()){ + if(t.getTaskType().equalsIgnoreCase("SUB_WORKFLOW")){ + subWorkflowId = t.getOutputData().get("subWorkflowId").toString(); + } + } + assertNotNull(subWorkflowId); + + Workflow subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + assertEquals(1, subWorkflow.getTasks().size()); + + // Now the Sub workflow is triggers + // Poll for first task of the sub workflow and execute it + task = ess.poll("junit_task_1", "task1.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); + task.getOutputData().put("op", "junit_task_1.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.RUNNING, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + assertEquals(2, es.getTasks().size()); + + // Poll for second task of the sub workflow and execute it + task = ess.poll("junit_task_2", "task2.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker")); + task.getOutputData().put("op", "junit_task_2.done"); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + // Now the sub workflow and the main workflow must have finished + subWorkflow = ess.getExecutionStatus(subWorkflowId, true); + assertNotNull(subWorkflow); + assertEquals(WorkflowStatus.COMPLETED, subWorkflow.getStatus()); + assertEquals(2, subWorkflow.getTasks().size()); + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.COMPLETED, es.getStatus()); + assertEquals(2, es.getTasks().size()); + + return wfid; + } } From a77cbc4561523da4b7f8f05f8caaf718601adbae Mon Sep 17 00:00:00 2001 From: Vikram Singh Date: Tue, 8 Aug 2017 13:24:12 -0700 Subject: [PATCH 13/14] Fix for retry with fork join workflows --- .../core/execution/WorkflowExecutor.java | 50 +++- .../integration/WorkflowServiceTest.java | 226 +++++++++++++++++- 2 files changed, 266 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index dc9ce35f6c..9edb23cca8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -18,6 +18,7 @@ */ package com.netflix.conductor.core.execution; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -189,13 +190,23 @@ public void retry(String workflowId) throws Exception { if (workflow.getTasks().isEmpty()) { throw new ApplicationException(Code.CONFLICT, "Workflow has not started yet"); } - int lastIndex = workflow.getTasks().size() - 1; - Task last = workflow.getTasks().get(lastIndex); - if (!last.getStatus().isTerminal()) { + + // First get the failed task and the cancelled task + Task failedTask = null; + List cancelledTasks = new ArrayList(); + for(Task t: workflow.getTasks()) { + if(t.getStatus().equals(Status.FAILED)){ + failedTask = t; + } else if(t.getStatus().equals(Status.CANCELED)){ + cancelledTasks.add(t); + + } + }; + if (failedTask != null && !failedTask.getStatus().isTerminal()) { throw new ApplicationException(Code.CONFLICT, "The last task is still not completed! I can only retry the last failed task. Use restart if you want to attempt entire workflow execution again."); } - if (last.getStatus().isSuccessful()) { + if (failedTask != null && failedTask.getStatus().isSuccessful()) { throw new ApplicationException(Code.CONFLICT, "The last task has not failed! I can only retry the last failed task. Use restart if you want to attempt entire workflow execution again."); } @@ -207,13 +218,34 @@ public void retry(String workflowId) throws Exception { update.forEach(task -> task.setRetried(true)); edao.updateTasks(update); - Task retried = last.copy(); + List rescheduledTasks = new ArrayList(); + // Now reschedule the failed task + Task retried = failedTask.copy(); retried.setTaskId(IDGenerator.generate()); - retried.setRetriedTaskId(last.getTaskId()); + retried.setRetriedTaskId(failedTask.getTaskId()); retried.setStatus(Status.SCHEDULED); - retried.setRetryCount(last.getRetryCount() + 1); - scheduleTask(workflow, Arrays.asList(retried)); - + retried.setRetryCount(failedTask.getRetryCount() + 1); + rescheduledTasks.add(retried); + + // Reschedule the cancelled task but if the join is cancelled set that to in progress + cancelledTasks.forEach(t -> { + if(t.getTaskType().equalsIgnoreCase(WorkflowTask.Type.JOIN.toString())){ + t.setStatus(Status.IN_PROGRESS); + t.setRetried(false); + edao.updateTask(t); + } else { + //edao.removeTask(t.getTaskId()); + Task copy = t.copy(); + copy.setTaskId(IDGenerator.generate()); + copy.setRetriedTaskId(t.getTaskId()); + copy.setStatus(Status.SCHEDULED); + copy.setRetryCount(t.getRetryCount() + 1); + rescheduledTasks.add(copy); + } + }); + + scheduleTask(workflow, rescheduledTasks); + workflow.setStatus(WorkflowStatus.RUNNING); edao.updateWorkflow(workflow); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java index 5492f71b87..6994d4c8da 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -147,6 +148,20 @@ public void init() throws Exception { ms.registerTaskDef(Arrays.asList(task)); } + for(int i = 0; i < 5; i++){ + + String name = "junit_task_0_RT_" + i; + if(ms.getTaskDef(name) != null){ + continue; + } + + TaskDef task = new TaskDef(); + task.setName(name); + task.setTimeoutSeconds(120); + task.setRetryCount(0); + ms.registerTaskDef(Arrays.asList(task)); + } + TaskDef task = new TaskDef(); task.setName("short_time_out"); task.setTimeoutSeconds(5); @@ -182,7 +197,7 @@ public void init() throws Exception { ip2.put("tp2", "${t1.output.op}"); wft2.setInputParameters(ip2); wft2.setTaskReferenceName("t2"); - + wftasks.add(wft1); wftasks.add(wft2); def.setTasks(wftasks); @@ -337,6 +352,8 @@ public void testForkJoin() throws Exception { Map input = new HashMap(); String wfid = provider.startWorkflow(FORK_JOIN_WF, 1, "fanouttest", input ); System.out.println("testForkJoin.wfid=" + wfid); + printTaskStatuses(wfid, "initiated"); + Task t1 = ess.poll("junit_task_1", "test"); assertTrue(ess.ackTaskRecieved(t1.getTaskId(), "test")); @@ -355,6 +372,7 @@ public void testForkJoin() throws Exception { Workflow wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); assertEquals("Found " + wf.getTasks(), WorkflowStatus.RUNNING, wf.getStatus()); + printTaskStatuses(wf, "T1 completed"); t3 = ess.poll("junit_task_3", "test"); assertNotNull(t3); @@ -385,6 +403,7 @@ public void testForkJoin() throws Exception { wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); + printTaskStatuses(wf, "T2 T3 completed"); assertEquals("Found " + wf.getTasks(), WorkflowStatus.RUNNING, wf.getStatus()); if (!wf.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equals("t3"))) { provider.decide(wfid); @@ -418,6 +437,7 @@ public void testForkJoin() throws Exception { wf = ess.getExecutionStatus(wfid, true); assertNotNull(wf); assertEquals("Found " + wf.getTasks(), WorkflowStatus.COMPLETED, wf.getStatus()); + printTaskStatuses(wf, "All completed"); } @Test @@ -745,6 +765,8 @@ public void testDynamicForkJoin() throws Exception { taskDef.setRetryCount(retryCount); taskDef.setRetryDelaySeconds(1); ms.updateTaskDef(taskDef); + + } private void createForkJoinWorkflow() throws Exception { @@ -800,6 +822,59 @@ private void createForkJoinWorkflow() throws Exception { } + + private void createForkJoinWorkflowWithZeroRetry() throws Exception { + + WorkflowDef def = new WorkflowDef(); + def.setName(FORK_JOIN_WF + "_2"); + def.setDescription(def.getName()); + def.setVersion(1); + def.setInputParameters(Arrays.asList("param1", "param2")); + + WorkflowTask fanout = new WorkflowTask(); + fanout.setType(Type.FORK_JOIN.name()); + fanout.setTaskReferenceName("fanouttask"); + + WorkflowTask wft1 = new WorkflowTask(); + wft1.setName("junit_task_0_RT_1"); + Map ip1 = new HashMap<>(); + ip1.put("p1", "workflow.input.param1"); + ip1.put("p2", "workflow.input.param2"); + wft1.setInputParameters(ip1); + wft1.setTaskReferenceName("t1"); + + WorkflowTask wft3 = new WorkflowTask(); + wft3.setName("junit_task_0_RT_3"); + wft3.setInputParameters(ip1); + wft3.setTaskReferenceName("t3"); + + WorkflowTask wft2 = new WorkflowTask(); + wft2.setName("junit_task_0_RT_2"); + Map ip2 = new HashMap<>(); + ip2.put("tp1", "workflow.input.param1"); + wft2.setInputParameters(ip2); + wft2.setTaskReferenceName("t2"); + + WorkflowTask wft4 = new WorkflowTask(); + wft4.setName("junit_task_0_RT_4"); + wft4.setInputParameters(ip2); + wft4.setTaskReferenceName("t4"); + + fanout.getForkTasks().add(Arrays.asList(wft1, wft3)); + fanout.getForkTasks().add(Arrays.asList(wft2)); + + def.getTasks().add(fanout); + + WorkflowTask join = new WorkflowTask(); + join.setType(Type.JOIN.name()); + join.setTaskReferenceName("fanouttask_join"); + join.setJoinOn(Arrays.asList("t3","t2")); + + def.getTasks().add(join); + def.getTasks().add(wft4); + ms.updateWorkflowDef(def); + + } private void createForkJoinNestedWorkflow() throws Exception { WorkflowDef def = new WorkflowDef(); @@ -2268,6 +2343,65 @@ public void testFailures() throws Exception { } + @Test + public void testRetryWithForkJoin() throws Exception { + String wfid = this.runAFailedForkJoinWF(); + + provider.retry(wfid); + + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + printTaskStatuses(wf, "After retry called"); + + Task t2 = ess.poll("junit_task_0_RT_2", "test"); + assertTrue(ess.ackTaskRecieved(t2.getTaskId(), "test")); + + Task t3 = ess.poll("junit_task_0_RT_3", "test"); + assertNotNull(t3); + assertTrue(ess.ackTaskRecieved(t3.getTaskId(), "test")); + + t2.setStatus(Status.COMPLETED); + t3.setStatus(Status.COMPLETED); + + ExecutorService es = Executors.newFixedThreadPool(2); + Future future1 = es.submit(()->{ + try { + ess.updateTask(t2); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + final Task _t3 = t3; + Future future2 = es.submit(()->{ + try { + ess.updateTask(_t3); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + future1.get(); + future2.get(); + + provider.decide(wfid); + provider.decide(wfid); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + printTaskStatuses(wf, "T2, T3 complete"); + provider.decide(wfid); + + Task t4 = ess.poll("junit_task_0_RT_4", "test"); + assertNotNull(t4); + t4.setStatus(Status.COMPLETED); + ess.updateTask(t4); + + printTaskStatuses(wfid, "After complete"); + } + @Test public void testRetry() throws Exception { WorkflowDef errorWorkflow = ms.getWorkflowDef(FORK_JOIN_WF, 1); @@ -2293,6 +2427,7 @@ public void testRetry() throws Exception { input.put("param2", "p2 value"); String wfid = provider.startWorkflow(LINEAR_WORKFLOW_T1_T2, 1, correlationId , input); assertNotNull(wfid); + printTaskStatuses(wfid, "initial"); Task task = getTask("junit_task_1"); assertNotNull(task); @@ -2312,8 +2447,11 @@ public void testRetry() throws Exception { assertNotNull(es); assertEquals(WorkflowStatus.FAILED, es.getStatus()); + printTaskStatuses(wfid, "before retry"); + provider.retry(wfid); + printTaskStatuses(wfid, "after retry"); es = ess.getExecutionStatus(wfid, true); assertNotNull(es); assertEquals(WorkflowStatus.RUNNING, es.getStatus()); @@ -2343,6 +2481,8 @@ public void testRetry() throws Exception { taskDef.setRetryCount(retryCount); taskDef.setRetryDelaySeconds(retryDelay); ms.updateTaskDef(taskDef); + + printTaskStatuses(wfid, "final"); } @@ -3239,6 +3379,8 @@ private void createWorkflowDefForDomain(){ } catch (Exception e) {} } + + private String runWorkflowWithSubworkflow() throws Exception{ clearWorkflows(); createWorkflowDefForDomain(); @@ -3335,4 +3477,86 @@ private String runWorkflowWithSubworkflow() throws Exception{ return wfid; } + + private String runAFailedForkJoinWF() throws Exception { + try{ + this.createForkJoinWorkflowWithZeroRetry(); + }catch(Exception e){} + + Map input = new HashMap(); + String wfid = provider.startWorkflow(FORK_JOIN_WF +"_2", 1, "fanouttest", input ); + System.out.println("testForkJoin.wfid=" + wfid); + Task t1 = ess.poll("junit_task_0_RT_1", "test"); + assertTrue(ess.ackTaskRecieved(t1.getTaskId(), "test")); + + Task t2 = ess.poll("junit_task_0_RT_2", "test"); + assertTrue(ess.ackTaskRecieved(t2.getTaskId(), "test")); + assertNotNull(t1); + assertNotNull(t2); + + t1.setStatus(Status.COMPLETED); + ess.updateTask(t1); + + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + assertEquals("Found " + wf.getTasks(), WorkflowStatus.RUNNING, wf.getStatus()); + printTaskStatuses(wf, "Initial"); + + t2.setStatus(Status.FAILED); + + ExecutorService es = Executors.newFixedThreadPool(2); + Future future1 = es.submit(()->{ + try { + ess.updateTask(t2); + } catch (Exception e) { + throw new RuntimeException(e); + } + + }); + future1.get(); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + if (!wf.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equals("t3"))) { + provider.decide(wfid); + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + }else { + provider.decide(wfid); + } + assertTrue("Found " + wf.getTasks().stream().map(t -> t.getTaskType()).collect(Collectors.toList()), wf.getTasks().stream().anyMatch(t -> t.getReferenceTaskName().equals("t3"))); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + provider.decide(wfid); + provider.decide(wfid); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + + wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + provider.decide(wfid); + printTaskStatuses(wfid, "After failed"); + + return wfid; + } + + private void printTaskStatuses(String wfid, String message) throws Exception{ + Workflow wf = ess.getExecutionStatus(wfid, true); + assertNotNull(wf); + printTaskStatuses(wf, message); + } + + private boolean printWFTaskDetails = false; + private void printTaskStatuses(Workflow wf, String message) throws Exception{ + if(printWFTaskDetails){ + System.out.println(message + " >>> Workflow status " + wf.getStatus().name()); + wf.getTasks().forEach(t -> { + System.out.println("Task " + String.format("%-15s",t.getTaskType()) + "\t" + String.format("%-15s",t.getReferenceTaskName()) + "\t" + String.format("%-15s",t.getWorkflowTask().getType()) + "\t" + t.getSeq() + "\t" + t.getStatus() + "\t" + t.getTaskId()); + }); + System.out.println(); + } + } } From 5c49584586c61b1b832f0e9fe8b723ab13dd464b Mon Sep 17 00:00:00 2001 From: Nagendra Kamath Date: Tue, 8 Aug 2017 17:18:15 -0700 Subject: [PATCH 14/14] Add search with all parameters in java client --- .../com/netflix/conductor/client/http/WorkflowClient.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index f498fdec7b..9566abf3cb 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -161,5 +161,9 @@ public SearchResult search(String query) { return result; } + public SearchResult search(Integer start, Integer size, String sort, String freeText, String query) { + Object[] params = new Object[]{"start", start, "size", size, "sort", sort, "freeText", freeText, "query", query}; + return getForEntity("workflow/search", params, new GenericType>() {}); + } }