diff --git a/.gitignore b/.gitignore index 8f787df660..396379543a 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ ui/.settings .settings dump.rdb .idea +out/ \ No newline at end of file diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 9566abf3cb..a8efa16160 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -140,6 +140,10 @@ public void retryLastFailedTask(String workflowId) { postForEntity1("workflow/{workflowId}/retry", workflowId); } + public void resetCallbacksForInProgressTasks(String workflowId) { + postForEntity1("workflow/{workflowId}//{workflowId}/resetcallbacks", workflowId); + } + public void terminateWorkflow(String workflowId, String reason) { delete(new Object[]{"reason", reason}, "workflow/{workflowId}", workflowId); } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 9edb23cca8..94e4356087 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -155,6 +155,26 @@ public String startWorkflow(String name, int version, Map input, } } + public String resetCallbacksForInProgressTasks(String workflowId) throws Exception { + Workflow workflow = edao.getWorkflow(workflowId, true); + if (workflow.getStatus().isTerminal()) { + throw new ApplicationException(Code.CONFLICT, "Workflow is completed. status=" + workflow.getStatus()); + } + + // Get tasks that are in progress and have callbackAfterSeconds > 0 + // and set the callbackAfterSeconds to 0; + for(Task t: workflow.getTasks()) { + if(t.getStatus().equals(Status.IN_PROGRESS) && + t.getCallbackAfterSeconds() > 0){ + if(queue.setOffsetTime(QueueUtils.getQueueName(t), t.getTaskId(), 0)){ + t.setCallbackAfterSeconds(0); + edao.updateTask(t); + } + } + }; + return workflowId; + } + public String rerun(RerunWorkflowRequest request) throws Exception { Preconditions.checkNotNull(request.getReRunFromWorkflowId(), "reRunFromWorkflowId is missing"); if(!rerunWF(request.getReRunFromWorkflowId(), request.getReRunFromTaskId(), request.getTaskInput(), diff --git a/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java b/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java index b375117219..7863bb7c75 100644 --- a/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java +++ b/core/src/main/java/com/netflix/conductor/dao/QueueDAO.java @@ -123,5 +123,14 @@ public default void processUnacks(String queueName) { } + /** + * Sets the offset time without pulling out the message from the queue + * @param queueName name of the queue + * @param id message id + * @param offsetTimeInSecond time in seconds, after which the message should be marked visible. (for timed queues) + * @return true if the message is in queue and the change was successful else returns false + */ + public boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond); + } \ No newline at end of file diff --git a/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java b/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java index 9b5ff2b9b1..20060935e8 100644 --- a/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java +++ b/jersey/src/main/java/com/netflix/conductor/server/resources/WorkflowResource.java @@ -207,6 +207,14 @@ public void restart(@PathParam("workflowId") String workflowId) throws Exception public void retry(@PathParam("workflowId") String workflowId) throws Exception { executor.retry(workflowId); } + + @POST + @Path("/{workflowId}/resetcallbacks") + @ApiOperation("Resets callback times of all in_progress tasks to 0") + @Consumes(MediaType.WILDCARD) + public void reset(@PathParam("workflowId") String workflowId) throws Exception { + executor.resetCallbacksForInProgressTasks(workflowId); + } @DELETE @Path("/{workflowId}") diff --git a/mysql-persistence/.gitignore b/mysql-persistence/.gitignore new file mode 100644 index 0000000000..466e24805a --- /dev/null +++ b/mysql-persistence/.gitignore @@ -0,0 +1 @@ +out/ \ No newline at end of file diff --git a/mysql-persistence/build.gradle b/mysql-persistence/build.gradle new file mode 100644 index 0000000000..9ffc16299b --- /dev/null +++ b/mysql-persistence/build.gradle @@ -0,0 +1,20 @@ +dependencies { + compile project(':conductor-core') + compile 'com.google.inject:guice:3.0' + compile 'org.elasticsearch:elasticsearch:2.+' + + compile 'org.sql2o:sql2o:1.5.4' + compile 'commons-io:commons-io:2.4+' + compile 'mysql:mysql-connector-java:5.1.43' + compile 'com.zaxxer:HikariCP:2.6.3' + compile 'org.flywaydb:flyway-core:4.2.0' + + testCompile 'ch.vorburger.mariaDB4j:mariaDB4j:2.2.3' + testCompile 'ch.qos.logback:logback-core:1.2.3' + testCompile 'ch.qos.logback:logback-classic:1.2.3' +} + +test { + //the MySQL unit tests must run within the same JVM to share the same embedded DB + maxParallelForks = 1 +} diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLBaseDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLBaseDAO.java new file mode 100644 index 0000000000..910d3a8a42 --- /dev/null +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLBaseDAO.java @@ -0,0 +1,91 @@ +package com.netflix.conductor.dao.mysql; + +import static java.sql.Connection.TRANSACTION_READ_COMMITTED; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sql2o.Connection; +import org.sql2o.Sql2o; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; + +abstract class MySQLBaseDAO { + + private static final List EXCLUDED_STACKTRACE_CLASS = ImmutableList.of("com.netflix.conductor.dao.mysql.MySQLBaseDAO", "java.lang.Thread"); + + protected final Sql2o sql2o; + protected final ObjectMapper om; + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + protected MySQLBaseDAO(ObjectMapper om, Sql2o sql2o) { + this.om = om; + this.sql2o = sql2o; + } + + protected String toJson(Object value) { + try { + return om.writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + protected T readValue(String json, Class clazz) { + try { + return om.readValue(json, clazz); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected R getWithTransaction(Function function) { + Instant start = Instant.now(); + StackTraceElement caller = Arrays.stream(Thread.currentThread().getStackTrace()).filter(ste -> !EXCLUDED_STACKTRACE_CLASS.contains(ste.getClassName())).findFirst().get(); + logger.debug("{} : starting transaction", caller.getMethodName()); + try (Connection connection = sql2o.beginTransaction(TRANSACTION_READ_COMMITTED)) { + final R result = function.apply(connection); + connection.commit(); + return result; + } finally { + Instant end = Instant.now(); + logger.debug("{} : took {}ms", caller.getMethodName(), Duration.between(start, end).toMillis()); + } + } + + protected void withTransaction(Consumer consumer) { + getWithTransaction(connection -> { + consumer.accept(connection); + return null; + }); + } + + /** + * This will inject a series of p1, p2, ... placeholders in the given query template so it can then be used + * in conjunction with the withParams method on the Sql2o Query object. + * + * The withParams method in the Query class loops through each element in the given array and adds a prepared statement for each. + * For each element found in the array, a pN placeholder should exists in the query. + * + * This is useful for generating the IN clause since Sql2o does not support passing directly a list + * + * @param queryTemplate a query template with a %s placeholder where the variable size parameters placeholders should be injected + * @param numPlaceholders the number of placeholders to generated + * @return + */ + protected String generateQueryWithParametersListPlaceholders(String queryTemplate, int numPlaceholders) { + String paramsPlaceholders = String.join(",", IntStream.rangeClosed(1, numPlaceholders).mapToObj(paramNumber -> ":p" + paramNumber).collect(Collectors.toList())); + return String.format(queryTemplate, paramsPlaceholders); + } +} diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java new file mode 100644 index 0000000000..b7cc234db0 --- /dev/null +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAO.java @@ -0,0 +1,766 @@ +package com.netflix.conductor.dao.mysql; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +import javax.inject.Inject; + +import org.sql2o.Connection; +import org.sql2o.ResultSetHandler; +import org.sql2o.Sql2o; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.netflix.conductor.common.metadata.events.EventExecution; +import com.netflix.conductor.common.metadata.tasks.PollData; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.IndexDAO; +import com.netflix.conductor.dao.MetadataDAO; +import com.netflix.conductor.metrics.Monitors; + +class MySQLExecutionDAO extends MySQLBaseDAO implements ExecutionDAO { + + private static final String ARCHIVED_FIELD = "archived"; + private static final String RAW_JSON_FIELD = "rawJSON"; + + private IndexDAO indexer; + + private MetadataDAO metadata; + + @Inject + MySQLExecutionDAO(IndexDAO indexer, MetadataDAO metadata, ObjectMapper om, Sql2o sql2o) { + super(om, sql2o); + this.indexer = indexer; + this.metadata = metadata; + } + + @Override + public List getPendingTasksByWorkflow(String taskDefName, String workflowId) { + return getWithTransaction(connection -> { + + String GET_IN_PROGRESS_TASKS_FOR_WORKFLOW = + "SELECT json_data FROM task_in_progress tip INNER JOIN task t ON t.task_id = tip.task_id \n" + + " WHERE task_def_name = :taskDefName AND workflow_id = :workflowId"; + + ResultSetHandler resultSetHandler = resultSet -> readValue(resultSet.getString("json_data"), Task.class); + + return connection.createQuery(GET_IN_PROGRESS_TASKS_FOR_WORKFLOW) + .addParameter("taskDefName", taskDefName) + .addParameter("workflowId", workflowId) + .executeAndFetch(resultSetHandler); + }); + } + + @Override + public List getTasks(String taskDefName, String startKey, int count) { + List tasks = new ArrayList<>(count); + + List pendingTasks = getPendingTasksForTaskType(taskDefName); + boolean startKeyFound = startKey == null; + int found = 0; + for (Task pendingTask : pendingTasks) { + if (!startKeyFound) { + if (pendingTask.getTaskId().equals(startKey)) { + startKeyFound = true; + if (startKey != null) { + continue; + } + } + } + if (startKeyFound && found < count) { + tasks.add(pendingTask); + found++; + } + } + + return tasks; + } + + @Override + public List createTasks(List tasks) { + List created = Lists.newLinkedList(); + + withTransaction(connection -> { + for (Task task : tasks) { + validate(task); + + task.setScheduledTime(System.currentTimeMillis()); + + String taskKey = task.getReferenceTaskName() + "_" + task.getRetryCount(); + + boolean scheduledTaskAdded = addScheduledTask(connection, task, taskKey); + + if (!scheduledTaskAdded) { + logger.info("Task already scheduled, skipping the run " + task.getTaskId() + ", ref=" + task.getReferenceTaskName() + ", key=" + taskKey); + continue; + } + + insertOrUpdateTaskData(connection, task); + addWorkflowToTaskMapping(connection, task); + addTaskInProgress(connection, task); + updateTask(connection, task); + + created.add(task); + } + }); + + return created; + } + + @Override + public void updateTask(Task task) { + withTransaction(connection -> updateTask(connection, task)); + } + + @Override + public boolean exceedsInProgressLimit(Task task) { + TaskDef taskDef = metadata.getTaskDef(task.getTaskDefName()); + if (taskDef == null) return false; + + int limit = taskDef.concurrencyLimit(); + if (limit <= 0) return false; + + long current = getInProgressTaskCount(task.getTaskDefName()); + + if (current >= limit) { + Monitors.recordTaskRateLimited(task.getTaskDefName(), limit); + return true; + } + + logger.info("Task execution count for {}: limit={}, current={}", task.getTaskDefName(), limit, getInProgressTaskCount(task.getTaskDefName())); + + String taskId = task.getTaskId(); + + List tasksInProgressInOrderOfArrival = findAllTasksInProgressInOrderOfArrival(task, limit); + + boolean rateLimited = !tasksInProgressInOrderOfArrival.contains(taskId); + + if (rateLimited) { + logger.info("Task execution count limited. {}, limit {}, current {}", task.getTaskDefName(), limit, getInProgressTaskCount(task.getTaskDefName())); + Monitors.recordTaskRateLimited(task.getTaskDefName(), limit); + } + + return rateLimited; + } + + @Override + public void updateTasks(List tasks) { + withTransaction(connection -> tasks.forEach(task -> updateTask(connection, task))); + } + + @Override + public void addTaskExecLog(List log) { + indexer.add(log); + } + + @Override + public void removeTask(String taskId) { + Task task = getTask(taskId); + + if(task == null) { + logger.warn("No such Task by id {}", taskId); + return; + } + + String taskKey = task.getReferenceTaskName() + "" + task.getRetryCount(); + + withTransaction(connection -> { + removeScheduledTask(connection, task, taskKey); + removeWorkflowToTaskMapping(connection, task); + removeTaskInProgress(connection, task); + removeTaskData(connection, task); + }); + } + + @Override + public Task getTask(String taskId) { + String GET_TASK = "SELECT json_data FROM task WHERE task_id = :taskId"; + String taskJsonStr = getWithTransaction(c -> c.createQuery(GET_TASK).addParameter("taskId", taskId).executeScalar(String.class)); + return taskJsonStr != null ? readValue(taskJsonStr, Task.class) : null; + } + + @Override + public List getTasks(List taskIds) { + if (taskIds.isEmpty()) return Lists.newArrayList(); + return getWithTransaction(c -> getTasks(c, taskIds)); + } + + private List getTasks(Connection connection, List taskIds) { + if (taskIds.isEmpty()) return Lists.newArrayList(); + + String GET_TASKS_FOR_IDS = "SELECT json_data FROM task WHERE task_id IN (%s) AND json_data IS NOT NULL"; + String query = generateQueryWithParametersListPlaceholders(GET_TASKS_FOR_IDS, taskIds.size()); + + ResultSetHandler resultSetHandler = resultSet -> readValue(resultSet.getString("json_data"), Task.class); + + return connection.createQuery(query).withParams(taskIds.toArray()).executeAndFetch(resultSetHandler); + } + + @Override + public List getPendingTasksForTaskType(String taskName) { + Preconditions.checkNotNull(taskName, "task name cannot be null"); + return getWithTransaction(connection -> { + + String GET_IN_PROGRESS_TASKS_FOR_TYPE = + "SELECT json_data FROM task_in_progress tip INNER JOIN task t ON t.task_id = tip.task_id WHERE task_def_name = :taskDefName"; + + ResultSetHandler resultSetHandler = resultSet -> readValue(resultSet.getString("json_data"), Task.class); + + return connection.createQuery(GET_IN_PROGRESS_TASKS_FOR_TYPE).addParameter("taskDefName", taskName).executeAndFetch(resultSetHandler); + }); + } + + @Override + public List getTasksForWorkflow(String workflowId) { + return getWithTransaction(connection -> { + String GET_TASKS_FOR_WORKFLOW = "SELECT task_id FROM workflow_to_task WHERE workflow_id = :workflowId"; + List taskIds = connection.createQuery(GET_TASKS_FOR_WORKFLOW).addParameter("workflowId", workflowId).executeScalarList(String.class); + return getTasks(connection, taskIds); + }); + } + + @Override + public String createWorkflow(Workflow workflow) { + workflow.setCreateTime(System.currentTimeMillis()); + return insertOrUpdateWorkflow(workflow, false); + } + + @Override + public String updateWorkflow(Workflow workflow) { + workflow.setUpdateTime(System.currentTimeMillis()); + return insertOrUpdateWorkflow(workflow, true); + } + + @Override + public void removeWorkflow(String workflowId) { + try { + Workflow wf = getWorkflow(workflowId, true); + + //Add to elasticsearch + indexer.update(workflowId, new String[]{RAW_JSON_FIELD, ARCHIVED_FIELD}, new Object[]{om.writeValueAsString(wf), true}); + + withTransaction(connection -> { + removeWorkflowDefToWorkflowMapping(connection, wf); + removeWorkflow(connection, workflowId); + removePendingWorkflow(connection, wf.getWorkflowType(), workflowId); + }); + + for(Task task : wf.getTasks()) { + removeTask(task.getTaskId()); + } + + } catch(Exception e) { + throw new ApplicationException("Unable to remove workflow " + workflowId, e); + } + } + + @Override + public void removeFromPendingWorkflow(String workflowType, String workflowId) { + withTransaction(connection -> removePendingWorkflow(connection, workflowType, workflowId)); + } + + @Override + public Workflow getWorkflow(String workflowId) { + return getWorkflow(workflowId, true); + } + + @Override + public Workflow getWorkflow(String workflowId, boolean includeTasks) { + Workflow workflow = getWithTransaction(tx -> readWorkflow(tx, workflowId)); + + if (workflow != null) { + if (includeTasks) { + List tasks = getTasksForWorkflow(workflowId); + tasks.sort(Comparator.comparingLong(Task::getScheduledTime).thenComparingInt(Task::getSeq)); + workflow.setTasks(tasks); + } + return workflow; + } + + //try from the archive + workflow = readWorkflowFromArchive(workflowId); + + if(!includeTasks) { + workflow.getTasks().clear(); + } + + return workflow; + } + + @Override + public List getRunningWorkflowIds(String workflowName) { + Preconditions.checkNotNull(workflowName, "workflowName cannot be null"); + String GET_PENDING_WORKFLOW_IDS = "SELECT workflow_id FROM workflow_pending WHERE workflow_type = :workflowType"; + return getWithTransaction(tx -> tx.createQuery(GET_PENDING_WORKFLOW_IDS).addParameter("workflowType", workflowName).executeScalarList(String.class)); + } + + @Override + public List getPendingWorkflowsByType(String workflowName) { + Preconditions.checkNotNull(workflowName, "workflowName cannot be null"); + return getRunningWorkflowIds(workflowName).stream().map(this::getWorkflow).collect(Collectors.toList()); + } + + @Override + public long getPendingWorkflowCount(String workflowName) { + Preconditions.checkNotNull(workflowName, "workflowName cannot be null"); + String GET_PENDING_WORKFLOW_COUNT = "SELECT COUNT(*) FROM workflow_pending WHERE workflow_type = :workflowType"; + return getWithTransaction(tx -> tx.createQuery(GET_PENDING_WORKFLOW_COUNT).addParameter("workflowType", workflowName).executeScalar(Long.class)); + } + + @Override + public long getInProgressTaskCount(String taskDefName) { + String GET_IN_PROGRESS_TASK_COUNT = "SELECT COUNT(*) FROM task_in_progress WHERE task_def_name = :taskDefName AND in_progress_status = true"; + return getWithTransaction(c -> c.createQuery(GET_IN_PROGRESS_TASK_COUNT) + .addParameter("taskDefName", taskDefName) + .executeScalar(Long.class)); + } + + @Override + public List getWorkflowsByType(String workflowName, Long startTime, Long endTime) { + Preconditions.checkNotNull(workflowName, "workflowName cannot be null"); + Preconditions.checkNotNull(startTime, "startTime cannot be null"); + Preconditions.checkNotNull(endTime, "endTime cannot be null"); + + List workflows = new LinkedList(); + + withTransaction(tx -> { + String GET_ALL_WORKFLOWS_FOR_WORKFLOW_DEF = "SELECT workflow_id FROM workflow_def_to_workflow WHERE workflow_def = :workflowType AND date_str BETWEEN :start AND :end"; + List workflowIds = tx.createQuery(GET_ALL_WORKFLOWS_FOR_WORKFLOW_DEF) + .addParameter("workflowType", workflowName) + .addParameter("start", dateStr(startTime)) + .addParameter("end", dateStr(endTime)) + .executeScalarList(String.class); + + workflowIds.forEach(workflowId -> { + try { + Workflow wf = getWorkflow(workflowId); + if (wf.getCreateTime() >= startTime && wf.getCreateTime() <= endTime) { + workflows.add(wf); + } + } catch(Exception e) { + logger.error("Unable to load workflow id {} with name {}", workflowId, workflowName, e); + } + }); + }); + + return workflows; + } + + @Override + public List getWorkflowsByCorrelationId(String correlationId) { + Preconditions.checkNotNull(correlationId, "correlationId cannot be null"); + String GET_WORKFLOWS_BY_CORRELATION_ID = "SELECT workflow_id FROM workflow WHERE correlation_id = :correlationId"; + return getWithTransaction(tx -> tx.createQuery(GET_WORKFLOWS_BY_CORRELATION_ID) + .addParameter("correlationId", correlationId) + .executeScalarList(String.class)).stream() + .map(this::getWorkflow) + .collect(Collectors.toList()); + } + + @Override + public boolean addEventExecution(EventExecution eventExecution) { + try { + boolean added = getWithTransaction(tx -> insertEventExecution(tx, eventExecution)); + if (added) { + indexer.add(eventExecution); + return true; + } + return false; + } catch (Exception e) { + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Unable to add event execution " + eventExecution.getId(), e); + } + } + + @Override + public void updateEventExecution(EventExecution eventExecution) { + try { + withTransaction(tx -> updateEventExecution(tx, eventExecution)); + indexer.add(eventExecution); + } catch (Exception e) { + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Unable to update event execution " + eventExecution.getId(), e); + } + } + + @Override + public List getEventExecutions(String eventHandlerName, String eventName, String messageId, int max) { + try { + List executions = Lists.newLinkedList(); + withTransaction(tx -> { + for(int i = 0; i < max; i++) { + String executionId = messageId + "_" + i; //see EventProcessor.handle to understand how the execution id is set + EventExecution ee = readEventExecution(tx, eventHandlerName, eventName, messageId, executionId); + if (ee == null) break; + executions.add(ee); + } + }); + return executions; + } catch (Exception e) { + String message = String.format("Unable to get event executions for eventHandlerName=%s, eventName=%s, messageId=%s", eventHandlerName, eventName, messageId); + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, message, e); + } + } + + @Override + public void addMessage(String queue, Message msg) { + indexer.addMessage(queue, msg); + } + + @Override + public void updateLastPoll(String taskDefName, String domain, String workerId) { + Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null"); + PollData pollData = new PollData(taskDefName, domain, workerId, System.currentTimeMillis()); + String effectiveDomain = (domain == null) ? "DEFAULT" : domain; + withTransaction(tx -> insertOrUpdatePollData(tx, pollData, effectiveDomain)); + } + + @Override + public PollData getPollData(String taskDefName, String domain) { + Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null"); + String effectiveDomain = (domain == null) ? "DEFAULT" : domain; + return getWithTransaction(tx -> readPollData(tx, taskDefName, effectiveDomain)); + } + + @Override + public List getPollData(String taskDefName) { + Preconditions.checkNotNull(taskDefName, "taskDefName name cannot be null"); + return readAllPollData(taskDefName); + } + + private String insertOrUpdateWorkflow(Workflow workflow, boolean update) { + Preconditions.checkNotNull(workflow, "workflow object cannot be null"); + + boolean terminal = workflow.getStatus().isTerminal(); + + if (terminal) workflow.setEndTime(System.currentTimeMillis()); + + List tasks = workflow.getTasks(); + workflow.setTasks(Lists.newLinkedList()); + + withTransaction(tx -> { + if (!update) { + addWorkflow(tx, workflow); + addWorkflowDefToWorkflowMapping(tx, workflow); + } else { + updateWorkflow(tx, workflow); + } + + if (terminal) { + removePendingWorkflow(tx, workflow.getWorkflowType(), workflow.getWorkflowId()); + } else { + addPendingWorkflow(tx, workflow.getWorkflowType(), workflow.getWorkflowId()); + } + }); + + workflow.setTasks(tasks); + indexer.index(workflow); + return workflow.getWorkflowId(); + } + + private void updateTask(Connection connection, Task task) { + task.setUpdateTime(System.currentTimeMillis()); + if (task.getStatus() != null && task.getStatus().isTerminal()) { + task.setEndTime(System.currentTimeMillis()); + } + + TaskDef taskDef = metadata.getTaskDef(task.getTaskDefName()); + + if (taskDef != null && taskDef.concurrencyLimit() > 0) { + boolean inProgress = task.getStatus() != null && task.getStatus().equals(Task.Status.IN_PROGRESS); + updateInProgressStatus(connection, task, inProgress); + } + + insertOrUpdateTaskData(connection, task); + + if (task.getStatus() != null && task.getStatus().isTerminal()) { + removeTaskInProgress(connection, task); + } + + indexer.index(task); + } + + private Workflow readWorkflow(Connection connection, String workflowId) { + String GET_WORKFLOW = "SELECT json_data FROM workflow WHERE workflow_id = :workflowId"; + String json = connection.createQuery(GET_WORKFLOW).addParameter("workflowId", workflowId).executeScalar(String.class); + return json != null ? readValue(json, Workflow.class) : null; + } + + private Workflow readWorkflowFromArchive(String workflowId) { + String json = indexer.get(workflowId, RAW_JSON_FIELD); + if (json != null) { + return readValue(json, Workflow.class); + } else { + throw new ApplicationException(ApplicationException.Code.NOT_FOUND, "No such workflow found by id: " + workflowId); + } + } + + private void addWorkflow(Connection connection, Workflow workflow) { + String INSERT_WORKFLOW = "INSERT INTO workflow (workflow_id, correlation_id, json_data) VALUES (:workflowId, :correlationId, :jsonData)"; + connection.createQuery(INSERT_WORKFLOW) + .addParameter("workflowId", workflow.getWorkflowId()) + .addParameter("correlationId", workflow.getCorrelationId()) + .addParameter("jsonData", toJson(workflow)) + .executeUpdate(); + } + + private void updateWorkflow(Connection connection, Workflow workflow) { + String UPDATE_WORKFLOW = "UPDATE workflow SET json_data = :jsonData, modified_on = CURRENT_TIMESTAMP WHERE workflow_id = :workflowId"; + connection.createQuery(UPDATE_WORKFLOW) + .addParameter("workflowId", workflow.getWorkflowId()) + .addParameter("jsonData", toJson(workflow)) + .executeUpdate(); + } + + private void removeWorkflow(Connection connection, String workflowId) { + String REMOVE_WORKFLOW = "DELETE FROM workflow WHERE workflow_id = :workflowId"; + connection.createQuery(REMOVE_WORKFLOW) + .addParameter("workflowId", workflowId) + .executeUpdate(); + } + + private void addPendingWorkflow(Connection connection, String workflowType, String workflowId) { + String EXISTS_PENDING_WORKFLOW = "SELECT EXISTS(SELECT 1 FROM workflow_pending WHERE workflow_type = :workflowType AND workflow_id = :workflowId)"; + boolean exist = connection.createQuery(EXISTS_PENDING_WORKFLOW) + .addParameter("workflowType", workflowType) + .addParameter("workflowId", workflowId) + .executeScalar(Boolean.class); + + if (!exist) { + String INSERT_PENDING_WORKFLOW = "INSERT INTO workflow_pending (workflow_type, workflow_id) VALUES (:workflowType, :workflowId)"; + connection.createQuery(INSERT_PENDING_WORKFLOW) + .addParameter("workflowType", workflowType) + .addParameter("workflowId", workflowId) + .executeUpdate(); + } + } + + private void removePendingWorkflow(Connection connection, String workflowType, String workflowId) { + String REMOVE_PENDING_WORKFLOW = "DELETE FROM workflow_pending WHERE workflow_type = :workflowType AND workflow_id = :workflowId"; + connection.createQuery(REMOVE_PENDING_WORKFLOW) + .addParameter("workflowType", workflowType) + .addParameter("workflowId", workflowId) + .executeUpdate(); + } + + private void insertOrUpdateTaskData(Connection connection, Task task) { + String UPDATE_TASK = "UPDATE task SET json_data = :jsonData, modified_on = CURRENT_TIMESTAMP WHERE task_id = :taskId"; + int result = connection.createQuery(UPDATE_TASK).addParameter("taskId", task.getTaskId()).addParameter("jsonData", toJson(task)).executeUpdate().getResult(); + if (result == 0) { + String INSERT_TASK = "INSERT INTO task (task_id, json_data) VALUES (:taskId, :jsonData)"; + connection.createQuery(INSERT_TASK).addParameter("taskId", task.getTaskId()).addParameter("jsonData", toJson(task)).executeUpdate().getResult(); + } + } + + private void removeTaskData(Connection connection, Task task) { + String REMOVE_TASK = "DELETE FROM task WHERE task_id = :taskId"; + connection.createQuery(REMOVE_TASK) + .addParameter("taskId", task.getTaskId()) + .executeUpdate(); + } + + private void addWorkflowToTaskMapping(Connection connection, Task task) { + String EXISTS_WORKFLOW_TO_TASK = "SELECT EXISTS(SELECT 1 FROM workflow_to_task WHERE workflow_id = :workflowId AND task_id = :taskId)"; + boolean exist = connection.createQuery(EXISTS_WORKFLOW_TO_TASK) + .addParameter("workflowId", task.getWorkflowInstanceId()) + .addParameter("taskId", task.getTaskId()) + .executeScalar(Boolean.class); + + if (!exist) { + String INSERT_WORKFLOW_TO_TASK = "INSERT INTO workflow_to_task (workflow_id, task_id) VALUES (:workflowId, :taskId)"; + connection.createQuery(INSERT_WORKFLOW_TO_TASK) + .addParameter("workflowId", task.getWorkflowInstanceId()) + .addParameter("taskId", task.getTaskId()) + .executeUpdate(); + } + } + + private void removeWorkflowToTaskMapping(Connection connection, Task task) { + String REMOVE_WORKFLOW_TO_TASK = "DELETE FROM workflow_to_task WHERE workflow_id = :workflowId AND task_id = :taskId"; + connection.createQuery(REMOVE_WORKFLOW_TO_TASK) + .addParameter("workflowId", task.getWorkflowInstanceId()) + .addParameter("taskId", task.getTaskId()) + .executeUpdate(); + } + + private void addWorkflowDefToWorkflowMapping(Connection connection, Workflow workflow) { + String INSERT_WORKFLOW_DEF_TO_WORKFLOW = "INSERT INTO workflow_def_to_workflow (workflow_def, date_str, workflow_id) VALUES (:workflowType, :dateStr, :workflowId)"; + connection.createQuery(INSERT_WORKFLOW_DEF_TO_WORKFLOW) + .bind(workflow) + .addParameter("dateStr", dateStr(workflow.getCreateTime())) + .executeUpdate(); + } + + private void removeWorkflowDefToWorkflowMapping(Connection connection, Workflow workflow) { + String REMOVE_WORKFLOW_DEF_TO_WORKFLOW = "DELETE FROM workflow_def_to_workflow WHERE workflow_def = :workflowType AND date_str = :dateStr AND workflow_id = :workflowId"; + connection.createQuery(REMOVE_WORKFLOW_DEF_TO_WORKFLOW) + .bind(workflow) + .addParameter("dateStr", dateStr(workflow.getCreateTime())) + .executeUpdate(); + } + + private boolean addScheduledTask(Connection connection, Task task, String taskKey) { + String EXISTS_SCHEDULED_TASK = "SELECT EXISTS(SELECT 1 FROM task_scheduled WHERE workflow_id = :workflowId AND task_key = :taskKey)"; + boolean exist = connection.createQuery(EXISTS_SCHEDULED_TASK) + .addParameter("workflowId", task.getWorkflowInstanceId()) + .addParameter("taskKey", taskKey) + .executeScalar(Boolean.class); + + if (!exist) { + String INSERT_SCHEDULED_TASK = "INSERT INTO task_scheduled (workflow_id, task_key, task_id) VALUES (:workflowId, :taskKey, :taskId)"; + connection.createQuery(INSERT_SCHEDULED_TASK) + .addParameter("workflowId", task.getWorkflowInstanceId()) + .addParameter("taskKey", taskKey) + .addParameter("taskId", task.getTaskId()) + .executeUpdate() + .getResult(); + return true; + } + + return false; + } + + private void removeScheduledTask(Connection connection, Task task, String taskKey) { + String REMOVE_SCHEDULED_TASK = "DELETE FROM task_scheduled WHERE workflow_id = :workflowId AND task_key = :taskKey"; + connection.createQuery(REMOVE_SCHEDULED_TASK) + .addParameter("workflowId", task.getWorkflowInstanceId()) + .addParameter("taskKey", taskKey) + .executeUpdate() + .getResult(); + } + + private void addTaskInProgress(Connection connection, Task task) { + String EXISTS_IN_PROGRESS_TASK = "SELECT EXISTS(SELECT 1 FROM task_in_progress WHERE task_def_name = :taskDefName AND task_id = :taskId)"; + boolean exist = connection.createQuery(EXISTS_IN_PROGRESS_TASK) + .addParameter("taskDefName", task.getTaskDefName()) + .addParameter("taskId", task.getTaskId()) + .executeScalar(Boolean.class); + + if (!exist) { + String INSERT_IN_PROGRESS_TASK = "INSERT INTO task_in_progress (task_def_name, task_id, workflow_id) VALUES (:taskDefName, :taskId, :workflowId)"; + connection.createQuery(INSERT_IN_PROGRESS_TASK) + .addParameter("taskDefName", task.getTaskDefName()) + .addParameter("taskId", task.getTaskId()) + .addParameter("workflowId", task.getWorkflowInstanceId()) + .executeUpdate(); + } + } + + private void removeTaskInProgress(Connection connection, Task task) { + String REMOVE_IN_PROGRESS_TASK = "DELETE FROM task_in_progress WHERE task_def_name = :taskDefName AND task_id = :taskId"; + connection.createQuery(REMOVE_IN_PROGRESS_TASK) + .addParameter("taskDefName", task.getTaskDefName()) + .addParameter("taskId", task.getTaskId()) + .executeUpdate(); + } + + private void updateInProgressStatus(Connection connection, Task task, boolean inProgress) { + String UPDATE_IN_PROGRESS_TASK_STATUS = "UPDATE task_in_progress SET in_progress_status = :inProgress, modified_on = CURRENT_TIMESTAMP WHERE task_def_name = :taskDefName AND task_id = :taskId"; + connection.createQuery(UPDATE_IN_PROGRESS_TASK_STATUS) + .addParameter("taskDefName", task.getTaskDefName()) + .addParameter("taskId", task.getTaskId()) + .addParameter("inProgress", inProgress) + .executeUpdate(); + } + + private boolean insertEventExecution(Connection connection, EventExecution eventExecution) { + String EXISTS_EVENT_EXECUTION = "SELECT EXISTS(SELECT 1 FROM event_execution WHERE event_handler_name = :name AND event_name = :event AND message_id = :messageId AND execution_id = :id)"; + boolean exist = connection.createQuery(EXISTS_EVENT_EXECUTION).bind(eventExecution).executeScalar(Boolean.class); + if (!exist) { + String INSERT_EVENT_EXECUTION = "INSERT INTO event_execution (event_handler_name, event_name, message_id, execution_id, json_data) VALUES (:name, :event, :messageId, :id, :jsonData)"; + connection.createQuery(INSERT_EVENT_EXECUTION).bind(eventExecution).addParameter("jsonData", toJson(eventExecution)).executeUpdate(); + return true; + } + return false; + } + + private void updateEventExecution(Connection connection, EventExecution eventExecution) { + String UPDATE_EVENT_EXECUTION = "UPDATE event_execution SET json_data = :jsonData, modified_on = CURRENT_TIMESTAMP WHERE event_handler_name = :name AND execution_event = :event AND message_id = :messageId AND execution_id = :id"; + connection.createQuery(UPDATE_EVENT_EXECUTION).bind(eventExecution).addParameter("jsonData", toJson(eventExecution)).executeUpdate(); + } + + private EventExecution readEventExecution(Connection connection, String eventHandlerName, String eventName, String messageId, String executionId) { + String GET_EVENT_EXECUTION = "SELECT json_data FROM event_execution WHERE event_handler_name = :name AND event_name = :event AND message_id = :messageId AND execution_id = :id"; + String jsonStr = connection.createQuery(GET_EVENT_EXECUTION) + .addParameter("name", eventHandlerName) + .addParameter("event", eventName) + .addParameter("messageId", messageId) + .addParameter("id", executionId) + .executeScalar(String.class); + return jsonStr != null ? readValue(jsonStr, EventExecution.class) : null; + } + + private void insertOrUpdatePollData(Connection connection, PollData pollData, String domain) { + String UPDATE_POLL_DATA = "UPDATE poll_data SET json_data = :jsonData, modified_on = CURRENT_TIMESTAMP WHERE queue_name = :queueName AND domain = :domain"; + int result = connection.createQuery(UPDATE_POLL_DATA) + .addParameter("queueName", pollData.getQueueName()) + .addParameter("domain", domain) + .addParameter("jsonData", toJson(pollData)) + .executeUpdate() + .getResult(); + if (result == 0) { + String INSERT_POLL_DATA = "INSERT INTO poll_data (queue_name, domain, json_data) VALUES (:queueName, :domain, :jsonData)"; + connection.createQuery(INSERT_POLL_DATA) + .addParameter("queueName", pollData.getQueueName()) + .addParameter("domain", domain) + .addParameter("jsonData", toJson(pollData)) + .executeUpdate() + .getResult(); + } + } + + private PollData readPollData(Connection connection, String queueName, String domain) { + String GET_POLL_DATA = "SELECT json_data FROM poll_data WHERE queue_name = :queueName AND domain = :domain"; + String jsonStr = connection.createQuery(GET_POLL_DATA) + .addParameter("queueName", queueName) + .addParameter("domain", domain) + .executeScalar(String.class); + return jsonStr != null ? readValue(jsonStr, PollData.class) : null; + } + + private List readAllPollData(String queueName) { + String GET_ALL_POLL_DATA = "SELECT json_data FROM poll_data WHERE queue_name = :queueName"; + return getWithTransaction(tx -> tx.createQuery(GET_ALL_POLL_DATA) + .addParameter("queueName", queueName) + .executeScalarList(String.class) + .stream() + .map(jsonData -> readValue(jsonData, PollData.class)) + .collect(Collectors.toList())); + } + + private List findAllTasksInProgressInOrderOfArrival(Task task, int limit) { + String GET_IN_PROGRESS_TASKS_WITH_LIMIT = "SELECT task_id FROM task_in_progress WHERE task_def_name = :taskDefName ORDER BY id LIMIT :limit"; + return getWithTransaction(connection -> + connection.createQuery(GET_IN_PROGRESS_TASKS_WITH_LIMIT) + .addParameter("taskDefName", task.getTaskDefName()) + .addParameter("limit", limit) + .executeScalarList(String.class)); + } + + private void validate(Task task) { + Preconditions.checkNotNull(task, "task object cannot be null"); + Preconditions.checkNotNull(task.getTaskId(), "Task id cannot be null"); + Preconditions.checkNotNull(task.getWorkflowInstanceId(), "Workflow instance id cannot be null"); + Preconditions.checkNotNull(task.getReferenceTaskName(), "Task reference name cannot be null"); + } + + private static String dateStr(Long timeInMs) { + Date date = new Date(timeInMs); + return dateStr(date); + } + + private static String dateStr(Date date) { + SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd"); + return format.format(date); + } +} diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAO.java new file mode 100644 index 0000000000..bae50eb897 --- /dev/null +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAO.java @@ -0,0 +1,269 @@ +package com.netflix.conductor.dao.mysql; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.inject.Inject; + +import org.sql2o.Connection; +import org.sql2o.Sql2o; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.netflix.conductor.common.metadata.events.EventHandler; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.core.config.Configuration; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.dao.MetadataDAO; + +class MySQLMetadataDAO extends MySQLBaseDAO implements MetadataDAO { + + private Map taskDefCache = new HashMap<>(); + + @Inject + MySQLMetadataDAO(ObjectMapper om, Sql2o sql2o, Configuration config) { + super(om, sql2o); + refreshTaskDefs(); + int cacheRefreshTime = config.getIntProperty("conductor.taskdef.cache.refresh.time.seconds", 60); + Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refreshTaskDefs, cacheRefreshTime, cacheRefreshTime, TimeUnit.SECONDS); + } + + @Override + public String createTaskDef(TaskDef taskDef) { + validate(taskDef); + taskDef.setCreateTime(System.currentTimeMillis()); + return insertOrUpdateTaskDef(taskDef); + } + + @Override + public String updateTaskDef(TaskDef taskDef) { + validate(taskDef); + taskDef.setUpdateTime(System.currentTimeMillis()); + return insertOrUpdateTaskDef(taskDef); + } + + @Override + public TaskDef getTaskDef(String name) { + Preconditions.checkNotNull(name, "TaskDef name cannot be null"); + TaskDef taskDef = taskDefCache.get(name); + if (taskDef == null) { + taskDef = getTaskDefFromDB(name); + } + return taskDef; + } + + private TaskDef getTaskDefFromDB(String name) { + String READ_ONE_TASKDEF_QUERY = "SELECT json_data FROM meta_task_def WHERE name = :name"; + String taskDefJsonStr = getWithTransaction(conn -> conn.createQuery(READ_ONE_TASKDEF_QUERY).addParameter("name", name).executeScalar(String.class)); + return taskDefJsonStr != null ? readValue(taskDefJsonStr, TaskDef.class) : null; + } + + @Override + public List getAllTaskDefs() { + return getWithTransaction(this::findAllTaskDefs); + } + + @Override + public void removeTaskDef(String name) { + withTransaction(connection -> { + String DELETE_TASKDEF_QUERY = "DELETE FROM meta_task_def WHERE name = :name"; + int deleted = connection.createQuery(DELETE_TASKDEF_QUERY).addParameter("name", name).executeUpdate().getResult(); + if (deleted != 1) { + throw new ApplicationException(ApplicationException.Code.NOT_FOUND, "Cannot remove the task - no such task definition"); + } + refreshTaskDefs(connection); + }); + } + + @Override + public void create(WorkflowDef def) { + validate(def); + def.setCreateTime(System.currentTimeMillis()); + withTransaction(connection -> { + if (workflowExists(connection, def)) { + throw new ApplicationException(ApplicationException.Code.CONFLICT, "Workflow with " + def.key() + " already exists!"); + } + insertOrUpdateWorkflowDef(connection, def); + }); + } + + @Override + public void update(WorkflowDef def) { + validate(def); + def.setUpdateTime(System.currentTimeMillis()); + withTransaction(connection -> insertOrUpdateWorkflowDef(connection, def)); + } + + @Override + public WorkflowDef getLatest(String name) { + String GET_LATEST_WORKFLOW_DEF_QUERY = "SELECT json_data FROM meta_workflow_def WHERE NAME = :name AND version = latest_version"; + String workflowJsonStr = getWithTransaction(conn -> conn.createQuery(GET_LATEST_WORKFLOW_DEF_QUERY).addParameter("name", name).executeScalar(String.class)); + return (workflowJsonStr != null) ? readValue(workflowJsonStr, WorkflowDef.class) : null; + } + + @Override + public WorkflowDef get(String name, int version) { + String GET_WORKFLOW_DEF_QUERY = "SELECT json_data FROM meta_workflow_def WHERE NAME = :name AND version = :version"; + String workflowJsonStr = getWithTransaction(conn -> conn.createQuery(GET_WORKFLOW_DEF_QUERY).addParameter("name", name).addParameter("version", version).executeScalar(String.class)); + return (workflowJsonStr != null) ? readValue(workflowJsonStr, WorkflowDef.class) : null; + } + + @Override + public List findAll() { + String FIND_ALL_WORKFLOW_DEF_QUERY = "SELECT DISTINCT name FROM meta_workflow_def"; + return getWithTransaction(conn -> conn.createQuery(FIND_ALL_WORKFLOW_DEF_QUERY).executeScalarList(String.class)); + } + + @Override + public List getAll() { + String GET_ALL_WORKFLOW_DEF_QUERY = "SELECT json_data FROM meta_workflow_def ORDER BY name, version"; + return getWithTransaction(conn -> conn.createQuery(GET_ALL_WORKFLOW_DEF_QUERY).executeScalarList(String.class)).stream() + .map(jsonData -> readValue(jsonData, WorkflowDef.class)) + .collect(Collectors.toList()); + } + + @Override + public List getAllLatest() { + String GET_ALL_LATEST_WORKFLOW_DEF_QUERY = "SELECT json_data FROM meta_workflow_def WHERE version = latest_version"; + return getWithTransaction(conn -> conn.createQuery(GET_ALL_LATEST_WORKFLOW_DEF_QUERY).executeScalarList(String.class)).stream() + .map(jsonData -> readValue(jsonData, WorkflowDef.class)) + .collect(Collectors.toList()); + } + + @Override + public List getAllVersions(String name) { + String GET_ALL_VERSIONS_WORKFLOW_DEF_QUERY = "SELECT json_data FROM meta_workflow_def WHERE name = :name ORDER BY version"; + return getWithTransaction(conn -> conn.createQuery(GET_ALL_VERSIONS_WORKFLOW_DEF_QUERY).addParameter("name",name).executeScalarList(String.class)).stream() + .map(jsonData -> readValue(jsonData, WorkflowDef.class)) + .collect(Collectors.toList()); + } + + @Override + public void addEventHandler(EventHandler eventHandler) { + Preconditions.checkNotNull(eventHandler.getName(), "EventHandler name cannot be null"); + withTransaction(connection -> { + if (getEventHandler(connection, eventHandler.getName()) != null) { + throw new ApplicationException(ApplicationException.Code.CONFLICT, "EventHandler with name " + eventHandler.getName() + " already exists!"); + } + String INSERT_EVENT_HANDLER_QUERY = "INSERT INTO meta_event_handler (name, event, active, json_data) VALUES (:name, :event, :active, :jsonData)"; + connection.createQuery(INSERT_EVENT_HANDLER_QUERY).bind(eventHandler).addParameter("jsonData", toJson(eventHandler)).executeUpdate(); + }); + } + + @Override + public void updateEventHandler(EventHandler eventHandler) { + Preconditions.checkNotNull(eventHandler.getName(), "EventHandler name cannot be null"); + withTransaction(connection -> { + EventHandler existing = getEventHandler(connection, eventHandler.getName()); + if (existing == null) { + throw new ApplicationException(ApplicationException.Code.NOT_FOUND, "EventHandler with name " + eventHandler.getName() + " not found!"); + } + String UPDATE_EVENT_HANDLER_QUERY = "UPDATE meta_event_handler SET event = :event, active = :active, json_data = :jsonData, modified_on = CURRENT_TIMESTAMP WHERE name = :name"; + connection.createQuery(UPDATE_EVENT_HANDLER_QUERY).bind(eventHandler).addParameter("jsonData", toJson(eventHandler)).executeUpdate(); + }); + } + + @Override + public void removeEventHandlerStatus(String name) { + withTransaction(connection -> { + EventHandler existing = getEventHandler(connection, name); + if (existing == null) { + throw new ApplicationException(ApplicationException.Code.NOT_FOUND, "EventHandler with name " + name + " not found!"); + } + String DELETE_EVENT_HANDLER_QUERY = "DELETE FROM meta_event_handler WHERE name = :name"; + connection.createQuery(DELETE_EVENT_HANDLER_QUERY).addParameter("name", name).executeUpdate(); + }); + } + + private EventHandler getEventHandler(Connection connection, String name) { + String READ_ONE_EVENT_HANDLER_QUERY = "SELECT json_data FROM meta_event_handler WHERE name = :name"; + String eventHandlerStr = connection.createQuery(READ_ONE_EVENT_HANDLER_QUERY).addParameter("name", name).executeScalar(String.class); + return eventHandlerStr != null ? readValue(eventHandlerStr, EventHandler.class) : null; + } + + @Override + public List getEventHandlers() { + String READ_ALL_EVENT_HANDLER_QUERY = "SELECT json_data FROM meta_event_handler"; + return getWithTransaction(conn -> conn.createQuery(READ_ALL_EVENT_HANDLER_QUERY).executeScalarList(String.class)).stream() + .map(jsonData -> readValue(jsonData, EventHandler.class)) + .collect(Collectors.toList()); + } + + @Override + public List getEventHandlersForEvent(String event, boolean activeOnly) { + String READ_ALL_EVENT_HANDLER_BY_EVENT_QUERY = "SELECT json_data FROM meta_event_handler WHERE event = :event"; + return getWithTransaction(conn -> conn.createQuery(READ_ALL_EVENT_HANDLER_BY_EVENT_QUERY).addParameter("event", event).executeScalarList(String.class)).stream() + .map(jsonData -> readValue(jsonData, EventHandler.class)) + .filter(eventHandler -> (!activeOnly || eventHandler.isActive())) + .collect(Collectors.toList()); + } + + private void refreshTaskDefs(Connection connection) { + Map map = new HashMap<>(); + findAllTaskDefs(connection).forEach(taskDef -> map.put(taskDef.getName(), taskDef)); + this.taskDefCache = map; + } + + private void refreshTaskDefs() { + withTransaction(this::refreshTaskDefs); + } + + private void validate(TaskDef taskDef) { + Preconditions.checkNotNull(taskDef, "TaskDef object cannot be null"); + Preconditions.checkNotNull(taskDef.getName(), "TaskDef name cannot be null"); + } + + private void validate(WorkflowDef def) { + Preconditions.checkNotNull(def, "WorkflowDef object cannot be null"); + Preconditions.checkNotNull(def.getName(), "WorkflowDef name cannot be null"); + } + + private String insertOrUpdateTaskDef(TaskDef taskDef) { + withTransaction(connection -> { + String UPDATE_TASKDEF_QUERY = "UPDATE meta_task_def SET json_data = :jsonData, modified_on = CURRENT_TIMESTAMP WHERE name = :name"; + int result = connection.createQuery(UPDATE_TASKDEF_QUERY).bind(taskDef).addParameter("jsonData", toJson(taskDef)).executeUpdate().getResult(); + if (result == 0) { + String INSERT_TASKDEF_QUERY = "INSERT INTO meta_task_def (name, json_data) VALUES (:name, :jsonData)"; + connection.createQuery(INSERT_TASKDEF_QUERY).bind(taskDef).addParameter("jsonData", toJson(taskDef)).executeUpdate(); + } + refreshTaskDefs(connection); + }); + return taskDef.getName(); + } + + private void insertOrUpdateWorkflowDef(Connection connection, WorkflowDef def) { + + String GET_LATEST_WORKFLOW_DEF_VERSION = "SELECT max(version) AS version FROM meta_workflow_def WHERE name = :name"; + Integer latestVersion = connection.createQuery(GET_LATEST_WORKFLOW_DEF_VERSION).bind(def).executeScalar(Integer.class); + + if (latestVersion == null || latestVersion < def.getVersion()) { + String INSERT_WORKFLOW_DEF_QUERY = "INSERT INTO meta_workflow_def (name, version, json_data) VALUES (:name, :version, :jsonData)"; + connection.createQuery(INSERT_WORKFLOW_DEF_QUERY).bind(def).addParameter("jsonData", toJson(def)).executeUpdate(); + latestVersion = def.getVersion(); + } else { + String UPDATE_WORKFLOW_DEF_QUERY = "UPDATE meta_workflow_def SET json_data = :jsonData, modified_on = CURRENT_TIMESTAMP WHERE name = :name AND version = :version"; + connection.createQuery(UPDATE_WORKFLOW_DEF_QUERY).bind(def).addParameter("jsonData", toJson(def)).executeUpdate(); + } + + String UPDATE_WORKFLOW_DEF_LATEST_VERSION_QUERY = "UPDATE meta_workflow_def SET latest_version = :latest_version WHERE name = :name"; + connection.createQuery(UPDATE_WORKFLOW_DEF_LATEST_VERSION_QUERY).bind(def).addParameter("latest_version", latestVersion).executeUpdate(); + } + + private Boolean workflowExists(Connection connection, WorkflowDef def) { + String CHECK_WORKFLOW_DEF_EXISTS_QUERY = "SELECT COUNT(*) FROM meta_workflow_def WHERE name = :name AND version = :version"; + return connection.createQuery(CHECK_WORKFLOW_DEF_EXISTS_QUERY).bind(def).executeScalar(Boolean.class); + } + + private List findAllTaskDefs(Connection connection) { + String READ_ALL_TASKDEF_QUERY = "SELECT json_data FROM meta_task_def"; + return connection.createQuery(READ_ALL_TASKDEF_QUERY).executeScalarList(String.class) + .stream() + .map(jsonData -> readValue(jsonData, TaskDef.class)) + .collect(Collectors.toList()); + } +} diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java new file mode 100644 index 0000000000..4cc44d5ac8 --- /dev/null +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java @@ -0,0 +1,277 @@ +package com.netflix.conductor.dao.mysql; + +import java.util.Calendar; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; + +import org.apache.commons.lang.time.DateUtils; +import org.sql2o.Connection; +import org.sql2o.Sql2o; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.util.concurrent.Uninterruptibles; +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.core.execution.ApplicationException; +import com.netflix.conductor.dao.QueueDAO; + +class MySQLQueueDAO extends MySQLBaseDAO implements QueueDAO { + + @Inject + MySQLQueueDAO(ObjectMapper om, Sql2o sql2o) { + super(om, sql2o); + } + + @Override + public void push(String queueName, String messageId, long offsetTimeInSecond) { + withTransaction(tx -> pushMessage(tx, queueName, messageId, null, offsetTimeInSecond)); + } + + @Override + public void push(String queueName, List messages) { + withTransaction(tx -> + messages.forEach(message -> + pushMessage(tx, queueName, message.getId(), message.getPayload(), 0) + ) + ); + } + + @Override + public boolean pushIfNotExists(String queueName, String messageId, long offsetTimeInSecond) { + return getWithTransaction(tx -> { + if (!existsMessage(tx, queueName, messageId)) { + pushMessage(tx, queueName, messageId, null, offsetTimeInSecond); + return true; + } + return false; + }); + } + + @Override + public List pop(String queueName, int count, int timeout) { + long start = System.currentTimeMillis(); + List foundsIds = peekMessages(queueName, count); + + while (foundsIds.size() < count && ((System.currentTimeMillis() - start) < timeout)) { + Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS); + foundsIds = peekMessages(queueName, count); + } + + ImmutableList messageIds = ImmutableList.copyOf(foundsIds); + return getWithTransaction(tx -> popMessages(tx, queueName, messageIds)); + } + + @Override + public List pollMessages(String queueName, int count, int timeout) { + List poppedMessageIds = pop(queueName, count, timeout); + return readMessages(queueName, poppedMessageIds); + } + + @Override + public void remove(String queueName, String messageId) { + withTransaction(tx -> removeMessage(tx, queueName, messageId)); + } + + @Override + public int getSize(String queueName) { + String GET_QUEUE_SIZE = "SELECT COUNT(*) FROM queue_message WHERE queue_name = :queueName"; + return getWithTransaction(tx -> tx.createQuery(GET_QUEUE_SIZE).addParameter("queueName", queueName).executeScalar(Integer.class)); + } + + @Override + public boolean ack(String queueName, String messageId) { + return getWithTransaction(tx -> { + if (existsMessage(tx, queueName, messageId)) { + removeMessage(tx, queueName, messageId); + return true; + } else { + return false; + } + }); + } + + @Override + public boolean setUnackTimeout(String queueName, String messageId, long unackTimeout) { + long updatedOffsetTimeInSecond = unackTimeout/1000; + + String UPDATE_UNACK_TIMEOUT = + "UPDATE queue_message SET offset_time_seconds = :offsetSeconds, deliver_on = TIMESTAMPADD(SECOND,:offsetSeconds,created_on) \n" + + "WHERE queue_name = :queueName AND message_id = :messageId"; + + return getWithTransaction(tx -> + tx.createQuery(UPDATE_UNACK_TIMEOUT) + .addParameter("queueName", queueName) + .addParameter("messageId", messageId) + .addParameter("offsetSeconds", updatedOffsetTimeInSecond) + .executeUpdate() + .getResult() + ) == 1; + } + + @Override + public void flush(String queueName) { + String FLUSH_QUEUE = "DELETE FROM queue_message WHERE queue_name = :queueName"; + withTransaction(tx -> tx.createQuery(FLUSH_QUEUE).addParameter("queueName", queueName).executeUpdate()); + } + + @Override + public Map queuesDetail() { + Map detail = Maps.newHashMap(); + + String GET_QUEUES_DETAIL = + "SELECT queue_name, (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size FROM queue q"; + + withTransaction(tx -> tx.createQuery(GET_QUEUES_DETAIL).executeAndFetchTable().asList().forEach(row -> { + String queueName = (String)row.get("queue_name"); + Number queueSize = (Number)row.get("size"); + detail.put(queueName, queueSize.longValue()); + })); + + return detail; + } + + @Override + public Map>> queuesDetailVerbose() { + Map>> result = Maps.newHashMap(); + + String GET_QUEUES_DETAIL_VERBOSE = + "SELECT queue_name, \n" + + " (SELECT count(*) FROM queue_message WHERE popped = false AND queue_name = q.queue_name) AS size,\n" + + " (SELECT count(*) FROM queue_message WHERE popped = true AND queue_name = q.queue_name) AS uacked \n" + + "FROM queue q"; + + withTransaction(tx -> tx.createQuery(GET_QUEUES_DETAIL_VERBOSE).executeAndFetchTable().asList().forEach(row -> { + String queueName = (String)row.get("queue_name"); + Number queueSize = (Number)row.get("size"); + Number queueUnacked = (Number)row.get("uacked"); + result.put(queueName, ImmutableMap.of( + "a", ImmutableMap.of( //sharding not implemented, returning only one shard with all the info + "size", queueSize.longValue(), + "uacked", queueUnacked.longValue() + ) + ) + ); + })); + + return result; + } + + @Override + public void processUnacks(String queueName) { + String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE CURRENT_TIMESTAMP > deliver_on AND popped = true"; + withTransaction(tx -> tx.createQuery(PROCESS_UNACKS).executeUpdate()); + } + + @Override + public boolean setOffsetTime(String queueName, String messageId, long offsetTimeInSecond) { + String SET_OFFSET_TIME = + "UPDATE queue_message SET offset_time_seconds = :offsetSeconds, deliver_on = TIMESTAMPADD(SECOND,:offsetSeconds,created_on) \n" + + "WHERE queue_name = :queueName AND message_id = :messageId"; + + return getWithTransaction(tx -> + tx.createQuery(SET_OFFSET_TIME) + .addParameter("queueName", queueName) + .addParameter("messageId", messageId) + .addParameter("offsetSeconds", offsetTimeInSecond) + .executeUpdate() + .getResult() + ) == 1; + } + private boolean existsMessage(Connection connection, String queueName, String messageId) { + String EXISTS_MESSAGE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = :queueName AND message_id = :messageId)"; + return connection.createQuery(EXISTS_MESSAGE).addParameter("queueName", queueName).addParameter("messageId", messageId).executeScalar(Boolean.class); + } + + private void pushMessage(Connection connection, String queueName, String messageId, String payload, long offsetTimeInSecond) { + String PUSH_MESSAGE = "INSERT INTO queue_message (created_on, deliver_on, queue_name, message_id, offset_time_seconds, payload) VALUES (:createdOn, :deliverOn, :queueName, :messageId, :offsetSeconds, :payload)"; + String UPDATE_MESSAGE = "UPDATE queue_message SET payload = :payload WHERE queue_name = :queueName AND message_id = :messageId"; + + createQueueIfNotExists(connection, queueName); + + Date now = DateUtils.truncate(new Date(), Calendar.SECOND); + Date deliverTime = new Date(now.getTime() + (offsetTimeInSecond*1000)); + boolean exists = existsMessage(connection, queueName, messageId); + + if (!exists) { + connection.createQuery(PUSH_MESSAGE) + .addParameter("createdOn", now) + .addParameter("deliverOn", deliverTime) + .addParameter("queueName", queueName) + .addParameter("messageId", messageId) + .addParameter("offsetSeconds", offsetTimeInSecond) + .addParameter("payload", payload).executeUpdate(); + } else { + connection.createQuery(UPDATE_MESSAGE) + .addParameter("queueName", queueName) + .addParameter("messageId", messageId) + .addParameter("payload", payload).executeUpdate(); + } + } + + private void removeMessage(Connection connection, String queueName, String messageId) { + String REMOVE_MESSAGE = "DELETE FROM queue_message WHERE queue_name = :queueName AND message_id = :messageId"; + connection.createQuery(REMOVE_MESSAGE).addParameter("queueName", queueName).addParameter("messageId", messageId).executeUpdate(); + } + + private List peekMessages(String queueName, int count) { + if (count < 1) return Collections.emptyList(); + String PEEK_MESSAGES = "SELECT message_id FROM queue_message WHERE queue_name = :queueName LIMIT :count"; + return getWithTransaction(tx -> tx.createQuery(PEEK_MESSAGES) + .addParameter("queueName", queueName) + .addParameter("count", count) + .executeScalarList(String.class)); + } + + private List popMessages(Connection connection, String queueName, List messageIds) { + if (messageIds.isEmpty()) return messageIds; + + String POP_MESSAGES = "UPDATE queue_message SET popped = true WHERE queue_name = :queueName AND message_id IN (%s)"; + String query = generateQueryWithParametersListPlaceholders(POP_MESSAGES, messageIds.size()); + + int result = connection.createQuery(query).addParameter("queueName", queueName).withParams(messageIds.toArray()).executeUpdate().getResult(); + + if (result != messageIds.size()) { + String message = String.format("could not pop all messages for given ids: %s (%d messages were popped)", messageIds, result); + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, message); + } + + return messageIds; + } + + private List readMessages(String queueName, List messageIds) { + if (messageIds.isEmpty()) return Collections.emptyList(); + + String READ_MESSAGES = "SELECT message_id, payload FROM queue_message WHERE queue_name = :queueName AND message_id IN (%s)"; + String query = generateQueryWithParametersListPlaceholders(READ_MESSAGES, messageIds.size()); + + List messages = getWithTransaction(tx -> tx.createQuery(query) + .addParameter("queueName", queueName) + .addColumnMapping("message_id", "id") + .withParams(messageIds.toArray()) + .executeAndFetch(Message.class)); + + if (messages.size() != messageIds.size()) { + throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "could not read all messages for given ids: " + messageIds); + } + + return messages; + } + + private void createQueueIfNotExists(Connection connection, String queueName) { + String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue WHERE queue_name = :queueName)"; + boolean queueExists = connection.createQuery(EXISTS_QUEUE).addParameter("queueName", queueName).executeScalar(Boolean.class); + + if (!queueExists) { + logger.info("creating queue {}", queueName); + String CREATE_QUEUE = "INSERT INTO queue (queue_name) VALUES (:queueName)"; + connection.createQuery(CREATE_QUEUE).addParameter("queueName", queueName).executeUpdate(); + } + } +} diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLWorkflowModule.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLWorkflowModule.java new file mode 100644 index 0000000000..ee7ab645e4 --- /dev/null +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLWorkflowModule.java @@ -0,0 +1,63 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.conductor.dao.mysql; + +import javax.inject.Singleton; +import javax.sql.DataSource; + +import org.flywaydb.core.Flyway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sql2o.Sql2o; + +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.netflix.conductor.core.config.Configuration; +import com.netflix.conductor.dao.ExecutionDAO; +import com.netflix.conductor.dao.MetadataDAO; +import com.netflix.conductor.dao.QueueDAO; +import com.zaxxer.hikari.HikariDataSource; + +public class MySQLWorkflowModule extends AbstractModule { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + + @Provides + @Singleton + public Sql2o getSql2o(Configuration config) { + HikariDataSource dataSource = new HikariDataSource(); + dataSource.setJdbcUrl(config.getProperty("jdbc.url", "jdbc:mysql://localhost:3306/conductor")); + dataSource.setUsername(config.getProperty("jdbc.username", "conductor")); + dataSource.setPassword(config.getProperty("jdbc.password", "password")); + dataSource.setAutoCommit(false); + flywayMigrate(dataSource); + return new Sql2o(dataSource); + } + + @Override + protected void configure() { + bind(MetadataDAO.class).to(MySQLMetadataDAO.class); + bind(ExecutionDAO.class).to(MySQLExecutionDAO.class); + bind(QueueDAO.class).to(MySQLQueueDAO.class); + } + + private void flywayMigrate(DataSource dataSource) { + Flyway flyway = new Flyway(); + flyway.setDataSource(dataSource); + flyway.setPlaceholderReplacement(false); + flyway.migrate(); + } +} diff --git a/mysql-persistence/src/main/resources/db/migration/V1__initial_schema.sql b/mysql-persistence/src/main/resources/db/migration/V1__initial_schema.sql new file mode 100644 index 0000000000..cefc211fa5 --- /dev/null +++ b/mysql-persistence/src/main/resources/db/migration/V1__initial_schema.sql @@ -0,0 +1,172 @@ + +-- -------------------------------------------------------------------------------------------------------------- +-- SCHEMA FOR METADATA DAO +-- -------------------------------------------------------------------------------------------------------------- + +CREATE TABLE meta_event_handler ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + name varchar(255) NOT NULL, + event varchar(255) NOT NULL, + active boolean NOT NULL, + json_data mediumtext NOT NULL, + PRIMARY KEY (id), + KEY event_handler_name_index (name), + KEY event_handler_event_index (event) +); + +CREATE TABLE meta_task_def ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + name varchar(255) NOT NULL, + json_data mediumtext NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_task_def_name (name) +); + +CREATE TABLE meta_workflow_def ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + name varchar(255) NOT NULL, + version int(11) NOT NULL, + latest_version int(11) NOT NULL DEFAULT 0, + json_data mediumtext NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_name_version (name,version), + KEY workflow_def_name_index (name) +); + +-- -------------------------------------------------------------------------------------------------------------- +-- SCHEMA FOR EXECUTION DAO +-- -------------------------------------------------------------------------------------------------------------- + +CREATE TABLE event_execution ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + event_handler_name varchar(255) NOT NULL, + event_name varchar(255) NOT NULL, + message_id varchar(255) NOT NULL, + execution_id varchar(255) NOT NULL, + json_data mediumtext NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_event_execution (event_handler_name,event_name,message_id) +); + +CREATE TABLE poll_data ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + queue_name varchar(255) NOT NULL, + domain varchar(255) NOT NULL, + json_data mediumtext NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_poll_data (queue_name,domain), + KEY (queue_name) +); + +CREATE TABLE task_scheduled ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + workflow_id varchar(255) NOT NULL, + task_key varchar(255) NOT NULL, + task_id varchar(255) NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_workflow_id_task_key (workflow_id,task_key) +); + +CREATE TABLE task_in_progress ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + task_def_name varchar(255) NOT NULL, + task_id varchar(255) NOT NULL, + workflow_id varchar(255) NOT NULL, + in_progress_status boolean NOT NULL DEFAULT false, + PRIMARY KEY (id), + UNIQUE KEY unique_task_def_task_id1 (task_def_name,task_id) +); + +CREATE TABLE task ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + task_id varchar(255) NOT NULL, + json_data mediumtext NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_task_id (task_id) +); + +CREATE TABLE workflow ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + workflow_id varchar(255) NOT NULL, + correlation_id varchar(255), + json_data mediumtext NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_workflow_id (workflow_id) +); + +CREATE TABLE workflow_def_to_workflow ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + workflow_def varchar(255) NOT NULL, + date_str integer NOT NULL, + workflow_id varchar(255) NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_workflow_def_date_str (workflow_def,date_str,workflow_id) +); + +CREATE TABLE workflow_pending ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + workflow_type varchar(255) NOT NULL, + workflow_id varchar(255) NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_workflow_type_workflow_id (workflow_type,workflow_id), + KEY workflow_type_index (workflow_type) +); + +CREATE TABLE workflow_to_task ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + modified_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + workflow_id varchar(255) NOT NULL, + task_id varchar(255) NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_workflow_to_task_id (workflow_id,task_id), + KEY workflow_id_index (workflow_id) +); + +-- -------------------------------------------------------------------------------------------------------------- +-- SCHEMA FOR QUEUE DAO +-- -------------------------------------------------------------------------------------------------------------- + +CREATE TABLE queue ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + queue_name varchar(255) NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY unique_queue_name (queue_name) +); + +CREATE TABLE queue_message ( + id int(11) unsigned NOT NULL AUTO_INCREMENT, + created_on TIMESTAMP, + deliver_on TIMESTAMP, + queue_name varchar(255) NOT NULL, + message_id varchar(255) NOT NULL, + popped boolean DEFAULT false, + offset_time_seconds long, + payload mediumtext, + PRIMARY KEY (id), + UNIQUE KEY unique_queue_name_message_id (queue_name,message_id), + KEY queue_name_index (queue_name) +); diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/config/TestConfiguration.java b/mysql-persistence/src/test/java/com/netflix/conductor/config/TestConfiguration.java new file mode 100644 index 0000000000..b78e5f8ce8 --- /dev/null +++ b/mysql-persistence/src/test/java/com/netflix/conductor/config/TestConfiguration.java @@ -0,0 +1,96 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.conductor.config; + +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.netflix.conductor.core.config.Configuration; + +/** + * @author Viren + * + */ +public class TestConfiguration implements Configuration { + + private Map testProperties = Maps.newHashMap(ImmutableMap.of("test", "dummy")); + + @Override + public int getSweepFrequency() { + return 1; + } + + @Override + public boolean disableSweep() { + return false; + } + + @Override + public boolean disableAsyncWorkers() { + return false; + } + + @Override + public String getServerId() { + return "server_id"; + } + + @Override + public String getEnvironment() { + return "test"; + } + + @Override + public String getStack() { + return "junit"; + } + + @Override + public String getAppId() { + return "workflow"; + } + + @Override + public String getProperty(String string, String def) { + String val = testProperties.get(string); + return val != null ? val : def; + } + + public void setProperty(String key, String value) { + testProperties.put(key, value); + } + + @Override + public String getAvailabilityZone() { + return "us-east-1a"; + } + + @Override + public int getIntProperty(String string, int def) { + return 100; + } + + @Override + public String getRegion() { + return "us-east-1"; + } + + @Override + public Map getAll() { + return null; + } +} diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/EmbeddedDatabase.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/EmbeddedDatabase.java new file mode 100644 index 0000000000..ef41a1a8fc --- /dev/null +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/EmbeddedDatabase.java @@ -0,0 +1,34 @@ +package com.netflix.conductor.dao.mysql; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.vorburger.exec.ManagedProcessException; +import ch.vorburger.mariadb4j.DB; + +public enum EmbeddedDatabase { + INSTANCE; + + private final DB db; + private final Logger logger = LoggerFactory.getLogger(getClass()); + + public DB getDB() { + return db; + } + + private DB startEmbeddedDatabase() { + try { + DB db = DB.newEmbeddedDB(33306); + db.start(); + db.createDB("conductor"); + return db; + } catch (ManagedProcessException e) { + throw new RuntimeException(e); + } + } + + EmbeddedDatabase() { + logger.info("Starting embedded database"); + db = startEmbeddedDatabase(); + } +} diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLBaseDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLBaseDAOTest.java new file mode 100644 index 0000000000..b0c54a896f --- /dev/null +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLBaseDAOTest.java @@ -0,0 +1,47 @@ +package com.netflix.conductor.dao.mysql; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sql2o.Connection; +import org.sql2o.Sql2o; + +import ch.vorburger.mariadb4j.DB; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.conductor.config.TestConfiguration; + +class MySQLBaseDAOTest { + + protected final Logger logger = LoggerFactory.getLogger(getClass()); + protected final Sql2o testSql2o; + protected final TestConfiguration testConfiguration = new TestConfiguration(); + protected final ObjectMapper objectMapper = createObjectMapper(); + protected final DB db = EmbeddedDatabase.INSTANCE.getDB(); + + MySQLBaseDAOTest() { + testConfiguration.setProperty("jdbc.url", "jdbc:mysql://localhost:33306/conductor"); + testConfiguration.setProperty("jdbc.username", "root"); + testConfiguration.setProperty("jdbc.password", ""); + testSql2o = new MySQLWorkflowModule().getSql2o(testConfiguration); + } + + private ObjectMapper createObjectMapper() { + ObjectMapper om = new ObjectMapper(); + om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + om.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false); + om.configure(DeserializationFeature.FAIL_ON_NULL_FOR_PRIMITIVES, false); + om.setSerializationInclusion(JsonInclude.Include.NON_NULL); + om.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); + return om; + } + + protected void resetAllData() { + logger.info("Resetting data for test"); + try(Connection connection = testSql2o.open()) { + connection.createQuery("SHOW TABLES").executeScalarList(String.class).stream() + .filter(name -> !name.equalsIgnoreCase("schema_version")) + .forEach(table -> connection.createQuery("TRUNCATE TABLE " + table).executeUpdate()); + } + } +} diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAOTest.java new file mode 100644 index 0000000000..09de21ce66 --- /dev/null +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLExecutionDAOTest.java @@ -0,0 +1,405 @@ +package com.netflix.conductor.dao.mysql; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.netflix.conductor.common.metadata.tasks.PollData; +import com.netflix.conductor.common.metadata.tasks.Task; +import com.netflix.conductor.common.metadata.tasks.Task.Status; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.common.run.Workflow.WorkflowStatus; +import com.netflix.conductor.dao.IndexDAO; + +public class MySQLExecutionDAOTest extends MySQLBaseDAOTest { + + private MySQLMetadataDAO metadata; + private MySQLExecutionDAO dao; + + @Before + public void setup() throws Exception { + metadata = new MySQLMetadataDAO(objectMapper, testSql2o, testConfiguration); + dao = new MySQLExecutionDAO(mock(IndexDAO.class), metadata, objectMapper, testSql2o); + resetAllData(); + } + + @Rule + public ExpectedException expected = ExpectedException.none(); + + @Test + public void testTaskExceedsLimit() throws Exception { + TaskDef def = new TaskDef(); + def.setName("task1"); + def.setConcurrentExecLimit(1); + metadata.createTaskDef(def); + + List tasks = new LinkedList<>(); + for(int i = 0; i < 15; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId("t_" + i); + task.setWorkflowInstanceId("workflow_" + i); + task.setReferenceTaskName("task1"); + task.setTaskDefName("task1"); + tasks.add(task); + task.setStatus(Status.SCHEDULED); + } + + dao.createTasks(tasks); + assertFalse(dao.exceedsInProgressLimit(tasks.get(0))); + tasks.get(0).setStatus(Status.IN_PROGRESS); + dao.updateTask(tasks.get(0)); + + for(Task task : tasks) { + assertTrue(dao.exceedsInProgressLimit(task)); + } + } + + @Test + public void testCreateTaskException() throws Exception { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId("t1"); + task.setTaskDefName("task1"); + + expected.expect(NullPointerException.class); + expected.expectMessage("Workflow instance id cannot be null"); + dao.createTasks(Collections.singletonList(task)); + + task.setWorkflowInstanceId("wfid"); + expected.expect(NullPointerException.class); + expected.expectMessage("Task reference name cannot be null"); + dao.createTasks(Collections.singletonList(task)); + } + + @Test + public void testCreateTaskException2() throws Exception { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId("t1"); + task.setTaskDefName("task1"); + task.setWorkflowInstanceId("wfid"); + + expected.expect(NullPointerException.class); + expected.expectMessage("Task reference name cannot be null"); + dao.createTasks(Collections.singletonList(task)); + } + + @Test + public void testPollData() throws Exception { + dao.updateLastPoll("taskDef", null, "workerId1"); + PollData pd = dao.getPollData("taskDef", null); + assertNotNull(pd); + assertTrue(pd.getLastPollTime() > 0); + assertEquals(pd.getQueueName(), "taskDef"); + assertEquals(pd.getDomain(), null); + assertEquals(pd.getWorkerId(), "workerId1"); + + dao.updateLastPoll("taskDef", "domain1", "workerId1"); + pd = dao.getPollData("taskDef", "domain1"); + assertNotNull(pd); + assertTrue(pd.getLastPollTime() > 0); + assertEquals(pd.getQueueName(), "taskDef"); + assertEquals(pd.getDomain(), "domain1"); + assertEquals(pd.getWorkerId(), "workerId1"); + + List pData = dao.getPollData("taskDef"); + assertEquals(pData.size(), 2); + + pd = dao.getPollData("taskDef", "domain2"); + assertTrue(pd == null); + } + + @Test + public void testTaskCreateDups() throws Exception { + List tasks = new LinkedList<>(); + String workflowId = UUID.randomUUID().toString(); + + for(int i = 0; i < 3; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(workflowId + "_t" + i); + task.setReferenceTaskName("t" + i); + task.setRetryCount(0); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("task" + i); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + } + + //Let's insert a retried task + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(workflowId + "_t" + 2); + task.setReferenceTaskName("t" + 2); + task.setRetryCount(1); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("task" + 2); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + + //Duplicate task! + task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(workflowId + "_t" + 1); + task.setReferenceTaskName("t" + 1); + task.setRetryCount(0); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("task" + 1); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + + List created = dao.createTasks(tasks); + assertEquals(tasks.size()-1, created.size()); //1 less + + Set srcIds = tasks.stream().map(t -> t.getReferenceTaskName() + "." + t.getRetryCount()).collect(Collectors.toSet()); + Set createdIds = created.stream().map(t -> t.getReferenceTaskName() + "." + t.getRetryCount()).collect(Collectors.toSet()); + + assertEquals(srcIds, createdIds); + + List pending = dao.getPendingTasksByWorkflow("task0", workflowId); + assertNotNull(pending); + assertEquals(1, pending.size()); + assertTrue(EqualsBuilder.reflectionEquals(tasks.get(0), pending.get(0))); + + List found = dao.getTasks(tasks.get(0).getTaskDefName(), null, 1); + assertNotNull(found); + assertEquals(1, found.size()); + assertTrue(EqualsBuilder.reflectionEquals(tasks.get(0), found.get(0))); + } + + @Test + public void testTaskOps() throws Exception { + List tasks = new LinkedList<>(); + String workflowId = UUID.randomUUID().toString(); + + for(int i = 0; i < 3; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId(workflowId + "_t" + i); + task.setReferenceTaskName("testTaskOps" + i); + task.setRetryCount(0); + task.setWorkflowInstanceId(workflowId); + task.setTaskDefName("testTaskOps" + i); + task.setStatus(Task.Status.IN_PROGRESS); + tasks.add(task); + } + + for(int i = 0; i < 3; i++) { + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId("x" + workflowId + "_t" + i); + task.setReferenceTaskName("testTaskOps" + i); + task.setRetryCount(0); + task.setWorkflowInstanceId("x" + workflowId); + task.setTaskDefName("testTaskOps" + i); + task.setStatus(Task.Status.IN_PROGRESS); + dao.createTasks(Arrays.asList(task)); + } + + + List created = dao.createTasks(tasks); + assertEquals(tasks.size(), created.size()); + + List pending = dao.getPendingTasksForTaskType(tasks.get(0).getTaskDefName()); + assertNotNull(pending); + assertEquals(2, pending.size()); + //Pending list can come in any order. finding the one we are looking for and then comparing + Task matching = pending.stream().filter(task -> task.getTaskId().equals(tasks.get(0).getTaskId())).findAny().get(); + assertTrue(EqualsBuilder.reflectionEquals(matching, tasks.get(0))); + + List update = new LinkedList<>(); + for(int i = 0; i < 3; i++) { + Task found = dao.getTask(workflowId + "_t" + i); + assertNotNull(found); + found.getOutputData().put("updated", true); + found.setStatus(Task.Status.COMPLETED); + update.add(found); + } + dao.updateTasks(update); + + List taskIds = tasks.stream().map(Task::getTaskId).collect(Collectors.toList()); + List found = dao.getTasks(taskIds); + assertEquals(taskIds.size(), found.size()); + found.forEach(task -> { + assertTrue(task.getOutputData().containsKey("updated")); + assertEquals(true, task.getOutputData().get("updated")); + dao.removeTask(task.getTaskId()); + }); + + found = dao.getTasks(taskIds); + assertTrue(found.isEmpty()); + } + + @Test + public void test() throws Exception { + Workflow workflow = new Workflow(); + workflow.setCorrelationId("correlationX"); + workflow.setCreatedBy("junit_tester"); + workflow.setEndTime(200L); + + Map input = new HashMap<>(); + input.put("param1", "param1 value"); + input.put("param2", 100); + workflow.setInput(input); + + Map output = new HashMap<>(); + output.put("ouput1", "output 1 value"); + output.put("op2", 300); + workflow.setOutput(output); + + workflow.setOwnerApp("workflow"); + workflow.setParentWorkflowId("parentWorkflowId"); + workflow.setParentWorkflowTaskId("parentWFTaskId"); + workflow.setReasonForIncompletion("missing recipe"); + workflow.setReRunFromWorkflowId("re-run from id1"); + workflow.setSchemaVersion(2); + workflow.setStartTime(90L); + workflow.setStatus(WorkflowStatus.FAILED); + workflow.setWorkflowId("workflow0"); + + List tasks = new LinkedList<>(); + + Task task = new Task(); + task.setScheduledTime(1L); + task.setSeq(1); + task.setTaskId("t1"); + task.setReferenceTaskName("t1"); + task.setWorkflowInstanceId(workflow.getWorkflowId()); + task.setTaskDefName("task1"); + + Task task2 = new Task(); + task2.setScheduledTime(2L); + task2.setSeq(2); + task2.setTaskId("t2"); + task2.setReferenceTaskName("t2"); + task2.setWorkflowInstanceId(workflow.getWorkflowId()); + task2.setTaskDefName("task2"); + + Task task3 = new Task(); + task3.setScheduledTime(2L); + task3.setSeq(3); + task3.setTaskId("t3"); + task3.setReferenceTaskName("t3"); + task3.setWorkflowInstanceId(workflow.getWorkflowId()); + task3.setTaskDefName("task3"); + + tasks.add(task); + tasks.add(task2); + tasks.add(task3); + + workflow.setTasks(tasks); + + workflow.setUpdatedBy("junit_tester"); + workflow.setUpdateTime(800L); + workflow.setVersion(3); + //workflow.setWorkflowId("wf0001"); + workflow.setWorkflowType("Junit Workflow"); + + String workflowId = dao.createWorkflow(workflow); + List created = dao.createTasks(tasks); + assertEquals(tasks.size(), created.size()); + + Workflow workflowWithTasks = dao.getWorkflow(workflow.getWorkflowId(), true); + assertEquals(workflowWithTasks.getWorkflowId(), workflowId); + assertTrue(!workflowWithTasks.getTasks().isEmpty()); + + assertEquals(workflow.getWorkflowId(), workflowId); + Workflow found = dao.getWorkflow(workflowId, false); + assertTrue(found.getTasks().isEmpty()); + + workflow.getTasks().clear(); + assertTrue(EqualsBuilder.reflectionEquals(workflow, found)); + + workflow.getInput().put("updated", true); + dao.updateWorkflow(workflow); + found = dao.getWorkflow(workflowId); + assertNotNull(found); + assertTrue(found.getInput().containsKey("updated")); + assertEquals(true, found.getInput().get("updated")); + + List running = dao.getRunningWorkflowIds(workflow.getWorkflowType()); + assertNotNull(running); + assertTrue(running.isEmpty()); + + workflow.setStatus(WorkflowStatus.RUNNING); + dao.updateWorkflow(workflow); + + running = dao.getRunningWorkflowIds(workflow.getWorkflowType()); + assertNotNull(running); + assertEquals(1, running.size()); + assertEquals(workflow.getWorkflowId(), running.get(0)); + + List pending = dao.getPendingWorkflowsByType(workflow.getWorkflowType()); + assertNotNull(pending); + assertEquals(1, pending.size()); + assertEquals(3, pending.get(0).getTasks().size()); + pending.get(0).getTasks().clear(); + assertTrue(EqualsBuilder.reflectionEquals(workflow, pending.get(0))); + + workflow.setStatus(WorkflowStatus.COMPLETED); + dao.updateWorkflow(workflow); + running = dao.getRunningWorkflowIds(workflow.getWorkflowType()); + assertNotNull(running); + assertTrue(running.isEmpty()); + + List bytime = dao.getWorkflowsByType(workflow.getWorkflowType(), System.currentTimeMillis(), System.currentTimeMillis()+100); + assertNotNull(bytime); + assertTrue(bytime.isEmpty()); + + bytime = dao.getWorkflowsByType(workflow.getWorkflowType(), workflow.getCreateTime() - 10, workflow.getCreateTime() + 10); + assertNotNull(bytime); + assertEquals(1, bytime.size()); + + String workflowName = "pending_count_test"; + String idBase = workflow.getWorkflowId(); + for(int i = 0; i < 10; i++) { + workflow.setWorkflowId("x" + i + idBase); + workflow.setCorrelationId("corr001"); + workflow.setStatus(WorkflowStatus.RUNNING); + workflow.setWorkflowType(workflowName); + dao.createWorkflow(workflow); + } + + List bycorrelationId = dao.getWorkflowsByCorrelationId("corr001"); + assertNotNull(bycorrelationId); + assertEquals(10, bycorrelationId.size()); + + long count = dao.getPendingWorkflowCount(workflowName); + assertEquals(10, count); + + for(int i = 0; i < 10; i++) { + dao.removeFromPendingWorkflow(workflowName, "x" + i + idBase); + } + count = dao.getPendingWorkflowCount(workflowName); + assertEquals(0, count); + } + +} diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAOTest.java new file mode 100644 index 0000000000..79ab5c2a8a --- /dev/null +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLMetadataDAOTest.java @@ -0,0 +1,210 @@ +package com.netflix.conductor.dao.mysql; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.commons.lang.builder.EqualsBuilder; +import org.junit.Before; +import org.junit.Test; + +import com.netflix.conductor.common.metadata.events.EventHandler; +import com.netflix.conductor.common.metadata.tasks.TaskDef; +import com.netflix.conductor.common.metadata.workflow.WorkflowDef; +import com.netflix.conductor.core.execution.ApplicationException; + +public class MySQLMetadataDAOTest extends MySQLBaseDAOTest { + + private MySQLMetadataDAO dao; + + @Before + public void setup() throws Exception { + dao = new MySQLMetadataDAO(objectMapper, testSql2o, testConfiguration); + resetAllData(); + } + + @Test(expected=NullPointerException.class) + public void testMissingName() throws Exception { + WorkflowDef def = new WorkflowDef(); + dao.create(def); + } + + @Test(expected=ApplicationException.class) + public void testDuplicate() throws Exception { + WorkflowDef def = new WorkflowDef(); + def.setName("testDuplicate"); + def.setVersion(1); + + dao.create(def); + dao.create(def); + } + + @Test + public void testWorkflowDefOperations() throws Exception { + WorkflowDef def = new WorkflowDef(); + def.setName("test"); + def.setVersion(1); + def.setDescription("description"); + def.setCreatedBy("unit_test"); + def.setCreateTime(1L); + def.setOwnerApp("ownerApp"); + def.setUpdatedBy("unit_test2"); + def.setUpdateTime(2L); + + dao.create(def); + + List all = dao.getAll(); + assertNotNull(all); + assertEquals(1, all.size()); + assertEquals("test", all.get(0).getName()); + assertEquals(1, all.get(0).getVersion()); + + WorkflowDef found = dao.get("test", 1); + assertTrue(EqualsBuilder.reflectionEquals(def, found)); + + def.setVersion(2); + dao.create(def); + + all = dao.getAll(); + assertNotNull(all); + assertEquals(2, all.size()); + assertEquals("test", all.get(0).getName()); + assertEquals(1, all.get(0).getVersion()); + + found = dao.getLatest(def.getName()); + assertEquals(def.getName(), found.getName()); + assertEquals(def.getVersion(), found.getVersion()); + assertEquals(2, found.getVersion()); + + all = dao.getAllLatest(); + assertNotNull(all); + assertEquals(1, all.size()); + assertEquals("test", all.get(0).getName()); + assertEquals(2, all.get(0).getVersion()); + + all = dao.getAllVersions(def.getName()); + assertNotNull(all); + assertEquals(2, all.size()); + assertEquals("test", all.get(0).getName()); + assertEquals("test", all.get(1).getName()); + assertEquals(1, all.get(0).getVersion()); + assertEquals(2, all.get(1).getVersion()); + + def.setDescription("updated"); + dao.update(def); + found = dao.get(def.getName(), def.getVersion()); + assertEquals(def.getDescription(), found.getDescription()); + + List allnames = dao.findAll(); + assertNotNull(allnames); + assertEquals(1, allnames.size()); + assertEquals(def.getName(), allnames.get(0)); + } + + @Test + public void testTaskDefOperations() throws Exception { + TaskDef def = new TaskDef("taskA"); + def.setDescription("description"); + def.setCreatedBy("unit_test"); + def.setCreateTime(1L); + def.setInputKeys(Arrays.asList("a","b","c")); + def.setOutputKeys(Arrays.asList("01","o2")); + def.setOwnerApp("ownerApp"); + def.setRetryCount(3); + def.setRetryDelaySeconds(100); + def.setRetryLogic(TaskDef.RetryLogic.FIXED); + def.setTimeoutPolicy(TaskDef.TimeoutPolicy.ALERT_ONLY); + def.setUpdatedBy("unit_test2"); + def.setUpdateTime(2L); + + dao.createTaskDef(def); + + TaskDef found = dao.getTaskDef(def.getName()); + assertTrue(EqualsBuilder.reflectionEquals(def, found)); + + def.setDescription("updated description"); + dao.updateTaskDef(def); + found = dao.getTaskDef(def.getName()); + assertTrue(EqualsBuilder.reflectionEquals(def, found)); + assertEquals("updated description", found.getDescription()); + + for(int i = 0; i < 9; i++) { + TaskDef tdf = new TaskDef("taskA" + i); + dao.createTaskDef(tdf); + } + + List all = dao.getAllTaskDefs(); + assertNotNull(all); + assertEquals(10, all.size()); + Set allnames = all.stream().map(TaskDef::getName).collect(Collectors.toSet()); + assertEquals(10, allnames.size()); + List sorted = allnames.stream().sorted().collect(Collectors.toList()); + assertEquals(def.getName(), sorted.get(0)); + + for(int i = 0; i < 9; i++) { + assertEquals(def.getName() + i, sorted.get(i+1)); + } + + for(int i = 0; i < 9; i++) { + dao.removeTaskDef(def.getName() + i); + } + all = dao.getAllTaskDefs(); + assertNotNull(all); + assertEquals(1, all.size()); + assertEquals(def.getName(), all.get(0).getName()); + } + + @Test(expected=ApplicationException.class) + public void testRemoveTaskDef() throws Exception { + dao.removeTaskDef("test" + UUID.randomUUID().toString()); + } + + @Test + public void testEventHandlers() { + String event1 = "SQS::arn:account090:sqstest1"; + String event2 = "SQS::arn:account090:sqstest2"; + + EventHandler eh = new EventHandler(); + eh.setName(UUID.randomUUID().toString()); + eh.setActive(false); + EventHandler.Action action = new EventHandler.Action(); + action.setAction(EventHandler.Action.Type.start_workflow); + action.setStart_workflow(new EventHandler.StartWorkflow()); + action.getStart_workflow().setName("workflow_x"); + eh.getActions().add(action); + eh.setEvent(event1); + + dao.addEventHandler(eh); + List all = dao.getEventHandlers(); + assertNotNull(all); + assertEquals(1, all.size()); + assertEquals(eh.getName(), all.get(0).getName()); + assertEquals(eh.getEvent(), all.get(0).getEvent()); + + List byEvents = dao.getEventHandlersForEvent(event1, true); + assertNotNull(byEvents); + assertEquals(0, byEvents.size()); //event is marked as in-active + + eh.setActive(true); + eh.setEvent(event2); + dao.updateEventHandler(eh); + + all = dao.getEventHandlers(); + assertNotNull(all); + assertEquals(1, all.size()); + + byEvents = dao.getEventHandlersForEvent(event1, true); + assertNotNull(byEvents); + assertEquals(0, byEvents.size()); + + byEvents = dao.getEventHandlersForEvent(event2, true); + assertNotNull(byEvents); + assertEquals(1, byEvents.size()); + } +} diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java new file mode 100644 index 0000000000..d4012efa0d --- /dev/null +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java @@ -0,0 +1,96 @@ +package com.netflix.conductor.dao.mysql; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.List; +import java.util.Map; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class MySQLQueueDAOTest extends MySQLBaseDAOTest { + + private MySQLQueueDAO dao; + + @Before + public void setup() throws Exception { + dao = new MySQLQueueDAO(objectMapper, testSql2o); + resetAllData(); + } + + @Rule + public ExpectedException expected = ExpectedException.none(); + + @Test + public void test() { + String queueName = "TestQueue"; + long offsetTimeInSecond = 0; + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + dao.push(queueName, messageId, offsetTimeInSecond); + } + int size = dao.getSize(queueName); + assertEquals(10, size); + Map details = dao.queuesDetail(); + assertEquals(1, details.size()); + assertEquals(10L, details.get(queueName).longValue()); + + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + dao.pushIfNotExists(queueName, messageId, offsetTimeInSecond); + } + + List popped = dao.pop(queueName, 10, 100); + assertNotNull(popped); + assertEquals(10, popped.size()); + + Map>> verbose = dao.queuesDetailVerbose(); + assertEquals(1, verbose.size()); + long shardSize = verbose.get(queueName).get("a").get("size"); + long unackedSize = verbose.get(queueName).get("a").get("uacked"); + assertEquals(0, shardSize); + assertEquals(10, unackedSize); + + popped.forEach(messageId -> dao.ack(queueName, messageId)); + + verbose = dao.queuesDetailVerbose(); + assertEquals(1, verbose.size()); + shardSize = verbose.get(queueName).get("a").get("size"); + unackedSize = verbose.get(queueName).get("a").get("uacked"); + assertEquals(0, shardSize); + assertEquals(0, unackedSize); + + popped = dao.pop(queueName, 10, 100); + assertNotNull(popped); + assertEquals(0, popped.size()); + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + dao.pushIfNotExists(queueName, messageId, offsetTimeInSecond); + } + size = dao.getSize(queueName); + assertEquals(10, size); + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + dao.remove(queueName, messageId); + } + + size = dao.getSize(queueName); + assertEquals(0, size); + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + dao.pushIfNotExists(queueName, messageId, offsetTimeInSecond); + } + dao.flush(queueName); + size = dao.getSize(queueName); + assertEquals(0, size); + + } +} diff --git a/mysql-persistence/src/test/resources/logback-test.xml b/mysql-persistence/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..f0fd8fb65c --- /dev/null +++ b/mysql-persistence/src/test/resources/logback-test.xml @@ -0,0 +1,17 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + diff --git a/redis-persistence/build.gradle b/redis-persistence/build.gradle index 6c29d79d04..92ea50921d 100644 --- a/redis-persistence/build.gradle +++ b/redis-persistence/build.gradle @@ -5,7 +5,7 @@ dependencies { compile 'com.google.inject:guice:3.0' compile 'com.netflix.dyno:dyno-core:1.5.9' compile 'com.netflix.dyno:dyno-jedis:1.5.9' - compile 'com.netflix.dyno-queues:dyno-queues-redis:1.0.7' + compile 'com.netflix.dyno-queues:dyno-queues-redis:1.0.8' compile 'org.elasticsearch:elasticsearch:2.+' //In memory redis for unit testing diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java index 99b677e5f0..b5df5f020d 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAO.java @@ -38,6 +38,7 @@ import com.netflix.conductor.common.metadata.tasks.Task.Status; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskExecLog; +import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.events.queue.Message; @@ -423,8 +424,8 @@ public List getWorkflowsByCorrelationId(String correlationId) { Preconditions.checkNotNull(correlationId, "correlationId cannot be null"); List workflows = new LinkedList(); - - Set workflowIds = dynoClient.smembers(nsKey(CORR_ID_TO_WORKFLOWS, correlationId)); + SearchResult result = indexer.searchWorkflows("correlationId='" + correlationId + "'", "*", 0, 10000, null); + List workflowIds = result.getResults(); for(String wfId : workflowIds) { workflows.add(getWorkflow(wfId)); } diff --git a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java index 8e28f24f39..0cc1eb3509 100644 --- a/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java +++ b/redis-persistence/src/main/java/com/netflix/conductor/dao/dynomite/queue/DynoQueueDAO.java @@ -203,4 +203,11 @@ public void processUnacks(String queueName) { ((RedisDynoQueue)queues.get(queueName)).processUnacks();; } + @Override + public boolean setOffsetTime(String queueName, String id, long offsetTimeInSecond) { + DynoQueue queue = queues.get(queueName); + return queue.setTimeout(id, offsetTimeInSecond); + + } + } \ No newline at end of file diff --git a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java index e82b7e633e..ffa4a164ae 100644 --- a/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java +++ b/redis-persistence/src/test/java/com/netflix/conductor/dao/dynomite/RedisExecutionDAOTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ /** - * + * */ package com.netflix.conductor.dao.dynomite; @@ -77,9 +77,9 @@ public class RedisExecutionDAOTest { private RedisMetadataDAO mdao; - + private RedisExecutionDAO dao; - + private static ObjectMapper om = new ObjectMapper(); static { @@ -89,20 +89,19 @@ public class RedisExecutionDAOTest { om.setSerializationInclusion(Include.NON_NULL); om.setSerializationInclusion(Include.NON_EMPTY); } - + @SuppressWarnings("unchecked") @Before public void init() throws Exception { Configuration config = new TestConfiguration(); JedisCommands jedisMock = new JedisMock(); DynoProxy dynoClient = new DynoProxy(jedisMock); - - + Client client = mock(Client.class); BulkItemResponse[] responses = new BulkItemResponse[0]; BulkResponse response = new BulkResponse(responses, 1); BulkRequestBuilder brb = mock(BulkRequestBuilder.class); - + when(brb.add(any(IndexRequest.class))).thenReturn(brb); ListenableActionFuture laf = mock(ListenableActionFuture.class); when(laf.actionGet()).thenReturn(response); @@ -110,52 +109,52 @@ public void init() throws Exception { when(client.prepareBulk()).thenReturn(brb); final UpdateResponse ur = new UpdateResponse(); when(client.update(any())).thenReturn(new ActionFuture() { - + @Override public boolean isDone() { return true; } - + @Override public boolean isCancelled() { return false; } - + @Override public UpdateResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return ur; } - + @Override public UpdateResponse get() throws InterruptedException, ExecutionException { return ur; } - + @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } - + @Override public UpdateResponse actionGet(long timeout, TimeUnit unit) { return ur; } - + @Override public UpdateResponse actionGet(TimeValue timeout) { return ur; } - + @Override public UpdateResponse actionGet(long timeoutMillis) { return ur; } - + @Override public UpdateResponse actionGet(String timeout) { return ur; } - + @Override public UpdateResponse actionGet() { return ur; @@ -164,26 +163,26 @@ public UpdateResponse actionGet() { BulkRequestBuilder bulk = client.prepareBulk(); bulk = bulk.add(any(IndexRequest.class)); bulk.execute().actionGet(); - + when(client.prepareBulk().add(any(IndexRequest.class)).execute().actionGet()).thenReturn(response); - + ElasticSearchDAO indexer = spy(new ElasticSearchDAO(client, config, om)); mdao = new RedisMetadataDAO(dynoClient, om, config); dao = new RedisExecutionDAO(dynoClient, om, indexer, mdao, config); - + } - + @Rule public ExpectedException expected = ExpectedException.none(); - + @Test public void testTaskExceedsLimit() throws Exception { - + TaskDef def = new TaskDef(); def.setName("task1"); - def.setConcurrentExecLimit(1); + def.setConcurrentExecLimit(1); mdao.createTaskDef(def); - + List tasks = new LinkedList<>(); for(int i = 0; i < 15; i++) { Task task = new Task(); @@ -196,16 +195,16 @@ public void testTaskExceedsLimit() throws Exception { tasks.add(task); task.setStatus(Status.SCHEDULED); } - + dao.createTasks(tasks); assertFalse(dao.exceedsInProgressLimit(tasks.get(0))); tasks.get(0).setStatus(Status.IN_PROGRESS); dao.updateTask(tasks.get(0)); - + for(Task task : tasks) { assertTrue(dao.exceedsInProgressLimit(task)); } - + } @Test public void testCreateTaskException() throws Exception { @@ -217,14 +216,14 @@ public void testCreateTaskException() throws Exception { expected.expect(NullPointerException.class); expected.expectMessage("Workflow instance id cannot be null"); dao.createTasks(Arrays.asList(task)); - + task.setWorkflowInstanceId("wfid"); expected.expect(NullPointerException.class); expected.expectMessage("Task reference name cannot be nullss"); dao.createTasks(Arrays.asList(task)); - + } - + @Test public void testCreateTaskException2() throws Exception { Task task = new Task(); @@ -255,10 +254,10 @@ public void testPollData() throws Exception { assertEquals(pd.getQueueName(), "taskDef"); assertEquals(pd.getDomain(), "domain1"); assertEquals(pd.getWorkerId(), "workerId1"); - + List pData = dao.getPollData("taskDef"); assertEquals(pData.size(), 2); - + pd = dao.getPollData("taskDef", "domain2"); assertTrue(pd == null); } @@ -267,7 +266,7 @@ public void testPollData() throws Exception { public void testTaskCreateDups() throws Exception { List tasks = new LinkedList<>(); String workflowId = UUID.randomUUID().toString(); - + for(int i = 0; i < 3; i++) { Task task = new Task(); task.setScheduledTime(1L); @@ -280,7 +279,7 @@ public void testTaskCreateDups() throws Exception { task.setStatus(Task.Status.IN_PROGRESS); tasks.add(task); } - + //Let's insert a retried task Task task = new Task(); task.setScheduledTime(1L); @@ -292,7 +291,7 @@ public void testTaskCreateDups() throws Exception { task.setTaskDefName("task" + 2); task.setStatus(Task.Status.IN_PROGRESS); tasks.add(task); - + //Duplicate task! task = new Task(); task.setScheduledTime(1L); @@ -304,31 +303,31 @@ public void testTaskCreateDups() throws Exception { task.setTaskDefName("task" + 1); task.setStatus(Task.Status.IN_PROGRESS); tasks.add(task); - + List created = dao.createTasks(tasks); assertEquals(tasks.size()-1, created.size()); //1 less - + Set srcIds = tasks.stream().map(t -> t.getReferenceTaskName() + "." + t.getRetryCount()).collect(Collectors.toSet()); Set createdIds = created.stream().map(t -> t.getReferenceTaskName() + "." + t.getRetryCount()).collect(Collectors.toSet()); - + assertEquals(srcIds, createdIds); - + List pending = dao.getPendingTasksByWorkflow("task0", workflowId); assertNotNull(pending); assertEquals(1, pending.size()); assertTrue(EqualsBuilder.reflectionEquals(tasks.get(0), pending.get(0))); - + List found = dao.getTasks(tasks.get(0).getTaskDefName(), null, 1); assertNotNull(found); assertEquals(1, found.size()); assertTrue(EqualsBuilder.reflectionEquals(tasks.get(0), found.get(0))); } - + @Test public void testTaskOps() throws Exception { List tasks = new LinkedList<>(); String workflowId = UUID.randomUUID().toString(); - + for(int i = 0; i < 3; i++) { Task task = new Task(); task.setScheduledTime(1L); @@ -341,7 +340,7 @@ public void testTaskOps() throws Exception { task.setStatus(Task.Status.IN_PROGRESS); tasks.add(task); } - + for(int i = 0; i < 3; i++) { Task task = new Task(); task.setScheduledTime(1L); @@ -354,18 +353,18 @@ public void testTaskOps() throws Exception { task.setStatus(Task.Status.IN_PROGRESS); dao.createTasks(Arrays.asList(task)); } - - + + List created = dao.createTasks(tasks); assertEquals(tasks.size(), created.size()); - + List pending = dao.getPendingTasksForTaskType(tasks.get(0).getTaskDefName()); assertNotNull(pending); assertEquals(2, pending.size()); //Pending list can come in any order. finding the one we are looking for and then comparing Task matching = pending.stream().filter(task -> task.getTaskId().equals(tasks.get(0).getTaskId())).findAny().get(); assertTrue(EqualsBuilder.reflectionEquals(matching, tasks.get(0))); - + List update = new LinkedList<>(); for(int i = 0; i < 3; i++) { Task found = dao.getTask(workflowId + "_t" + i); @@ -375,7 +374,7 @@ public void testTaskOps() throws Exception { update.add(found); } dao.updateTasks(update); - + List taskIds = tasks.stream().map(Task::getTaskId).collect(Collectors.toList()); List found = dao.getTasks(taskIds); assertEquals(taskIds.size(), found.size()); @@ -384,28 +383,28 @@ public void testTaskOps() throws Exception { assertEquals(true, task.getOutputData().get("updated")); dao.removeTask(task.getTaskId()); }); - + found = dao.getTasks(taskIds); assertTrue(found.isEmpty()); } - + @Test public void test() throws Exception { Workflow workflow = new Workflow(); workflow.setCorrelationId("correlationX"); workflow.setCreatedBy("junit_tester"); workflow.setEndTime(200L); - + Map input = new HashMap<>(); input.put("param1", "param1 value"); input.put("param2", 100); workflow.setInput(input); - + Map output = new HashMap<>(); output.put("ouput1", "output 1 value"); - output.put("op2", 300); + output.put("op2", 300); workflow.setOutput(output); - + workflow.setOwnerApp("workflow"); workflow.setParentWorkflowId("parentWorkflowId"); workflow.setParentWorkflowTaskId("parentWFTaskId"); @@ -415,9 +414,9 @@ public void test() throws Exception { workflow.setStartTime(90L); workflow.setStatus(WorkflowStatus.FAILED); workflow.setWorkflowId("workflow0"); - + List tasks = new LinkedList<>(); - + Task task = new Task(); task.setScheduledTime(1L); task.setSeq(1); @@ -425,7 +424,7 @@ public void test() throws Exception { task.setReferenceTaskName("t1"); task.setWorkflowInstanceId(workflow.getWorkflowId()); task.setTaskDefName("task1"); - + Task task2 = new Task(); task2.setScheduledTime(2L); task2.setSeq(2); @@ -433,7 +432,7 @@ public void test() throws Exception { task2.setReferenceTaskName("t2"); task2.setWorkflowInstanceId(workflow.getWorkflowId()); task2.setTaskDefName("task2"); - + Task task3 = new Task(); task3.setScheduledTime(2L); task3.setSeq(3); @@ -441,34 +440,34 @@ public void test() throws Exception { task3.setReferenceTaskName("t3"); task3.setWorkflowInstanceId(workflow.getWorkflowId()); task3.setTaskDefName("task3"); - + tasks.add(task); tasks.add(task2); tasks.add(task3); - + workflow.setTasks(tasks); - + workflow.setUpdatedBy("junit_tester"); workflow.setUpdateTime(800L); workflow.setVersion(3); //workflow.setWorkflowId("wf0001"); workflow.setWorkflowType("Junit Workflow"); - + String workflowId = dao.createWorkflow(workflow); List created = dao.createTasks(tasks); assertEquals(tasks.size(), created.size()); - + Workflow workflowWithTasks = dao.getWorkflow(workflow.getWorkflowId(), true); assertEquals(workflowWithTasks.getWorkflowId(), workflowId); assertTrue(!workflowWithTasks.getTasks().isEmpty()); - + assertEquals(workflow.getWorkflowId(), workflowId); Workflow found = dao.getWorkflow(workflowId, false); assertTrue(found.getTasks().isEmpty()); - + workflow.getTasks().clear(); assertTrue(EqualsBuilder.reflectionEquals(workflow, found)); - + workflow.getInput().put("updated", true); dao.updateWorkflow(workflow); found = dao.getWorkflow(workflowId); @@ -479,22 +478,22 @@ public void test() throws Exception { List running = dao.getRunningWorkflowIds(workflow.getWorkflowType()); assertNotNull(running); assertTrue(running.isEmpty()); - + workflow.setStatus(WorkflowStatus.RUNNING); dao.updateWorkflow(workflow); - + running = dao.getRunningWorkflowIds(workflow.getWorkflowType()); assertNotNull(running); assertEquals(1, running.size()); assertEquals(workflow.getWorkflowId(), running.get(0)); - + List pending = dao.getPendingWorkflowsByType(workflow.getWorkflowType()); assertNotNull(pending); assertEquals(1, pending.size()); assertEquals(3, pending.get(0).getTasks().size()); pending.get(0).getTasks().clear(); assertTrue(EqualsBuilder.reflectionEquals(workflow, pending.get(0))); - + workflow.setStatus(WorkflowStatus.COMPLETED); dao.updateWorkflow(workflow); running = dao.getRunningWorkflowIds(workflow.getWorkflowType()); @@ -504,11 +503,11 @@ public void test() throws Exception { List bytime = dao.getWorkflowsByType(workflow.getWorkflowType(), System.currentTimeMillis(), System.currentTimeMillis()+100); assertNotNull(bytime); assertTrue(bytime.isEmpty()); - + bytime = dao.getWorkflowsByType(workflow.getWorkflowType(), workflow.getCreateTime() - 10, workflow.getCreateTime() + 10); assertNotNull(bytime); assertEquals(1, bytime.size()); - + String workflowName = "pending_count_test"; String idBase = workflow.getWorkflowId(); for(int i = 0; i < 10; i++) { @@ -518,20 +517,21 @@ public void test() throws Exception { workflow.setWorkflowType(workflowName); dao.createWorkflow(workflow); } - + + /* List bycorrelationId = dao.getWorkflowsByCorrelationId("corr001"); assertNotNull(bycorrelationId); assertEquals(10, bycorrelationId.size()); - + */ long count = dao.getPendingWorkflowCount(workflowName); assertEquals(10, count); - + for(int i = 0; i < 10; i++) { dao.removeFromPendingWorkflow(workflowName, "x" + i + idBase); } count = dao.getPendingWorkflowCount(workflowName); assertEquals(0, count); - + } - + } diff --git a/server/build.gradle b/server/build.gradle index b7a1e18f06..60a6db5186 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -23,6 +23,7 @@ dependencies { compile project(':conductor-core') compile project(':conductor-jersey') compile project(':conductor-redis-persistence') + compile project(':conductor-mysql-persistence') compile project(':conductor-contribs') //Jetty diff --git a/server/src/main/java/com/netflix/conductor/server/ConductorServer.java b/server/src/main/java/com/netflix/conductor/server/ConductorServer.java index d2c4ae5226..d1eaa92d57 100644 --- a/server/src/main/java/com/netflix/conductor/server/ConductorServer.java +++ b/server/src/main/java/com/netflix/conductor/server/ConductorServer.java @@ -64,8 +64,8 @@ public class ConductorServer { private static Logger logger = LoggerFactory.getLogger(ConductorServer.class); - private enum DB { - redis, dynomite, memory, redis_cluster + enum DB { + redis, dynomite, memory, redis_cluster, mysql } private ServerModule sm; @@ -85,11 +85,11 @@ public ConductorServer(ConductorConfig cc) { try { db = DB.valueOf(dbstring); }catch(IllegalArgumentException ie) { - logger.error("Invalid db name: " + dbstring + ", supported values are: redis, dynomite, memory"); + logger.error("Invalid db name: " + dbstring + ", supported values are: " + Arrays.toString(DB.values())); System.exit(1); } - if(!db.equals(DB.memory)) { + if(!(db.equals(DB.memory) || db.equals(DB.mysql))) { String hosts = cc.getProperty("workflow.dynomite.cluster.hosts", null); if(hosts == null) { System.err.println("Missing dynomite/redis hosts. Ensure 'workflow.dynomite.cluster.hosts' has been set in the supplied configuration."); @@ -125,6 +125,7 @@ public Collection getHosts() { }; JedisCommands jedis = null; + switch(db) { case redis: case dynomite: @@ -159,6 +160,9 @@ public HostToken getTokenForHost(Host host, Set activeHosts) { break; + case mysql: + logger.info("Starting conductor server using MySQL data store", db); + break; case memory: jedis = new JedisMock(); try { @@ -185,7 +189,7 @@ public HostToken getTokenForHost(Host host, Set activeHosts) { break; } - this.sm = new ServerModule(jedis, hs, cc); + this.sm = new ServerModule(jedis, hs, cc, db); } public ServerModule getGuiceModule() { diff --git a/server/src/main/java/com/netflix/conductor/server/Main.java b/server/src/main/java/com/netflix/conductor/server/Main.java index 654876c733..9e7b90214f 100644 --- a/server/src/main/java/com/netflix/conductor/server/Main.java +++ b/server/src/main/java/com/netflix/conductor/server/Main.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.FileInputStream; +import java.io.IOException; import java.util.Properties; import org.apache.log4j.PropertyConfigurator; @@ -32,16 +33,7 @@ public class Main { public static void main(String[] args) throws Exception { - if(args.length > 0) { - String propertyFile = args[0]; - System.out.println("Using " + propertyFile); - FileInputStream propFile = new FileInputStream(propertyFile); - Properties props = new Properties(System.getProperties()); - props.load(propFile); - System.setProperties(props); - } - - + loadConfigFile(args.length > 0 ? args[0] : System.getenv("CONDUCTOR_CONFIG_FILE")); if(args.length == 2) { System.out.println("Using log4j config " + args[1]); @@ -61,6 +53,13 @@ public static void main(String[] args) throws Exception { server.start(config.getIntProperty("port", 8080), true); + } + private static void loadConfigFile(String propertyFile) throws IOException { + if (propertyFile == null) return; + System.out.println("Using config file" + propertyFile); + Properties props = new Properties(System.getProperties()); + props.load(new FileInputStream(propertyFile)); + System.setProperties(props); } } diff --git a/server/src/main/java/com/netflix/conductor/server/ServerModule.java b/server/src/main/java/com/netflix/conductor/server/ServerModule.java index 5786371539..bdf0876511 100644 --- a/server/src/main/java/com/netflix/conductor/server/ServerModule.java +++ b/server/src/main/java/com/netflix/conductor/server/ServerModule.java @@ -40,6 +40,7 @@ import com.netflix.conductor.dao.dynomite.queue.DynoQueueDAO; import com.netflix.conductor.dao.index.ElasticSearchDAO; import com.netflix.conductor.dao.index.ElasticsearchModule; +import com.netflix.conductor.dao.mysql.MySQLWorkflowModule; import com.netflix.dyno.connectionpool.HostSupplier; import com.netflix.dyno.queues.redis.DynoShardSupplier; @@ -65,12 +66,15 @@ public class ServerModule extends AbstractModule { private ConductorConfig config; - public ServerModule(JedisCommands jedis, HostSupplier hs, ConductorConfig config) { + private ConductorServer.DB db; + + public ServerModule(JedisCommands jedis, HostSupplier hs, ConductorConfig config, ConductorServer.DB db) { this.dynoConn = jedis; this.hs = hs; this.config = config; this.region = config.getRegion(); this.localRack = config.getAvailabilityZone(); + this.db = db; } @@ -80,25 +84,33 @@ protected void configure() { configureExecutorService(); bind(Configuration.class).toInstance(config); - String localDC = localRack; - localDC = localDC.replaceAll(region, ""); - DynoShardSupplier ss = new DynoShardSupplier(hs, region, localDC); - DynoQueueDAO queueDao = new DynoQueueDAO(dynoConn, dynoConn, ss, config); - + + if (db == ConductorServer.DB.mysql) { + install(new MySQLWorkflowModule()); + } else { + String localDC = localRack; + localDC = localDC.replaceAll(region, ""); + DynoShardSupplier ss = new DynoShardSupplier(hs, region, localDC); + DynoQueueDAO queueDao = new DynoQueueDAO(dynoConn, dynoConn, ss, config); + + bind(MetadataDAO.class).to(RedisMetadataDAO.class); + bind(ExecutionDAO.class).to(RedisExecutionDAO.class); + bind(DynoQueueDAO.class).toInstance(queueDao); + bind(QueueDAO.class).to(DynoQueueDAO.class); + + DynoProxy proxy = new DynoProxy(dynoConn); + bind(DynoProxy.class).toInstance(proxy); + } + install(new ElasticsearchModule()); - bind(MetadataDAO.class).to(RedisMetadataDAO.class); - bind(ExecutionDAO.class).to(RedisExecutionDAO.class); - bind(DynoQueueDAO.class).toInstance(queueDao); - bind(QueueDAO.class).to(DynoQueueDAO.class); bind(IndexDAO.class).to(ElasticSearchDAO.class); - DynoProxy proxy = new DynoProxy(dynoConn); - bind(DynoProxy.class).toInstance(proxy); - install(new CoreModule()); install(new JerseyModule()); + new HttpTask(new RestClientManager(), config); new JsonJqTransform(); + List additionalModules = config.getAdditionalModules(); if(additionalModules != null) { for(AbstractModule additionalModule : additionalModules) { diff --git a/settings.gradle b/settings.gradle index 426c84c572..4db9fe02fa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,3 @@ rootProject.name='conductor' -include 'common', 'core', 'redis-persistence','es5-persistence', 'jersey', 'client', 'test-harness', 'ui', 'contribs', 'server' +include 'common', 'core', 'redis-persistence','es5-persistence','mysql-persistence','jersey', 'client', 'test-harness', 'ui', 'contribs', 'server' rootProject.children.each {it.name="conductor-${it.name}"} diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java index 7753024bcb..6aab84bf26 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/End2EndTests.java @@ -67,6 +67,7 @@ public class End2EndTests { @BeforeClass public static void setup() throws Exception { + ConductorServer server = new ConductorServer(new ConductorConfig()); server.start(8080, false); @@ -134,13 +135,6 @@ public void testAll() throws Exception { assertEquals(1, runningIds.size()); assertEquals(workflowId, runningIds.get(0)); - List byCorrId = wc.getWorkflows(def.getName(), correlationId, true, true); - assertNotNull(byCorrId); - assertTrue(!byCorrId.isEmpty()); - assertEquals(workflowId, byCorrId.get(0).getWorkflowId()); - assertEquals(correlationId, byCorrId.get(0).getCorrelationId()); - assertTrue(!byCorrId.get(0).getStatus().isTerminal()); - List polled = tc.poll("non existing task", "test", 1, 100); assertNotNull(polled); assertEquals(0, polled.size()); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java index 6994d4c8da..e10bb05ec3 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/integration/WorkflowServiceTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue; import java.util.Arrays; -import java.util.Date; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -1665,6 +1664,120 @@ public void testLongRunning() throws Exception { } + @Test + public void testResetWorkflowInProgressTasks() throws Exception { + + clearWorkflows(); + + WorkflowDef found = ms.getWorkflowDef(LONG_RUNNING, 1); + assertNotNull(found); + + String correlationId = "unit_test_1"; + Map input = new HashMap(); + String inputParam1 = "p1 value"; + input.put("param1", inputParam1); + input.put("param2", "p2 value"); + String wfid = provider.startWorkflow(LONG_RUNNING, 1, correlationId , input); + System.out.println("testLongRunning.wfid=" + wfid); + assertNotNull(wfid); + + Workflow es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + + // Check the queue + assertEquals(Integer.valueOf(1), ess.getTaskQueueSizes(Arrays.asList("junit_task_1")).get("junit_task_1")); + /// + + Task task = ess.poll("junit_task_1", "task1.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); + + String param1 = (String) task.getInputData().get("p1"); + String param2 = (String) task.getInputData().get("p2"); + + assertNotNull(param1); + assertNotNull(param2); + assertEquals("p1 value", param1); + assertEquals("p2 value", param2); + + + String task1Op = "task1.In.Progress"; + task.getOutputData().put("op", task1Op); + task.setStatus(Status.IN_PROGRESS); + task.setCallbackAfterSeconds(3600); + ess.updateTask(task); + String taskId = task.getTaskId(); + + // Check the queue + assertEquals(Integer.valueOf(1), ess.getTaskQueueSizes(Arrays.asList("junit_task_1")).get("junit_task_1")); + /// + + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.RUNNING, es.getStatus()); + + // Polling for next task should not return anything + Task task2 = ess.poll("junit_task_2", "task2.junit.worker"); + assertNull(task2); + + task = ess.poll("junit_task_1", "task1.junit.worker"); + assertNull(task); + + //Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + // Reset + provider.resetCallbacksForInProgressTasks(wfid); + + + // Now Polling for the first task should return the same task as before + task = ess.poll("junit_task_1", "task1.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task1.junit.worker")); + assertEquals(task.getTaskId(), taskId); + assertEquals(task.getCallbackAfterSeconds(), 0); + + task1Op = "task1.Done"; + List tasks = ess.getTasks(task.getTaskType(), null, 1); + assertNotNull(tasks); + assertEquals(1, tasks.size()); + assertEquals(wfid, task.getWorkflowInstanceId()); + task = tasks.get(0); + task.getOutputData().put("op", task1Op); + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + task = ess.poll("junit_task_2", "task2.junit.worker"); + assertNotNull(task); + assertTrue(ess.ackTaskRecieved(task.getTaskId(), "task2.junit.worker")); + String task2Input = (String) task.getInputData().get("tp2"); + assertNotNull(task2Input); + assertEquals(task1Op, task2Input); + + task2Input = (String) task.getInputData().get("tp1"); + assertNotNull(task2Input); + assertEquals(inputParam1, task2Input); + + task.setStatus(Status.COMPLETED); + ess.updateTask(task); + + + es = ess.getExecutionStatus(wfid, true); + assertNotNull(es); + assertEquals(WorkflowStatus.COMPLETED, es.getStatus()); + tasks = es.getTasks(); + assertNotNull(tasks); + assertEquals(2, tasks.size()); + + + } + + @Test public void testConcurrentWorkflowExecutions() throws Exception { @@ -1965,10 +2078,13 @@ public void testSuccess() throws Exception { } assertTrue(foundId); + /* + * @correlationId List byCorrelationId = ess.getWorkflowInstances(LINEAR_WORKFLOW_T1_T2, correlationId, false, false); assertNotNull(byCorrelationId); assertTrue(!byCorrelationId.isEmpty()); assertEquals(1, byCorrelationId.size()); + */ Workflow es = ess.getExecutionStatus(wfid, true); assertNotNull(es); @@ -2179,11 +2295,6 @@ public void testDeciderMix() throws Exception { } assertTrue(foundId); - List byCorrelationId = ess.getWorkflowInstances(LINEAR_WORKFLOW_T1_T2, correlationId, false, false); - assertNotNull(byCorrelationId); - assertTrue(!byCorrelationId.isEmpty()); - assertEquals(1, byCorrelationId.size()); - Workflow es = ess.getExecutionStatus(wfid, true); assertNotNull(es); assertEquals(WorkflowStatus.RUNNING, es.getStatus()); @@ -2334,10 +2445,6 @@ public void testFailures() throws Exception { assertNotNull(es); assertEquals(WorkflowStatus.FAILED, es.getStatus()); - List failureInstances = ess.getWorkflowInstances(FORK_JOIN_WF, wfid, false, false); - assertNotNull(failureInstances); - assertEquals(1, failureInstances.size()); - assertEquals(wfid, failureInstances.get(0).getCorrelationId()); taskDef.setRetryCount(RETRY_COUNT); ms.updateTaskDef(taskDef); @@ -2616,12 +2723,6 @@ public void testTimeout() throws Exception { assertEquals(Status.TIMED_OUT, es.getTasks().get(0).getStatus()); assertEquals(Status.TIMED_OUT, es.getTasks().get(1).getStatus()); assertEquals(WorkflowStatus.TIMED_OUT, es.getStatus()); - - List failureInstances = ess.getWorkflowInstances(FORK_JOIN_WF, wfid, false, false); - assertNotNull(failureInstances); - assertEquals(failureInstances.stream().map(Workflow::getCorrelationId).collect(Collectors.toList()).toString(), 1, failureInstances.size()); - assertEquals(wfid, failureInstances.get(0).getCorrelationId()); - assertTrue(!failureInstances.get(0).getStatus().isTerminal()); assertEquals(1, queue.getSize(WorkflowExecutor.deciderQueue)); @@ -2876,12 +2977,7 @@ public void testPauseResume() throws Exception { } } assertTrue(foundId); - - List byCorrelationId = ess.getWorkflowInstances(LINEAR_WORKFLOW_T1_T2, correlationId, false, false); - assertNotNull(byCorrelationId); - assertTrue(!byCorrelationId.isEmpty()); - assertEquals(byCorrelationId.toString(), 1, byCorrelationId.size()); - + Workflow es = ess.getExecutionStatus(wfid, true); assertNotNull(es); assertEquals(WorkflowStatus.RUNNING, es.getStatus()); diff --git a/ui/package.json b/ui/package.json index 069ffe6086..589ec8a134 100644 --- a/ui/package.json +++ b/ui/package.json @@ -51,6 +51,7 @@ "react-router": "^1.0.3", "react-router-redux": "^4.0.0", "react-select": "^1.0.0-beta10", + "react-toastr": "2.9.3", "redux": "^3.3.1", "redux-thunk": "^1.0.3", "superagent": "^1.7.2",