Skip to content

Commit

Permalink
refactor: migrate gke and pubsub to dynamic properties (#470)
Browse files Browse the repository at this point in the history
* refactor: migrate gke and pubsub to dynamic properties

* refactor: migrate vertexai to dynamic properties
  • Loading branch information
mgabelle authored Dec 16, 2024
1 parent d6a52db commit 4347f9b
Show file tree
Hide file tree
Showing 19 changed files with 138 additions and 133 deletions.
20 changes: 9 additions & 11 deletions src/main/java/io/kestra/plugin/gcp/gke/ClusterMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.cloud.container.v1.ClusterManagerClient;
import com.google.cloud.container.v1.ClusterManagerSettings;
import com.google.container.v1.Cluster;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
Expand Down Expand Up @@ -52,20 +53,17 @@ public class ClusterMetadata extends AbstractTask implements RunnableTask<Cluste
@Schema(
title = "Cluster ID whose metadata needs to be fetched."
)
@PluginProperty(dynamic = true)
private String clusterId;
private Property<String> clusterId;

@Schema(
title = "GCP zone in which the GKE cluster is present."
)
@PluginProperty(dynamic = true)
private String clusterZone;
private Property<String> clusterZone;

@Schema(
title = "GCP project ID where the GKE cluster is present."
)
@PluginProperty(dynamic = true)
private String clusterProjectId;
private Property<String> clusterProjectId;

@Override
public Output run(RunContext runContext) throws Exception {
Expand All @@ -79,8 +77,8 @@ public Output run(RunContext runContext) throws Exception {
.clusterIpv4Cidr(cluster.getClusterIpv4Cidr())
.subNetwork(cluster.getSubnetwork())
.endpoint(cluster.getEndpoint())
.zone(clusterZone)
.project(clusterProjectId)
.zone(runContext.render(clusterZone).as(String.class).orElse(null))
.project(runContext.render(clusterProjectId).as(String.class).orElse(null))
.createTime(cluster.getCreateTime())
.nodePoolsCount(cluster.getNodePoolsCount())
.nodePools(cluster.getNodePoolsList()
Expand Down Expand Up @@ -111,9 +109,9 @@ Cluster fetch(RunContext runContext) throws IllegalVariableEvaluationException,

try (ClusterManagerClient client = ClusterManagerClient.create(clusterManagerSettings)) {
return client.getCluster(
runContext.render(clusterProjectId),
runContext.render(clusterZone),
runContext.render(clusterId)
runContext.render(clusterProjectId).as(String.class).orElse(null),
runContext.render(clusterZone).as(String.class).orElse(null),
runContext.render(clusterId).as(String.class).orElse(null)
);
}
}
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/io/kestra/plugin/gcp/pubsub/AbstractPubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.pubsub.v1.*;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.gcp.AbstractTask;
import jakarta.validation.constraints.NotNull;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -25,11 +27,12 @@
@Getter
@NoArgsConstructor
abstract class AbstractPubSub extends AbstractTask implements PubSubConnectionInterface {
private String topic;
@NotNull
private Property<String> topic;

Publisher createPublisher(RunContext runContext) throws IOException, IllegalVariableEvaluationException {

TopicName topicName = TopicName.of(runContext.render(projectId).as(String.class).orElse(null), runContext.render(topic));
TopicName topicName = TopicName.of(runContext.render(projectId).as(String.class).orElse(null), runContext.render(topic).as(String.class).orElseThrow());
return Publisher.newBuilder(topicName)
.setCredentialsProvider(FixedCredentialsProvider.create(this.credentials(runContext)))
.setHeaderProvider(() -> Map.of("user-agent", "Kestra/" + runContext.version()))
Expand All @@ -38,7 +41,7 @@ Publisher createPublisher(RunContext runContext) throws IOException, IllegalVari

public ProjectSubscriptionName createSubscription(RunContext runContext, String subscription, boolean autoCreateSubscription) throws IOException, IllegalVariableEvaluationException {

TopicName topicName = TopicName.of(runContext.render(projectId).as(String.class).orElse(null), runContext.render(topic));
TopicName topicName = TopicName.of(runContext.render(projectId).as(String.class).orElse(null), runContext.render(topic).as(String.class).orElseThrow());
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(runContext.render(projectId).as(String.class).orElse(null), runContext.render(subscription));

if(autoCreateSubscription) {
Expand Down
41 changes: 20 additions & 21 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Consume.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -60,38 +61,37 @@ public class Consume extends AbstractPubSub implements RunnableTask<Consume.Outp
title = "The Pub/Sub subscription.",
description = "The Pub/Sub subscription. It will be created automatically if it didn't exist and 'autoCreateSubscription' is enabled."
)
@PluginProperty(dynamic = true)
@NotNull
private String subscription;
private Property<String> subscription;

@Schema(
title = "Whether the Pub/Sub subscription should be created if not exists."
)
@PluginProperty
@Builder.Default
private Boolean autoCreateSubscription = true;
private Property<Boolean> autoCreateSubscription = Property.of(true);

@PluginProperty
@Schema(title = "Max number of records, when reached the task will end.")
private Integer maxRecords;
private Property<Integer> maxRecords;

@PluginProperty
@Schema(title = "Max duration in the Duration ISO format, after that the task will end.")
private Duration maxDuration;
private Property<Duration> maxDuration;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;
private Property<SerdeType> serdeType = Property.of(SerdeType.STRING);

@Override
public Output run(RunContext runContext) throws Exception {
if (this.maxDuration == null && this.maxRecords == null) {
throw new IllegalArgumentException("'maxDuration' or 'maxRecords' must be set to avoid an infinite loop");
}

var subscriptionName = this.createSubscription(runContext, subscription, autoCreateSubscription);
var subscriptionName = this.createSubscription(
runContext,
runContext.render(subscription).as(String.class).orElseThrow(),
runContext.render(autoCreateSubscription).as(Boolean.class).orElseThrow()
);
var total = new AtomicInteger();
var started = ZonedDateTime.now();
var tempFile = runContext.workingDir().createTempFile(".ion").toFile();
Expand All @@ -100,7 +100,7 @@ public Output run(RunContext runContext) throws Exception {
AtomicReference<Exception> threadException = new AtomicReference<>();
MessageReceiver receiver = (message, consumer) -> {
try {
FileSerde.write(outputFile, Message.of(message, serdeType));
FileSerde.write(outputFile, Message.of(message, runContext.render(serdeType).as(SerdeType.class).orElseThrow()));
total.getAndIncrement();
consumer.ack();
}
Expand All @@ -114,7 +114,7 @@ public Output run(RunContext runContext) throws Exception {
.build();
subscriber.startAsync().awaitRunning();

while (!this.ended(total, started)) {
while (!this.ended(total, started, runContext)) {
if (threadException.get() != null) {
subscriber.stopAsync().awaitTerminated();
throw threadException.get();
Expand All @@ -123,7 +123,7 @@ public Output run(RunContext runContext) throws Exception {
}
subscriber.stopAsync().awaitTerminated();

runContext.metric(Counter.of("records", total.get(), "topic", runContext.render(this.getTopic())));
runContext.metric(Counter.of("records", total.get(), "topic", runContext.render(this.getTopic()).as(String.class).orElseThrow()));
outputFile.flush();
}
return Output.builder()
Expand All @@ -132,15 +132,14 @@ public Output run(RunContext runContext) throws Exception {
.build();
}

private boolean ended(AtomicInteger count, ZonedDateTime start) {
if (this.maxRecords != null && count.get() >= this.maxRecords) {
return true;
}
if (this.maxDuration != null && ZonedDateTime.now().toEpochSecond() > start.plus(this.maxDuration).toEpochSecond()) {
private boolean ended(AtomicInteger count, ZonedDateTime start, RunContext runContext) throws IllegalVariableEvaluationException {
var max = runContext.render(this.maxRecords).as(Integer.class);
if (max.isPresent() && count.get() >= max.get()) {
return true;
}

return false;
var duration = runContext.render(this.maxDuration).as(Duration.class);
return duration.isPresent() && ZonedDateTime.now().toEpochSecond() > start.plus(duration.get()).toEpochSecond();
}

@Builder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.gcp.pubsub;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.plugin.gcp.GcpInterface;
import io.swagger.v3.oas.annotations.media.Schema;

Expand All @@ -13,5 +14,5 @@ public interface PubSubConnectionInterface extends GcpInterface {
)
@PluginProperty(dynamic = true)
@NotNull
String getTopic();
Property<String> getTopic();
}
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
Expand Down Expand Up @@ -69,10 +70,9 @@ public class Publish extends AbstractPubSub implements RunnableTask<Publish.Outp
private Object from;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;
private Property<SerdeType> serdeType = Property.of(SerdeType.STRING);

@Override
public Publish.Output run(RunContext runContext) throws Exception {
Expand Down Expand Up @@ -104,13 +104,13 @@ public Publish.Output run(RunContext runContext) throws Exception {
count = resultFlowable.reduce(Integer::sum).blockOptional().orElse(0);
} else {
var msg = JacksonMapper.toMap(this.from, Message.class);
publisher.publish(msg.to(runContext, this.serdeType));
publisher.publish(msg.to(runContext, runContext.render(this.serdeType).as(SerdeType.class).orElseThrow()));
}

publisher.shutdown();

// metrics
runContext.metric(Counter.of("records", count, "topic", runContext.render(this.getTopic())));
runContext.metric(Counter.of("records", count, "topic", runContext.render(this.getTopic()).as(String.class).orElseThrow()));

return Output.builder()
.messagesCount(count)
Expand All @@ -120,7 +120,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
private Flux<Integer> buildFlowable(Flux<Message> flowable, Publisher publisher, RunContext runContext) throws Exception {
return flowable
.map(throwFunction(message -> {
publisher.publish(message.to(runContext, this.serdeType));
publisher.publish(message.to(runContext, runContext.render(this.serdeType).as(SerdeType.class).orElseThrow()));
return 1;
}));
}
Expand Down
28 changes: 14 additions & 14 deletions src/main/java/io/kestra/plugin/gcp/pubsub/RealtimeTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,38 +78,33 @@ public class RealtimeTrigger extends AbstractTrigger implements RealtimeTriggerI
@Builder.Default
private Property<List<String>> scopes = Property.of(Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));

private String topic;
private Property<String> topic;

@Schema(
title = "The Pub/Sub subscription",
description = "The Pub/Sub subscription. It will be created automatically if it didn't exist and 'autoCreateSubscription' is enabled."
)
@PluginProperty(dynamic = true)
private String subscription;
private Property<String> subscription;

@Schema(
title = "Whether the Pub/Sub subscription should be created if not exist"
)
@PluginProperty
@Builder.Default
private Boolean autoCreateSubscription = true;
private Property<Boolean> autoCreateSubscription = Property.of(true);

@Builder.Default
private final Duration interval = Duration.ofSeconds(60);
private final Property<Duration> interval = Property.of(Duration.ofSeconds(60));

@PluginProperty
@Schema(title = "Max number of records, when reached the task will end.")
private Integer maxRecords;
private Property<Integer> maxRecords;

@PluginProperty
@Schema(title = "Max duration in the Duration ISO format, after that the task will end.")
private Duration maxDuration;
private Property<Duration> maxDuration;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;
private Property<SerdeType> serdeType = Property.of(SerdeType.STRING);

@Builder.Default
@Getter(AccessLevel.NONE)
Expand Down Expand Up @@ -142,15 +137,20 @@ public Publisher<Execution> evaluate(ConditionContext conditionContext, TriggerC
}

private Publisher<Message> publisher(final Consume task, final RunContext runContext) throws Exception {
ProjectSubscriptionName subscriptionName = task.createSubscription(runContext, subscription, autoCreateSubscription);
ProjectSubscriptionName subscriptionName = task.createSubscription(
runContext,
runContext.render(subscription).as(String.class).orElse(null),
runContext.render(autoCreateSubscription).as(Boolean.class).orElse(true)
);
GoogleCredentials credentials = task.credentials(runContext);

var serdeTypeRendered = runContext.render(serdeType).as(SerdeType.class).orElseThrow();
return Flux.create(
emitter -> {
AtomicInteger total = new AtomicInteger();
final MessageReceiver receiver = (message, consumer) -> {
try {
emitter.next(Message.of(message, serdeType));
emitter.next(Message.of(message, serdeTypeRendered));
total.getAndIncrement();
consumer.ack();
} catch(Exception exception) {
Expand Down
17 changes: 6 additions & 11 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,38 +65,33 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface,
@Builder.Default
private Property<List<String>> scopes = Property.of(Collections.singletonList("https://www.googleapis.com/auth/cloud-platform"));

private String topic;
private Property<String> topic;

@Schema(
title = "The Pub/Sub subscription",
description = "The Pub/Sub subscription. It will be created automatically if it didn't exist and 'autoCreateSubscription' is enabled."
)
@PluginProperty(dynamic = true)
private String subscription;
private Property<String> subscription;

@Schema(
title = "Whether the Pub/Sub subscription should be created if not exist"
)
@PluginProperty
@Builder.Default
private Boolean autoCreateSubscription = true;
private Property<Boolean> autoCreateSubscription = Property.of(true);

@Builder.Default
private final Duration interval = Duration.ofSeconds(60);

@PluginProperty
@Schema(title = "Max number of records, when reached the task will end.")
private Integer maxRecords;
private Property<Integer> maxRecords;

@PluginProperty
@Schema(title = "Max duration in the Duration ISO format, after that the task will end.")
private Duration maxDuration;
private Property<Duration> maxDuration;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;
private Property<SerdeType> serdeType = Property.of(SerdeType.STRING);

@Override
public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.cloud.vertexai.generativeai.GenerativeModel;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.gcp.AbstractTask;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -32,9 +33,8 @@ abstract class AbstractGenerativeAi extends AbstractTask {
@Schema(
title = "The GCP region."
)
@PluginProperty(dynamic = true)
@NotNull
private String region;
private Property<String> region;

@Builder.Default
@Schema(
Expand Down
Loading

0 comments on commit 4347f9b

Please sign in to comment.