From 1fea8d74ce0da4174a4e7ae6d5eaf4eb275aec1f Mon Sep 17 00:00:00 2001 From: astelmashenko Date: Thu, 8 Jun 2023 15:47:39 +0300 Subject: [PATCH 1/2] removing dead tasks from queue message --- .../conductor/core/execution/AsyncSystemTaskExecutor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java index 07e1c6dd09..06355f5896 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -64,6 +64,12 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { TaskModel task = loadTaskQuietly(taskId); if (task == null) { LOGGER.error("TaskId: {} could not be found while executing {}", taskId, systemTask); + try { + LOGGER.debug("Cleaning up dead task from queue message: taskQueue={}, taskId={}", systemTask.getTaskType(), taskId); + queueDAO.remove(systemTask.getTaskType(), taskId); + } catch (Exception e) { + LOGGER.error("Failed to remove dead task from queue message: taskQueue={}, taskId={}", systemTask.getTaskType(), taskId); + } return; } From a709ad26c4fa48408e9d8e851c0b678452710b5d Mon Sep 17 00:00:00 2001 From: astelmashenko Date: Thu, 8 Jun 2023 15:52:56 +0300 Subject: [PATCH 2/2] fixed formatting --- .../core/execution/AsyncSystemTaskExecutor.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java index 06355f5896..86f7281943 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/AsyncSystemTaskExecutor.java @@ -65,10 +65,16 @@ public void execute(WorkflowSystemTask systemTask, String taskId) { if (task == null) { LOGGER.error("TaskId: {} could not be found while executing {}", taskId, systemTask); try { - LOGGER.debug("Cleaning up dead task from queue message: taskQueue={}, taskId={}", systemTask.getTaskType(), taskId); + LOGGER.debug( + "Cleaning up dead task from queue message: taskQueue={}, taskId={}", + systemTask.getTaskType(), + taskId); queueDAO.remove(systemTask.getTaskType(), taskId); } catch (Exception e) { - LOGGER.error("Failed to remove dead task from queue message: taskQueue={}, taskId={}", systemTask.getTaskType(), taskId); + LOGGER.error( + "Failed to remove dead task from queue message: taskQueue={}, taskId={}", + systemTask.getTaskType(), + taskId); } return; }