Skip to content

Commit

Permalink
refactor: move left properties to dynamic (#476)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Jan 20, 2025
1 parent 3c0c360 commit 5211823
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 90 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kestra.plugin.gcp.bigquery.models;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
Expand Down
33 changes: 13 additions & 20 deletions src/main/java/io/kestra/plugin/gcp/cli/GCloudCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,31 +97,23 @@ public class GCloudCLI extends Task implements RunnableTask<ScriptOutput>, Names
@Schema(
title = "The full service account JSON key to use to authenticate to gcloud."
)
@PluginProperty(dynamic = true)
protected String serviceAccount;
protected Property<String> serviceAccount;

@Schema(
title = "The GCP project ID to scope the commands to."
)
@PluginProperty(dynamic = true)
protected String projectId;
protected Property<String> projectId;

@Schema(
title = "The commands to run."
)
@PluginProperty(dynamic = true)
@NotNull
@NotEmpty
protected List<String> commands;
protected Property<List<String>> commands;

@Schema(
title = "Additional environment variables for the current process."
)
@PluginProperty(
additionalProperties = String.class,
dynamic = true
)
protected Map<String, String> env;
protected Property<Map<String, String>> env;

@Schema(
title = "Deprecated, use 'taskRunner' instead"
Expand All @@ -140,9 +132,8 @@ public class GCloudCLI extends Task implements RunnableTask<ScriptOutput>, Names
private TaskRunner<?> taskRunner = Docker.instance();

@Schema(title = "The task runner container image, only used if the task runner is container-based.")
@PluginProperty(dynamic = true)
@Builder.Default
private String containerImage = DEFAULT_IMAGE;
private Property<String> containerImage = Property.of(DEFAULT_IMAGE);

private NamespaceFiles namespaceFiles;

Expand All @@ -159,12 +150,13 @@ public ScriptOutput run(RunContext runContext) throws Exception {
.withWarningOnStdErr(true)
.withDockerOptions(injectDefaults(getDocker()))
.withTaskRunner(this.taskRunner)
.withContainerImage(this.containerImage)
.withContainerImage(runContext.render(this.containerImage).as(String.class).orElseThrow())
.withCommands(
ScriptService.scriptCommands(
List.of("/bin/sh", "-c"),
null,
this.commands)
runContext.render(this.commands).asList(String.class)
)
)
.withEnv(this.getEnv(runContext))
.withNamespaceFiles(namespaceFiles)
Expand All @@ -191,19 +183,20 @@ private Map<String, String> getEnv(RunContext runContext) throws IOException, Il
Map<String, String> envs = new HashMap<>();

if (serviceAccount != null) {
Path serviceAccountPath = runContext.workingDir().createTempFile(runContext.render(this.serviceAccount).getBytes());
Path serviceAccountPath = runContext.workingDir().createTempFile(runContext.render(this.serviceAccount).as(String.class).orElseThrow().getBytes());
envs.putAll(Map.of(
"GOOGLE_APPLICATION_CREDENTIALS", serviceAccountPath.toString(),
"CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE", serviceAccountPath.toString()
));
}

if (projectId != null) {
envs.put("CLOUDSDK_CORE_PROJECT", runContext.render(this.projectId));
envs.put("CLOUDSDK_CORE_PROJECT", runContext.render(this.projectId).as(String.class).orElseThrow());
}

if (this.env != null) {
envs.putAll(this.env);
var renderedEnv = runContext.render(this.env).asMap(String.class, String.class);
if (!renderedEnv.isEmpty()) {
envs.putAll(renderedEnv);
}

return envs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.kestra.plugin.gcp.pubsub;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.plugin.gcp.GcpInterface;
import io.swagger.v3.oas.annotations.media.Schema;
Expand All @@ -12,7 +11,6 @@ public interface PubSubConnectionInterface extends GcpInterface {
title = "The Pub/Sub topic",
description = "The Pub/Sub topic. It must be created before executing the task."
)
@PluginProperty(dynamic = true)
@NotNull
Property<String> getTopic();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -48,12 +50,12 @@
public class TextCompletion extends AbstractGenerativeAi implements RunnableTask<TextCompletion.Output> {
private static final String MODEL_ID = "gemini-pro";

@PluginProperty(dynamic = true)
@Schema(
title = "Text input to generate model response.",
description = "Prompts can include preamble, questions, suggestions, instructions, or examples."
)
private String prompt;
@NotNull
private Property<String> prompt;

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -62,7 +64,7 @@ public Output run(RunContext runContext) throws Exception {

try (VertexAI vertexAI = new VertexAI.Builder().setProjectId(projectId).setLocation(region).setCredentials(this.credentials(runContext)).build()) {
var model = buildModel(MODEL_ID, vertexAI);
var content = ContentMaker.fromString(runContext.render(this.prompt));
var content = ContentMaker.fromString(runContext.render(this.prompt).as(String.class).orElseThrow());

var response = model.generateContent(content);
runContext.logger().debug(response.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.cloud.aiplatform.v1.EnvVar;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
Expand All @@ -24,45 +25,41 @@ public class ContainerSpec {
title = "The URI of a container image in the Container Registry that is to be run on each worker replica.",
description = "Must be on google container registry, example: `gcr.io/{{ project }}/{{ dir }}/{{ image }}:{{ tag }}`"
)
@PluginProperty(dynamic = true)
@NotNull
private String imageUri;
private Property<String> imageUri;

@Schema(
title = "The command to be invoked when the container is started.",
description = "It overrides the entrypoint instruction in Dockerfile when provided."
)
@PluginProperty(dynamic = true)
private List<String> commands;
private Property<List<String>> commands;

@Schema(
title = "The arguments to be passed when starting the container."
)
@PluginProperty(dynamic = true)
private List<String> args;
private Property<List<String>> args;

@Schema(
title = "Environment variables to be passed to the container.",
description = "Maximum limit is 100."
)
@PluginProperty(dynamic = true)
private Map<String, String> env;
private Property<Map<String, String>> env;

public com.google.cloud.aiplatform.v1.ContainerSpec to(RunContext runContext) throws IllegalVariableEvaluationException {
var builder = com.google.cloud.aiplatform.v1.ContainerSpec.newBuilder()
.setImageUri(runContext.render(this.getImageUri()))
.addAllCommand(this.getCommands() != null ? runContext.render(this.getCommands()) : List.of())
.addAllArgs(this.getArgs() != null ? runContext.render(this.getArgs()) : List.of());
.setImageUri(runContext.render(this.getImageUri()).as(String.class).orElseThrow())
.addAllCommand(runContext.render(this.getCommands()).asList(String.class))
.addAllArgs(runContext.render(this.getArgs()).asList(String.class));

if (this.getEnv() != null) {
builder.addAllEnv(this.getEnv()
builder.addAllEnv(runContext.render(this.getEnv()).asMap(String.class, String.class)
.entrySet()
.stream()
.map(throwFunction(e -> EnvVar.newBuilder()
.setName(runContext.render(e.getKey()))
.setValue(runContext.render(e.getValue()))
.map(e -> EnvVar.newBuilder()
.setName(e.getKey())
.setValue(e.getValue())
.build()
))
)
.toList()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public class CustomJobSpec {
title = "The name of a Vertex AI Tensorboard resource to which this CustomJob",
description = "will upload Tensorboard logs. Format:`projects/{project}/locations/{location}/tensorboards/{tensorboard}`"
)
@PluginProperty(dynamic = true)
private String tensorboard;
private Property<String> tensorboard;

@Schema(
title = "Whether you want Vertex AI to enable [interactive shell access](https://cloud.google.com/vertex-ai/docs/training/monitor-debug-interactive-shell) to training containers."
Expand Down Expand Up @@ -89,7 +88,7 @@ public com.google.cloud.aiplatform.v1.CustomJobSpec to(RunContext runContext) t
}

if (this.getTensorboard() != null) {
builder.setTensorboard(runContext.render(this.getTensorboard()));
builder.setTensorboard(runContext.render(this.getTensorboard()).as(String.class).orElseThrow());
}

if (this.getEnableWebAccess() != null) {
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/io/kestra/plugin/gcp/vertexai/models/DiscSpec.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.kestra.plugin.gcp.vertexai.models;

import com.google.cloud.aiplatform.v1.DiskSpec;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
Expand All @@ -17,21 +19,19 @@ public class DiscSpec {
@Schema(
title = "Type of the boot disk."
)
@PluginProperty(dynamic = false)
@Builder.Default
private DiskType bootDiskType = DiskType.PD_SSD;
private Property<DiskType> bootDiskType = Property.of(DiskType.PD_SSD);

@Schema(
title = "Size in GB of the boot disk."
)
@PluginProperty(dynamic = false)
@Builder.Default
private Integer bootDiskSizeGb = 100;
private Property<Integer> bootDiskSizeGb = Property.of(100);

public com.google.cloud.aiplatform.v1.DiskSpec to(RunContext runContext) throws IllegalVariableEvaluationException {
return com.google.cloud.aiplatform.v1.DiskSpec.newBuilder()
.setBootDiskType(this.getBootDiskType().value())
.setBootDiskSizeGb(this.getBootDiskSizeGb())
return DiskSpec.newBuilder()
.setBootDiskType(runContext.render(this.getBootDiskType()).as(DiskType.class).orElseThrow().value())
.setBootDiskSizeGb(runContext.render(this.getBootDiskSizeGb()).as(Integer.class).orElseThrow())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.plugin.gcp.vertexai.models;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
Expand All @@ -21,33 +21,25 @@ public class MachineSpec {
"See the [list of machine types supported for custom " +
"training](https://cloud.google.com/vertex-ai/docs/training/configure-compute#machine-types)."
)
@PluginProperty(dynamic = true)
@NotNull
private String machineType;
private Property<String> machineType;

@Schema(
title = "The number of accelerators to attach to the machine."
)
@PluginProperty(dynamic = false)
private Integer acceleratorCount;
private Property<Integer> acceleratorCount;

@Schema(
title = "The type of accelerator(s) that may be attached to the machine."
)
@PluginProperty(dynamic = true)
private com.google.cloud.aiplatform.v1.AcceleratorType acceleratorType;
private Property<com.google.cloud.aiplatform.v1.AcceleratorType> acceleratorType;

public com.google.cloud.aiplatform.v1.MachineSpec to(RunContext runContext) throws IllegalVariableEvaluationException {
com.google.cloud.aiplatform.v1.MachineSpec.Builder builder = com.google.cloud.aiplatform.v1.MachineSpec.newBuilder()
.setMachineType(runContext.render(this.getMachineType()));
.setMachineType(runContext.render(this.getMachineType()).as(String.class).orElseThrow());

if (this.getAcceleratorCount() != null) {
builder.setAcceleratorCount(this.getAcceleratorCount());
}

if (this.getAcceleratorType() != null) {
builder.setAcceleratorType(this.getAcceleratorType());
}
runContext.render(this.getAcceleratorCount()).as(Integer.class).ifPresent(builder::setAcceleratorCount);
runContext.render(this.getAcceleratorType()).as(com.google.cloud.aiplatform.v1.AcceleratorType.class).ifPresent(builder::setAcceleratorType);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.cloud.aiplatform.v1.EnvVar;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
Expand All @@ -24,44 +25,43 @@ public class PythonPackageSpec {
title = "The Google Cloud Storage location of the Python package files which are the training program and its dependent packages.",
description = "The maximum number of package URIs is 100."
)
@PluginProperty(dynamic = true)
@NotNull
private List<String> packageUris;
private Property<List<String>> packageUris;

@Schema(
title = "The Google Cloud Storage location of the Python package files which are the training program and its dependent packages.",
description = "The maximum number of package URIs is 100."
)
@PluginProperty(dynamic = true)
@NotNull
private List<String> args;
private Property<List<String>> args;

@Schema(
title = "Environment variables to be passed to the python module.",
description = "Maximum limit is 100."
)
@PluginProperty(dynamic = true)
@NotNull
private Map<String, String> envs;
private Property<Map<String, String>> envs;

public com.google.cloud.aiplatform.v1.PythonPackageSpec to(RunContext runContext) throws IllegalVariableEvaluationException {
com.google.cloud.aiplatform.v1.PythonPackageSpec.Builder builder = com.google.cloud.aiplatform.v1.PythonPackageSpec.newBuilder();

if (this.packageUris != null) {
builder.addAllPackageUris(runContext.render(this.packageUris));
var renderedPackage = runContext.render(this.packageUris).asList(String.class);
if (!renderedPackage.isEmpty()) {
builder.addAllPackageUris(renderedPackage);
}

if (this.args != null) {
builder.addAllArgs(runContext.render(this.args));
var renderedArgs = runContext.render(this.args).asList(String.class);
if (!renderedArgs.isEmpty()) {
builder.addAllArgs(renderedArgs);
}

if (this.packageUris != null) {
builder.addAllEnv(this.envs
if (!renderedPackage.isEmpty()) {
builder.addAllEnv(runContext.render(this.envs).asMap(String.class, String.class)
.entrySet()
.stream()
.map(throwFunction(e -> EnvVar.newBuilder()
.setName(runContext.render(e.getKey()))
.setValue(runContext.render(e.getValue()))
.setName(e.getKey())
.setValue(e.getValue())
.build()
))
.collect(Collectors.toList())
Expand Down
Loading

0 comments on commit 5211823

Please sign in to comment.