Skip to content

Commit

Permalink
feat(core): add taskRunner output on ScriptOutput to get detailled …
Browse files Browse the repository at this point in the history
…information on underlying taskrunner
  • Loading branch information
tchiotludo committed Jan 7, 2025
1 parent 111493c commit c265b49
Show file tree
Hide file tree
Showing 22 changed files with 161 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public List<Document> generate(RegisteredPlugin registeredPlugin) throws Excepti
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTasks(), Task.class, "tasks"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTriggers(), AbstractTrigger.class, "triggers"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getConditions(), Condition.class, "conditions"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTaskRunners(), TaskRunner.class, "task-runners"));
//noinspection unchecked
result.addAll(this.generate(registeredPlugin, registeredPlugin.getTaskRunners(), (Class) TaskRunner.class, "task-runners"));
result.addAll(this.generate(registeredPlugin, registeredPlugin.getLogShippers(), LogShipper.class, "log-shipper"));

result.addAll(guides(registeredPlugin));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected static Map<String, Long> taskRunnerTypeCount(List<Flow> allFlows) {
})
.map(t -> {
try {
TaskRunner taskRunner = (TaskRunner) t.getClass().getMethod("getTaskRunner").invoke(t);
TaskRunner<?> taskRunner = (TaskRunner<?>) t.getClass().getMethod("getTaskRunner").invoke(t);
return taskRunner != null ? taskRunner.getType() : null;
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
return null;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static List<String> replaceInternalStorage(
List<String> commands,
boolean replaceWithRelativePath
) throws IOException, IllegalVariableEvaluationException {
return commands
return ListUtils.emptyOnNull(commands)
.stream()
.map(throwFunction(c -> runContext.render(c, additionalVars)))
.map(throwFunction(c -> ScriptService.replaceInternalStorage(runContext, c, replaceWithRelativePath)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@Getter
@NoArgsConstructor
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public abstract class TaskRunner implements Plugin, WorkerJobLifecycle {
public abstract class TaskRunner<T extends TaskRunnerDetailResult> implements Plugin, WorkerJobLifecycle {
@NotBlank
@Pattern(regexp="\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*(\\.\\p{javaJavaIdentifierStart}\\p{javaJavaIdentifierPart}*)*")
protected String type;
Expand Down Expand Up @@ -62,7 +62,7 @@ public abstract class TaskRunner implements Plugin, WorkerJobLifecycle {
* For remote task runner (like Kubernetes or in a cloud provider), <code>filesToUpload</code> must be used to upload input and namespace files to the runner,
* and <code>filesToDownload</code> must be used to download output files from the runner.
*/
public abstract RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception;
public abstract TaskRunnerResult<T> run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception;

public Map<String, Object> additionalVars(RunContext runContext, TaskCommands taskCommands) throws IllegalVariableEvaluationException {
if (this.additionalVars == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.kestra.core.models.tasks.runners;

import io.kestra.core.models.tasks.Output;
import lombok.Getter;
import lombok.experimental.SuperBuilder;

@Getter
@SuperBuilder
public class TaskRunnerDetailResult implements Output {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.kestra.core.models.tasks.runners;

import io.kestra.core.models.tasks.Output;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@Getter
@Builder
@NoArgsConstructor
public class TaskRunnerResult<T extends TaskRunnerDetailResult> implements Output {
private int exitCode;
private AbstractLogConsumer logConsumer;
private T details;

@SuppressWarnings("unchecked")
public TaskRunnerResult(int exitCode, AbstractLogConsumer logConsumer) {
this.exitCode = exitCode;
this.logConsumer = logConsumer;
this.details = (T) TaskRunnerDetailResult.builder().build();
}
}
7 changes: 4 additions & 3 deletions core/src/main/java/io/kestra/core/plugins/PluginScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private RegisteredPlugin scanClassLoader(final ClassLoader classLoader,
List<Class<? extends Condition>> conditions = new ArrayList<>();
List<Class<? extends StorageInterface>> storages = new ArrayList<>();
List<Class<? extends SecretPluginInterface>> secrets = new ArrayList<>();
List<Class<? extends TaskRunner>> taskRunners = new ArrayList<>();
List<Class<? extends TaskRunner<?>>> taskRunners = new ArrayList<>();
List<Class<? extends AppPluginInterface>> apps = new ArrayList<>();
List<Class<? extends AppBlockInterface>> appBlocks = new ArrayList<>();
List<Class<? extends Chart<?>>> charts = new ArrayList<>();
Expand Down Expand Up @@ -142,9 +142,10 @@ private RegisteredPlugin scanClassLoader(final ClassLoader classLoader,
log.debug("Loading SecretPlugin plugin: '{}'", plugin.getClass());
secrets.add(storage.getClass());
}
case TaskRunner runner -> {
case TaskRunner<?> runner -> {
log.debug("Loading TaskRunner plugin: '{}'", plugin.getClass());
taskRunners.add(runner.getClass());
//noinspection unchecked
taskRunners.add((Class<? extends TaskRunner<?>>) runner.getClass());
}
case AppPluginInterface app -> {
log.debug("Loading AppPlugin plugin: '{}'", plugin.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class RegisteredPlugin {
private final List<Class<? extends Condition>> conditions;
private final List<Class<? extends StorageInterface>> storages;
private final List<Class<? extends SecretPluginInterface>> secrets;
private final List<Class<? extends TaskRunner>> taskRunners;
private final List<Class<? extends TaskRunner<?>>> taskRunners;
private final List<Class<? extends AppPluginInterface>> apps;
private final List<Class<? extends AppBlockInterface>> appBlocks;
private final List<Class<? extends Chart<?>>> charts;
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/java/io/kestra/plugin/core/runner/Process.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
)
}
)
public class Process extends TaskRunner {
public class Process extends TaskRunner<TaskRunnerDetailResult> {

/**
* Convenient default instance to be used as task default value for a 'taskRunner' property.
Expand All @@ -123,7 +123,7 @@ public static Process instance() {
}

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception {
public TaskRunnerResult<TaskRunnerDetailResult> run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) throws Exception {
Logger logger = runContext.logger();
AbstractLogConsumer defaultLogConsumer = taskCommands.getLogConsumer();

Expand Down Expand Up @@ -157,7 +157,7 @@ public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<S
logger.debug("Command succeed with exit code {}", exitCode);
}

return new RunnerResult(exitCode, defaultLogConsumer);
return new TaskRunnerResult<>(exitCode, defaultLogConsumer);
} catch (InterruptedException e) {
logger.warn("Killing process {} for InterruptedException", pid);
killDescendantsOf(process.toHandle(), logger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void taskRunner() throws URISyntaxException {
PluginScanner pluginScanner = new PluginScanner(ClassPluginDocumentationTest.class.getClassLoader());
RegisteredPlugin scan = pluginScanner.scan();

ClassPluginDocumentation<? extends TaskRunner> doc = ClassPluginDocumentation.of(jsonSchemaGenerator, scan, Process.class, null);
ClassPluginDocumentation<? extends TaskRunner<?>> doc = ClassPluginDocumentation.of(jsonSchemaGenerator, scan, Process.class, null);

assertThat((Map<?, ?>) doc.getPropertiesSchema().get("properties"), anEmptyMap());
assertThat(doc.getCls(), is("io.kestra.plugin.core.runner.Process"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ void taskRunner() throws IOException {
RegisteredPlugin scan = pluginScanner.scan();
Class<Process> processTaskRunner = scan.findClass(Process.class.getName()).orElseThrow();

ClassPluginDocumentation<? extends TaskRunner> doc = ClassPluginDocumentation.of(jsonSchemaGenerator, scan, processTaskRunner, TaskRunner.class);
ClassPluginDocumentation<Process> doc = ClassPluginDocumentation.of(jsonSchemaGenerator, scan, processTaskRunner, Process.class);

String render = DocumentationGenerator.render(doc);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class TaskRunnerTest {

@Test
void additionalVarsAndEnv() throws IllegalVariableEvaluationException {
TaskRunner taskRunner = new TaskRunnerAdditional(true);
TaskRunner<?> taskRunner = new TaskRunnerAdditional(true);
TaskCommands taskCommands = new TaskCommandsAdditional();

Map<String, Object> contextVariables = Map.of(
Expand Down Expand Up @@ -75,7 +75,7 @@ void additionalVarsAndEnv() throws IllegalVariableEvaluationException {
)));
}

private static class TaskRunnerAdditional extends TaskRunner {
private static class TaskRunnerAdditional extends TaskRunner<TaskRunnerDetailResult> {

public static final String RUNNER_BUCKET_PATH = "{{runnerBucketPath}}";
public static final String RUNNER_WORKING_DIR = "runnerWorkingDir";
Expand All @@ -97,7 +97,7 @@ public TaskRunnerAdditional(boolean overrideEnvValues) {
}

@Override
public RunnerResult run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) {
public TaskRunnerResult<TaskRunnerDetailResult> run(RunContext runContext, TaskCommands taskCommands, List<String> filesToDownload) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
class ProcessTest extends AbstractTaskRunnerTest {

@Override
protected TaskRunner taskRunner() {
protected TaskRunner<?> taskRunner() {
return new Process();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class AbstractExecScript extends Task implements RunnableTask<Sc
@PluginProperty
@Builder.Default
@Valid
protected TaskRunner taskRunner = Docker.builder()
protected TaskRunner<?> taskRunner = Docker.builder()
.type(Docker.class.getName())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.State;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.runners.TaskRunnerDetailResult;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -44,6 +45,8 @@ public class ScriptOutput implements Output {
@JsonIgnore
private Boolean warningOnStdErr;

private TaskRunnerDetailResult taskRunner;

@Override
public Optional<State.Type> finalState() {
return this.warningOnStdErr != null && this.warningOnStdErr && this.stdErrLineCount > 0 ? Optional.of(State.Type.WARNING) : Output.super.finalState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class CommandsWrapper implements TaskCommands {
private String containerImage;

@With
private TaskRunner taskRunner;
private TaskRunner<?> taskRunner;

@With
private DockerOptions dockerOptions;
Expand Down Expand Up @@ -131,7 +131,7 @@ public CommandsWrapper addEnv(Map<String, String> envs) {
return this;
}

public ScriptOutput run() throws Exception {
public <T extends TaskRunnerDetailResult> ScriptOutput run() throws Exception {
if (this.namespaceFiles != null && !Boolean.FALSE.equals(runContext.render(this.namespaceFiles.getEnabled()).as(Boolean.class).orElse(true))) {

List<NamespaceFile> matchedNamespaceFiles = runContext.storage()
Expand All @@ -147,7 +147,7 @@ public ScriptOutput run() throws Exception {
}));
}

TaskRunner realTaskRunner = this.getTaskRunner();
TaskRunner<T> realTaskRunner = this.getTaskRunner();
if (this.inputFiles != null) {
FilesService.inputFiles(runContext, realTaskRunner.additionalVars(runContext, this), this.inputFiles);
}
Expand All @@ -157,17 +157,25 @@ public ScriptOutput run() throws Exception {
RunContext taskRunnerRunContext = initializer.forPlugin(((DefaultRunContext) runContext).clone(), realTaskRunner);
this.commands = this.render(runContext, commands);

var outputBuilder = ScriptOutput.builder().warningOnStdErr(this.warningOnStdErr);
ScriptOutput.ScriptOutputBuilder scriptOutputBuilder = ScriptOutput.builder()
.warningOnStdErr(this.warningOnStdErr);

try {
RunnerResult runnerResult = realTaskRunner.run(taskRunnerRunContext, this, this.outputFiles);
return outputBuilder.exitCode(runnerResult.getExitCode())
.stdOutLineCount(runnerResult.getLogConsumer().getStdOutCount())
.stdErrLineCount(runnerResult.getLogConsumer().getStdErrCount())
.vars(runnerResult.getLogConsumer().getOutputs())
TaskRunnerResult<T> taskRunnerResult = realTaskRunner.run(taskRunnerRunContext, this, this.outputFiles);
scriptOutputBuilder.exitCode(taskRunnerResult.getExitCode())
.outputFiles(getOutputFiles(taskRunnerRunContext))
.build();
.taskRunner(taskRunnerResult.getDetails());

if (taskRunnerResult.getLogConsumer() != null) {
scriptOutputBuilder
.stdOutLineCount(taskRunnerResult.getLogConsumer().getStdOutCount())
.stdErrLineCount(taskRunnerResult.getLogConsumer().getStdErrCount())
.vars(taskRunnerResult.getLogConsumer().getOutputs());
}

return scriptOutputBuilder.build();
} catch (TaskException e) {
var output = outputBuilder.exitCode(e.getExitCode())
var output = scriptOutputBuilder.exitCode(e.getExitCode())
.stdOutLineCount(e.getStdOutCount())
.stdErrLineCount(e.getStdErrCount())
.vars(e.getLogConsumer() != null ? e.getLogConsumer().getOutputs() : null)
Expand All @@ -189,20 +197,21 @@ private Map<String, URI> getOutputFiles(RunContext taskRunnerRunContext) throws
return outputFiles;
}

public TaskRunner getTaskRunner() {
@SuppressWarnings("unchecked")
public <T extends TaskRunnerDetailResult> TaskRunner<T> getTaskRunner() {
if (runnerType != null) {
return switch (runnerType) {
case DOCKER -> Docker.from(dockerOptions);
case PROCESS -> new Process();
case DOCKER -> (TaskRunner<T>) Docker.from(dockerOptions);
case PROCESS -> (TaskRunner<T>) new Process();
};
}

// special case to take into account the deprecated dockerOptions if set
if (taskRunner instanceof Docker && dockerOptions != null) {
return Docker.from(dockerOptions);
return (TaskRunner<T>) Docker.from(dockerOptions);
}

return taskRunner;
return (TaskRunner<T>) taskRunner;
}

public Boolean getEnableOutputDirectory() {
Expand All @@ -226,7 +235,7 @@ public Path getOutputDirectory() {
}

public String render(RunContext runContext, String command, List<String> internalStorageLocalFiles) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
TaskRunner<?> taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
Expand All @@ -236,7 +245,7 @@ public String render(RunContext runContext, String command, List<String> interna
}

public List<String> render(RunContext runContext, List<String> commands) throws IllegalVariableEvaluationException, IOException {
TaskRunner taskRunner = this.getTaskRunner();
TaskRunner<?> taskRunner = this.getTaskRunner();
return ScriptService.replaceInternalStorage(
this.runContext,
taskRunner.additionalVars(runContext, this),
Expand Down
Loading

0 comments on commit c265b49

Please sign in to comment.