diff --git a/.gitignore b/.gitignore
index a1bedb5ac6..eb2ac67732 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,7 @@ ui/dist
ui/package-lock.json
.gradle
.project
+bin
build
client/python/conductor.egg-info
*.pyc
@@ -19,3 +20,4 @@ out/
bin/
target/
.DS_Store
+target/
diff --git a/client/src/test/java/com/netflix/conductor/client/http/MetadataClientTest.java b/client/src/test/java/com/netflix/conductor/client/http/MetadataClientTest.java
index d4e9083413..4a5948b989 100644
--- a/client/src/test/java/com/netflix/conductor/client/http/MetadataClientTest.java
+++ b/client/src/test/java/com/netflix/conductor/client/http/MetadataClientTest.java
@@ -1,22 +1,16 @@
package com.netflix.conductor.client.http;
-import com.sun.jersey.api.client.Client;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.netflix.conductor.client.http.MetadataClient;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
/**
diff --git a/client/src/test/java/com/netflix/conductor/client/metadata/workflow/TestWorkflowTask.java b/client/src/test/java/com/netflix/conductor/client/metadata/workflow/TestWorkflowTask.java
index dbd4868036..e988aeabac 100644
--- a/client/src/test/java/com/netflix/conductor/client/metadata/workflow/TestWorkflowTask.java
+++ b/client/src/test/java/com/netflix/conductor/client/metadata/workflow/TestWorkflowTask.java
@@ -1,12 +1,12 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,46 +15,65 @@
*/
package com.netflix.conductor.client.metadata.workflow;
-import static org.junit.Assert.*;
-
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.workflow.TaskType;
+import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
+import com.netflix.conductor.common.utils.JsonMapperProvider;
+import org.junit.Before;
import org.junit.Test;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
+import java.io.InputStream;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/**
- *
* @author Viren
- *
*/
public class TestWorkflowTask {
- @Test
- public void test() throws Exception {
- ObjectMapper om = new ObjectMapper();
- WorkflowTask task = new WorkflowTask();
- task.setType("Hello");
- task.setName("name");
-
- String json = om.writeValueAsString(task);
-
- WorkflowTask read = om.readValue(json, WorkflowTask.class);
- assertNotNull(read);
- assertEquals(task.getName(), read.getName());
- assertEquals(task.getType(), read.getType());
-
- task = new WorkflowTask();
- task.setWorkflowTaskType(TaskType.SUB_WORKFLOW);
- task.setName("name");
-
- json = om.writeValueAsString(task);
-
- read = om.readValue(json, WorkflowTask.class);
- assertNotNull(read);
- assertEquals(task.getName(), read.getName());
- assertEquals(task.getType(), read.getType());
- assertEquals(TaskType.SUB_WORKFLOW.name(), read.getType());
- }
+ private ObjectMapper objectMapper;
+
+ @Before
+ public void setup() {
+ objectMapper = new JsonMapperProvider().get();
+ }
+
+ @Test
+ public void test() throws Exception {
+ WorkflowTask task = new WorkflowTask();
+ task.setType("Hello");
+ task.setName("name");
+
+ String json = objectMapper.writeValueAsString(task);
+
+ WorkflowTask read = objectMapper.readValue(json, WorkflowTask.class);
+ assertNotNull(read);
+ assertEquals(task.getName(), read.getName());
+ assertEquals(task.getType(), read.getType());
+
+ task = new WorkflowTask();
+ task.setWorkflowTaskType(TaskType.SUB_WORKFLOW);
+ task.setName("name");
+
+ json = objectMapper.writeValueAsString(task);
+
+ read = objectMapper.readValue(json, WorkflowTask.class);
+ assertNotNull(read);
+ assertEquals(task.getName(), read.getName());
+ assertEquals(task.getType(), read.getType());
+ assertEquals(TaskType.SUB_WORKFLOW.name(), read.getType());
+ }
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testObectMapper() throws Exception {
+ try (InputStream stream = TestWorkflowTask.class.getResourceAsStream("/tasks.json")) {
+ List tasks = objectMapper.readValue(stream, List.class);
+ assertNotNull(tasks);
+ assertEquals(1, tasks.size());
+ }
+ }
}
diff --git a/client/src/test/resources/tasks.json b/client/src/test/resources/tasks.json
new file mode 100644
index 0000000000..424b4880ec
--- /dev/null
+++ b/client/src/test/resources/tasks.json
@@ -0,0 +1,70 @@
+[
+ {
+ "taskType": "task_1",
+ "status": "IN_PROGRESS",
+ "inputData": {
+ "mod": null,
+ "oddEven": null
+ },
+ "referenceTaskName": "task_1",
+ "retryCount": 0,
+ "seq": 1,
+ "pollCount": 1,
+ "taskDefName": "task_1",
+ "scheduledTime": 1539623183131,
+ "startTime": 1539623436841,
+ "endTime": 0,
+ "updateTime": 1539623436841,
+ "startDelayInSeconds": 0,
+ "retried": false,
+ "executed": false,
+ "callbackFromWorker": true,
+ "responseTimeoutSeconds": 0,
+ "workflowInstanceId": "2d525ed8-d0e5-44c8-a2df-a110b25c09ac",
+ "workflowType": "kitchensink",
+ "taskId": "bc5d9deb-cf86-443d-a1f6-59c36d2464f7",
+ "callbackAfterSeconds": 0,
+ "workerId": "test",
+ "workflowTask": {
+ "name": "task_1",
+ "taskReferenceName": "task_1",
+ "inputParameters": {
+ "mod": "${workflow.input.mod}",
+ "oddEven": "${workflow.input.oddEven}"
+ },
+ "type": "SIMPLE",
+ "startDelay": 0,
+ "optional": false,
+ "taskDefinition": {
+ "ownerApp": "falguni-test",
+ "createTime": 1534274994644,
+ "createdBy": "CPEWORKFLOW",
+ "name": "task_1",
+ "description": "Test Task 01",
+ "retryCount": 0,
+ "timeoutSeconds": 5,
+ "inputKeys": [
+ "mod",
+ "oddEven"
+ ],
+ "outputKeys": [
+ "someOutput"
+ ],
+ "timeoutPolicy": "TIME_OUT_WF",
+ "retryLogic": "FIXED",
+ "retryDelaySeconds": 0,
+ "responseTimeoutSeconds": 0,
+ "concurrentExecLimit": 0,
+ "rateLimitPerFrequency": 0,
+ "rateLimitFrequencyInSeconds": 1
+ }
+ },
+ "rateLimitPerFrequency": 0,
+ "rateLimitFrequencyInSeconds": 0,
+ "taskDefinition": {
+ "present": true
+ },
+ "queueWaitTime": 253710,
+ "taskStatus": "IN_PROGRESS"
+ }
+]
\ No newline at end of file
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
index 0aae4b857e..1210a732a5 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/Task.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -135,7 +135,7 @@ public boolean isRetriable() {
private boolean callbackFromWorker = true;
@ProtoField(id = 19)
- private int responseTimeoutSeconds;
+ private long responseTimeoutSeconds;
@ProtoField(id = 20)
private String workflowInstanceId;
@@ -459,14 +459,14 @@ public void setTaskDefName(String taskDefName) {
/**
* @return the timeout for task to send response. After this timeout, the task will be re-queued
*/
- public int getResponseTimeoutSeconds() {
+ public long getResponseTimeoutSeconds() {
return responseTimeoutSeconds;
}
/**
* @param responseTimeoutSeconds - timeout for task to send response. After this timeout, the task will be re-queued
*/
- public void setResponseTimeoutSeconds(int responseTimeoutSeconds) {
+ public void setResponseTimeoutSeconds(long responseTimeoutSeconds) {
this.responseTimeoutSeconds = responseTimeoutSeconds;
}
diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java
index 4a372a2898..7ede7b426b 100644
--- a/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java
+++ b/common/src/main/java/com/netflix/conductor/common/metadata/tasks/TaskDef.java
@@ -74,7 +74,7 @@ public static enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF}
private int retryDelaySeconds = 60;
@ProtoField(id = 10)
- private int responseTimeoutSeconds = ONE_HOUR;
+ private long responseTimeoutSeconds = ONE_HOUR;
@ProtoField(id = 11)
private Integer concurrentExecLimit;
@@ -236,7 +236,7 @@ public int getRetryDelaySeconds() {
*
* @return the timeout for task to send response. After this timeout, the task will be re-queued
*/
- public int getResponseTimeoutSeconds() {
+ public long getResponseTimeoutSeconds() {
return responseTimeoutSeconds;
}
@@ -244,7 +244,7 @@ public int getResponseTimeoutSeconds() {
*
* @param responseTimeoutSeconds - timeout for task to send response. After this timeout, the task will be re-queued
*/
- public void setResponseTimeoutSeconds(int responseTimeoutSeconds) {
+ public void setResponseTimeoutSeconds(long responseTimeoutSeconds) {
this.responseTimeoutSeconds = responseTimeoutSeconds;
}
diff --git a/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java b/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java
index f0fc91361c..a61d13756b 100644
--- a/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java
+++ b/common/src/main/java/com/netflix/conductor/common/run/TaskSummary.java
@@ -86,6 +86,9 @@ public class TaskSummary {
@ProtoField(id = 16)
private String taskId;
+ public TaskSummary() {
+ }
+
public TaskSummary(Task task) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
diff --git a/common/src/test/java/com/netflix/conductor/common/run/TestTaskSummary.java b/common/src/test/java/com/netflix/conductor/common/run/TestTaskSummary.java
new file mode 100644
index 0000000000..5bba3a13c2
--- /dev/null
+++ b/common/src/test/java/com/netflix/conductor/common/run/TestTaskSummary.java
@@ -0,0 +1,23 @@
+package com.netflix.conductor.common.run;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.netflix.conductor.common.metadata.tasks.Task;
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+public class TestTaskSummary {
+
+ @Test
+ public void testJsonSerializing() throws Exception {
+ ObjectMapper om = new ObjectMapper();
+
+ Task task = new Task();
+ TaskSummary taskSummary = new TaskSummary(task);
+
+ String json = om.writeValueAsString(taskSummary);
+ TaskSummary read = om.readValue(json, TaskSummary.class);
+ assertNotNull(read);
+ }
+
+}
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/ContribsModule.java b/contribs/src/main/java/com/netflix/conductor/contribs/ContribsModule.java
index e5c1e8b1dc..a9345b68e8 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/ContribsModule.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/ContribsModule.java
@@ -58,8 +58,8 @@ protected void configure() {
@StringMapKey("sqs")
@Singleton
@Named(EVENT_QUEUE_PROVIDERS_QUALIFIER)
- public EventQueueProvider getSQSEventQueueProvider(AmazonSQSClient amazonSQSClient) {
- return new SQSEventQueueProvider(amazonSQSClient);
+ public EventQueueProvider getSQSEventQueueProvider(AmazonSQSClient amazonSQSClient, Configuration config) {
+ return new SQSEventQueueProvider(amazonSQSClient, config);
}
diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
index fbedc8d0f9..9a348dc34b 100644
--- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
+++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java
@@ -72,7 +72,7 @@ public class SQSObservableQueue implements ObservableQueue {
private String queueName;
- private int visibilityTimeout;
+ private int visibilityTimeoutInSeconds;
private int batchSize;
@@ -82,10 +82,10 @@ public class SQSObservableQueue implements ObservableQueue {
private String queueURL;
- private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeout, int batchSize, int pollTimeInMS, List accountsToAuthorize) {
+ private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeoutInSeconds, int batchSize, int pollTimeInMS, List accountsToAuthorize) {
this.queueName = queueName;
this.client = client;
- this.visibilityTimeout = visibilityTimeout;
+ this.visibilityTimeoutInSeconds = visibilityTimeoutInSeconds;
this.batchSize = batchSize;
this.pollTimeInMS = pollTimeInMS;
this.queueURL = getOrCreateQueue();
@@ -141,6 +141,18 @@ public String getURI() {
return queueURL;
}
+ public int getPollTimeInMS() {
+ return pollTimeInMS;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public int getVisibilityTimeoutInSeconds() {
+ return visibilityTimeoutInSeconds;
+ }
+
public static class Builder {
private String queueName;
@@ -271,7 +283,7 @@ List receiveMessages() {
try {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(queueURL)
- .withVisibilityTimeout(visibilityTimeout)
+ .withVisibilityTimeout(visibilityTimeoutInSeconds)
.withMaxNumberOfMessages(batchSize);
ReceiveMessageResult result = client.receiveMessage(receiveMessageRequest);
diff --git a/contribs/src/main/java/com/netflix/conductor/core/events/sqs/SQSEventQueueProvider.java b/contribs/src/main/java/com/netflix/conductor/core/events/sqs/SQSEventQueueProvider.java
index afd4f9772e..77b421e1ba 100644
--- a/contribs/src/main/java/com/netflix/conductor/core/events/sqs/SQSEventQueueProvider.java
+++ b/contribs/src/main/java/com/netflix/conductor/core/events/sqs/SQSEventQueueProvider.java
@@ -21,6 +21,7 @@
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue;
import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue.Builder;
+import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.events.EventQueueProvider;
import com.netflix.conductor.core.events.queue.ObservableQueue;
@@ -38,21 +39,27 @@ public class SQSEventQueueProvider implements EventQueueProvider {
private final Map queues = new ConcurrentHashMap<>();
private final AmazonSQSClient client;
+ private final int batchSize;
+ private final int pollTimeInMS;
+ private final int visibilityTimeoutInSeconds;
@Inject
- public SQSEventQueueProvider(AmazonSQSClient client) {
+ public SQSEventQueueProvider(AmazonSQSClient client, Configuration config) {
this.client = client;
+ this.batchSize = config.getIntProperty("workflow.event.queues.sqs.batchSize", 1);
+ this.pollTimeInMS = config.getIntProperty("workflow.event.queues.sqs.pollTimeInMS", 100);
+ this.visibilityTimeoutInSeconds = config.getIntProperty("workflow.event.queues.sqs.visibilityTimeoutInSeconds", 60);
}
@Override
public ObservableQueue getQueue(String queueURI) {
return queues.computeIfAbsent(queueURI, q -> {
Builder builder = new SQSObservableQueue.Builder();
- return builder.withBatchSize(1)
+ return builder.withBatchSize(this.batchSize)
.withClient(client)
- .withPollTimeInMS(100)
+ .withPollTimeInMS(this.pollTimeInMS)
.withQueueName(queueURI)
- .withVisibilityTimeout(60)
+ .withVisibilityTimeout(this.visibilityTimeoutInSeconds)
.build();
});
}
diff --git a/contribs/src/test/java/com/netflix/conductor/core/events/sqs/TestSQSEventQueueProvider.java b/contribs/src/test/java/com/netflix/conductor/core/events/sqs/TestSQSEventQueueProvider.java
new file mode 100644
index 0000000000..3763f563cb
--- /dev/null
+++ b/contribs/src/test/java/com/netflix/conductor/core/events/sqs/TestSQSEventQueueProvider.java
@@ -0,0 +1,64 @@
+package com.netflix.conductor.core.events.sqs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.ListQueuesRequest;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.netflix.conductor.contribs.queue.sqs.SQSObservableQueue;
+import com.netflix.conductor.core.config.Configuration;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSQSEventQueueProvider {
+ private AmazonSQSClient amazonSQSClient;
+ private Configuration configuration;
+
+ @Before
+ public void setup() {
+ amazonSQSClient = mock(AmazonSQSClient.class);
+ configuration = mock(Configuration.class);
+ }
+
+ @Test
+ public void testGetQueueWithDefaultConfiguration() {
+ when(configuration.getIntProperty(anyString(), anyInt())).thenAnswer(invocation -> invocation.getArguments()[1]);
+
+ ListQueuesResult listQueuesResult = new ListQueuesResult().withQueueUrls("test_queue_1");
+ when(amazonSQSClient.listQueues(any(ListQueuesRequest.class))).thenReturn(listQueuesResult);
+
+ SQSEventQueueProvider sqsEventQueueProvider = new SQSEventQueueProvider(amazonSQSClient, configuration);
+ SQSObservableQueue sqsObservableQueue = (SQSObservableQueue) sqsEventQueueProvider.getQueue("test_queue_1");
+
+ assertNotNull(sqsObservableQueue);
+ assertEquals(1, sqsObservableQueue.getBatchSize());
+ assertEquals(100, sqsObservableQueue.getPollTimeInMS());
+ assertEquals(60, sqsObservableQueue.getVisibilityTimeoutInSeconds());
+ }
+
+ @Test
+ public void testGetQueueWithCustomConfiguration() {
+ when(configuration.getIntProperty(eq("workflow.event.queues.sqs.batchSize"), anyInt())).thenReturn(10);
+ when(configuration.getIntProperty(eq("workflow.event.queues.sqs.pollTimeInMS"), anyInt())).thenReturn(50);
+ when(configuration.getIntProperty(eq("workflow.event.queues.sqs.visibilityTimeoutInSeconds"), anyInt())).thenReturn(30);
+
+ ListQueuesResult listQueuesResult = new ListQueuesResult().withQueueUrls("test_queue_1");
+ when(amazonSQSClient.listQueues(any(ListQueuesRequest.class))).thenReturn(listQueuesResult);
+
+ SQSEventQueueProvider sqsEventQueueProvider = new SQSEventQueueProvider(amazonSQSClient, configuration);
+ SQSObservableQueue sqsObservableQueue = (SQSObservableQueue) sqsEventQueueProvider.getQueue("test_queue_1");
+
+ assertNotNull(sqsObservableQueue);
+ assertEquals(10, sqsObservableQueue.getBatchSize());
+ assertEquals(50, sqsObservableQueue.getPollTimeInMS());
+ assertEquals(30, sqsObservableQueue.getVisibilityTimeoutInSeconds());
+ }
+
+}
diff --git a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java
index 891bd77bdc..f7940e75df 100644
--- a/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java
+++ b/core/src/main/java/com/netflix/conductor/core/config/CoreModule.java
@@ -144,8 +144,8 @@ public TaskMapper getWaitTaskMapper(ParametersUtils parametersUtils) {
@StringMapKey(TASK_TYPE_SUB_WORKFLOW)
@Singleton
@Named(TASK_MAPPERS_QUALIFIER)
- public TaskMapper getSubWorkflowTaskMapper(ParametersUtils parametersUtils) {
- return new SubWorkflowTaskMapper(parametersUtils);
+ public TaskMapper getSubWorkflowTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
+ return new SubWorkflowTaskMapper(parametersUtils, metadataDAO);
}
@ProvidesIntoMap
diff --git a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java
index 528380f856..449a238047 100644
--- a/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java
+++ b/core/src/main/java/com/netflix/conductor/core/events/EventProcessor.java
@@ -90,6 +90,7 @@ public EventProcessor(ExecutionService executionService, MetadataService metadat
executorService = Executors.newFixedThreadPool(executorThreadCount);
refresh();
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::refresh, 60, 60, TimeUnit.SECONDS);
+ logger.info("Event Processing is ENABLED. executorThreadCount set to {}", executorThreadCount);
} else {
logger.warn("Event processing is DISABLED. executorThreadCount set to {}", executorThreadCount);
}
@@ -161,6 +162,8 @@ private void handle(ObservableQueue queue, Message msg) {
}
} catch (Exception e) {
logger.error("Error handling message: {} on queue:{}", msg, queue.getName(), e);
+ } finally {
+ Monitors.recordEventQueueMessagesHandled(queue.getType(), queue.getName());
}
}
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java
index d7879dce27..91b4af9ca2 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java
@@ -448,9 +448,9 @@ void checkForTimeout(TaskDef taskDef, Task task) {
return;
}
- long timeout = 1000 * taskDef.getTimeoutSeconds();
+ long timeout = 1000L * taskDef.getTimeoutSeconds();
long now = System.currentTimeMillis();
- long elapsedTime = now - (task.getStartTime() + (task.getStartDelayInSeconds() * 1000));
+ long elapsedTime = now - (task.getStartTime() + ((long)task.getStartDelayInSeconds() * 1000L));
if (elapsedTime < timeout) {
return;
@@ -494,7 +494,7 @@ boolean isResponseTimedOut(TaskDef taskDefinition, Task task) {
logger.debug("Evaluating responseTimeOut for Task: {}, with Task Definition: {} ", task, taskDefinition);
- long responseTimeout = 1000 * taskDefinition.getResponseTimeoutSeconds();
+ long responseTimeout = 1000L * taskDefinition.getResponseTimeoutSeconds();
long now = System.currentTimeMillis();
long noResponseTime = now - task.getUpdateTime();
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java
index 3ec5dd1c46..d9f793c769 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/DecisionTaskMapper.java
@@ -84,7 +84,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) {
decisionTask.setWorkflowType(workflowInstance.getWorkflowName());
decisionTask.setCorrelationId(workflowInstance.getCorrelationId());
decisionTask.setScheduledTime(System.currentTimeMillis());
- decisionTask.setEndTime(System.currentTimeMillis());
decisionTask.getInputData().put("case", caseValue);
decisionTask.getOutputData().put("caseOutput", Collections.singletonList(caseValue));
decisionTask.setTaskId(taskId);
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java
index 2a2cd3a808..9cf695a354 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/EventTaskMapper.java
@@ -61,7 +61,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) {
eventTask.setWorkflowType(workflowInstance.getWorkflowName());
eventTask.setCorrelationId(workflowInstance.getCorrelationId());
eventTask.setScheduledTime(System.currentTimeMillis());
- eventTask.setEndTime(System.currentTimeMillis());
eventTask.setInputData(eventTaskInput);
eventTask.getInputData().put("sink", sink);
eventTask.setTaskId(taskId);
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java
index ea7b7b4b44..433ad7d2d3 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/ForkJoinDynamicTaskMapper.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -83,17 +83,16 @@ public ForkJoinDynamicTaskMapper(ParametersUtils parametersUtils, ObjectMapper o
*
A check is performed that the next following task in the {@link WorkflowDef} is a {@link TaskType#JOIN}
*
*
- *
* @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link WorkflowDef}, {@link Workflow} and a string representation of the TaskId
* @throws TerminateWorkflowException In case of:
- *
- * -
- * When the task after {@link TaskType#FORK_JOIN_DYNAMIC} is not a {@link TaskType#JOIN}
- *
- * -
- * When the input parameters for the dynamic tasks are not of type {@link Map}
- *
- *
+ *
+ * -
+ * When the task after {@link TaskType#FORK_JOIN_DYNAMIC} is not a {@link TaskType#JOIN}
+ *
+ * -
+ * When the input parameters for the dynamic tasks are not of type {@link Map}
+ *
+ *
* @return: List of tasks in the following order:
*
* -
@@ -174,7 +173,7 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) throws Ter
* @param workflowInstance: A instance of the {@link Workflow} which represents the workflow being executed.
* @param taskId: The string representation of {@link java.util.UUID} which will be set as the taskId.
* @param dynForkTasks: The list of dynamic forked tasks, the reference names of these tasks will be added to the forkDynamicTask
- * @return: A new instance of {@link Task} representing a {@link SystemTaskType#FORK}
+ * @return A new instance of {@link Task} representing a {@link SystemTaskType#FORK}
*/
@VisibleForTesting
Task createDynamicForkTask(WorkflowTask taskToSchedule, Workflow workflowInstance, String taskId, List dynForkTasks) {
@@ -204,7 +203,7 @@ Task createDynamicForkTask(WorkflowTask taskToSchedule, Workflow workflowInstanc
* @param workflowInstance: A instance of the {@link Workflow} which represents the workflow being executed.
* @param joinWorkflowTask: A instance of {@link WorkflowTask} which is of type {@link TaskType#JOIN}
* @param joinInput: The input which is set in the {@link Task#setInputData(Map)}
- * @return: a new instance of {@link Task} representing a {@link SystemTaskType#JOIN}
+ * @return a new instance of {@link Task} representing a {@link SystemTaskType#JOIN}
*/
@VisibleForTesting
Task createJoinTask(Workflow workflowInstance, WorkflowTask joinWorkflowTask, HashMap joinInput) {
@@ -216,7 +215,6 @@ Task createJoinTask(Workflow workflowInstance, WorkflowTask joinWorkflowTask, Ha
joinTask.setWorkflowType(workflowInstance.getWorkflowName());
joinTask.setCorrelationId(workflowInstance.getCorrelationId());
joinTask.setScheduledTime(System.currentTimeMillis());
- joinTask.setEndTime(System.currentTimeMillis());
joinTask.setInputData(joinInput);
joinTask.setTaskId(IDGenerator.generate());
joinTask.setStatus(Task.Status.IN_PROGRESS);
@@ -231,7 +229,7 @@ Task createJoinTask(Workflow workflowInstance, WorkflowTask joinWorkflowTask, Ha
* @param workflowInstance: The instance of the {@link Workflow} which represents the workflow being executed.
* @param dynamicForkTaskParam: The key representing the dynamic fork join json payload which is available in {@link WorkflowTask#getInputParameters()}
* @throws TerminateWorkflowException : In case of input parameters of the dynamic fork tasks not represented as {@link Map}
- * @return: a {@link Pair} representing the list of dynamic fork tasks in {@link Pair#getLeft()} and the input for the dynamic fork tasks in {@link Pair#getRight()}
+ * @return a {@link Pair} representing the list of dynamic fork tasks in {@link Pair#getLeft()} and the input for the dynamic fork tasks in {@link Pair#getRight()}
*/
@SuppressWarnings("unchecked")
@VisibleForTesting
@@ -261,7 +259,7 @@ Pair
, Map>> getDynamicForkTasksAn
* @param taskToSchedule: The Task of type FORK_JOIN_DYNAMIC that needs to scheduled, which has the input parameters
* @param workflowInstance: The instance of the {@link Workflow} which represents the workflow being executed.
* @throws TerminateWorkflowException : In case of the {@link WorkflowTask#getInputParameters()} does not have a payload that contains the list of the dynamic tasks
- * @return: {@link Pair} representing the list of dynamic fork tasks in {@link Pair#getLeft()} and the input for the dynamic fork tasks in {@link Pair#getRight()}
+ * @return {@link Pair} representing the list of dynamic fork tasks in {@link Pair#getLeft()} and the input for the dynamic fork tasks in {@link Pair#getRight()}
*/
@VisibleForTesting
Pair, Map>> getDynamicForkJoinTasksAndInput(WorkflowTask taskToSchedule, Workflow workflowInstance) throws TerminateWorkflowException {
@@ -294,8 +292,5 @@ Pair, Map>> getDynamicForkJoinTas
.collect(Collectors.toCollection(LinkedList::new));
return new ImmutablePair<>(dynamicForkJoinWorkflowTasks, dynamicForkJoinTasksInput);
-
}
-
-
}
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java
index 991ec6d80d..d90977f7af 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/JoinTaskMapper.java
@@ -65,7 +65,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) {
joinTask.setCorrelationId(workflowInstance.getCorrelationId());
joinTask.setWorkflowType(workflowInstance.getWorkflowName());
joinTask.setScheduledTime(System.currentTimeMillis());
- joinTask.setEndTime(System.currentTimeMillis());
joinTask.setInputData(joinInput);
joinTask.setTaskId(taskId);
joinTask.setStatus(Task.Status.IN_PROGRESS);
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java
index 84a9501f85..8b08fa42fe 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
@@ -10,38 +10,39 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
-
-
package com.netflix.conductor.core.execution.mapper;
import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams;
+import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.TerminateWorkflowException;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
+import com.netflix.conductor.dao.MetadataDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
+import javax.inject.Inject;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import javax.inject.Inject;
-
public class SubWorkflowTaskMapper implements TaskMapper {
public static final Logger logger = LoggerFactory.getLogger(SubWorkflowTaskMapper.class);
- private ParametersUtils parametersUtils;
+ private final ParametersUtils parametersUtils;
+ private final MetadataDAO metadataDAO;
@Inject
- public SubWorkflowTaskMapper(ParametersUtils parametersUtils) {
+ public SubWorkflowTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
this.parametersUtils = parametersUtils;
+ this.metadataDAO = metadataDAO;
}
@Override
@@ -56,6 +57,7 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) {
Map resolvedParams = getSubWorkflowInputParameters(workflowInstance, subWorkflowParams);
String subWorkflowName = resolvedParams.get("name").toString();
+ Integer subWorkflowVersion = getSubWorkflowVersion(resolvedParams, subWorkflowName);
Task subWorkflowTask = new Task();
subWorkflowTask.setTaskType(SubWorkflow.NAME);
@@ -65,15 +67,14 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) {
subWorkflowTask.setWorkflowType(workflowInstance.getWorkflowName());
subWorkflowTask.setCorrelationId(workflowInstance.getCorrelationId());
subWorkflowTask.setScheduledTime(System.currentTimeMillis());
- subWorkflowTask.setEndTime(System.currentTimeMillis());
subWorkflowTask.getInputData().put("subWorkflowName", subWorkflowName);
- subWorkflowTask.getInputData().put("subWorkflowVersion", subWorkflowParams.getVersion());
+ subWorkflowTask.getInputData().put("subWorkflowVersion", subWorkflowVersion);
subWorkflowTask.getInputData().put("workflowInput", taskMapperContext.getTaskInput());
subWorkflowTask.setTaskId(taskId);
subWorkflowTask.setStatus(Task.Status.SCHEDULED);
subWorkflowTask.setWorkflowTask(taskToSchedule);
logger.debug("SubWorkflowTask {} created to be Scheduled", subWorkflowTask);
- return Arrays.asList(subWorkflowTask);
+ return Collections.singletonList(subWorkflowTask);
}
@VisibleForTesting
@@ -87,8 +88,7 @@ SubWorkflowParams getSubWorkflowParams(WorkflowTask taskToSchedule) {
});
}
- @VisibleForTesting
- Map getSubWorkflowInputParameters(Workflow workflowInstance, SubWorkflowParams subWorkflowParams) {
+ private Map getSubWorkflowInputParameters(Workflow workflowInstance, SubWorkflowParams subWorkflowParams) {
Map params = new HashMap<>();
params.put("name", subWorkflowParams.getName());
@@ -99,4 +99,17 @@ Map getSubWorkflowInputParameters(Workflow workflowInstance, Sub
return parametersUtils.getTaskInputV2(params, workflowInstance, null, null);
}
+ private Integer getSubWorkflowVersion(Map resolvedParams, String subWorkflowName) {
+ return Optional.ofNullable(resolvedParams.get("version"))
+ .map(Object::toString)
+ .map(Integer::parseInt)
+ .orElseGet(
+ () -> metadataDAO.getLatest(subWorkflowName)
+ .map(WorkflowDef::getVersion)
+ .orElseThrow(() -> {
+ String reason = String.format("The Task %s defined as a sub-workflow has no workflow definition available ", subWorkflowName);
+ logger.error(reason);
+ return new TerminateWorkflowException(reason);
+ }));
+ }
}
diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java
index 5613a0ad9b..f0e58bde96 100644
--- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java
+++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/WaitTaskMapper.java
@@ -65,7 +65,6 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) {
waitTask.setWorkflowType(workflowInstance.getWorkflowName());
waitTask.setCorrelationId(workflowInstance.getCorrelationId());
waitTask.setScheduledTime(System.currentTimeMillis());
- waitTask.setEndTime(System.currentTimeMillis());
waitTask.setInputData(waitTaskInput);
waitTask.setTaskId(taskId);
waitTask.setStatus(Task.Status.IN_PROGRESS);
diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java
index c77efc9076..6a6822633a 100644
--- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java
+++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java
@@ -232,6 +232,10 @@ public static void recordObservableQMessageReceivedErrors(String queueType) {
counter(classQualifier, "observable_queue_error", "queueType", queueType);
}
+ public static void recordEventQueueMessagesHandled(String queueType, String queueName) {
+ counter(classQualifier, "event_queue_messages_handled", "queueType", queueType, "queueName", queueName);
+ }
+
public static void recordDaoRequests(String dao, String action, String taskType, String workflowType) {
counter(classQualifier, "dao_requests", "dao", dao, "action", action, "taskType", taskType, "workflowType", workflowType);
}
diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java
index e56e618b38..d0b330a7d3 100644
--- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java
+++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java
@@ -105,7 +105,7 @@ public void init() {
taskMappers.put("FORK_JOIN_DYNAMIC", new ForkJoinDynamicTaskMapper(parametersUtils, objectMapper, metadataDAO));
taskMappers.put("USER_DEFINED", new UserDefinedTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("SIMPLE", new SimpleTaskMapper(parametersUtils));
- taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils));
+ taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java
index 3f5d9f8cb0..4bfd1288c8 100644
--- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java
+++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java
@@ -130,7 +130,7 @@ public void setup() {
taskMappers.put("FORK_JOIN_DYNAMIC", new ForkJoinDynamicTaskMapper(parametersUtils, objectMapper, metadataDAO));
taskMappers.put("USER_DEFINED", new UserDefinedTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("SIMPLE", new SimpleTaskMapper(parametersUtils));
- taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils));
+ taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java
index 9bd791dcc8..04cc50f5ca 100644
--- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java
+++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java
@@ -97,7 +97,7 @@ public void init() {
taskMappers.put("FORK_JOIN_DYNAMIC", new ForkJoinDynamicTaskMapper(parametersUtils, objectMapper, metadataDAO));
taskMappers.put("USER_DEFINED", new UserDefinedTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("SIMPLE", new SimpleTaskMapper(parametersUtils));
- taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils));
+ taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
@@ -177,7 +177,6 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
task2.setWorkflowInstanceId(workflow.getWorkflowId());
task2.setCorrelationId(workflow.getCorrelationId());
task2.setScheduledTime(System.currentTimeMillis());
- task2.setEndTime(System.currentTimeMillis());
task2.setInputData(new HashMap<>());
task2.setTaskId(IDGenerator.generate());
task2.setStatus(Status.IN_PROGRESS);
diff --git a/core/src/test/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapperTest.java b/core/src/test/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapperTest.java
index 8cfa5a993a..05aaf2e6bc 100644
--- a/core/src/test/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapperTest.java
+++ b/core/src/test/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapperTest.java
@@ -1,3 +1,15 @@
+/*
+ * Copyright 2018 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
package com.netflix.conductor.core.execution.mapper;
import com.netflix.conductor.common.metadata.tasks.Task;
@@ -42,8 +54,7 @@ public class SubWorkflowTaskMapperTest {
@Before
public void setUp() {
parametersUtils = mock(ParametersUtils.class);
- metadataDAO = mock(MetadataDAO.class);
- subWorkflowTaskMapper = new SubWorkflowTaskMapper(parametersUtils);
+ subWorkflowTaskMapper = new SubWorkflowTaskMapper(parametersUtils, metadataDAO);
deciderService = mock(DeciderService.class);
}
@@ -52,18 +63,18 @@ public void setUp() {
public void getMappedTasks() {
//Given
WorkflowDef workflowDef = new WorkflowDef();
- Workflow workflowInstance = new Workflow();
+ Workflow workflowInstance = new Workflow();
workflowInstance.setWorkflowDefinition(workflowDef);
WorkflowTask taskToSchedule = new WorkflowTask();
SubWorkflowParams subWorkflowParams = new SubWorkflowParams();
subWorkflowParams.setName("Foo");
subWorkflowParams.setVersion(2);
taskToSchedule.setSubWorkflowParam(subWorkflowParams);
- Map taskInput = new HashMap<>();
+ Map taskInput = new HashMap<>();
Map subWorkflowParamMap = new HashMap<>();
- subWorkflowParamMap.put("name","FooWorkFlow");
- subWorkflowParamMap.put("version",2);
+ subWorkflowParamMap.put("name", "FooWorkFlow");
+ subWorkflowParamMap.put("version", 2);
when(parametersUtils.getTaskInputV2(anyMap(), any(Workflow.class), anyString(), any(TaskDef.class)))
.thenReturn(subWorkflowParamMap);
diff --git a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/EmbeddedElasticSearchProvider.java b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/EmbeddedElasticSearchProvider.java
index 8f22fbd633..9327aaec95 100644
--- a/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/EmbeddedElasticSearchProvider.java
+++ b/es5-persistence/src/main/java/com/netflix/conductor/elasticsearch/EmbeddedElasticSearchProvider.java
@@ -1,8 +1,7 @@
package com.netflix.conductor.elasticsearch;
-import java.util.Optional;
-
import javax.inject.Provider;
+import java.util.Optional;
public interface EmbeddedElasticSearchProvider extends Provider> {
}
diff --git a/grpc/src/main/proto/model/task.proto b/grpc/src/main/proto/model/task.proto
index 3887f7d69b..4aae0ebfba 100644
--- a/grpc/src/main/proto/model/task.proto
+++ b/grpc/src/main/proto/model/task.proto
@@ -40,7 +40,7 @@ message Task {
bool retried = 16;
bool executed = 17;
bool callback_from_worker = 18;
- int32 response_timeout_seconds = 19;
+ int64 response_timeout_seconds = 19;
string workflow_instance_id = 20;
string workflow_type = 21;
string task_id = 22;
diff --git a/grpc/src/main/proto/model/taskdef.proto b/grpc/src/main/proto/model/taskdef.proto
index da9d13e311..5b8a636592 100644
--- a/grpc/src/main/proto/model/taskdef.proto
+++ b/grpc/src/main/proto/model/taskdef.proto
@@ -26,7 +26,7 @@ message TaskDef {
TaskDef.TimeoutPolicy timeout_policy = 7;
TaskDef.RetryLogic retry_logic = 8;
int32 retry_delay_seconds = 9;
- int32 response_timeout_seconds = 10;
+ int64 response_timeout_seconds = 10;
int32 concurrent_exec_limit = 11;
map input_template = 12;
int32 rate_limit_per_frequency = 14;
diff --git a/mysql-persistence/build.gradle b/mysql-persistence/build.gradle
index f7c5cdbe65..896a96a796 100644
--- a/mysql-persistence/build.gradle
+++ b/mysql-persistence/build.gradle
@@ -9,9 +9,6 @@ dependencies {
testCompile project(':conductor-core').sourceSets.test.output
testCompile "ch.vorburger.mariaDB4j:mariaDB4j:${revMariaDB4j}"
- //TODO Change the below deps to use the same version as one in versionsOfDependencies.gradle
- testCompile 'ch.qos.logback:logback-core:1.2.3'
- testCompile 'ch.qos.logback:logback-classic:1.2.3'
}
test {
diff --git a/mysql-persistence/dependencies.lock b/mysql-persistence/dependencies.lock
index 78687942d1..e4271a8cb5 100644
--- a/mysql-persistence/dependencies.lock
+++ b/mysql-persistence/dependencies.lock
@@ -1,13 +1,21 @@
{
"compile": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -18,6 +26,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -31,6 +45,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -58,9 +78,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.4",
@@ -72,9 +98,15 @@
],
"locked": "1.2.2"
},
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -94,14 +126,22 @@
}
},
"compileClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -112,6 +152,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -125,6 +171,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -152,9 +204,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.4",
@@ -166,9 +224,15 @@
],
"locked": "1.2.2"
},
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -188,14 +252,22 @@
}
},
"default": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -206,6 +278,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -219,6 +297,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -246,9 +330,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.4",
@@ -260,9 +350,15 @@
],
"locked": "1.2.2"
},
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -281,15 +377,33 @@
"locked": "1.7.25"
}
},
+ "jacocoAgent": {
+ "org.jacoco:org.jacoco.agent": {
+ "locked": "0.8.1"
+ }
+ },
+ "jacocoAnt": {
+ "org.jacoco:org.jacoco.ant": {
+ "locked": "0.8.1"
+ }
+ },
"runtime": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -300,6 +414,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -313,6 +433,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -340,9 +466,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.4",
@@ -354,9 +486,15 @@
],
"locked": "1.2.2"
},
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -376,14 +514,22 @@
}
},
"runtimeClasspath": {
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -394,6 +540,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -407,6 +559,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -434,9 +592,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.4",
@@ -448,9 +612,15 @@
],
"locked": "1.2.2"
},
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -470,26 +640,26 @@
}
},
"testCompile": {
- "ch.qos.logback:logback-classic": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
- "ch.qos.logback:logback-core": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
"ch.vorburger.mariaDB4j:mariaDB4j": {
"locked": "2.2.3",
"requested": "2.2.3"
},
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -500,6 +670,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -513,6 +689,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -540,9 +722,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.5",
@@ -554,13 +742,19 @@
],
"locked": "1.2.2"
},
- "junit:junit-dep": {
- "locked": "4.10",
- "requested": "4.10"
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
+ "junit:junit": {
+ "locked": "4.12",
+ "requested": "4.12"
},
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -572,9 +766,9 @@
"locked": "4.0.3",
"requested": "4.0.3"
},
- "org.mockito:mockito-all": {
- "locked": "1.10.0",
- "requested": "1.10.0"
+ "org.mockito:mockito-core": {
+ "locked": "1.10.19",
+ "requested": "1.10.19"
},
"org.slf4j:slf4j-api": {
"firstLevelTransitive": [
@@ -584,26 +778,26 @@
}
},
"testCompileClasspath": {
- "ch.qos.logback:logback-classic": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
- "ch.qos.logback:logback-core": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
"ch.vorburger.mariaDB4j:mariaDB4j": {
"locked": "2.2.3",
"requested": "2.2.3"
},
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -614,6 +808,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -627,6 +827,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -654,9 +860,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.5",
@@ -668,13 +880,19 @@
],
"locked": "1.2.2"
},
- "junit:junit-dep": {
- "locked": "4.10",
- "requested": "4.10"
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
+ "junit:junit": {
+ "locked": "4.12",
+ "requested": "4.12"
},
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -686,9 +904,9 @@
"locked": "4.0.3",
"requested": "4.0.3"
},
- "org.mockito:mockito-all": {
- "locked": "1.10.0",
- "requested": "1.10.0"
+ "org.mockito:mockito-core": {
+ "locked": "1.10.19",
+ "requested": "1.10.19"
},
"org.slf4j:slf4j-api": {
"firstLevelTransitive": [
@@ -698,26 +916,26 @@
}
},
"testRuntime": {
- "ch.qos.logback:logback-classic": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
- "ch.qos.logback:logback-core": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
"ch.vorburger.mariaDB4j:mariaDB4j": {
"locked": "2.2.3",
"requested": "2.2.3"
},
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -728,6 +946,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -741,6 +965,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -768,9 +998,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.5",
@@ -782,13 +1018,19 @@
],
"locked": "1.2.2"
},
- "junit:junit-dep": {
- "locked": "4.10",
- "requested": "4.10"
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
+ "junit:junit": {
+ "locked": "4.12",
+ "requested": "4.12"
},
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -800,9 +1042,9 @@
"locked": "4.0.3",
"requested": "4.0.3"
},
- "org.mockito:mockito-all": {
- "locked": "1.10.0",
- "requested": "1.10.0"
+ "org.mockito:mockito-core": {
+ "locked": "1.10.19",
+ "requested": "1.10.19"
},
"org.slf4j:slf4j-api": {
"firstLevelTransitive": [
@@ -812,26 +1054,26 @@
}
},
"testRuntimeClasspath": {
- "ch.qos.logback:logback-classic": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
- "ch.qos.logback:logback-core": {
- "locked": "1.2.3",
- "requested": "1.2.3"
- },
"ch.vorburger.mariaDB4j:mariaDB4j": {
"locked": "2.2.3",
"requested": "2.2.3"
},
+ "com.amazonaws:aws-java-sdk-s3": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "1.11.86"
+ },
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
},
"com.fasterxml.jackson.core:jackson-databind": {
"firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "2.7.5"
@@ -842,6 +1084,12 @@
],
"locked": "2.0.0"
},
+ "com.github.vmg.protogen:protogen-annotations": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1.0.0"
+ },
"com.google.inject.extensions:guice-multibindings": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -855,6 +1103,12 @@
"locked": "4.1.0",
"requested": "4.1.0"
},
+ "com.google.protobuf:protobuf-java": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "3.5.1"
+ },
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-core"
@@ -882,9 +1136,15 @@
],
"locked": "0.68.0"
},
+ "com.spotify:completable-futures": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-core"
+ ],
+ "locked": "0.3.1"
+ },
"com.zaxxer:HikariCP": {
- "locked": "2.6.3",
- "requested": "2.6.3"
+ "locked": "3.2.0",
+ "requested": "3.2.0"
},
"commons-io:commons-io": {
"locked": "2.5",
@@ -896,13 +1156,19 @@
],
"locked": "1.2.2"
},
- "junit:junit-dep": {
- "locked": "4.10",
- "requested": "4.10"
+ "javax.inject:javax.inject": {
+ "firstLevelTransitive": [
+ "com.netflix.conductor:conductor-common"
+ ],
+ "locked": "1"
+ },
+ "junit:junit": {
+ "locked": "4.12",
+ "requested": "4.12"
},
"mysql:mysql-connector-java": {
- "locked": "5.1.43",
- "requested": "5.1.43"
+ "locked": "8.0.11",
+ "requested": "8.0.11"
},
"org.apache.commons:commons-lang3": {
"firstLevelTransitive": [
@@ -914,9 +1180,9 @@
"locked": "4.0.3",
"requested": "4.0.3"
},
- "org.mockito:mockito-all": {
- "locked": "1.10.0",
- "requested": "1.10.0"
+ "org.mockito:mockito-core": {
+ "locked": "1.10.19",
+ "requested": "1.10.19"
},
"org.slf4j:slf4j-api": {
"firstLevelTransitive": [
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLBaseDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLBaseDAO.java
index 3cba07cd47..e223e380af 100644
--- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLBaseDAO.java
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLBaseDAO.java
@@ -8,7 +8,10 @@
import com.netflix.conductor.sql.ExecuteFunction;
import com.netflix.conductor.sql.QueryFunction;
import com.netflix.conductor.sql.TransactionalFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.sql.DataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
@@ -17,14 +20,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
-import javax.sql.DataSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author mustafa
- */
public abstract class MySQLBaseDAO {
private static final List EXCLUDED_STACKTRACE_CLASS = ImmutableList.of(
MySQLBaseDAO.class.getName(),
@@ -115,6 +111,33 @@ protected R getWithTransaction(TransactionalFunction function) {
}
}
+ protected R getWithTransactionWithOutErrorPropagation(TransactionalFunction function) {
+ Instant start = Instant.now();
+ LazyToString callingMethod = getCallingMethod();
+ logger.trace("{} : starting transaction", callingMethod);
+
+ try(Connection tx = dataSource.getConnection()) {
+ boolean previousAutoCommitMode = tx.getAutoCommit();
+ tx.setAutoCommit(false);
+ try {
+ R result = function.apply(tx);
+ tx.commit();
+ return result;
+ } catch (Throwable th) {
+ tx.rollback();
+ logger.info(ApplicationException.Code.CONFLICT + " " +th.getMessage());
+ return null;
+ } finally {
+ tx.setAutoCommit(previousAutoCommitMode);
+ }
+ } catch (SQLException ex) {
+ throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, ex.getMessage(), ex);
+ } finally {
+ logger.trace("{} : took {}ms", callingMethod, Duration.between(start, Instant.now()).toMillis());
+ }
+ }
+
+
/**
* Wraps {@link #getWithTransaction(TransactionalFunction)} with no return value.
*
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java
index 437a63b299..e2a2571868 100644
--- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java
@@ -1,6 +1,7 @@
package com.netflix.conductor.dao.mysql;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.netflix.conductor.common.metadata.events.EventExecution;
@@ -16,6 +17,7 @@
import com.netflix.conductor.metrics.Monitors;
import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.sql.DataSource;
import java.sql.Connection;
import java.text.SimpleDateFormat;
@@ -27,6 +29,7 @@
import java.util.Optional;
import java.util.stream.Collectors;
+@Singleton
public class MySQLExecutionDAO extends MySQLBaseDAO implements ExecutionDAO {
private static final String ARCHIVED_FIELD = "archived";
@@ -87,6 +90,10 @@ public List getTasks(String taskDefName, String startKey, int count) {
return tasks;
}
+ private static String taskKey(Task task) {
+ return task.getReferenceTaskName() + "_" + task.getRetryCount();
+ }
+
@Override
public List createTasks(List tasks) {
List created = Lists.newArrayListWithCapacity(tasks.size());
@@ -97,12 +104,12 @@ public List createTasks(List tasks) {
task.setScheduledTime(System.currentTimeMillis());
- String taskKey = task.getReferenceTaskName() + "" + task.getRetryCount();
+ final String taskKey = taskKey(task);
boolean scheduledTaskAdded = addScheduledTask(connection, task, taskKey);
if (!scheduledTaskAdded) {
- logger.info("Task already scheduled, skipping the run " + task.getTaskId() + ", ref="
+ logger.trace("Task already scheduled, skipping the run " + task.getTaskId() + ", ref="
+ task.getReferenceTaskName() + ", key=" + taskKey);
continue;
}
@@ -125,8 +132,10 @@ public void updateTask(Task task) {
}
/**
- * This is a dummy implementation and this feature is not for Mysql backed Conductor
- * @param task: which needs to be evaluated whether it is rateLimited or not
+ * This is a dummy implementation and this feature is not for Mysql backed
+ * Conductor
+ * @param task:
+ * which needs to be evaluated whether it is rateLimited or not
* @return
*/
@Override
@@ -193,7 +202,7 @@ public void removeTask(String taskId) {
return;
}
- String taskKey = task.getReferenceTaskName() + "_" + task.getRetryCount();
+ final String taskKey = taskKey(task);
withTransaction(connection -> {
removeScheduledTask(connection, task, taskKey);
@@ -257,8 +266,8 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) {
if (archiveWorkflow) {
// Add to elasticsearch
- indexer.updateWorkflow(workflowId, new String[]{RAW_JSON_FIELD, ARCHIVED_FIELD},
- new Object[]{objectMapper.writeValueAsString(wf), true});
+ indexer.updateWorkflow(workflowId, new String[] { RAW_JSON_FIELD, ARCHIVED_FIELD },
+ new Object[] { objectMapper.writeValueAsString(wf), true });
} else {
// Not archiving, also remove workflowId from index
indexer.removeWorkflow(workflowId);
@@ -575,17 +584,12 @@ private void removeWorkflow(Connection connection, String workflowId) {
}
private void addPendingWorkflow(Connection connection, String workflowType, String workflowId) {
- String EXISTS_PENDING_WORKFLOW = "SELECT EXISTS(SELECT 1 FROM workflow_pending WHERE workflow_type = ? AND workflow_id = ?)";
- boolean exist = query(connection, EXISTS_PENDING_WORKFLOW,
- q -> q.addParameter(workflowType).addParameter(workflowId).exists());
+ String INSERT_PENDING_WORKFLOW = "INSERT IGNORE INTO workflow_pending (workflow_type, workflow_id) VALUES (?, ?)";
- if (!exist) {
- String INSERT_PENDING_WORKFLOW = "INSERT INTO workflow_pending (workflow_type, workflow_id) VALUES (?, ?)";
+ execute(connection, INSERT_PENDING_WORKFLOW,
+ q -> q.addParameter(workflowType).addParameter(workflowId).executeUpdate());
- execute(connection, INSERT_PENDING_WORKFLOW,
- q -> q.addParameter(workflowType).addParameter(workflowId).executeUpdate());
- }
}
private void removePendingWorkflow(Connection connection, String workflowType, String workflowId) {
@@ -608,17 +612,12 @@ private void removeTaskData(Connection connection, Task task) {
}
private void addWorkflowToTaskMapping(Connection connection, Task task) {
- String EXISTS_WORKFLOW_TO_TASK = "SELECT EXISTS(SELECT 1 FROM workflow_to_task WHERE workflow_id = ? AND task_id = ?)";
- boolean exist = query(connection, EXISTS_WORKFLOW_TO_TASK,
- q -> q.addParameter(task.getWorkflowInstanceId()).addParameter(task.getTaskId()).exists());
+ String INSERT_WORKFLOW_TO_TASK = "INSERT IGNORE INTO workflow_to_task (workflow_id, task_id) VALUES (?, ?)";
- if (!exist) {
- String INSERT_WORKFLOW_TO_TASK = "INSERT INTO workflow_to_task (workflow_id, task_id) VALUES (?, ?)";
+ execute(connection, INSERT_WORKFLOW_TO_TASK,
+ q -> q.addParameter(task.getWorkflowInstanceId()).addParameter(task.getTaskId()).executeUpdate());
- execute(connection, INSERT_WORKFLOW_TO_TASK,
- q -> q.addParameter(task.getWorkflowInstanceId()).addParameter(task.getTaskId()).executeUpdate());
- }
}
private void removeWorkflowToTaskMapping(Connection connection, Task task) {
@@ -644,21 +643,15 @@ private void removeWorkflowDefToWorkflowMapping(Connection connection, Workflow
.addParameter(workflow.getWorkflowId()).executeUpdate());
}
- private boolean addScheduledTask(Connection connection, Task task, String taskKey) {
- String EXISTS_SCHEDULED_TASK = "SELECT EXISTS(SELECT 1 FROM task_scheduled WHERE workflow_id = ? AND task_key = ?)";
- boolean exist = query(connection, EXISTS_SCHEDULED_TASK,
- q -> q.addParameter(task.getWorkflowInstanceId()).addParameter(taskKey).exists());
-
- if (!exist) {
- String INSERT_SCHEDULED_TASK = "INSERT INTO task_scheduled (workflow_id, task_key, task_id) VALUES (?, ?, ?)";
+ @VisibleForTesting
+ boolean addScheduledTask(Connection connection, Task task, String taskKey) {
- execute(connection, INSERT_SCHEDULED_TASK, q -> q.addParameter(task.getWorkflowInstanceId())
- .addParameter(taskKey).addParameter(task.getTaskId()).executeUpdate());
+ final String INSERT_IGNORE_SCHEDULED_TASK = "INSERT IGNORE INTO task_scheduled (workflow_id, task_key, task_id) VALUES (?, ?, ?)";
- return true;
- }
+ int count = query(connection, INSERT_IGNORE_SCHEDULED_TASK, q -> q.addParameter(task.getWorkflowInstanceId())
+ .addParameter(taskKey).addParameter(task.getTaskId()).executeUpdate());
+ return count > 0;
- return false;
}
private void removeScheduledTask(Connection connection, Task task, String taskKey) {
@@ -697,25 +690,14 @@ private void updateInProgressStatus(Connection connection, Task task, boolean in
}
private boolean insertEventExecution(Connection connection, EventExecution eventExecution) {
- // @formatter:off
- String EXISTS_EVENT_EXECUTION = "SELECT EXISTS(SELECT 1 FROM event_execution " + "WHERE event_handler_name = ? "
- + "AND event_name = ? " + "AND message_id = ? " + "AND execution_id = ?)";
- // @formatter:on
- boolean exist = query(connection, EXISTS_EVENT_EXECUTION,
+ String INSERT_EVENT_EXECUTION = "INSERT INTO event_execution (event_handler_name, event_name, message_id, execution_id, json_data) "
+ + "VALUES (?, ?, ?, ?, ?)";
+ int count = query(connection, INSERT_EVENT_EXECUTION,
q -> q.addParameter(eventExecution.getName()).addParameter(eventExecution.getEvent())
- .addParameter(eventExecution.getMessageId()).addParameter(eventExecution.getId()).exists());
-
- if (!exist) {
- String INSERT_EVENT_EXECUTION = "INSERT INTO event_execution (event_handler_name, event_name, message_id, execution_id, json_data) "
- + "VALUES (?, ?, ?, ?, ?)";
-
- execute(connection, INSERT_EVENT_EXECUTION,
- q -> q.addParameter(eventExecution.getName()).addParameter(eventExecution.getEvent())
- .addParameter(eventExecution.getMessageId()).addParameter(eventExecution.getId())
- .addJsonParameter(eventExecution).executeUpdate());
- }
- return false;
+ .addParameter(eventExecution.getMessageId()).addParameter(eventExecution.getId())
+ .addJsonParameter(eventExecution).executeUpdate());
+ return count > 0;
}
private void updateEventExecution(Connection connection, EventExecution eventExecution) {
@@ -737,7 +719,8 @@ private void removeEventExecution(Connection connection, EventExecution eventExe
execute(connection, REMOVE_EVENT_EXECUTION,
q -> q.addParameter(eventExecution.getName()).addParameter(eventExecution.getEvent())
- .addParameter(eventExecution.getMessageId()).addParameter(eventExecution.getId()).executeUpdate());
+ .addParameter(eventExecution.getMessageId()).addParameter(eventExecution.getId())
+ .executeUpdate());
}
private EventExecution readEventExecution(Connection connection, String eventHandlerName, String eventName,
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAO.java
index eff70a2263..ef8aa6435f 100644
--- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAO.java
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAO.java
@@ -11,6 +11,7 @@
import com.netflix.conductor.metrics.Monitors;
import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.sql.DataSource;
import java.sql.Connection;
import java.util.ArrayList;
@@ -22,10 +23,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-
-/**
- * @author mustafa
- */
+@Singleton
public class MySQLMetadataDAO extends MySQLBaseDAO implements MetadataDAO {
public static final String PROP_TASKDEF_CACHE_REFRESH = "conductor.taskdef.cache.refresh.time.seconds";
public static final int DEFAULT_TASKDEF_CACHE_REFRESH_SECONDS = 60;
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java
index cd83a9aa0e..b415465e5d 100644
--- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java
@@ -1,20 +1,5 @@
package com.netflix.conductor.dao.mysql;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import javax.inject.Inject;
-import javax.sql.DataSource;
-
-import org.apache.commons.lang3.time.DateUtils;
-
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -23,11 +8,30 @@
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.QueueDAO;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@Singleton
public class MySQLQueueDAO extends MySQLBaseDAO implements QueueDAO {
+ private static final Long UNACK_SCHEDULE_MS = 60_000L;
@Inject
public MySQLQueueDAO(ObjectMapper om, DataSource ds) {
super(om, ds);
+
+ Executors.newSingleThreadScheduledExecutor()
+ .scheduleAtFixedRate(this::processAllUnacks,
+ UNACK_SCHEDULE_MS, UNACK_SCHEDULE_MS, TimeUnit.MILLISECONDS);
+ logger.debug(MySQLQueueDAO.class.getName() + " is ready to serve");
}
@Override
@@ -54,13 +58,16 @@ public boolean pushIfNotExists(String queueName, String messageId, long offsetTi
@Override
public List pop(String queueName, int count, int timeout) {
- List messages = getWithTransaction(tx -> popMessages(tx, queueName, count, timeout));
+ List messages = getWithTransactionWithOutErrorPropagation(tx -> popMessages(tx, queueName, count, timeout));
+ if(messages == null) return new ArrayList<>();
return messages.stream().map(Message::getId).collect(Collectors.toList());
}
@Override
public List pollMessages(String queueName, int count, int timeout) {
- return getWithTransaction(tx -> popMessages(tx, queueName, count, timeout));
+ List messages = getWithTransactionWithOutErrorPropagation(tx -> popMessages(tx, queueName, count, timeout));
+ if(messages == null) return new ArrayList<>();
+ return messages;
}
@Override
@@ -70,32 +77,20 @@ public void remove(String queueName, String messageId) {
@Override
public int getSize(String queueName) {
- String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = ?";
+ final String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = ?";
return queryWithTransaction(GET_QUEUE_SIZE, q -> ((Long) q.addParameter(queueName).executeCount()).intValue());
}
- public boolean ack1(String queueName, String messageId) {
- return getWithTransaction(tx -> {
- if (existsMessage(tx, queueName, messageId)) {
- removeMessage(tx, queueName, messageId);
- return true;
- } else {
- return false;
- }
- });
- }
-
@Override
public boolean ack(String queueName, String messageId) {
return getWithTransaction(tx -> removeMessage(tx, queueName, messageId));
-
}
@Override
public boolean setUnackTimeout(String queueName, String messageId, long unackTimeout) {
long updatedOffsetTimeInSecond = unackTimeout / 1000;
- String UPDATE_UNACK_TIMEOUT = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND, ?, created_on) WHERE queue_name = ? AND message_id = ?";
+ final String UPDATE_UNACK_TIMEOUT = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND, ?, CURRENT_TIMESTAMP) WHERE queue_name = ? AND message_id = ?";
return queryWithTransaction(UPDATE_UNACK_TIMEOUT,
q -> q.addParameter(updatedOffsetTimeInSecond).addParameter(updatedOffsetTimeInSecond)
@@ -104,15 +99,13 @@ public boolean setUnackTimeout(String queueName, String messageId, long unackTim
@Override
public void flush(String queueName) {
- String FLUSH_QUEUE = "DELETE FROM queue_message WHERE queue_name = ?";
+ final String FLUSH_QUEUE = "DELETE FROM queue_message WHERE queue_name = ?";
executeWithTransaction(FLUSH_QUEUE, q -> q.addParameter(queueName).executeDelete());
}
@Override
public Map queuesDetail() {
-
- String GET_QUEUES_DETAIL = "SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q";
-
+ final String GET_QUEUES_DETAIL = "SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q";
return queryWithTransaction(GET_QUEUES_DETAIL, q -> q.executeAndFetch(rs -> {
Map detail = Maps.newHashMap();
while (rs.next()) {
@@ -127,7 +120,7 @@ public Map queuesDetail() {
@Override
public Map>> queuesDetailVerbose() {
// @formatter:off
- String GET_QUEUES_DETAIL_VERBOSE = "SELECT queue_name, \n"
+ final String GET_QUEUES_DETAIL_VERBOSE = "SELECT queue_name, \n"
+ " (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n"
+ " (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \n"
+ "FROM queue q";
@@ -140,22 +133,36 @@ public Map>> queuesDetailVerbose() {
Long size = rs.getLong("size");
Long queueUnacked = rs.getLong("uacked");
result.put(queueName, ImmutableMap.of("a", ImmutableMap.of( // sharding not implemented, returning only
- // one shard with all the info
+ // one shard with all the info
"size", size, "uacked", queueUnacked)));
}
return result;
}));
}
+ /**
+ * Un-pop all un-acknowledged messages for all queues.
+
+ * @since 1.11.6
+ */
+ public void processAllUnacks() {
+
+ logger.trace("processAllUnacks started");
+
+
+ final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on";
+ executeWithTransaction(PROCESS_ALL_UNACKS, Query::executeUpdate);
+ }
+
@Override
public void processUnacks(String queueName) {
- String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND CURRENT_TIMESTAMP > deliver_on";
+ final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on";
executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate());
}
@Override
public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) {
- String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND,?,created_on) \n"
+ final String SET_OFFSET_TIME = "UPDATE queue_message SET offset_time_seconds = ?, deliver_on = TIMESTAMPADD(SECOND,?,CURRENT_TIMESTAMP) \n"
+ "WHERE queue_name = ? AND message_id = ?";
return queryWithTransaction(SET_OFFSET_TIME, q -> q.addParameter(offsetTimeInSecond)
@@ -168,35 +175,24 @@ public boolean exists(String queueName, String messageId) {
}
private boolean existsMessage(Connection connection, String queueName, String messageId) {
- String EXISTS_MESSAGE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ?)";
+ final String EXISTS_MESSAGE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ?)";
return query(connection, EXISTS_MESSAGE, q -> q.addParameter(queueName).addParameter(messageId).exists());
}
private void pushMessage(Connection connection, String queueName, String messageId, String payload,
- long offsetTimeInSecond) {
- String PUSH_MESSAGE = "INSERT INTO queue_message (created_on, deliver_on, queue_name, message_id, offset_time_seconds, payload) VALUES (?, ?, ?, ?, ?, ?)";
- String UPDATE_MESSAGE = "UPDATE queue_message SET payload = ? WHERE queue_name = ? AND message_id = ?";
+ long offsetTimeInSecond) {
- createQueueIfNotExists(connection, queueName);
+ String PUSH_MESSAGE = "INSERT INTO queue_message (deliver_on, queue_name, message_id, offset_time_seconds, payload) VALUES (TIMESTAMPADD(SECOND,?,CURRENT_TIMESTAMP), ?, ?,?,?) ON DUPLICATE KEY UPDATE payload=VALUES(payload), deliver_on=VALUES(deliver_on)";
- Date now = DateUtils.truncate(new Date(), Calendar.SECOND);
- Date deliverTime = new Date(now.getTime() + (offsetTimeInSecond * 1_000));
- boolean exists = existsMessage(connection, queueName, messageId);
+ createQueueIfNotExists(connection, queueName);
- if (!exists) {
- execute(connection, PUSH_MESSAGE,
- q -> q.addTimestampParameter(now).addTimestampParameter(deliverTime).addParameter(queueName)
- .addParameter(messageId).addParameter(offsetTimeInSecond).addParameter(payload)
- .executeUpdate());
+ execute(connection, PUSH_MESSAGE, q -> q.addParameter(offsetTimeInSecond).addParameter(queueName)
+ .addParameter(messageId).addParameter(offsetTimeInSecond).addParameter(payload).executeUpdate());
- } else {
- execute(connection, UPDATE_MESSAGE,
- q -> q.addParameter(payload).addParameter(queueName).addParameter(messageId).executeUpdate());
- }
}
private boolean removeMessage(Connection connection, String queueName, String messageId) {
- String REMOVE_MESSAGE = "DELETE FROM queue_message WHERE queue_name = ? AND message_id = ?";
+ final String REMOVE_MESSAGE = "DELETE FROM queue_message WHERE queue_name = ? AND message_id = ?";
return query(connection, REMOVE_MESSAGE,
q -> q.addParameter(queueName).addParameter(messageId).executeDelete());
}
@@ -205,12 +201,10 @@ private List peekMessages(Connection connection, String queueName, int
if (count < 1)
return Collections.emptyList();
- final long peekTime = System.currentTimeMillis() + 1;
-
- String PEEK_MESSAGES = "SELECT message_id, payload FROM queue_message WHERE queue_name = ? AND popped = false AND deliver_on <= TIMESTAMP(?) ORDER BY deliver_on, created_on LIMIT ? FOR UPDATE";
+ final String PEEK_MESSAGES = "SELECT message_id, payload FROM queue_message use index(combo_queue_message) WHERE queue_name = ? AND popped = false AND deliver_on <= TIMESTAMPADD(MICROSECOND, 1000, CURRENT_TIMESTAMP) ORDER BY deliver_on, created_on LIMIT ?";
List messages = query(connection, PEEK_MESSAGES, p -> p.addParameter(queueName)
- .addTimestampParameter(peekTime).addParameter(count).executeAndFetch(rs -> {
+ .addParameter(count).executeAndFetch(rs -> {
List results = new ArrayList<>();
while (rs.next()) {
Message m = new Message();
@@ -222,11 +216,9 @@ private List peekMessages(Connection connection, String queueName, int
}));
return messages;
-
}
private List popMessages(Connection connection, String queueName, int count, int timeout) {
-
long start = System.currentTimeMillis();
List messages = peekMessages(connection, queueName, count);
@@ -247,19 +239,17 @@ private List popMessages(Connection connection, String queueName, int c
int result = query(connection, query, q -> q.addParameter(queueName).addParameters(Ids).executeUpdate());
if (result != messages.size()) {
- String message = String.format("could not pop all messages for given ids: %s (%d messages were popped)",
+ String message = String.format("Could not pop all messages for given ids: %s (%d messages were popped)",
Ids, result);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, message);
}
return messages;
}
- private void createQueueIfNotExists(Connection connection, String queueName) {
- logger.debug("creating new queue {}", queueName);
- String CREATE_QUEUE = "INSERT INTO queue (queue_name) VALUES (?) ON DUPLICATE KEY UPDATE queue_name=VALUES(queue_name)";
+ private void createQueueIfNotExists(Connection connection, String queueName) {
+ logger.trace("Creating new queue '{}'", queueName);
+ final String CREATE_QUEUE = "INSERT IGNORE INTO queue (queue_name) VALUES (?)";
execute(connection, CREATE_QUEUE, q -> q.addParameter(queueName).executeUpdate());
-
}
-
}
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLWorkflowModule.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLWorkflowModule.java
new file mode 100644
index 0000000000..1ac836d93f
--- /dev/null
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLWorkflowModule.java
@@ -0,0 +1,74 @@
+package com.netflix.conductor.dao.mysql;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.netflix.conductor.core.config.Configuration;
+import com.netflix.conductor.dao.ExecutionDAO;
+import com.netflix.conductor.dao.MetadataDAO;
+import com.netflix.conductor.dao.QueueDAO;
+import com.zaxxer.hikari.HikariDataSource;
+
+import org.flywaydb.core.Flyway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+/**
+ * @author mustafa
+ */
+public class MySQLWorkflowModule extends AbstractModule {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ @Provides
+ @Singleton
+ public DataSource getDataSource(Configuration config) {
+ HikariDataSource dataSource = new HikariDataSource();
+ dataSource.setJdbcUrl(config.getProperty("jdbc.url", "jdbc:mysql://localhost:3306/conductor"));
+ dataSource.setUsername(config.getProperty("jdbc.username", "conductor"));
+ dataSource.setPassword(config.getProperty("jdbc.password", "password"));
+ dataSource.setAutoCommit(false);
+
+ dataSource.setMaximumPoolSize(config.getIntProperty("jdbc.maxPoolSize", 20));
+ dataSource.setMinimumIdle(config.getIntProperty("jdbc.minIdleSize", 5));
+ dataSource.setIdleTimeout(config.getIntProperty("jdbc.idleTimeout", 1000*300));
+ dataSource.setTransactionIsolation(config.getProperty("jdbc.isolationLevel", "TRANSACTION_REPEATABLE_READ"));
+
+ flywayMigrate(config, dataSource);
+
+ return dataSource;
+ }
+
+ @Override
+ protected void configure() {
+ bind(MetadataDAO.class).to(MySQLMetadataDAO.class);
+ bind(ExecutionDAO.class).to(MySQLExecutionDAO.class);
+ bind(QueueDAO.class).to(MySQLQueueDAO.class);
+ }
+
+ private void flywayMigrate(Configuration config, DataSource dataSource) {
+ boolean enabled = getBool(config.getProperty("flyway.enabled", "true"), true);
+ if(!enabled) {
+ logger.debug("Flyway migrations are disabled");
+ return;
+ }
+
+ String migrationTable = config.getProperty("flyway.table", null);
+
+ Flyway flyway = new Flyway();
+ if(null != migrationTable) {
+ logger.debug("Using Flyway migration table '{}'", migrationTable);
+ flyway.setTable(migrationTable);
+ }
+
+ flyway.setDataSource(dataSource);
+ flyway.setPlaceholderReplacement(false);
+ flyway.migrate();
+ }
+
+ private boolean getBool(String value, boolean defaultValue) {
+ if(null == value || value.trim().length() == 0){ return defaultValue; }
+ return Boolean.valueOf(value.trim());
+ }
+}
\ No newline at end of file
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/Query.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/Query.java
index 2eb43db6df..bd45e6d4ee 100644
--- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/Query.java
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/Query.java
@@ -237,7 +237,7 @@ public boolean exists() {
public boolean executeDelete() {
int count = executeUpdate();
if (count > 1) {
- logger.debug("Removed {} row(s) for query {}", count, rawQuery);
+ logger.trace("Removed {} row(s) for query {}", count, rawQuery);
}
return count > 0;
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLConfiguration.java b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLConfiguration.java
index e0bfff664a..3776fdd5f2 100644
--- a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLConfiguration.java
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLConfiguration.java
@@ -1,7 +1,6 @@
package com.netflix.conductor.mysql;
import com.netflix.conductor.core.config.Configuration;
-import com.zaxxer.hikari.HikariConfig;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLDataSourceProvider.java b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLDataSourceProvider.java
index 264334c538..6a85a329c2 100644
--- a/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLDataSourceProvider.java
+++ b/mysql-persistence/src/main/java/com/netflix/conductor/mysql/MySQLDataSourceProvider.java
@@ -1,19 +1,16 @@
package com.netflix.conductor.mysql;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
-
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.ThreadFactory;
-
import javax.inject.Inject;
import javax.inject.Provider;
import javax.sql.DataSource;
+import java.util.concurrent.ThreadFactory;
public class MySQLDataSourceProvider implements Provider {
private static final Logger logger = LoggerFactory.getLogger(MySQLDataSourceProvider.class);
@@ -28,10 +25,6 @@ public MySQLDataSourceProvider(MySQLConfiguration configuration) {
@Override
public DataSource get() {
HikariDataSource dataSource = new HikariDataSource(createConfiguration());
- dataSource.setJdbcUrl(configuration.getJdbcUrl());
- dataSource.setUsername(configuration.getJdbcUserName());
- dataSource.setPassword(configuration.getJdbcPassword());
- dataSource.setAutoCommit(false);
flywayMigrate(dataSource);
return dataSource;
@@ -39,6 +32,10 @@ public DataSource get() {
private HikariConfig createConfiguration(){
HikariConfig cfg = new HikariConfig();
+ cfg.setJdbcUrl(configuration.getJdbcUrl());
+ cfg.setUsername(configuration.getJdbcUserName());
+ cfg.setPassword(configuration.getJdbcPassword());
+ cfg.setAutoCommit(false);
cfg.setMaximumPoolSize(configuration.getConnectionPoolMaxSize());
cfg.setMinimumIdle(configuration.getConnectionPoolMinIdle());
cfg.setMaxLifetime(configuration.getConnectionMaxLifetime());
diff --git a/mysql-persistence/src/main/resources/db/migration/V1__initial_schema.sql b/mysql-persistence/src/main/resources/db/migration/V1__initial_schema.sql
index 601964016d..246b55ecd7 100644
--- a/mysql-persistence/src/main/resources/db/migration/V1__initial_schema.sql
+++ b/mysql-persistence/src/main/resources/db/migration/V1__initial_schema.sql
@@ -168,5 +168,5 @@ CREATE TABLE queue_message (
payload mediumtext,
PRIMARY KEY (id),
UNIQUE KEY unique_queue_name_message_id (queue_name,message_id),
- KEY queue_name_index (queue_name)
+ KEY combo_queue_message (queue_name,popped,deliver_on,created_on)
);
diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/config/TestConfiguration.java b/mysql-persistence/src/test/java/com/netflix/conductor/config/TestConfiguration.java
index 2fcfde56c9..b538f0689a 100644
--- a/mysql-persistence/src/test/java/com/netflix/conductor/config/TestConfiguration.java
+++ b/mysql-persistence/src/test/java/com/netflix/conductor/config/TestConfiguration.java
@@ -15,135 +15,172 @@
*/
package com.netflix.conductor.config;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-
import com.netflix.conductor.mysql.MySQLConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
import java.util.Map;
-
-import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
/**
* @author Viren
- *
*/
public class TestConfiguration implements MySQLConfiguration {
- private Map testProperties = Maps.newHashMap(ImmutableMap.of("test", "dummy"));
+ private static final Logger logger = LoggerFactory.getLogger(TestConfiguration.class);
+ private static final Map testProperties = new HashMap<>();
@Override
public int getSweepFrequency() {
- return 1;
+ return getIntProperty("decider.sweep.frequency.seconds", 30);
}
@Override
public boolean disableSweep() {
- return false;
+ String disable = getProperty("decider.sweep.disable", "false");
+ return Boolean.getBoolean(disable);
}
@Override
public boolean disableAsyncWorkers() {
- return false;
+ String disable = getProperty("conductor.disable.async.workers", "false");
+ return Boolean.getBoolean(disable);
}
@Override
public String getServerId() {
- return "server_id";
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ return "unknown";
+ }
}
@Override
public String getEnvironment() {
- return "test";
+ return getProperty("environment", "test");
}
@Override
public String getStack() {
- return "junit";
+ return getProperty("STACK", "test");
}
@Override
public String getAppId() {
- return "workflow";
+ return getProperty("APP_ID", "conductor");
}
@Override
- public Long getWorkflowInputPayloadSizeThresholdKB() {
- return 5120L;
+ public String getRegion() {
+ return getProperty("EC2_REGION", "us-east-1");
}
@Override
- public Long getMaxWorkflowInputPayloadSizeThresholdKB() {
- return 10240L;
+ public String getAvailabilityZone() {
+ return getProperty("EC2_AVAILABILITY_ZONE", "us-east-1c");
+ }
+
+ public void setProperty(String key, String value) {
+ testProperties.put(key, value);
}
@Override
- public Long getWorkflowOutputPayloadSizeThresholdKB() {
- return 5120L;
+ public int getIntProperty(String key, int defaultValue) {
+ String val = getProperty(key, Integer.toString(defaultValue));
+ try {
+ defaultValue = Integer.parseInt(val);
+ } catch (NumberFormatException e) {
+ }
+ return defaultValue;
}
@Override
- public Long getMaxWorkflowOutputPayloadSizeThresholdKB() {
- return 10240L;
+ public long getLongProperty(String key, long defaultValue) {
+ String val = getProperty(key, Long.toString(defaultValue));
+ try {
+ defaultValue = Long.parseLong(val);
+ } catch (NumberFormatException e) {
+ logger.error("Error parsing the Long value for Key:{} , returning a default value: {}", key, defaultValue);
+ }
+ return defaultValue;
}
+ @SuppressWarnings("Duplicates")
@Override
- public Long getTaskInputPayloadSizeThresholdKB() {
- return 3072L;
+ public String getProperty(String key, String defaultValue) {
+ String val = null;
+ if (testProperties.containsKey(key)) {
+ return testProperties.get(key);
+ }
+
+ try {
+ val = System.getenv(key.replace('.', '_'));
+ if (val == null || val.isEmpty()) {
+ val = Optional.ofNullable(System.getProperty(key)).orElse(defaultValue);
+ }
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ return val;
}
@Override
- public Long getMaxTaskInputPayloadSizeThresholdKB() {
- return 10240L;
+ public Map getAll() {
+ Map map = new HashMap<>();
+ Properties props = System.getProperties();
+ props.entrySet().forEach(entry -> map.put(entry.getKey().toString(), entry.getValue()));
+ map.putAll(testProperties);
+ return map;
}
@Override
- public Long getTaskOutputPayloadSizeThresholdKB() {
- return 3072L;
+ public Long getWorkflowInputPayloadSizeThresholdKB() {
+ return 5120L;
}
@Override
- public Long getMaxTaskOutputPayloadSizeThresholdKB() {
+ public Long getMaxWorkflowInputPayloadSizeThresholdKB() {
return 10240L;
}
@Override
- public String getProperty(String string, String def) {
- String val = testProperties.get(string);
- return val != null ? val : def;
+ public Long getWorkflowOutputPayloadSizeThresholdKB() {
+ return 5120L;
}
-
@Override
public boolean getBooleanProperty(String name, boolean defaultValue) {
return false;
}
- public void setProperty(String key, String value) {
- testProperties.put(key, value);
- }
-
@Override
- public String getAvailabilityZone() {
- return "us-east-1a";
+ public Long getMaxWorkflowOutputPayloadSizeThresholdKB() {
+ return 10240L;
}
@Override
- public int getIntProperty(String string, int def) {
- return 100;
+ public Long getTaskInputPayloadSizeThresholdKB() {
+ return 3072L;
}
@Override
- public long getLongProperty(String name, long defaultValue) {
- return 0;
+ public Long getMaxTaskInputPayloadSizeThresholdKB() {
+ return 10240L;
}
@Override
- public String getRegion() {
- return "us-east-1";
+ public Long getTaskOutputPayloadSizeThresholdKB() {
+ return 3072L;
}
@Override
- public Map getAll() {
- return null;
+ public Long getMaxTaskOutputPayloadSizeThresholdKB() {
+ return 10240L;
}
}
+
diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/EmbeddedDatabase.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/EmbeddedDatabase.java
index 9ca5b7b264..a1d874c83c 100644
--- a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/EmbeddedDatabase.java
+++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/EmbeddedDatabase.java
@@ -1,5 +1,7 @@
package com.netflix.conductor.dao.mysql;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import ch.vorburger.mariadb4j.DBConfiguration;
import ch.vorburger.mariadb4j.DBConfigurationBuilder;
import org.slf4j.Logger;
@@ -12,7 +14,8 @@ public enum EmbeddedDatabase {
INSTANCE;
private final DB db;
- private final Logger logger = LoggerFactory.getLogger(getClass());
+ private final Logger logger = LoggerFactory.getLogger(EmbeddedDatabase.class);
+ private static final AtomicBoolean hasBeenMigrated = new AtomicBoolean(false);
public DB getDB() {
return db;
@@ -37,4 +40,13 @@ private DB startEmbeddedDatabase() {
logger.info("Starting embedded database");
db = startEmbeddedDatabase();
}
+
+ public static boolean hasBeenMigrated() {
+ return hasBeenMigrated.get();
+ }
+
+ public static void setHasBeenMigrated() {
+ hasBeenMigrated.getAndSet(true);
+ }
+
}
diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLBaseDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLBaseDAOTest.java
new file mode 100644
index 0000000000..7bd5d80ac8
--- /dev/null
+++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLBaseDAOTest.java
@@ -0,0 +1,102 @@
+package com.netflix.conductor.dao.mysql;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.netflix.conductor.config.TestConfiguration;
+import com.netflix.conductor.core.config.Configuration;
+import com.zaxxer.hikari.HikariDataSource;
+import org.flywaydb.core.Flyway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+
+@SuppressWarnings("Duplicates")
+public class MySQLBaseDAOTest {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+ protected final DataSource dataSource;
+ protected final TestConfiguration testConfiguration = new TestConfiguration();
+ protected final ObjectMapper objectMapper = createObjectMapper();
+ protected final EmbeddedDatabase DB = EmbeddedDatabase.INSTANCE;
+
+ MySQLBaseDAOTest() {
+ testConfiguration.setProperty("jdbc.url", "jdbc:mysql://localhost:33307/conductor?useSSL=false&useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC");
+ testConfiguration.setProperty("jdbc.username", "root");
+ testConfiguration.setProperty("jdbc.password", "");
+ this.dataSource = getDataSource(testConfiguration);
+ }
+
+ private DataSource getDataSource(Configuration config) {
+
+ HikariDataSource dataSource = new HikariDataSource();
+ dataSource.setJdbcUrl(config.getProperty("jdbc.url", "jdbc:mysql://localhost:33307/conductor"));
+ dataSource.setUsername(config.getProperty("jdbc.username", "conductor"));
+ dataSource.setPassword(config.getProperty("jdbc.password", "password"));
+ dataSource.setAutoCommit(false);
+ dataSource.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
+
+ // Prevent DB from getting exhausted during rapid testing
+ dataSource.setMaximumPoolSize(8);
+
+ if (!EmbeddedDatabase.hasBeenMigrated()) {
+ synchronized (EmbeddedDatabase.class) {
+ flywayMigrate(dataSource);
+ EmbeddedDatabase.setHasBeenMigrated();
+ }
+ }
+
+ return dataSource;
+ }
+
+ private synchronized static void flywayMigrate(DataSource dataSource) {
+ if(EmbeddedDatabase.hasBeenMigrated()) {
+ return;
+ }
+
+ synchronized (MySQLBaseDAOTest.class) {
+ Flyway flyway = new Flyway();
+ flyway.setDataSource(dataSource);
+ flyway.setPlaceholderReplacement(false);
+ flyway.migrate();
+ }
+ }
+
+ private static ObjectMapper createObjectMapper() {
+ ObjectMapper om = new ObjectMapper();
+ om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ om.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
+ om.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false);
+ om.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ om.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
+ return om;
+ }
+
+ protected void resetAllData() {
+ logger.info("Resetting data for test");
+ try (Connection connection = dataSource.getConnection()) {
+ try(ResultSet rs = connection.prepareStatement("SHOW TABLES").executeQuery();
+ PreparedStatement keysOn = connection.prepareStatement("SET FOREIGN_KEY_CHECKS=1")) {
+ try(PreparedStatement keysOff = connection.prepareStatement("SET FOREIGN_KEY_CHECKS=0")){
+ keysOff.execute();
+ while(rs.next()) {
+ String table = rs.getString(1);
+ try(PreparedStatement ps = connection.prepareStatement("TRUNCATE TABLE " + table)) {
+ ps.execute();
+ }
+ }
+ } finally {
+ keysOn.execute();
+ }
+ }
+ } catch (SQLException ex) {
+ logger.error(ex.getMessage(), ex);
+ throw new RuntimeException(ex);
+ }
+ }
+}
\ No newline at end of file
diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLDAOTestUtil.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLDAOTestUtil.java
index f80b9c48d4..e6a2104665 100644
--- a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLDAOTestUtil.java
+++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLDAOTestUtil.java
@@ -5,18 +5,16 @@
import com.netflix.conductor.config.TestConfiguration;
import com.netflix.conductor.core.config.Configuration;
import com.zaxxer.hikari.HikariDataSource;
-
import org.flywaydb.core.Flyway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import javax.sql.DataSource;
-
@SuppressWarnings("Duplicates")
public class MySQLDAOTestUtil {
@@ -26,7 +24,7 @@ public class MySQLDAOTestUtil {
private final ObjectMapper objectMapper = new JsonMapperProvider().get();
MySQLDAOTestUtil(String dbName) throws Exception {
- testConfiguration.setProperty("jdbc.url", "jdbc:mysql://localhost:33307/" + dbName);
+ testConfiguration.setProperty("jdbc.url", "jdbc:mysql://localhost:33307/" + dbName +"?useSSL=false&useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC");
testConfiguration.setProperty("jdbc.username", "root");
testConfiguration.setProperty("jdbc.password", "");
// Ensure the DB starts
diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLPushPopQueueDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLPushPopQueueDAOTest.java
deleted file mode 100644
index e948038ffe..0000000000
--- a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLPushPopQueueDAOTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package com.netflix.conductor.dao.mysql;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("Duplicates")
-public class MySQLPushPopQueueDAOTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MySQLPushPopQueueDAOTest.class);
-
- private MySQLDAOTestUtil testUtil;
- private MySQLQueueDAO dao;
-
- @Rule
- public TestName name = new TestName();
-
- @Before
- public void setup() throws Exception {
- testUtil = new MySQLDAOTestUtil(name.getMethodName());
- dao = new MySQLQueueDAO(testUtil.getObjectMapper(), testUtil.getDataSource());
- }
-
- @After
- public void teardown() throws Exception {
- testUtil.resetAllData();
- testUtil.getDataSource().close();
- }
-
- @Test
- public void testWith2THreads() throws Exception {
- testPollDataWithParallelThreads(2);
- }
-
- private void testPollDataWithParallelThreads(final int threadCount)
- throws Exception {
-
- List expectedList = Collections.synchronizedList(new ArrayList(threadCount));
-
- Callable> task = new Callable>() {
- @Override
- public List call() throws Exception {
- String messageID1 = UUID.randomUUID().toString();
- String messageID2 = UUID.randomUUID().toString();
- expectedList.add(messageID1);
- expectedList.add(messageID2);
- dao.push("T1", messageID1, 0);
- dao.push("T1", messageID2, 0);
- Thread.sleep(10);
- return dao.pop("T1", 5, 10);
- }
- };
- List>> tasks = Collections.nCopies(threadCount, task);
-
- ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
- List>> futures = executorService.invokeAll(tasks);
- List resultList = new ArrayList(futures.size());
- // Check for exceptions
- for (Future> future : futures) {
- // Throws an exception if an exception was thrown by the task.
- List list = future.get();
- resultList.addAll(list);
- }
- // Validate the IDs
- Assert.assertEquals(threadCount, futures.size());
-
- Collections.sort(expectedList);
- Collections.sort(resultList);
- Assert.assertEquals(expectedList, resultList);
- }
-
-}
diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java
index 1a8eaa8768..3f6043290f 100644
--- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java
+++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java
@@ -175,7 +175,7 @@ public void updateTasks(List tasks) {
public void updateTask(Task task) {
task.setUpdateTime(System.currentTimeMillis());
- if (task.getStatus() != null && task.getStatus().isTerminal()) {
+ if (task.getStatus() != null && task.getStatus().isTerminal() && task.getEndTime() == 0) {
task.setEndTime(System.currentTimeMillis());
}
diff --git a/redis-persistence/src/main/java/com/netflix/conductor/jedis/InMemoryJedisProvider.java b/redis-persistence/src/main/java/com/netflix/conductor/jedis/InMemoryJedisProvider.java
index f8987db791..162767bc59 100644
--- a/redis-persistence/src/main/java/com/netflix/conductor/jedis/InMemoryJedisProvider.java
+++ b/redis-persistence/src/main/java/com/netflix/conductor/jedis/InMemoryJedisProvider.java
@@ -1,10 +1,10 @@
package com.netflix.conductor.jedis;
+import redis.clients.jedis.JedisCommands;
+
import javax.inject.Provider;
import javax.inject.Singleton;
-import redis.clients.jedis.JedisCommands;
-
@Singleton
public class InMemoryJedisProvider implements Provider {
private final JedisCommands mock = new JedisMock();
diff --git a/server/src/main/java/com/netflix/conductor/bootstrap/BootstrapModule.java b/server/src/main/java/com/netflix/conductor/bootstrap/BootstrapModule.java
index d68d77c8e3..2dcdbce278 100644
--- a/server/src/main/java/com/netflix/conductor/bootstrap/BootstrapModule.java
+++ b/server/src/main/java/com/netflix/conductor/bootstrap/BootstrapModule.java
@@ -1,8 +1,6 @@
package com.netflix.conductor.bootstrap;
import com.google.inject.AbstractModule;
-
-import com.netflix.conductor.common.utils.JsonMapperProvider;
import com.netflix.conductor.core.config.Configuration;
import com.netflix.conductor.core.config.SystemPropertiesConfiguration;
diff --git a/test-harness/build.gradle b/test-harness/build.gradle
index d2bb313705..6b98298e3f 100644
--- a/test-harness/build.gradle
+++ b/test-harness/build.gradle
@@ -8,6 +8,7 @@ dependencies {
testCompile project(':conductor-core')
testCompile project(':conductor-jersey')
testCompile project(':conductor-redis-persistence').sourceSets.test.output
+ testCompile project(':conductor-mysql-persistence').sourceSets.test.output
testCompile project(':conductor-client')
testCompile project(':conductor-server')
testCompile project(':conductor-grpc-client')
@@ -20,11 +21,14 @@ dependencies {
testCompile "com.google.inject.extensions:guice-servlet:${revGuiceServlet}"
testCompile "io.swagger:swagger-jersey-jaxrs:${revSwaggerJersey}"
+
+ testCompile "ch.vorburger.mariaDB4j:mariaDB4j:${revMariaDB4j}"
}
test {
- // Because tests in the module bind to ports they shouldn't be executed in parallel.
-// maxParallelForks = 1
+ testLogging {
+ exceptionFormat = 'full'
+ }
}
task server(type: JavaExec) {
diff --git a/test-harness/dependencies.lock b/test-harness/dependencies.lock
index 4988ec5c01..d85895eead 100644
--- a/test-harness/dependencies.lock
+++ b/test-harness/dependencies.lock
@@ -10,11 +10,15 @@
}
},
"testCompile": {
+ "ch.vorburger.mariaDB4j:mariaDB4j": {
+ "locked": "2.2.3",
+ "requested": "2.2.3"
+ },
"com.amazonaws:aws-java-sdk-core": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-client"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.amazonaws:aws-java-sdk-s3": {
"firstLevelTransitive": [
@@ -26,7 +30,7 @@
"firstLevelTransitive": [
"com.netflix.conductor:conductor-contribs"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
@@ -271,7 +275,7 @@
"com.netflix.conductor:conductor-es5-persistence",
"com.netflix.conductor:conductor-mysql-persistence"
],
- "locked": "2.4"
+ "locked": "2.5"
},
"io.grpc:grpc-netty": {
"firstLevelTransitive": [
@@ -447,11 +451,15 @@
}
},
"testCompileClasspath": {
+ "ch.vorburger.mariaDB4j:mariaDB4j": {
+ "locked": "2.2.3",
+ "requested": "2.2.3"
+ },
"com.amazonaws:aws-java-sdk-core": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-client"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.amazonaws:aws-java-sdk-s3": {
"firstLevelTransitive": [
@@ -463,7 +471,7 @@
"firstLevelTransitive": [
"com.netflix.conductor:conductor-contribs"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
@@ -708,7 +716,7 @@
"com.netflix.conductor:conductor-es5-persistence",
"com.netflix.conductor:conductor-mysql-persistence"
],
- "locked": "2.4"
+ "locked": "2.5"
},
"io.grpc:grpc-netty": {
"firstLevelTransitive": [
@@ -884,11 +892,15 @@
}
},
"testRuntime": {
+ "ch.vorburger.mariaDB4j:mariaDB4j": {
+ "locked": "2.2.3",
+ "requested": "2.2.3"
+ },
"com.amazonaws:aws-java-sdk-core": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-client"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.amazonaws:aws-java-sdk-s3": {
"firstLevelTransitive": [
@@ -900,7 +912,7 @@
"firstLevelTransitive": [
"com.netflix.conductor:conductor-contribs"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
@@ -1145,7 +1157,7 @@
"com.netflix.conductor:conductor-es5-persistence",
"com.netflix.conductor:conductor-mysql-persistence"
],
- "locked": "2.4"
+ "locked": "2.5"
},
"io.grpc:grpc-netty": {
"firstLevelTransitive": [
@@ -1321,11 +1333,15 @@
}
},
"testRuntimeClasspath": {
+ "ch.vorburger.mariaDB4j:mariaDB4j": {
+ "locked": "2.2.3",
+ "requested": "2.2.3"
+ },
"com.amazonaws:aws-java-sdk-core": {
"firstLevelTransitive": [
"com.netflix.conductor:conductor-client"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.amazonaws:aws-java-sdk-s3": {
"firstLevelTransitive": [
@@ -1337,7 +1353,7 @@
"firstLevelTransitive": [
"com.netflix.conductor:conductor-contribs"
],
- "locked": "1.11.426"
+ "locked": "1.11.428"
},
"com.fasterxml.jackson.core:jackson-core": {
"firstLevelTransitive": [
@@ -1582,7 +1598,7 @@
"com.netflix.conductor:conductor-es5-persistence",
"com.netflix.conductor:conductor-mysql-persistence"
],
- "locked": "2.4"
+ "locked": "2.5"
},
"io.grpc:grpc-netty": {
"firstLevelTransitive": [
diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java
index 9cd897cb4e..23625f7051 100644
--- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java
+++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java
@@ -57,6 +57,7 @@
/**
* @author Viren
*/
+
public class End2EndTests extends AbstractEndToEndTest {
private static TaskClient taskClient;
diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLWorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLWorkflowServiceTest.java
new file mode 100644
index 0000000000..74a6deb640
--- /dev/null
+++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/MySQLWorkflowServiceTest.java
@@ -0,0 +1,13 @@
+
+package com.netflix.conductor.tests.integration;
+
+import java.util.Map;
+
+public class MySQLWorkflowServiceTest {
+
+
+ String startOrLoadWorkflowExecution(String snapshotResourceName, String workflowName, int version, String correlationId, Map input, String event, Map taskToDomain) {
+ // return workflowExecutor.startWorkflow(workflowName, version, correlationId, input, null, event, taskToDomain);
+ return null;
+ }
+}
diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java
index 534f0a48a0..903e5619fb 100644
--- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java
+++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java
@@ -17,7 +17,6 @@
import com.netflix.conductor.tests.utils.TestRunner;
import org.junit.runner.RunWith;
-
import java.util.Map;
@RunWith(TestRunner.class)
diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java
new file mode 100644
index 0000000000..a623551bcd
--- /dev/null
+++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestModule.java
@@ -0,0 +1,75 @@
+package com.netflix.conductor.tests.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import com.netflix.conductor.common.utils.ExternalPayloadStorage;
+import com.netflix.conductor.common.utils.JsonMapperProvider;
+import com.netflix.conductor.core.config.Configuration;
+import com.netflix.conductor.core.config.CoreModule;
+import com.netflix.conductor.dao.ExecutionDAO;
+import com.netflix.conductor.dao.IndexDAO;
+import com.netflix.conductor.dao.MetadataDAO;
+import com.netflix.conductor.dao.QueueDAO;
+import com.netflix.conductor.dao.mysql.EmbeddedDatabase;
+import com.netflix.conductor.dao.mysql.MySQLExecutionDAO;
+import com.netflix.conductor.dao.mysql.MySQLMetadataDAO;
+import com.netflix.conductor.dao.mysql.MySQLQueueDAO;
+import com.netflix.conductor.mysql.MySQLConfiguration;
+import com.netflix.conductor.mysql.MySQLDataSourceProvider;
+import com.netflix.conductor.mysql.SystemPropertiesMySQLConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @author jvemugunta
+ */
+public class MySQLTestModule extends AbstractModule {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private int maxThreads = 50;
+
+ private ExecutorService executorService;
+ protected final EmbeddedDatabase DB = EmbeddedDatabase.INSTANCE;
+
+ @Override
+ protected void configure() {
+
+
+ bind(Configuration.class).to(SystemPropertiesMySQLConfiguration.class).in(Singleton.class);
+ bind(MySQLConfiguration.class).to(SystemPropertiesMySQLConfiguration.class).in(Singleton.class);
+
+ bind(DataSource.class).toProvider(MySQLDataSourceProvider.class).in(Scopes.SINGLETON);
+ bind(MetadataDAO.class).to(MySQLMetadataDAO.class);
+ bind(ExecutionDAO.class).to(MySQLExecutionDAO.class);
+ bind(QueueDAO.class).to(MySQLQueueDAO.class);
+ bind(IndexDAO.class).to(MockIndexDAO.class);
+
+ install(new CoreModule());
+ bind(UserTask.class).asEagerSingleton();
+ bind(ObjectMapper.class).toProvider(JsonMapperProvider.class);
+ bind(ExternalPayloadStorage.class).to(MockExternalPayloadStorage.class);
+
+ }
+
+
+ @Provides
+ public ExecutorService getExecutorService() {
+ return this.executorService;
+ }
+
+ private void configureExecutorService() {
+ AtomicInteger count = new AtomicInteger(0);
+ this.executorService = java.util.concurrent.Executors.newFixedThreadPool(maxThreads, runnable -> {
+ Thread workflowWorkerThread = new Thread(runnable);
+ workflowWorkerThread.setName(String.format("workflow-worker-%d", count.getAndIncrement()));
+ return workflowWorkerThread;
+ });
+ }
+}
diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestRunner.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestRunner.java
new file mode 100644
index 0000000000..72ca840320
--- /dev/null
+++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/MySQLTestRunner.java
@@ -0,0 +1,69 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package com.netflix.conductor.tests.utils;
+
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+/**
+ * @author Viren
+ *
+ */
+public class MySQLTestRunner extends BlockJUnit4ClassRunner {
+
+ private Injector injector;
+
+ static {
+ System.setProperty("EC2_REGION", "us-east-1");
+ System.setProperty("EC2_AVAILABILITY_ZONE", "us-east-1c");
+
+ System.setProperty("conductor.workflow.input.payload.threshold.kb", "10");
+ System.setProperty("conductor.max.workflow.input.payload.threshold.kb", "10240");
+ System.setProperty("conductor.workflow.output.payload.threshold.kb", "10");
+ System.setProperty("conductor.max.workflow.output.payload.threshold.kb", "10240");
+ System.setProperty("conductor.task.input.payload.threshold.kb", "1");
+ System.setProperty("conductor.max.task.input.payload.threshold.kb", "10240");
+ System.setProperty("conductor.task.output.payload.threshold.kb", "10");
+ System.setProperty("conductor.max.task.output.payload.threshold.kb", "10240");
+
+ // jdbc properties
+
+ System.setProperty("jdbc.url", "jdbc:mysql://localhost:33307/conductor?useSSL=false&useUnicode=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC");
+ System.setProperty("jdbc.username", "root");
+ System.setProperty("jdbc.password", "");
+ System.setProperty("conductor.mysql.connection.pool.size.min", "8");
+ System.setProperty("conductor.mysql.connection.pool.size.max", "8");
+ System.setProperty("conductor.mysql.connection.pool.idle.min", "300000");
+
+ }
+
+ public MySQLTestRunner(Class> klass) throws Exception {
+ super(klass);
+ System.setProperty("workflow.namespace.prefix", "conductor" + System.getProperty("user.name"));
+ injector = Guice.createInjector(new MySQLTestModule());
+ }
+
+ @Override
+ protected Object createTest() throws Exception {
+ Object test = super.createTest();
+ injector.injectMembers(test);
+ return test;
+ }
+}
diff --git a/test-harness/src/test/resources/log4j.properties b/test-harness/src/test/resources/log4j.properties
index 5e31e3c26f..e4eeb85c6a 100644
--- a/test-harness/src/test/resources/log4j.properties
+++ b/test-harness/src/test/resources/log4j.properties
@@ -6,4 +6,8 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.ch.vorburger=error
+log4j.logger.ch.vorburger.mariadb4j.DB=info
+log4j.logger.com.zaxxer.hikari=error