diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java index f4e1aab1bdb..694044946d4 100644 --- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java +++ b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/core/impl/ProcessorExecutorImpl.java @@ -103,8 +103,10 @@ public void executeStart(Element element, WorkflowContext context) { // If it is a continuous task execution transaction isolation if (element instanceof WorkflowTask) { - transactionHelper.execute(executeCompleteInTransaction(element, context), - TransactionDefinition.PROPAGATION_NESTED); + TransactionCallback callback = executeCompleteInTransaction(element, context); + if (callback != null) { + transactionHelper.execute(callback, TransactionDefinition.PROPAGATION_NESTED); + } return; } @@ -120,7 +122,9 @@ public void executeComplete(Element element, WorkflowContext context) { return; } List nextElements = processor.next(element, context); - nextElements.forEach(next -> executeStart(next, context)); + for (Element next : nextElements) { + executeStart(next, context); + } } private boolean isSkipCurrentElement(Element element, WorkflowContext context) { @@ -150,20 +154,20 @@ private void executeSkipAndNext(Element element, WorkflowContext context) { // Execute next context.getActionContext().setAction(((NextableElement) element).defaultNextAction()); List nextElements = processor.next(element, context); - nextElements.forEach(next -> executeStart(next, context)); + for (Element next : nextElements) { + executeStart(next, context); + } } private TransactionCallback executeCompleteInTransaction(Element element, WorkflowContext context) { - return s -> { - try { - executeComplete(element, context); - return null; - } catch (WorkflowNoRollbackException e) { // Exception does not roll back - throw e; - } catch (Exception e) { // The exception is only rolled back once - throw new WorkflowRollbackOnceException(e.getMessage()); - } - }; + try { + executeComplete(element, context); + return null; + } catch (WorkflowNoRollbackException e) { // Exception does not roll back + throw e; + } catch (Exception e) { // The exception is only rolled back once + throw new WorkflowRollbackOnceException(e.getMessage()); + } } }