Skip to content

Commit

Permalink
fix(core): flow should not continue tasks when having a finally
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Jan 15, 2025
1 parent f1c147c commit 9f12f9a
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,15 @@ public TaskRun findTaskRunByTaskIdAndValue(String id, List<String> values)
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedErrors afters tasks
* @param resolvedErrors finally tasks
* @return the flow we need to follow
*/
public List<ResolvedTask> findTaskDependingFlowState(
List<ResolvedTask> resolvedTasks,
List<ResolvedTask> resolvedErrors,
List<ResolvedTask> resolvedAfters
List<ResolvedTask> resolvedFinally
) {
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedAfters, null);
return this.findTaskDependingFlowState(resolvedTasks, resolvedErrors, resolvedFinally, null);
}

/**
Expand All @@ -353,7 +353,7 @@ public List<ResolvedTask> findTaskDependingFlowState(
*
* @param resolvedTasks normal tasks
* @param resolvedErrors errors tasks
* @param resolvedFinally afters tasks
* @param resolvedFinally finally tasks
* @param parentTaskRun the parent task
* @return the flow we need to follow
*/
Expand All @@ -367,7 +367,6 @@ public List<ResolvedTask> findTaskDependingFlowState(
resolvedErrors = removeDisabled(resolvedErrors);
resolvedFinally = removeDisabled(resolvedFinally);


List<TaskRun> errorsFlow = this.findTaskRunByTasks(resolvedErrors, parentTaskRun);
List<TaskRun> finallyFlow = this.findTaskRunByTasks(resolvedFinally, parentTaskRun);

Expand All @@ -390,7 +389,9 @@ public List<ResolvedTask> findTaskDependingFlowState(
}
}

if (this.isTerminated(resolvedTasks, parentTaskRun) && resolvedFinally != null) {
if (resolvedFinally != null && (
this.isTerminated(resolvedTasks, parentTaskRun) || this.hasFailed(resolvedTasks, parentTaskRun
))) {
return resolvedFinally;
}

Expand Down
24 changes: 24 additions & 0 deletions core/src/test/java/io/kestra/plugin/core/flow/FinallyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ void sequentialErrorBlockWithoutErrors() throws QueueException, TimeoutException
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

@Test
@LoadFlows({"flows/valids/finally-sequential-error-first.yaml"})
void sequentialErrorFirst() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "finally-sequential-error-first");

assertThat(execution.getTaskRunList(), hasSize(3));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("ko").getFirst().getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("ok").isEmpty(), is(true));
assertThat(execution.findTaskRunsByTaskId("a1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

@Test
@LoadFlows({"flows/valids/finally-sequential-error.yaml"})
void sequentialErrorBlockWithErrors() throws QueueException, TimeoutException {
Expand Down Expand Up @@ -350,4 +362,16 @@ void flowErrorBlockWithErrors() throws QueueException, TimeoutException {
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}

@Test
@LoadFlows({"flows/valids/finally-flow-error-first.yaml"})
void flowErrorFirst() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(null, "io.kestra.tests", "finally-flow-error-first");

assertThat(execution.getTaskRunList(), hasSize(2));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("ko").getFirst().getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("ok").isEmpty(), is(true));
assertThat(execution.findTaskRunsByTaskId("a1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
}
}
15 changes: 15 additions & 0 deletions core/src/test/resources/flows/valids/finally-flow-error-first.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
id: finally-flow-error-first
namespace: io.kestra.tests

tasks:
- id: ko
type: io.kestra.plugin.core.execution.Fail

- id: ok
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"

finally:
- id: a1
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
id: finally-sequential-error-first
namespace: io.kestra.tests

tasks:
- id: seq
type: io.kestra.plugin.core.flow.Sequential
tasks:
- id: ko
type: io.kestra.plugin.core.execution.Fail

- id: ok
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"

finally:
- id: a1
type: io.kestra.plugin.core.debug.Return
format: "{{ task.id }}"

0 comments on commit 9f12f9a

Please sign in to comment.