Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.

Commit c428a7d

Browse files
Merge pull request #824 from Netflix/cleanup
Refactor and fixes
2 parents 2592787 + 582c6cd commit c428a7d

File tree

20 files changed

+205
-108
lines changed

20 files changed

+205
-108
lines changed

client/src/test/java/com/netflix/conductor/client/http/MetadataClientTest.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
11
package com.netflix.conductor.client.http;
22

3-
import com.sun.jersey.api.client.Client;
3+
import org.junit.Before;
4+
import org.junit.Rule;
5+
import org.junit.Test;
6+
import org.junit.rules.ExpectedException;
7+
import org.mockito.Mockito;
48

59
import static org.mockito.Matchers.any;
610
import static org.mockito.Matchers.anyString;
7-
import static org.mockito.Mockito.doNothing;
811
import static org.mockito.Mockito.doThrow;
912
import static org.mockito.Mockito.times;
1013
import static org.mockito.Mockito.verify;
11-
import static org.mockito.Mockito.when;
12-
13-
import com.netflix.conductor.client.http.MetadataClient;
14-
import org.junit.Before;
15-
import org.junit.Rule;
16-
import org.junit.Test;
17-
import org.junit.rules.ExpectedException;
18-
import org.mockito.Mockito;
19-
import org.mockito.MockitoAnnotations;
2014

2115

2216
/**
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
/**
1+
/*
22
* Copyright 2016 Netflix, Inc.
3-
*
3+
* <p>
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,46 +15,65 @@
1515
*/
1616
package com.netflix.conductor.client.metadata.workflow;
1717

18-
import static org.junit.Assert.*;
19-
18+
import com.fasterxml.jackson.databind.ObjectMapper;
19+
import com.netflix.conductor.common.metadata.tasks.Task;
2020
import com.netflix.conductor.common.metadata.workflow.TaskType;
21+
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
22+
import com.netflix.conductor.common.utils.JsonMapperProvider;
23+
import org.junit.Before;
2124
import org.junit.Test;
2225

23-
import com.fasterxml.jackson.databind.ObjectMapper;
24-
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
26+
import java.io.InputStream;
27+
import java.util.List;
28+
29+
import static org.junit.Assert.assertEquals;
30+
import static org.junit.Assert.assertNotNull;
2531

2632
/**
27-
*
2833
* @author Viren
29-
*
3034
*/
3135
public class TestWorkflowTask {
3236

33-
@Test
34-
public void test() throws Exception {
35-
ObjectMapper om = new ObjectMapper();
36-
WorkflowTask task = new WorkflowTask();
37-
task.setType("Hello");
38-
task.setName("name");
39-
40-
String json = om.writeValueAsString(task);
41-
42-
WorkflowTask read = om.readValue(json, WorkflowTask.class);
43-
assertNotNull(read);
44-
assertEquals(task.getName(), read.getName());
45-
assertEquals(task.getType(), read.getType());
46-
47-
task = new WorkflowTask();
48-
task.setWorkflowTaskType(TaskType.SUB_WORKFLOW);
49-
task.setName("name");
50-
51-
json = om.writeValueAsString(task);
52-
53-
read = om.readValue(json, WorkflowTask.class);
54-
assertNotNull(read);
55-
assertEquals(task.getName(), read.getName());
56-
assertEquals(task.getType(), read.getType());
57-
assertEquals(TaskType.SUB_WORKFLOW.name(), read.getType());
58-
}
37+
private ObjectMapper objectMapper;
38+
39+
@Before
40+
public void setup() {
41+
objectMapper = new JsonMapperProvider().get();
42+
}
43+
44+
@Test
45+
public void test() throws Exception {
46+
WorkflowTask task = new WorkflowTask();
47+
task.setType("Hello");
48+
task.setName("name");
49+
50+
String json = objectMapper.writeValueAsString(task);
51+
52+
WorkflowTask read = objectMapper.readValue(json, WorkflowTask.class);
53+
assertNotNull(read);
54+
assertEquals(task.getName(), read.getName());
55+
assertEquals(task.getType(), read.getType());
56+
57+
task = new WorkflowTask();
58+
task.setWorkflowTaskType(TaskType.SUB_WORKFLOW);
59+
task.setName("name");
60+
61+
json = objectMapper.writeValueAsString(task);
62+
63+
read = objectMapper.readValue(json, WorkflowTask.class);
64+
assertNotNull(read);
65+
assertEquals(task.getName(), read.getName());
66+
assertEquals(task.getType(), read.getType());
67+
assertEquals(TaskType.SUB_WORKFLOW.name(), read.getType());
68+
}
5969

70+
@SuppressWarnings("unchecked")
71+
@Test
72+
public void testObectMapper() throws Exception {
73+
try (InputStream stream = TestWorkflowTask.class.getResourceAsStream("/tasks.json")) {
74+
List<Task> tasks = objectMapper.readValue(stream, List.class);
75+
assertNotNull(tasks);
76+
assertEquals(1, tasks.size());
77+
}
78+
}
6079
}

client/src/test/resources/tasks.json

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
[
2+
{
3+
"taskType": "task_1",
4+
"status": "IN_PROGRESS",
5+
"inputData": {
6+
"mod": null,
7+
"oddEven": null
8+
},
9+
"referenceTaskName": "task_1",
10+
"retryCount": 0,
11+
"seq": 1,
12+
"pollCount": 1,
13+
"taskDefName": "task_1",
14+
"scheduledTime": 1539623183131,
15+
"startTime": 1539623436841,
16+
"endTime": 0,
17+
"updateTime": 1539623436841,
18+
"startDelayInSeconds": 0,
19+
"retried": false,
20+
"executed": false,
21+
"callbackFromWorker": true,
22+
"responseTimeoutSeconds": 0,
23+
"workflowInstanceId": "2d525ed8-d0e5-44c8-a2df-a110b25c09ac",
24+
"workflowType": "kitchensink",
25+
"taskId": "bc5d9deb-cf86-443d-a1f6-59c36d2464f7",
26+
"callbackAfterSeconds": 0,
27+
"workerId": "test",
28+
"workflowTask": {
29+
"name": "task_1",
30+
"taskReferenceName": "task_1",
31+
"inputParameters": {
32+
"mod": "${workflow.input.mod}",
33+
"oddEven": "${workflow.input.oddEven}"
34+
},
35+
"type": "SIMPLE",
36+
"startDelay": 0,
37+
"optional": false,
38+
"taskDefinition": {
39+
"ownerApp": "falguni-test",
40+
"createTime": 1534274994644,
41+
"createdBy": "CPEWORKFLOW",
42+
"name": "task_1",
43+
"description": "Test Task 01",
44+
"retryCount": 0,
45+
"timeoutSeconds": 5,
46+
"inputKeys": [
47+
"mod",
48+
"oddEven"
49+
],
50+
"outputKeys": [
51+
"someOutput"
52+
],
53+
"timeoutPolicy": "TIME_OUT_WF",
54+
"retryLogic": "FIXED",
55+
"retryDelaySeconds": 0,
56+
"responseTimeoutSeconds": 0,
57+
"concurrentExecLimit": 0,
58+
"rateLimitPerFrequency": 0,
59+
"rateLimitFrequencyInSeconds": 1
60+
}
61+
},
62+
"rateLimitPerFrequency": 0,
63+
"rateLimitFrequencyInSeconds": 0,
64+
"taskDefinition": {
65+
"present": true
66+
},
67+
"queueWaitTime": 253710,
68+
"taskStatus": "IN_PROGRESS"
69+
}
70+
]

common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Copyright 2016 Netflix, Inc.
33
* <p>
44
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -135,7 +135,7 @@ public boolean isRetriable() {
135135
private boolean callbackFromWorker = true;
136136

137137
@ProtoField(id = 19)
138-
private int responseTimeoutSeconds;
138+
private long responseTimeoutSeconds;
139139

140140
@ProtoField(id = 20)
141141
private String workflowInstanceId;
@@ -459,14 +459,14 @@ public void setTaskDefName(String taskDefName) {
459459
/**
460460
* @return the timeout for task to send response. After this timeout, the task will be re-queued
461461
*/
462-
public int getResponseTimeoutSeconds() {
462+
public long getResponseTimeoutSeconds() {
463463
return responseTimeoutSeconds;
464464
}
465465

466466
/**
467467
* @param responseTimeoutSeconds - timeout for task to send response. After this timeout, the task will be re-queued
468468
*/
469-
public void setResponseTimeoutSeconds(int responseTimeoutSeconds) {
469+
public void setResponseTimeoutSeconds(long responseTimeoutSeconds) {
470470
this.responseTimeoutSeconds = responseTimeoutSeconds;
471471
}
472472

common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public static enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF}
7474
private int retryDelaySeconds = 60;
7575

7676
@ProtoField(id = 10)
77-
private int responseTimeoutSeconds = ONE_HOUR;
77+
private long responseTimeoutSeconds = ONE_HOUR;
7878

7979
@ProtoField(id = 11)
8080
private Integer concurrentExecLimit;
@@ -236,15 +236,15 @@ public int getRetryDelaySeconds() {
236236
*
237237
* @return the timeout for task to send response. After this timeout, the task will be re-queued
238238
*/
239-
public int getResponseTimeoutSeconds() {
239+
public long getResponseTimeoutSeconds() {
240240
return responseTimeoutSeconds;
241241
}
242242

243243
/**
244244
*
245245
* @param responseTimeoutSeconds - timeout for task to send response. After this timeout, the task will be re-queued
246246
*/
247-
public void setResponseTimeoutSeconds(int responseTimeoutSeconds) {
247+
public void setResponseTimeoutSeconds(long responseTimeoutSeconds) {
248248
this.responseTimeoutSeconds = responseTimeoutSeconds;
249249
}
250250

core/src/main/java/com/netflix/conductor/core/config/CoreModule.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ public TaskMapper getWaitTaskMapper(ParametersUtils parametersUtils) {
144144
@StringMapKey(TASK_TYPE_SUB_WORKFLOW)
145145
@Singleton
146146
@Named(TASK_MAPPERS_QUALIFIER)
147-
public TaskMapper getSubWorkflowTaskMapper(ParametersUtils parametersUtils) {
148-
return new SubWorkflowTaskMapper(parametersUtils);
147+
public TaskMapper getSubWorkflowTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
148+
return new SubWorkflowTaskMapper(parametersUtils, metadataDAO);
149149
}
150150

151151
@ProvidesIntoMap

core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -448,9 +448,9 @@ void checkForTimeout(TaskDef taskDef, Task task) {
448448
return;
449449
}
450450

451-
long timeout = 1000 * taskDef.getTimeoutSeconds();
451+
long timeout = 1000L * taskDef.getTimeoutSeconds();
452452
long now = System.currentTimeMillis();
453-
long elapsedTime = now - (task.getStartTime() + (task.getStartDelayInSeconds() * 1000));
453+
long elapsedTime = now - (task.getStartTime() + ((long)task.getStartDelayInSeconds() * 1000L));
454454

455455
if (elapsedTime < timeout) {
456456
return;
@@ -494,7 +494,7 @@ boolean isResponseTimedOut(TaskDef taskDefinition, Task task) {
494494

495495
logger.debug("Evaluating responseTimeOut for Task: {}, with Task Definition: {} ", task, taskDefinition);
496496

497-
long responseTimeout = 1000 * taskDefinition.getResponseTimeoutSeconds();
497+
long responseTimeout = 1000L * taskDefinition.getResponseTimeoutSeconds();
498498
long now = System.currentTimeMillis();
499499
long noResponseTime = now - task.getUpdateTime();
500500

core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java

-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) {
8484
decisionTask.setWorkflowType(workflowInstance.getWorkflowName());
8585
decisionTask.setCorrelationId(workflowInstance.getCorrelationId());
8686
decisionTask.setScheduledTime(System.currentTimeMillis());
87-
decisionTask.setEndTime(System.currentTimeMillis());
8887
decisionTask.getInputData().put("case", caseValue);
8988
decisionTask.getOutputData().put("caseOutput", Collections.singletonList(caseValue));
9089
decisionTask.setTaskId(taskId);

core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java

-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) {
6161
eventTask.setWorkflowType(workflowInstance.getWorkflowName());
6262
eventTask.setCorrelationId(workflowInstance.getCorrelationId());
6363
eventTask.setScheduledTime(System.currentTimeMillis());
64-
eventTask.setEndTime(System.currentTimeMillis());
6564
eventTask.setInputData(eventTaskInput);
6665
eventTask.getInputData().put("sink", sink);
6766
eventTask.setTaskId(taskId);

0 commit comments

Comments
 (0)