diff --git a/build.gradle b/build.gradle index f5b6444e2..5800bc5aa 100644 --- a/build.gradle +++ b/build.gradle @@ -98,7 +98,10 @@ allprojects { } // all client and their related modules are published with Java 17 compatibility -["annotations", "common", "client", "client-spring", "grpc", "grpc-client"].each { +// calix +// ["annotations", "common", "client", "client-spring", "grpc", "grpc-client"].each { +["annotations", "common", "client", "client-spring"].each { +// end calix project(":conductor-$it") { compileJava { options.release = 17 @@ -110,7 +113,10 @@ task server { dependsOn ':conductor-server:bootRun' } -configure(allprojects - project(':conductor-grpc')) { +// calix +// configure(allprojects - project(':conductor-grpc')) { +configure(allprojects) { +// end calix apply plugin: 'com.diffplug.spotless' spotless { @@ -123,7 +129,10 @@ configure(allprojects - project(':conductor-grpc')) { } } -['cassandra-persistence', 'core', 'redis-concurrency-limit', 'test-harness', 'client'].each { +// calix +// ['cassandra-persistence', 'core', 'redis-concurrency-limit', 'test-harness', 'client'].each { +['core', 'redis-concurrency-limit', 'test-harness', 'client'].each { +// end calix configure(project(":conductor-$it")) { spotless { groovy { diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index b1bfb4adb..7d60a21a9 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -541,4 +541,45 @@ public Map getAll() { props.forEach((key, value) -> map.put(key.toString(), value)); return map; } + + // calix + private int asyncDaoThreadCount = Runtime.getRuntime().availableProcessors() * 2; + private int asyncListenerThreadCount = Runtime.getRuntime().availableProcessors() * 2; + private int asyncStartThreadCount = Runtime.getRuntime().availableProcessors() * 2; + + @DurationUnit(ChronoUnit.MILLIS) + private Duration workflowUnAckLockTimeout = Duration.ofMillis(500); + + public int getAsyncDaoThreadCount() { + return this.asyncDaoThreadCount; + } + + public void setAsyncDaoThreadCount(int asyncDaoThreadCount) { + this.asyncDaoThreadCount = asyncDaoThreadCount; + } + + public int getAsyncListenerThreadCount() { + return this.asyncListenerThreadCount; + } + + public void setAsyncListenerThreadCount(int asyncListenerThreadCount) { + this.asyncListenerThreadCount = asyncListenerThreadCount; + } + + public int getAsyncStartThreadCount() { + return this.asyncStartThreadCount; + } + + public void setAsyncStartThreadCount(int asyncStartThreadCount) { + this.asyncStartThreadCount = asyncStartThreadCount; + } + + public Duration getWorkflowUnAckLockTimeout() { + return workflowUnAckLockTimeout; + } + + public void setWorkflowUnAckLockTimeout(Duration workflowUnAckLockTimeout) { + this.workflowUnAckLockTimeout = workflowUnAckLockTimeout; + } + // end calix } diff --git a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java index 3180dfc5b..66b753c99 100644 --- a/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java +++ b/core/src/main/java/com/netflix/conductor/core/dal/ExecutionDAOFacade.java @@ -495,6 +495,9 @@ public long getInProgressTaskCount(String taskDefName) { * payload fails. */ public void updateTask(TaskModel taskModel) { + // calix + long s = Monitors.now(); + // end calix if (taskModel.getStatus() != null) { if (!taskModel.getStatus().isTerminal() || (taskModel.getStatus().isTerminal() && taskModel.getUpdateTime() == 0)) { @@ -526,6 +529,10 @@ public void updateTask(TaskModel taskModel) { taskModel.getTaskId(), taskModel.getWorkflowInstanceId()); LOGGER.error(errorMsg, e); throw new TransientException(errorMsg, e); + } finally { + // calix + Monitors.recordTaskUpdateDuration(taskModel.getTaskType(), s); + // end calix } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java index 8751f1582..be10528c5 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -61,6 +61,9 @@ public AsyncSystemTaskExecutor( * @param taskId The id of the {@link TaskModel} object. */ public void execute(WorkflowSystemTask systemTask, String taskId) { + // calix + long s = Monitors.now(); + // end calix TaskModel task = loadTaskQuietly(taskId); if (task == null) { LOGGER.error("TaskId: {} could not be found while executing {}", taskId, systemTask); @@ -88,6 +91,9 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { queueDAO.remove(queueName, task.getTaskId()); return; } + // calix + Monitors.recordWorkflowTaskSys(queueName, "get_tasks", s); + // end calix if (task.getStatus().equals(TaskModel.Status.SCHEDULED)) { if (executionDAOFacade.exceedsInProgressLimit(task)) { @@ -115,9 +121,15 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { // if we are here the Task object is updated and needs to be persisted regardless of an // exception try { + // calix + s = Monitors.now(); + // end calix WorkflowModel workflow = executionDAOFacade.getWorkflowModel( workflowId, systemTask.isTaskRetrievalRequired()); + // calix + Monitors.recordWorkflowTaskSys(queueName, "get_wf", s); + // end calix if (workflow.getStatus().isTerminal()) { LOGGER.info( @@ -146,12 +158,21 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { task.incrementPollCount(); } + // calix + s = Monitors.now(); + // end calix if (task.getStatus() == TaskModel.Status.SCHEDULED) { task.setStartTime(System.currentTimeMillis()); Monitors.recordQueueWaitTime(task.getTaskType(), task.getQueueWaitTime()); systemTask.start(workflow, task, workflowExecutor); + // calix + Monitors.recordWorkflowTaskSys(queueName, "task_start", s); + // end calix } else if (task.getStatus() == TaskModel.Status.IN_PROGRESS) { systemTask.execute(workflow, task, workflowExecutor); + // calix + Monitors.recordWorkflowTaskSys(queueName, "task_exec", s); + // end calix } // Update message in Task queue based on Task status @@ -187,14 +208,25 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { Monitors.error(AsyncSystemTaskExecutor.class.getSimpleName(), "executeSystemTask"); LOGGER.error("Error executing system task - {}, with id: {}", systemTask, taskId, e); } finally { + // calix + s = Monitors.now(); executionDAOFacade.updateTask(task); + s = Monitors.recordWorkflowTaskSys(queueName, "update_task", s); + // end calix if (shouldRemoveTaskFromQueue) { queueDAO.remove(queueName, task.getTaskId()); LOGGER.debug("{} removed from queue: {}", task, queueName); + // calix + s = Monitors.recordWorkflowTaskSys(queueName, "queue_rem", s); + // end calix } // if the current task execution has completed, then the workflow needs to be evaluated if (hasTaskExecutionCompleted) { - workflowExecutor.decide(workflowId); + // calix + // workflowExecutor.decide(workflowId); + workflowExecutor.locked(workflowExecutor.decide(workflowId), workflowId); + Monitors.recordWorkflowTaskSys(queueName, "decide", s); + // end calix } } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index a25f06dd1..704365b33 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -13,13 +13,18 @@ package com.netflix.conductor.core.execution; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.StopWatch; +import org.apache.logging.log4j.util.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -42,6 +47,7 @@ import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; import com.netflix.conductor.core.execution.tasks.Terminate; import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask; +import com.netflix.conductor.core.external.WorkflowExternalBeans; import com.netflix.conductor.core.listener.TaskStatusListener; import com.netflix.conductor.core.listener.WorkflowStatusListener; import com.netflix.conductor.core.metadata.MetadataMapperService; @@ -93,6 +99,15 @@ public class WorkflowExecutor { pollData.getLastPollTime() > System.currentTimeMillis() - activeWorkerLastPollMs; + // calix + public static final WorkflowModel LOCKED = new WorkflowModel(); + + @Autowired + @Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_LISTENER) + private Executor asyncListener; + + // end calix + public WorkflowExecutor( DeciderService deciderService, MetadataDAO metadataDAO, @@ -506,6 +521,9 @@ private void endExecution(WorkflowModel workflow, TaskModel terminateTask) { */ @VisibleForTesting WorkflowModel completeWorkflow(WorkflowModel workflow) { + // calix + long s = Monitors.now(); + // end calix LOGGER.debug("Completing workflow execution for {}", workflow.getWorkflowId()); if (workflow.getStatus().equals(WorkflowModel.Status.COMPLETED)) { @@ -513,6 +531,9 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) { executionDAOFacade.removeFromPendingWorkflow( workflow.getWorkflowName(), workflow.getWorkflowId()); LOGGER.debug("Workflow: {} has already been completed.", workflow.getWorkflowId()); + // calix + Monitors.recordWorkflowComplete(workflow.getWorkflowName(), "completed_sweep", s); + // end calix return workflow; } @@ -550,7 +571,12 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) { executionDAOFacade.updateWorkflow(workflow); LOGGER.debug("Completed workflow execution for {}", workflow.getWorkflowId()); - workflowStatusListener.onWorkflowCompletedIfEnabled(workflow); + // calix + // workflowStatusListener.onWorkflowCompletedIfEnabled(workflow); + CompletableFuture.runAsync( + () -> workflowStatusListener.onWorkflowCompletedIfEnabled(workflow), asyncListener); + s = Monitors.recordWorkflowComplete(workflow.getWorkflowName(), "completed", s); + // end calix Monitors.recordWorkflowCompletion( workflow.getWorkflowName(), workflow.getEndTime() - workflow.getCreateTime(), @@ -564,10 +590,20 @@ WorkflowModel completeWorkflow(WorkflowModel workflow) { workflow.getParentWorkflowId(), workflow.getParentWorkflowTaskId()); expediteLazyWorkflowEvaluation(workflow.getParentWorkflowId()); + // calix + s = Monitors.recordWorkflowComplete(workflow.getWorkflowName(), "completed_parent", s); + // end calix } + // calix + executionLockService.deleteLock( + WorkflowExternalBeans.lockedQueue(workflow.getWorkflowId())); + // end calix executionLockService.releaseLock(workflow.getWorkflowId()); executionLockService.deleteLock(workflow.getWorkflowId()); + // calix + Monitors.recordWorkflowComplete(workflow.getWorkflowName(), "completed_locks", s); + // end calix return workflow; } @@ -883,7 +919,9 @@ public void updateTask(TaskResult taskResult) { } if (!isLazyEvaluateWorkflow(workflowInstance.getWorkflowDefinition(), task)) { - decide(workflowId); + // calix + this.locked(decide(workflowId), workflowId); + // end calix } } @@ -1013,7 +1051,10 @@ public List getRunningWorkflowIds(String workflowName, int version) { @EventListener(WorkflowEvaluationEvent.class) public void handleWorkflowEvaluationEvent(WorkflowEvaluationEvent wee) { - decide(wee.getWorkflowModel()); + // calix + WorkflowModel w = wee.getWorkflowModel(); + locked(decideWithLock(w), w.getWorkflowId()); + // end calix } /** Records a metric for the "decide" process. */ @@ -1021,21 +1062,38 @@ public WorkflowModel decide(String workflowId) { StopWatch watch = new StopWatch(); watch.start(); if (!executionLockService.acquireLock(workflowId)) { - return null; - } + // calix + // return null; + LOGGER.debug("locked failed {}", workflowId); + return LOCKED; + // end calix + } + // calix + long s = Monitors.now(); + WorkflowModel workflow = null; + // end calix try { - WorkflowModel workflow = executionDAOFacade.getWorkflowModel(workflowId, true); + workflow = executionDAOFacade.getWorkflowModel(workflowId, true); if (workflow == null) { // This can happen if the workflowId is incorrect return null; } - return decide(workflow); + // calix + s = Monitors.recordWorkflowDecision(workflow.getWorkflowName(), "decision_wf_get", s); + WorkflowModel w = decide(workflow); + Monitors.recordWorkflowDecision(workflow.getWorkflowName(), "decision_wf_decide", s); + LOGGER.debug("Decided {} ok", workflowId); + return w; + // end calix } finally { executionLockService.releaseLock(workflowId); watch.stop(); - Monitors.recordWorkflowDecisionTime(watch.getTime()); + // calix + Monitors.recordWorkflowDecisionTime( + null == workflow ? "NOT_FOUND" : workflow.getWorkflowName(), watch.getTime()); + // end calix } } @@ -1053,7 +1111,9 @@ public WorkflowModel decideWithLock(WorkflowModel workflow) { StopWatch watch = new StopWatch(); watch.start(); if (!executionLockService.acquireLock(workflow.getWorkflowId())) { - return null; + // calix + return LOCKED; + // end calix } try { return decide(workflow); @@ -1061,7 +1121,9 @@ public WorkflowModel decideWithLock(WorkflowModel workflow) { } finally { executionLockService.releaseLock(workflow.getWorkflowId()); watch.stop(); - Monitors.recordWorkflowDecisionTime(watch.getTime()); + // calix + Monitors.recordWorkflowDecisionTime(workflow.getWorkflowName(), watch.getTime()); + // end calix } } @@ -1083,8 +1145,17 @@ public WorkflowModel decide(WorkflowModel workflow) { // and change the workflow/task state accordingly adjustStateIfSubWorkflowChanged(workflow); + // calix + long s = Monitors.now(); + // end calix + try { DeciderService.DeciderOutcome outcome = deciderService.decide(workflow); + // calix + s = + Monitors.recordWorkflowDecision( + workflow.getWorkflowName(), "decision_svc_decide", s); + // end calix if (outcome.isComplete) { endExecution(workflow, outcome.terminateTask); return workflow; @@ -1097,6 +1168,11 @@ public WorkflowModel decide(WorkflowModel workflow) { tasksToBeScheduled = dedupAndAddTasks(workflow, tasksToBeScheduled); boolean stateChanged = scheduleTask(workflow, tasksToBeScheduled); // start + // calix + s = + Monitors.recordWorkflowDecision( + workflow.getWorkflowName(), "decision_task_scheduled", s); + // end calix for (TaskModel task : outcome.tasksToBeScheduled) { executionDAOFacade.populateTaskData(task); @@ -1112,17 +1188,37 @@ public WorkflowModel decide(WorkflowModel workflow) { } } + // calix + s = + Monitors.recordWorkflowDecision( + workflow.getWorkflowName(), "decision_sys_task_executed", s); + // end calix + if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) { executionDAOFacade.updateTasks(tasksToBeUpdated); } + // calix + s = + Monitors.recordWorkflowDecision( + workflow.getWorkflowName(), "decision_tasks_update", s); + // end calix + if (stateChanged) { - return decide(workflow); + // calix + WorkflowModel w = decide(workflow); + Monitors.recordWorkflowDecision( + workflow.getWorkflowName(), "decision_changed_decide", s); + return w; + // end calix } if (!outcome.tasksToBeUpdated.isEmpty() || !tasksToBeScheduled.isEmpty()) { executionDAOFacade.updateWorkflow(workflow); } + // calix + Monitors.recordWorkflowDecision(workflow.getWorkflowName(), "decision_wf_update", s); + // end calix return workflow; @@ -1799,8 +1895,28 @@ public TaskDef getTaskDefinition(TaskModel task) { @VisibleForTesting void updateParentWorkflowTask(WorkflowModel subWorkflow) { + // calix + TaskModel subWorkflowTask; + /* TaskModel subWorkflowTask = executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId()); + */ + do { + subWorkflowTask = + executionDAOFacade.getTaskModel(subWorkflow.getParentWorkflowTaskId()); + if (Strings.isNotBlank(subWorkflowTask.getSubWorkflowId())) break; + try { + LOGGER.debug( + "Waiting for parent {} task {} updated of {}", + subWorkflow.getParentWorkflowId(), + subWorkflow.getParentWorkflowTaskId(), + subWorkflow.getWorkflowId()); + Thread.sleep(200); + } catch (Exception e) { + LOGGER.error("Interrupted", e); + } + } while (true); + // end calix executeSubworkflowTaskAndSyncData(subWorkflow, subWorkflowTask); executionDAOFacade.updateTask(subWorkflowTask); } @@ -1818,11 +1934,25 @@ private void executeSubworkflowTaskAndSyncData( * @param workflowId The workflow to be evaluated at higher priority */ private void expediteLazyWorkflowEvaluation(String workflowId) { + // calix + /* if (queueDAO.containsMessage(DECIDER_QUEUE, workflowId)) { queueDAO.postpone(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0); } else { queueDAO.push(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0); + }*/ + + String queue = WorkflowExternalBeans.lockedQueue(workflowId); + long wait = properties.getWorkflowUnAckLockTimeout().toMillis(); + long lease = 600000; + boolean locked = executionLockService.acquireLock(queue, wait, lease); + if (!locked) { + LOGGER.debug("Locked {} ms by other threads", workflowId); + locked = executionLockService.acquireLock(queue, 5, lease); + if (!locked) return; } + queueDAO.postpone(DECIDER_QUEUE, workflowId, EXPEDITED_PRIORITY, 0); + // end calix LOGGER.info("Pushed workflow {} to {} for expedited evaluation", workflowId, DECIDER_QUEUE); } @@ -1836,4 +1966,12 @@ private static boolean isJoinOnFailedPermissive(List joinOn, WorkflowMod && !t.getWorkflowTask().isOptional() && t.getStatus().equals(FAILED)); } + + // calix + public boolean locked(WorkflowModel w, String workflowId) { + if (!LOCKED.equals(w)) return false; + expediteLazyWorkflowEvaluation(workflowId); + return true; + } + // end calix } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java index ab7d9c291..3dc8eeb5b 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/mapper/SubWorkflowTaskMapper.java @@ -19,6 +19,7 @@ import org.springframework.stereotype.Component; import com.netflix.conductor.annotations.VisibleForTesting; +import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.tasks.TaskType; import com.netflix.conductor.common.metadata.workflow.SubWorkflowParams; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; @@ -83,6 +84,22 @@ public List getMappedTasks(TaskMapperContext taskMapperContext) { subWorkflowTask.addInput("workflowInput", taskMapperContext.getTaskInput()); subWorkflowTask.setStatus(TaskModel.Status.SCHEDULED); subWorkflowTask.setCallbackAfterSeconds(workflowTask.getStartDelay()); + + // calix + if (Objects.nonNull(workflowTask.getName())) { + TaskDef taskDefinition = + Optional.ofNullable(taskMapperContext.getTaskDefinition()) + .orElseGet(() -> metadataDAO.getTaskDef(workflowTask.getName())); + if (Objects.nonNull(taskDefinition)) { + subWorkflowTask.setIsolationGroupId(taskDefinition.getIsolationGroupId()); + LOGGER.debug( + "[Isolated] {} => {}", + subWorkflowTask.getIsolationGroupId(), + workflowTask.getName()); + } + } + // end calix + LOGGER.debug("SubWorkflowTask {} created to be Scheduled", subWorkflowTask); return List.of(subWorkflowTask); } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/IsolatedTaskQueueProducer.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/IsolatedTaskQueueProducer.java index 9ac9b4bab..fea586774 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/IsolatedTaskQueueProducer.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/IsolatedTaskQueueProducer.java @@ -111,7 +111,13 @@ void addTaskQueues() { isolatedTaskDef.getIsolationGroupId(), isolatedTaskDef.getExecutionNameSpace()); LOGGER.debug("Adding taskQueue:'{}' to system task worker coordinator", taskQueue); - if (!listeningQueues.contains(taskQueue)) { + // calix + if (!listeningQueues.contains(taskQueue) + && (StringUtils.isEmpty(isolatedTaskDef.getDescription()) + || systemTask + .getTaskType() + .equals(isolatedTaskDef.getDescription()))) { + // end calix systemTaskWorker.startPolling(systemTask, taskQueue); listeningQueues.add(taskQueue); } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java index 6e29dd64d..4dffaa799 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SubWorkflow.java @@ -85,7 +85,10 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf startWorkflowInput.setParentWorkflowTaskId(task.getTaskId()); startWorkflowInput.setTaskToDomain(taskToDomain); - String subWorkflowId = startWorkflowOperation.execute(startWorkflowInput); + // calix + WorkflowModel subWorkflow = startWorkflowOperation.start(startWorkflowInput); + String subWorkflowId = subWorkflow.getWorkflowId(); + // end calix task.setSubWorkflowId(subWorkflowId); // For backwards compatibility @@ -93,7 +96,9 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf // Set task status based on current sub-workflow status, as the status can change in // recursion by the time we update here. - WorkflowModel subWorkflow = workflowExecutor.getWorkflow(subWorkflowId, false); + // calix + // WorkflowModel subWorkflow = workflowExecutor.getWorkflow(subWorkflowId, false); + // end calix updateTaskStatus(subWorkflow, task); } catch (TransientException te) { LOGGER.info( @@ -118,6 +123,12 @@ public boolean execute( WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { String workflowId = task.getSubWorkflowId(); if (StringUtils.isEmpty(workflowId)) { + LOGGER.error( + "update parent {} sub {} task {} status {} null id", + workflow.getParentWorkflowId(), + workflow.getWorkflowId(), + task.getTaskId(), + workflow.getStatus()); return false; } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java index d13334708..5796debb9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorker.java @@ -86,6 +86,9 @@ public void startPolling(WorkflowSystemTask systemTask, String queueName) { } void pollAndExecute(WorkflowSystemTask systemTask, String queueName) { + // calix + long start = Monitors.now(); + // end calix if (!isRunning()) { LOGGER.debug( "{} stopped. Not polling for task: {}", getClass().getSimpleName(), systemTask); @@ -108,7 +111,9 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) { LOGGER.debug("Polling queue: {} with {} slots acquired", queueName, messagesToAcquire); + long s = Monitors.now(); List polledTaskIds = queueDAO.pop(queueName, messagesToAcquire, 200); + Monitors.recordWorkflowTaskSys(queueName, "sys_pop", s); Monitors.recordTaskPoll(queueName); LOGGER.debug("Polling queue:{}, got {} tasks", queueName, polledTaskIds.size()); @@ -128,12 +133,22 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) { queueName); Monitors.recordTaskPollCount(queueName, 1); - executionService.ackTaskReceived(taskId); + // calix + // executionService.ackTaskReceived(taskId); CompletableFuture taskCompletableFuture = CompletableFuture.runAsync( - () -> asyncSystemTaskExecutor.execute(systemTask, taskId), + () -> { + long se = Monitors.now(); + executionService.ackTaskReceived(queueName, taskId); + se = + Monitors.recordWorkflowTaskSys( + queueName, "ack", se); + asyncSystemTaskExecutor.execute(systemTask, taskId); + Monitors.recordWorkflowTaskSys(queueName, "exec", se); + }, executorService); + // end calix // release permit after processing is complete taskCompletableFuture.whenComplete( @@ -152,6 +167,10 @@ void pollAndExecute(WorkflowSystemTask systemTask, String queueName) { semaphoreUtil.completeProcessing(messagesToAcquire); Monitors.recordTaskPollError(taskName, e.getClass().getSimpleName()); LOGGER.error("Error polling system task in queue:{}", queueName, e); + } finally { + // calix + Monitors.recordWorkflowTaskSys(queueName, "sync_exec", start); + // end calix } } diff --git a/core/src/main/java/com/netflix/conductor/core/external/WorkflowExternalBeans.java b/core/src/main/java/com/netflix/conductor/core/external/WorkflowExternalBeans.java new file mode 100644 index 000000000..d00a0a77f --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/external/WorkflowExternalBeans.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.core.external; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import com.netflix.conductor.core.config.ConductorProperties; + +@Component +public class WorkflowExternalBeans { + + public static final String EXECUTOR_ASYNC_DAO = "EXEC_ASYNC_DAO"; + public static final String EXECUTOR_ASYNC_LISTENER = "EXEC_ASYNC_LISTENER"; + public static final String EXECUTOR_ASYNC_START = "EXEC_ASYNC_START"; + private static final String LOCKED_QUEUE = "_lockedQueue."; + private static final String LOCKED_CONFLICT_QUEUE = "_lockedQueue.conflict"; + + private ExecutorService newFixedExecutor(String pattern, int threads) { + return Executors.newFixedThreadPool( + threads, + new BasicThreadFactory.Builder() + .namingPattern(pattern + "-%d") + .daemon(true) + .build()); + } + + @Bean + @Qualifier(EXECUTOR_ASYNC_DAO) + public ExecutorService asyncDaoExecutor(ConductorProperties properties) { + return newFixedExecutor("exec-async-dao", properties.getAsyncDaoThreadCount()); + } + + @Bean + @Qualifier(EXECUTOR_ASYNC_LISTENER) + public ExecutorService asyncListenerExecutor(ConductorProperties properties) { + return newFixedExecutor("exec-async-listener", properties.getAsyncListenerThreadCount()); + } + + @Bean + @Qualifier(EXECUTOR_ASYNC_START) + public ExecutorService asyncStartExecutor(ConductorProperties properties) { + return newFixedExecutor("exec-async-start", properties.getAsyncStartThreadCount()); + } + + public static String lockedQueue(String workflowId) { + return LOCKED_QUEUE + workflowId; + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java b/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java index 3ac92eb53..8403620c4 100644 --- a/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java +++ b/core/src/main/java/com/netflix/conductor/core/operation/StartWorkflowOperation.java @@ -14,10 +14,14 @@ import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -29,6 +33,7 @@ import com.netflix.conductor.core.event.WorkflowEvaluationEvent; import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.core.execution.StartWorkflowInput; +import com.netflix.conductor.core.external.WorkflowExternalBeans; import com.netflix.conductor.core.metadata.MetadataMapperService; import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.core.utils.ParametersUtils; @@ -48,6 +53,13 @@ public class StartWorkflowOperation implements WorkflowOperation workflowInput = input.getWorkflowInput(); String externalInputPayloadStoragePath = input.getExternalInputPayloadStoragePath(); @@ -126,7 +160,10 @@ private String startWorkflow(StartWorkflowInput input) { workflow.getWorkflowName(), String.valueOf(workflow.getWorkflowVersion()), workflow.getOwnerApp()); - return workflowId; + // calix + Monitors.recordWorkflowStartDuration(workflowDefinition.getName(), "create_eval", s); + return workflow; + // end calix } catch (Exception e) { Monitors.recordWorkflowStartError( workflowDefinition.getName(), WorkflowContext.get().getClientApp()); @@ -151,17 +188,31 @@ private void createAndEvaluate(WorkflowModel workflow) { if (!executionLockService.acquireLock(workflow.getWorkflowId())) { throw new TransientException("Error acquiring lock when creating workflow: {}"); } + // calix + long s = Monitors.now(); + // end calix try { executionDAOFacade.createWorkflow(workflow); + // calix + Monitors.recordWorkflowStartDuration(workflow.getWorkflowName(), "create_create", s); + // end calix LOGGER.debug( "A new instance of workflow: {} created with id: {}", workflow.getWorkflowName(), workflow.getWorkflowId()); executionDAOFacade.populateWorkflowAndTaskPayloadData(workflow); - eventPublisher.publishEvent(new WorkflowEvaluationEvent(workflow)); + // calix + // eventPublisher.publishEvent(new WorkflowEvaluationEvent(workflow)); + // end calix } finally { executionLockService.releaseLock(workflow.getWorkflowId()); } + // calix + CompletableFuture.runAsync( + () -> eventPublisher.publishEvent(new WorkflowEvaluationEvent(workflow)), + asyncStart); + // end calix + } /** diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index b12529a62..15c916f44 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -12,13 +12,13 @@ */ package com.netflix.conductor.core.reconciliation; -import java.time.Instant; import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @@ -30,11 +30,13 @@ import com.netflix.conductor.core.dal.ExecutionDAOFacade; import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.core.external.WorkflowExternalBeans; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.TaskModel.Status; import com.netflix.conductor.model.WorkflowModel; +import com.netflix.conductor.service.ExecutionLockService; import static com.netflix.conductor.core.config.SchedulerConfiguration.SWEEPER_EXECUTOR_NAME; import static com.netflix.conductor.core.utils.Utils.DECIDER_QUEUE; @@ -52,6 +54,11 @@ public class WorkflowSweeper { private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName(); + // calix + @Autowired private ExecutionLockService lockService; + + // end calix + public WorkflowSweeper( WorkflowExecutor workflowExecutor, Optional workflowRepairService, @@ -75,18 +82,30 @@ public CompletableFuture sweepAsync(String workflowId) { public void sweep(String workflowId) { WorkflowModel workflow = null; try { + // calix + lockService.deleteLock(WorkflowExternalBeans.lockedQueue(workflowId)); + // end calix WorkflowContext workflowContext = new WorkflowContext(properties.getAppId()); WorkflowContext.set(workflowContext); LOGGER.debug("Running sweeper for workflow {}", workflowId); - workflow = executionDAOFacade.getWorkflowModel(workflowId, true); + // calix + // workflow = executionDAOFacade.getWorkflowModel(workflowId, true); + // end calix if (workflowRepairService != null) { + // calix + workflow = executionDAOFacade.getWorkflowModel(workflowId, true); + // end calix // Verify and repair tasks in the workflow. workflowRepairService.verifyAndRepairWorkflowTasks(workflow); } - workflow = workflowExecutor.decideWithLock(workflow); + // calix + // workflow = workflowExecutor.decideWithLock(workflow); + workflow = workflowExecutor.decide(workflowId); + if (workflowExecutor.locked(workflow, workflowId)) return; + // end calix if (workflow != null && workflow.getStatus().isTerminal()) { queueDAO.remove(DECIDER_QUEUE, workflowId); return; @@ -104,15 +123,21 @@ public void sweep(String workflowId) { long workflowOffsetTimeout = workflowOffsetWithJitter(properties.getWorkflowOffsetTimeout().getSeconds()); if (workflow != null) { - long startTime = Instant.now().toEpochMilli(); + // calix + // long startTime = Instant.now().toEpochMilli(); unack(workflow, workflowOffsetTimeout); - long endTime = Instant.now().toEpochMilli(); - Monitors.recordUnackTime(workflow.getWorkflowName(), endTime - startTime); + // long endTime = Instant.now().toEpochMilli(); + // Monitors.recordUnackTime(workflow.getWorkflowName(),endTime - startTime); + // end calix } else { LOGGER.warn( "Workflow with {} id can not be found. Attempting to unack using the id", workflowId); - queueDAO.setUnackTimeout(DECIDER_QUEUE, workflowId, workflowOffsetTimeout * 1000); + // calix + // queueDAO.setUnackTimeout(DECIDER_QUEUE,workflowId,workflowOffsetTimeout * 1000); + lock(workflowId, "NO_FOUND", workflowOffsetTimeout); + // end calix + } } @@ -161,8 +186,16 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { break; } } + // calix + /* queueDAO.setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), postponeDurationSeconds * 1000); + */ + lock( + workflowModel.getWorkflowId(), + workflowModel.getWorkflowName(), + postponeDurationSeconds); + // end calix } /** @@ -178,4 +211,31 @@ long workflowOffsetWithJitter(long workflowOffsetTimeout) { long jitter = new Random().nextInt((int) (2 * range + 1)) - range; return workflowOffsetTimeout + jitter; } + + // calix + private void lock(String workflowId, String name, long timeout) { + String lockId = WorkflowExternalBeans.lockedQueue(workflowId); + long s = Monitors.now(); + boolean locked = lockService.acquireLock(lockId, 5, 60000); + if (!locked) { + LOGGER.debug("Locked {} by other threads", workflowId); + return; + } + try { + queueDAO.setUnackTimeout(DECIDER_QUEUE, workflowId, timeout * 1000); + LOGGER.debug("Postpone {} s for {}", timeout, workflowId); + } catch (Exception e) { + LOGGER.error("Un ack failed of {} with {}", workflowId, timeout, e); + } finally { + lockService.releaseLock(lockId); + long t = Monitors.now() - s; + Monitors.recordUnackTime(name, t); + long te = properties.getWorkflowUnAckLockTimeout().toMillis() - 5; + if (t >= te) { + LOGGER.error("Un ack {} over {} ms", workflowId, t); + queueDAO.push(DECIDER_QUEUE, workflowId, 1); + } + } + } + // end calix } diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index cbe529ae4..4684062bc 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -12,6 +12,7 @@ */ package com.netflix.conductor.metrics; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -191,10 +192,14 @@ public static void recordTaskExecutionTime( .record(duration, TimeUnit.MILLISECONDS); } - public static void recordWorkflowDecisionTime(long duration) { - getTimer(classQualifier, "workflow_decision").record(duration, TimeUnit.MILLISECONDS); + // calix + public static void recordWorkflowDecisionTime(String workflowType, long duration) { + getTimer(classQualifier, "workflow_decision", "workflowType", workflowType) + .record(duration, TimeUnit.MILLISECONDS); } + // end calix + public static void recordTaskPollError(String taskType, String exception) { recordTaskPollError(taskType, NO_DOMAIN, exception); } @@ -589,4 +594,72 @@ public static void recordQueueMessageRepushFromRepairService(String queueName) { public static void recordTaskExecLogSize(int val) { gauge(classQualifier, "task_exec_log_size", val); } + + // calix + public static long now() { + return Instant.now().toEpochMilli(); + } + + public static void recordTaskPollDuration(String taskType, long start) { + getTimer(classQualifier, "task_poll_duration", "taskType", taskType) + .record(Instant.now().toEpochMilli() - start, TimeUnit.MILLISECONDS); + } + + public static long recordWorkflowComplete(String workflowType, String action, long start) { + long s = Instant.now().toEpochMilli(); + getTimer( + classQualifier, + "workflow_complete_duration", + "workflowType", + workflowType, + "action", + action) + .record(s - start, TimeUnit.MILLISECONDS); + return s; + } + + public static long recordWorkflowDecision(String workflowType, String action, long start) { + long s = Instant.now().toEpochMilli(); + getTimer( + classQualifier, + "workflow_decision_duration", + "workflowType", + workflowType, + "action", + action) + .record(s - start, TimeUnit.MILLISECONDS); + return s; + } + + public static long recordWorkflowTaskSys(String taskType, String action, long start) { + long s = Instant.now().toEpochMilli(); + getTimer( + classQualifier, + "workflow_sys_task_duration", + "taskType", + taskType, + "action", + "async_sys_task_" + action) + .record(s - start, TimeUnit.MILLISECONDS); + return s; + } + + public static void recordTaskUpdateDuration(String taskType, long start) { + getTimer(classQualifier, "workflow_task_update_duration", "taskType", taskType) + .record(Instant.now().toEpochMilli() - start, TimeUnit.MILLISECONDS); + } + + public static long recordWorkflowStartDuration(String workflowType, String action, long start) { + long s = Instant.now().toEpochMilli(); + getTimer( + classQualifier, + "workflow_start_duration", + "workflowType", + workflowType, + "action", + "wf_start_" + action) + .record(s - start, TimeUnit.MILLISECONDS); + return s; + } + // end calix } diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 2c985dec0..27fae14f7 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -13,12 +13,16 @@ package com.netflix.conductor.service; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import com.netflix.conductor.annotations.Trace; @@ -35,6 +39,7 @@ import com.netflix.conductor.core.exception.NotFoundException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; +import com.netflix.conductor.core.external.WorkflowExternalBeans; import com.netflix.conductor.core.listener.TaskStatusListener; import com.netflix.conductor.core.utils.QueueUtils; import com.netflix.conductor.core.utils.Utils; @@ -61,6 +66,13 @@ public class ExecutionService { private static final int POLL_COUNT_ONE = 1; private static final int POLLING_TIMEOUT_IN_MS = 100; + // calix + @Autowired + @Qualifier(WorkflowExternalBeans.EXECUTOR_ASYNC_DAO) + private Executor daoExecutor; + + // end calix + public ExecutionService( WorkflowExecutor workflowExecutor, ExecutionDAOFacade executionDAOFacade, @@ -106,7 +118,10 @@ public List poll( String queueName = QueueUtils.getQueueName(taskType, domain, null, null); List taskIds = new LinkedList<>(); - List tasks = new LinkedList<>(); + // calix + // List tasks = new LinkedList<>(); + List tasks = Collections.synchronizedList(new LinkedList<>()); + // end calix try { taskIds = queueDAO.pop(queueName, count, timeoutInMilliSecond); } catch (Exception e) { @@ -121,6 +136,123 @@ public List poll( Monitors.recordTaskPollError(taskType, domain, e.getClass().getSimpleName()); } + // calix + taskIds.stream() + .map( + taskId -> + CompletableFuture.runAsync( + () -> { + // calix + long s = Monitors.now(); + TaskModel taskModel = null; + try { + taskModel = executionDAOFacade.getTaskModel(taskId); + // end calix + if (taskModel == null + || taskModel.getStatus().isTerminal()) { + // Remove taskId(s) without a valid + // Task/terminal state task from the queue + queueDAO.remove(queueName, taskId); + LOGGER.debug( + "Removed task: {} from the queue: {}", + taskId, + queueName); + return; + } + + if (executionDAOFacade.exceedsInProgressLimit( + taskModel)) { + // Postpone this message, so that it would be + // available for poll again. + queueDAO.postpone( + queueName, + taskId, + taskModel.getWorkflowPriority(), + queueTaskMessagePostponeSecs); + LOGGER.debug( + "Postponed task: {} in queue: {} by {} seconds", + taskId, + queueName, + queueTaskMessagePostponeSecs); + return; + } + TaskDef taskDef = + taskModel.getTaskDefinition().isPresent() + ? taskModel + .getTaskDefinition() + .get() + : null; + if (taskModel.getRateLimitPerFrequency() > 0 + && executionDAOFacade + .exceedsRateLimitPerFrequency( + taskModel, taskDef)) { + // Postpone this message, so that it would be + // available for poll again. + queueDAO.postpone( + queueName, + taskId, + taskModel.getWorkflowPriority(), + queueTaskMessagePostponeSecs); + LOGGER.debug( + "RateLimit Execution limited for {}:{}, limit:{}", + taskId, + taskModel.getTaskDefName(), + taskModel.getRateLimitPerFrequency()); + return; + } + + taskModel.setStatus(TaskModel.Status.IN_PROGRESS); + if (taskModel.getStartTime() == 0) { + taskModel.setStartTime( + System.currentTimeMillis()); + Monitors.recordQueueWaitTime( + taskModel.getTaskDefName(), + taskModel.getQueueWaitTime()); + } + taskModel.setCallbackAfterSeconds( + 0); // reset callbackAfterSeconds when + // giving the task to the worker + taskModel.setWorkerId(workerId); + taskModel.incrementPollCount(); + executionDAOFacade.updateTask(taskModel); + tasks.add(taskModel.toTask()); + } catch (Exception e) { + // db operation failed for dequeued message, + // re-enqueue with a delay + LOGGER.error( + "DB operation failed for task: {}, postponing task in queue", + taskId, + e); + Monitors.recordTaskPollError( + taskType, + domain, + e.getClass().getSimpleName()); + queueDAO.postpone( + queueName, + taskId, + 0, + queueTaskMessagePostponeSecs); + } finally { + // calix + queueDAO.ack(queueName, taskId); + if (null != taskModel) + taskStatusListener.onTaskInProgress(taskModel); + Monitors.recordTaskPollDuration(queueName, s); + // end calix + } + }, + daoExecutor)) + .toList() + .forEach( + f -> { + try { + f.get(); + } catch (Exception e) { + LOGGER.error("Polling failed", e); + } + }); + + /* for (String taskId : taskIds) { try { TaskModel taskModel = executionDAOFacade.getTaskModel(taskId); @@ -190,9 +322,13 @@ public List poll( .filter(Objects::nonNull) .filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus())) .forEach(taskStatusListener::onTaskInProgress); + */ + // end calix executionDAOFacade.updateTaskLastPoll(taskType, domain, workerId); Monitors.recordTaskPoll(queueName); - tasks.forEach(this::ackTaskReceived); + // calix + // tasks.forEach(this::ackTaskReceived); + // end calix return tasks; } @@ -630,4 +766,10 @@ public ExternalStorageLocation getExternalStorageLocation( throw new IllegalArgumentException(errorMsg); } } + + // calix + public boolean ackTaskReceived(String queueName, String taskId) { + return queueDAO.ack(queueName, taskId); + } + // end calix } diff --git a/docker/server/bin/startup.sh b/docker/server/bin/startup.sh deleted file mode 100755 index 2dffebd3e..000000000 --- a/docker/server/bin/startup.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/sh -# -# Copyright 2023 Conductor authors -#

-# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -#

-# http://www.apache.org/licenses/LICENSE-2.0 -#

-# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on -# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the -# specific language governing permissions and limitations under the License. -# - -# startup.sh - startup script for the server docker image - -echo "Starting Conductor server" - -echo "Running Nginx in background" -# Start nginx as daemon -nginx - -# Start the server -cd /app/libs -echo "Property file: $CONFIG_PROP" -echo $CONFIG_PROP -export config_file= - -if [ -z "$CONFIG_PROP" ]; - then - echo "Using default configuration file"; - export config_file=/app/config/config.properties - else - echo "Using '$CONFIG_PROP'"; - export config_file=/app/config/$CONFIG_PROP -fi - -echo "Using java options config: $JAVA_OPTS" - -java ${JAVA_OPTS} -jar -DCONDUCTOR_CONFIG_FILE=$config_file conductor-server.jar 2>&1 | tee -a /app/logs/server.log diff --git a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java index 7de1b0419..e304b432c 100644 --- a/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java +++ b/es7-persistence/src/main/java/com/netflix/conductor/es7/dao/index/ElasticSearchRestDAOV7.java @@ -1234,8 +1234,10 @@ private void indexObject(final String index, final String docType, final Object indexObject(index, docType, null, doc); } - private void indexObject( + // calix + private synchronized void indexObject( final String index, final String docType, final String docId, final Object doc) { + // end calix byte[] docBytes; try { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index db8c3baaf..2584cae78 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists distributionSha256Sum=9d926787066a081739e8200858338b4a69e837c3a821a33aca9db09dd4a41026 -distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip +distributionUrl=https\://mirrors.cloud.tencent.com/gradle/gradle-8.5-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/postgres-persistence/build.gradle b/postgres-persistence/build.gradle index a9f740b64..fd90bd441 100644 --- a/postgres-persistence/build.gradle +++ b/postgres-persistence/build.gradle @@ -21,7 +21,9 @@ dependencies { testImplementation project(':conductor-server') testImplementation project(':conductor-client') - testImplementation project(':conductor-grpc-client') + // calix + // testImplementation project(':conductor-grpc-client') + // end calix testImplementation project(':conductor-es7-persistence') testImplementation "org.testcontainers:postgresql:${revTestContainer}" diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresMetadataDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresMetadataDAO.java index 81decfe83..03c075189 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresMetadataDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresMetadataDAO.java @@ -45,6 +45,11 @@ public class PostgresMetadataDAO extends PostgresBaseDAO implements MetadataDAO, private final ScheduledExecutorService scheduledExecutorService; + // calix + private static final TaskDef DEFAULT = new TaskDef(); + + // end calix + public PostgresMetadataDAO( RetryTemplate retryTemplate, ObjectMapper objectMapper, @@ -56,8 +61,10 @@ public PostgresMetadataDAO( this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( ExecutorsUtil.newNamedThreadFactory("postgres-metadata-")); + // calix this.scheduledExecutorService.scheduleWithFixedDelay( - this::refreshTaskDefs, cacheRefreshTime, cacheRefreshTime, TimeUnit.SECONDS); + this::refreshTaskDefs, 10, cacheRefreshTime, TimeUnit.SECONDS); + // end calix } @PreDestroy @@ -102,9 +109,13 @@ public TaskDef getTaskDef(String name) { logger.trace("Cache miss: {}", name); } taskDef = getTaskDefFromDB(name); + // calix + taskDefCache.put(name, null == taskDef ? DEFAULT : taskDef); + // end calix } - - return taskDef; + // calix + return DEFAULT.equals(taskDef) ? null : taskDef; + // end calix } @Override diff --git a/redis-lock/src/main/java/com/netflix/conductor/redislock/lock/RedisLock.java b/redis-lock/src/main/java/com/netflix/conductor/redislock/lock/RedisLock.java index 433bacf6f..b55fa71bd 100644 --- a/redis-lock/src/main/java/com/netflix/conductor/redislock/lock/RedisLock.java +++ b/redis-lock/src/main/java/com/netflix/conductor/redislock/lock/RedisLock.java @@ -86,6 +86,13 @@ public void releaseLock(String lockId) { @Override public void deleteLock(String lockId) { // Noop for Redlock algorithm as releaseLock / unlock deletes it. + // calix + try { + redisson.getKeys().delete(parseLockId(lockId)); + } catch (Exception e) { + LOGGER.error("Delete lock {} failed", lockId, e); + } + // end calix } private String parseLockId(String lockId) { diff --git a/server/build.gradle b/server/build.gradle index 09c0ce0c2..5c1f64d19 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -18,7 +18,9 @@ plugins { dependencies { implementation project(':conductor-core') implementation project(':conductor-rest') - implementation project(':conductor-grpc-server') + // calix + // implementation project(':conductor-grpc-server') + // end calix //Event Systems implementation project(':conductor-amqp') @@ -27,16 +29,22 @@ dependencies { implementation project(':conductor-awssqs-event-queue') //External Payload Storage - implementation project(':conductor-azureblob-storage') + // calix + // implementation project(':conductor-azureblob-storage') + // end calix implementation project(':conductor-postgres-external-storage') implementation project(':conductor-awss3-storage') //Persistence implementation project(':conductor-redis-persistence') - implementation project(':conductor-cassandra-persistence') + // calix + // implementation project(':conductor-cassandra-persistence') + // end calix implementation project(':conductor-postgres-persistence') - implementation project(':conductor-mysql-persistence') + // calix + // implementation project(':conductor-mysql-persistence') + // end calix //Indexing (note: Elasticsearch 6 is deprecated) implementation project(':conductor-es7-persistence') @@ -68,9 +76,12 @@ dependencies { implementation "org.postgresql:postgresql:${revPostgres}" implementation 'org.springframework.boot:spring-boot-starter-actuator' - implementation ("io.orkes.queues:orkes-conductor-queues:${revOrkesQueues}") { - exclude group: 'com.netflix.conductor', module: 'conductor-core' - } + // calix + // implementation ("io.orkes.queues:orkes-conductor-queues:${revOrkesQueues}") { + // exclude group: 'com.netflix.conductor', module: 'conductor-core' + //} + implementation fileTree(dir: 'libs', include: ['*.jar']) + // end calix implementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}" diff --git a/server/src/main/java/com/netflix/conductor/Conductor.java b/server/src/main/java/com/netflix/conductor/Conductor.java index 10a0f85bd..fdbc3ce4d 100644 --- a/server/src/main/java/com/netflix/conductor/Conductor.java +++ b/server/src/main/java/com/netflix/conductor/Conductor.java @@ -28,7 +28,9 @@ // In case that SQL database is selected this class will be imported back in the appropriate // database persistence module. @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) -@ComponentScan(basePackages = {"com.netflix.conductor", "io.orkes.conductor"}) +// calix +@ComponentScan(basePackages = {"com.netflix.conductor", "io.orkes.conductor", "com.calix"}) +// end calix public class Conductor { private static final Logger log = LoggerFactory.getLogger(Conductor.class); diff --git a/server/src/main/resources/log4j2.xml b/server/src/main/resources/log4j2.xml index a35b65824..196507c51 100644 --- a/server/src/main/resources/log4j2.xml +++ b/server/src/main/resources/log4j2.xml @@ -19,7 +19,7 @@ - + diff --git a/settings.gradle b/settings.gradle index 5c39b77fe..f730fc95c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -38,10 +38,14 @@ include 'core' include 'client' include 'client-spring' -include 'cassandra-persistence' +// calix +// include 'cassandra-persistence' +// end calix include 'redis-persistence' -include 'es6-persistence' +// calix +// include 'es6-persistence' +// end calix include 'redis-lock' @@ -54,9 +58,11 @@ include 'json-jq-task' include 'http-task' include 'rest' -include 'grpc' -include 'grpc-server' -include 'grpc-client' +// calix +// include 'grpc' +// include 'grpc-server' +// include 'grpc-client' +// end calix include 'java-sdk' @@ -65,11 +71,15 @@ include 'workflow-event-listener' include 'test-util' include 'kafka' include 'common-persistence' -include 'mysql-persistence' +// calix +// include 'mysql-persistence' +// end calix include 'postgres-persistence' include 'metrics' include 'es7-persistence' -include 'azureblob-storage' +// calix +// include 'azureblob-storage' +// end calix include 'postgres-external-storage' include 'amqp' include 'nats' diff --git a/test-harness/build.gradle b/test-harness/build.gradle index c250eddf6..4a334c443 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -6,11 +6,17 @@ dependencies { testImplementation project(':conductor-rest') testImplementation project(':conductor-core') testImplementation project(':conductor-redis-persistence') - testImplementation project(':conductor-cassandra-persistence') + // calix + // testImplementation project(':conductor-cassandra-persistence') + // end calix testImplementation project(':conductor-es7-persistence') - testImplementation project(':conductor-grpc-server') + // calix + // testImplementation project(':conductor-grpc-server') + // end calix testImplementation project(':conductor-client') - testImplementation project(':conductor-grpc-client') + // calix + // testImplementation project(':conductor-grpc-client') + // end calix testImplementation project(':conductor-json-jq-task') testImplementation project(':conductor-http-task') diff --git a/test-util/build.gradle b/test-util/build.gradle index b3280ca51..19518e7fd 100644 --- a/test-util/build.gradle +++ b/test-util/build.gradle @@ -9,8 +9,10 @@ dependencies { compileOnly project(':conductor-server') implementation project(':conductor-client') implementation project(':conductor-rest') - implementation project(':conductor-grpc-server') - implementation project(':conductor-grpc-client') + // calix + // implementation project(':conductor-grpc-server') + // implementation project(':conductor-grpc-client') + // end calix implementation project(':conductor-redis-persistence')