Skip to content

Commit

Permalink
refactor: migrate gcs to dynamic properties (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Dec 16, 2024
1 parent c9a468c commit d6a52db
Show file tree
Hide file tree
Showing 31 changed files with 303 additions and 277 deletions.
57 changes: 27 additions & 30 deletions src/main/java/io/kestra/plugin/gcp/gcs/AbstractBucket.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.plugin.gcp.gcs;

import com.google.cloud.storage.BucketInfo;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Rethrow;
Expand All @@ -20,39 +20,38 @@
@EqualsAndHashCode
@Getter
@NoArgsConstructor
abstract public class AbstractBucket extends AbstractGcs implements RunnableTask<AbstractBucket.Output> {
public abstract class AbstractBucket extends AbstractGcs implements RunnableTask<AbstractBucket.Output> {
@NotNull
@Schema(
title = "Bucket's unique name"
)
@PluginProperty(dynamic = true)
protected String name;
protected Property<String> name;

@Schema(
title = "Whether the requester pays or not.",
description = "Whether a user accessing the bucket or an object it contains should assume the transit " +
" costs related to the access."
)
protected Boolean requesterPays;
protected Property<Boolean> requesterPays;

@Schema(
title = "Whether versioning should be enabled for this bucket",
description = "When set to true, versioning is " +
" fully enabled."
)
protected Boolean versioningEnabled;
protected Property<Boolean> versioningEnabled;

@Schema(
title = "The bucket's website index page",
description = "Behaves as the bucket's directory index where missing " +
" blobs are treated as potential directories."
)
protected String indexPage;
protected Property<String> indexPage;

@Schema(
title = "The custom object to return when a requested resource is not found"
)
protected String notFoundPage;
protected Property<String> notFoundPage;

@Schema(
title = "The bucket's lifecycle configuration",
Expand All @@ -71,16 +70,15 @@ abstract public class AbstractBucket extends AbstractGcs implements RunnableTask
" determines the SLA and the cost of storage. A list of supported values is available <a" +
" href=\"https://cloud.google.com/storage/docs/storage-classes\">here</a>."
)
protected StorageClass storageClass;
protected Property<StorageClass> storageClass;

@Schema(
title = "The bucket's location",
description = "Data for blobs in the bucket resides in physical storage within" +
" this region. A list of supported values is available <a" +
" href=\"https://cloud.google.com/storage/docs/bucket-locations\">here</a>."
)
@PluginProperty(dynamic = true)
protected String location;
protected Property<String> location;

@Schema(
title = "The bucket's Cross-Origin Resource Sharing (CORS) configuration",
Expand Down Expand Up @@ -111,25 +109,24 @@ abstract public class AbstractBucket extends AbstractGcs implements RunnableTask
@Schema(
title = "The labels of this bucket"
)
@PluginProperty(dynamic = true)
protected Map<String, String> labels;
protected Property<Map<String, String>> labels;

@Schema(
title = "The default Cloud KMS key name for this bucket"
)
protected String defaultKmsKeyName;
protected Property<String> defaultKmsKeyName;

@Schema(
title = "The default event-based hold for this bucket"
)
protected Boolean defaultEventBasedHold;
protected Property<Boolean> defaultEventBasedHold;

@Schema(
title = "Retention period",
description = "If policy is not locked this value can be cleared, increased, and decreased. If policy is " +
" locked the retention period can only be increased."
)
protected Long retentionPeriod;
protected Property<Long> retentionPeriod;

@Schema(
title = "The Bucket's IAM Configuration",
Expand All @@ -146,51 +143,51 @@ abstract public class AbstractBucket extends AbstractGcs implements RunnableTask
protected Logging logging;

protected BucketInfo bucketInfo(RunContext runContext) throws Exception {
BucketInfo.Builder builder = BucketInfo.newBuilder(runContext.render(this.name));
BucketInfo.Builder builder = BucketInfo.newBuilder(runContext.render(this.name).as(String.class).orElseThrow());

if (this.requesterPays != null) {
builder.setRequesterPays(this.requesterPays);
builder.setRequesterPays(runContext.render(this.requesterPays).as(Boolean.class).orElseThrow());
}

if (this.versioningEnabled != null) {
builder.setVersioningEnabled(this.versioningEnabled);
builder.setVersioningEnabled(runContext.render(this.versioningEnabled).as(Boolean.class).orElseThrow());
}

if (this.indexPage != null) {
builder.setIndexPage(this.indexPage);
builder.setIndexPage(runContext.render(this.indexPage).as(String.class).orElseThrow());
}

if (this.notFoundPage != null) {
builder.setNotFoundPage(this.notFoundPage);
builder.setNotFoundPage(runContext.render(this.notFoundPage).as(String.class).orElseThrow());
}

if (this.lifecycleRules != null) {
builder.setLifecycleRules(BucketLifecycleRule.convert(this.lifecycleRules));
builder.setLifecycleRules(BucketLifecycleRule.convert(this.lifecycleRules, runContext));
}

if (this.storageClass != null) {
builder.setStorageClass(com.google.cloud.storage.StorageClass.valueOf(this.storageClass.toString()));
builder.setStorageClass(com.google.cloud.storage.StorageClass.valueOf(runContext.render(this.storageClass).as(StorageClass.class).orElseThrow().toString()));
}

if (this.location != null) {
builder.setLocation(runContext.render(this.location));
builder.setLocation(runContext.render(this.location).as(String.class).orElseThrow());
}

if (this.cors != null) {
builder.setCors(Cors.convert(this.cors));
}

if (this.acl != null) {
builder.setAcl(AccessControl.convert(this.acl));
builder.setAcl(AccessControl.convert(this.acl, runContext));
}

if (this.defaultAcl != null) {
builder.setDefaultAcl(AccessControl.convert(this.defaultAcl));
builder.setDefaultAcl(AccessControl.convert(this.defaultAcl, runContext));
}

if (this.labels != null) {
builder.setLabels(
this.labels.entrySet().stream()
runContext.render(this.labels).asMap(String.class, String.class).entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
Rethrow.throwFunction(e -> runContext.render(e.getValue()))
Expand All @@ -199,15 +196,15 @@ protected BucketInfo bucketInfo(RunContext runContext) throws Exception {
}

if (this.defaultKmsKeyName != null) {
builder.setDefaultKmsKeyName(this.defaultKmsKeyName);
builder.setDefaultKmsKeyName(runContext.render(this.defaultKmsKeyName).as(String.class).orElseThrow());
}

if (this.defaultEventBasedHold != null) {
builder.setDefaultEventBasedHold(this.defaultEventBasedHold);
builder.setDefaultEventBasedHold(runContext.render(this.defaultEventBasedHold).as(Boolean.class).orElseThrow());
}

if (this.retentionPeriod != null) {
builder.setRetentionPeriod(this.retentionPeriod);
builder.setRetentionPeriod(runContext.render(this.retentionPeriod).as(Long.class).orElseThrow());
}

if (this.iamConfiguration != null) {
Expand Down
23 changes: 12 additions & 11 deletions src/main/java/io/kestra/plugin/gcp/gcs/AbstractList.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Storage;
import com.google.common.collect.Iterables;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.gcp.gcs.models.Blob;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
Expand All @@ -22,21 +24,20 @@
@NoArgsConstructor
public abstract class AbstractList extends AbstractGcs implements ListInterface {
@NotNull
protected String from;
protected Property<String> from;

@Schema(
title = "If set to `true`, lists all versions of a blob. The default is `false`."
)
@PluginProperty(dynamic = true)
private Boolean allVersions;
private Property<Boolean> allVersions;

@Builder.Default
private final ListingType listingType = ListingType.DIRECTORY;
private final Property<ListingType> listingType = Property.of(ListingType.DIRECTORY);

protected String regExp;
protected Property<String> regExp;

public Spliterator<com.google.cloud.storage.Blob> iterator(Storage connection, URI from) {
Page<com.google.cloud.storage.Blob> list = connection.list(from.getAuthority(), options(from));
public Spliterator<com.google.cloud.storage.Blob> iterator(Storage connection, URI from, RunContext runContext) throws IllegalVariableEvaluationException {
Page<com.google.cloud.storage.Blob> list = connection.list(from.getAuthority(), options(from, runContext));

return list.iterateAll().spliterator();
}
Expand All @@ -45,18 +46,18 @@ protected boolean filter(com.google.cloud.storage.Blob blob, String regExp) {
return regExp == null || Blob.uri(blob).toString().matches(regExp);
}

private Storage.BlobListOption[] options(URI from) {
private Storage.BlobListOption[] options(URI from, RunContext runContext) throws IllegalVariableEvaluationException {
java.util.List<Storage.BlobListOption> options = new ArrayList<>();

if (!from.getPath().equals("")) {
options.add(Storage.BlobListOption.prefix(from.getPath().substring(1)));
}

if (this.allVersions != null) {
options.add(Storage.BlobListOption.versions(this.allVersions));
options.add(Storage.BlobListOption.versions(runContext.render(this.allVersions).as(Boolean.class).orElseThrow()));
}

if (this.listingType == ListingType.DIRECTORY) {
if (runContext.render(this.listingType).as(ListingType.class).orElseThrow() == ListingType.DIRECTORY) {
options.add(Storage.BlobListOption.currentDirectory());
}

Expand Down
7 changes: 3 additions & 4 deletions src/main/java/io/kestra/plugin/gcp/gcs/ActionInterface.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.gcp.gcs;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;

import jakarta.validation.constraints.NotNull;
Expand All @@ -9,15 +10,13 @@ public interface ActionInterface {
@Schema(
title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering."
)
@PluginProperty(dynamic = true)
@NotNull
ActionInterface.Action getAction();
Property<Action> getAction();

@Schema(
title = "The destination directory for `MOVE` action."
)
@PluginProperty(dynamic = true)
String getMoveDirectory();
Property<String> getMoveDirectory();

enum Action {
MOVE,
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/kestra/plugin/gcp/gcs/Compose.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -35,7 +36,7 @@
tasks:
- id: compose
type: io.kestra.plugin.gcp.gcs.Compose
list:
list:
from: "gs://my_bucket/dir/"
to: "gs://my_bucket/destination/my-compose-file.txt"
"""
Expand All @@ -56,22 +57,21 @@ public class Compose extends AbstractGcs implements RunnableTask<Compose.Output>
@Schema(
title = "The destination path"
)
@PluginProperty(dynamic = true)
private String to;
private Property<String> to;

@Schema(
title = "if `true`, don't failed if no result"
)
@PluginProperty
@Builder.Default
private Boolean allowEmpty = false;
private Property<Boolean> allowEmpty = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {
Storage connection = this.connection(runContext);
Logger logger = runContext.logger();

URI to = encode(runContext, this.to);
URI to = encode(runContext, runContext.render(this.to).as(String.class).orElse(null));

// target
BlobInfo destination = BlobInfo
Expand All @@ -88,14 +88,14 @@ public Output run(RunContext runContext) throws Exception {
.serviceAccount(this.serviceAccount)
.scopes(this.scopes)
.from(this.list.getFrom())
.filter(ListInterface.Filter.FILES)
.filter(Property.of(ListInterface.Filter.FILES))
.listingType(this.list.getListingType())
.regExp(this.list.getRegExp())
.build();

io.kestra.plugin.gcp.gcs.List.Output run = listActions.run(runContext);

if (run.getBlobs().size() == 0 && this.allowEmpty == false) {
if (run.getBlobs().size() == 0 && runContext.render(this.allowEmpty).as(Boolean.class).orElse(false).equals(false)) {
throw new FileNotFoundException("No files founds");
}

Expand Down Expand Up @@ -127,14 +127,14 @@ public static class Output implements io.kestra.core.models.tasks.Output {
@Getter
public static class List implements ListInterface {
@NotNull
private String from;
private Property<String> from;

@Builder.Default
private final io.kestra.plugin.gcp.gcs.List.Filter filter = ListInterface.Filter.BOTH;
private final Property<io.kestra.plugin.gcp.gcs.List.Filter> filter = Property.of(Filter.BOTH);

@Builder.Default
private final io.kestra.plugin.gcp.gcs.List.ListingType listingType = ListInterface.ListingType.DIRECTORY;
private final Property<io.kestra.plugin.gcp.gcs.List.ListingType> listingType = Property.of(ListingType.DIRECTORY);

private String regExp;
private Property<String> regExp;
}
}
Loading

0 comments on commit d6a52db

Please sign in to comment.