Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Extract json node body from json_jq task output (#3207)
Browse files Browse the repository at this point in the history
  • Loading branch information
jxu-nflx authored Sep 2, 2022
1 parent f6dbe62 commit 037b8a6
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class JsonJqTransform extends WorkflowSystemTask {
private static final String OUTPUT_RESULT_LIST = "resultList";
private static final String OUTPUT_ERROR = "error";
private static final TypeReference<Map<String, Object>> mapType = new TypeReference<>() {};
private final TypeReference<List<Object>> listType = new TypeReference<>() {};
private final Scope rootScope;
private final ObjectMapper objectMapper;
private final LoadingCache<String, JsonQuery> queryCache = createQueryCache();
Expand Down Expand Up @@ -125,8 +126,18 @@ private String extractFirstValidMessage(final Exception e) {
private Object extractBody(JsonNode node) {
if (node.isObject()) {
return objectMapper.convertValue(node, mapType);
} else if (node.isArray()) {
return objectMapper.convertValue(node, listType);
} else if (node.isBoolean()) {
return node.asBoolean();
} else if (node.isNumber()) {
if (node.isIntegralNumber()) {
return node.asLong();
} else {
return node.asDouble();
}
} else {
return node;
return node.asText();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
*/
package com.netflix.conductor.tasks.json;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Test;
Expand All @@ -22,6 +24,7 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static org.junit.Assert.*;
Expand All @@ -46,7 +49,7 @@ public void dataShouldBeCorrectlySelected() {
jsonJqTransform.start(workflow, task, null);

assertNull(task.getOutputData().get("error"));
assertEquals("\"VALUE\"", task.getOutputData().get("result").toString());
assertEquals("VALUE", task.getOutputData().get("result").toString());
assertEquals("[\"VALUE\"]", task.getOutputData().get("resultList").toString());
}

Expand Down Expand Up @@ -124,4 +127,47 @@ public void mapResultShouldBeCorrectlyExtracted() {
assertEquals(
"{result: \"reply: \" + .response.body.message}", result.get("responseTransform"));
}

@Test
public void stringResultShouldBeCorrectlyExtracted() {
final JsonJqTransform jsonJqTransform = new JsonJqTransform(objectMapper);
final WorkflowModel workflow = new WorkflowModel();
final TaskModel task = new TaskModel();
final Map<String, Object> taskInput = new HashMap<>();
taskInput.put("data", new ArrayList<>());
taskInput.put(
"queryExpression", "if(.data | length >0) then \"EXISTS\" else \"CREATE\" end");

task.setInputData(taskInput);

jsonJqTransform.start(workflow, task, null);

assertNull(task.getOutputData().get("error"));
assertTrue(task.getOutputData().get("result") instanceof String);
String result = (String) task.getOutputData().get("result");
assertEquals("CREATE", result);
}

@Test
public void listResultShouldBeCorrectlyExtracted() throws JsonProcessingException {
final JsonJqTransform jsonJqTransform = new JsonJqTransform(objectMapper);
final WorkflowModel workflow = new WorkflowModel();
final TaskModel task = new TaskModel();
String json =
"{ \"request\": { \"transitions\": [ { \"name\": \"redeliver\" }, { \"name\": \"redeliver_from_validation_error\" }, { \"name\": \"redelivery\" } ] } }";
Map<String, Object> inputData = objectMapper.readValue(json, Map.class);

final Map<String, Object> taskInput = new HashMap<>();
taskInput.put("inputData", inputData);
taskInput.put("queryExpression", ".inputData.request.transitions | map(.name)");

task.setInputData(taskInput);

jsonJqTransform.start(workflow, task, null);

assertNull(task.getOutputData().get("error"));
assertTrue(task.getOutputData().get("result") instanceof List);
List<Object> result = (List<Object>) task.getOutputData().get("result");
assertEquals(3, result.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ class JsonJQTransformSpec extends AbstractSpecification {
@Shared
def SEQUENTIAL_JSON_JQ_TRANSFORM_WF = 'sequential_json_jq_transform_wf'

@Shared
def JSON_JQ_TRANSFORM_RESULT_WF = 'json_jq_transform_result_wf'

def setup() {
workflowTestUtil.registerWorkflows(
'simple_json_jq_transform_integration_test.json',
'sequential_json_jq_transform_integration_test.json'
'sequential_json_jq_transform_integration_test.json',
'json_jq_transform_result_integration_test.json'
)
}

Expand Down Expand Up @@ -187,4 +191,39 @@ class JsonJQTransformSpec extends AbstractSpecification {
result2.get("name") == "Beary Beariston you are the Brown Bear"
}
}

def "Test json jq transform task with different json object results succeeds"() {
given: "workflow input"
def workflowInput = new HashMap()
workflowInput["requestedAction"] = "redeliver"

when: "workflow which has the json jq transform task has started"
def workflowInstanceId = workflowExecutor.startWorkflow(JSON_JQ_TRANSFORM_RESULT_WF, 1,
'', workflowInput, null, null, null)

then: "verify that the workflow and task are completed with expected output"
with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) {
status == Workflow.WorkflowStatus.COMPLETED
tasks.size() == 4
tasks[0].status == Task.Status.COMPLETED
tasks[0].taskType == 'JSON_JQ_TRANSFORM'
tasks[0].outputData.containsKey("result") && tasks[0].outputData.containsKey("resultList")
assert tasks[0].outputData.get("result") == "CREATE"

tasks[1].status == Task.Status.COMPLETED
tasks[1].taskType == 'DECISION'
assert tasks[1].inputData.get("case") == "CREATE"

tasks[2].status == Task.Status.COMPLETED
tasks[2].taskType == 'JSON_JQ_TRANSFORM'
tasks[2].outputData.containsKey("result") && tasks[0].outputData.containsKey("resultList")
List<String> result = (List<String>) tasks[2].outputData.get("result")
assert result.size() == 3
assert result.indexOf("redeliver") >= 0

tasks[3].status == Task.Status.COMPLETED
tasks[3].taskType == 'DECISION'
assert tasks[3].inputData.get("case") == "true"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{
"name": "json_jq_transform_result_wf",
"version": 1,
"tasks": [
{
"name": "json_jq_1",
"taskReferenceName": "json_jq_1",
"description": "json_jq_1",
"inputParameters": {
"data": [],
"queryExpression": "if(.data | length >0) then \"EXISTS\" else \"CREATE\" end"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "decide_1",
"taskReferenceName": "decide_1",
"inputParameters": {
"outcome": "${json_jq_1.output.result}"
},
"type": "DECISION",
"caseValueParam": "outcome",
"decisionCases": {
"CREATE": [
{
"name": "json_jq_2",
"taskReferenceName": "json_jq_2",
"description": "json_jq_2",
"inputParameters": {
"inputData": {
"request": {
"transitions": [
{
"name": "redeliver"
},
{
"name": "redeliver_from_validation_error"
},
{
"name": "redelivery"
}
]
}
},
"queryExpression": ".inputData.request.transitions | map(.name)"
},
"type": "JSON_JQ_TRANSFORM",
"startDelay": 0,
"optional": false,
"asyncComplete": false
},
{
"name": "decide_2",
"taskReferenceName": "decide_2",
"inputParameters": {
"requestedAction": "${workflow.input.requestedAction}",
"availableActions": "${json_jq_2.output.result}"
},
"type": "DECISION",
"caseExpression": "if ($.availableActions.indexOf($.requestedAction) >= 0) { \"true\" } else { \"false\" }",
"decisionCases": {
"false": [
{
"name": "get_population_data",
"taskReferenceName": "get_population_data",
"inputParameters": {
"http_request": {
"uri": "https://datausa.io/api/data?drilldowns=Nation&measures=Population",
"method": "GET"
}
},
"type": "HTTP",
"startDelay": 0,
"optional": false,
"asyncComplete": false
}
]
}
}
]
}
}
],
"inputParameters": [],
"outputParameters": {},
"schemaVersion": 2,
"restartable": true,
"workflowStatusListenerEnabled": false,
"timeoutPolicy": "ALERT_ONLY",
"timeoutSeconds": 0,
"ownerEmail": "[email protected]"
}

0 comments on commit 037b8a6

Please sign in to comment.