Skip to content

Commit

Permalink
fix(core): parallel task should not resolved error and finally in par…
Browse files Browse the repository at this point in the history
…allel
  • Loading branch information
tchiotludo committed Jan 15, 2025
1 parent 45bd08c commit f1c147c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 4 deletions.
16 changes: 14 additions & 2 deletions core/src/main/java/io/kestra/core/runners/FlowableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,19 @@ public static List<NextTaskRun> resolveParallelNexts(
parentTaskRun
);

boolean isTasks = tasks.equals(currentTasks);

// errors & finally must be run as sequential tasks
if (!isTasks) {
return resolveSequentialNexts(
execution,
tasks,
errors,
_finally,
parentTaskRun
);
}

// all tasks run
List<TaskRun> taskRuns = execution.findTaskRunByTasks(currentTasks, parentTaskRun);

Expand Down Expand Up @@ -393,8 +406,7 @@ public static List<NextTaskRun> resolveParallelNexts(
return Collections.emptyList();
}

private final static TypeReference<List<Object>> TYPE_REFERENCE = new TypeReference<>() {
};
private final static TypeReference<List<Object>> TYPE_REFERENCE = new TypeReference<>() {};
private final static ObjectMapper MAPPER = JacksonMapper.ofJson();

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down
38 changes: 36 additions & 2 deletions core/src/test/java/io/kestra/plugin/core/flow/DagTest.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
package io.kestra.plugin.core.flow;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.validations.ModelValidator;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import io.kestra.core.serializers.YamlParser;
import io.kestra.core.utils.TestsUtils;
import jakarta.inject.Inject;
import jakarta.validation.ConstraintViolationException;
import java.io.File;
import java.net.URL;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;

import org.junit.jupiter.api.Test;

@KestraTest(startRunner = true)
Expand All @@ -27,6 +34,12 @@ public class DagTest {
@Inject
ModelValidator modelValidator;

@Inject
protected RunnerUtils runnerUtils;

@Inject
private FlowInputOutput flowIO;

@Test
@ExecuteFlow("flows/valids/dag.yaml")
void dag(Execution execution) {
Expand Down Expand Up @@ -56,6 +69,27 @@ void dagNotExistTask() {
assertThat(validate.get().getMessage(), containsString("dag: Not existing task id in dependency: taskX"));
}

@Test
@LoadFlows({"flows/valids/finally-dag.yaml"})
void errors() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests", "finally-dag", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, Map.of("failed", true)),
Duration.ofSeconds(60)
);

assertThat(execution.getTaskRunList(), hasSize(9));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("ko").getFirst().getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("a1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("a1").getFirst().getState().getEndDate().orElseThrow()), is(true));
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("e1").getFirst().getState().getEndDate().orElseThrow()), is(true));
}

private Flow parse(String path) {
URL resource = TestsUtils.class.getClassLoader().getResource(path);
assert resource != null;
Expand Down
37 changes: 37 additions & 0 deletions core/src/test/java/io/kestra/plugin/core/flow/ParallelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,30 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.junit.annotations.LoadFlows;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.flows.State;
import io.kestra.core.queues.QueueException;
import io.kestra.core.runners.FlowInputOutput;
import io.kestra.core.runners.RunnerUtils;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@KestraTest(startRunner = true)
class ParallelTest {
@Inject
protected RunnerUtils runnerUtils;

@Inject
private FlowInputOutput flowIO;

@Test
@ExecuteFlow("flows/valids/parallel.yaml")
Expand All @@ -22,4 +38,25 @@ void parallel(Execution execution) {
void parallelNested(Execution execution) {
assertThat(execution.getTaskRunList(), hasSize(11));
}

@Test
@LoadFlows({"flows/valids/finally-parallel.yaml"})
void errors() throws QueueException, TimeoutException {
Execution execution = runnerUtils.runOne(
null,
"io.kestra.tests", "finally-parallel", null,
(flow, execution1) -> flowIO.readExecutionInputs(flow, execution1, Map.of("failed", true)),
Duration.ofSeconds(60)
);

assertThat(execution.getTaskRunList(), hasSize(10));
assertThat(execution.getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("ko").getFirst().getState().getCurrent(), is(State.Type.FAILED));
assertThat(execution.findTaskRunsByTaskId("a1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("e1").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getCurrent(), is(State.Type.SUCCESS));
assertThat(execution.findTaskRunsByTaskId("a2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("a1").getFirst().getState().getEndDate().orElseThrow()), is(true));
assertThat(execution.findTaskRunsByTaskId("e2").getFirst().getState().getStartDate().isAfter(execution.findTaskRunsByTaskId("e1").getFirst().getState().getEndDate().orElseThrow()), is(true));
}
}

0 comments on commit f1c147c

Please sign in to comment.