Skip to content

Commit

Permalink
feat(trigger, downloads): allow 'NONE' action
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Jan 22, 2024
1 parent 5f0f181 commit 276cafb
Show file tree
Hide file tree
Showing 20 changed files with 408 additions and 45 deletions.
35 changes: 35 additions & 0 deletions src/main/java/io/kestra/plugin/fs/ftp/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,41 @@
" moveDirectory: \"/archive/\"",
}
),
@Example(
title = "Wait for one or more files in a given FTP server's directory and process each of these files sequentially. Delete files manually after processing to prevent infinite triggering.",
full = true,
code = {
"id: ftp_trigger_flow",
"namespace: dev",
"",
"tasks:",
" - id: for_each_file",
" type: io.kestra.core.tasks.flows.EachSequential",
" value: \"{{ trigger.files | jq('.name') }}\"",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ taskrun.value }}\"",
" - id: delete",
" type: io.kestra.plugin.fs.ftp.Delete",
" host: localhost",
" port: 21",
" username: foo",
" password: bar",
" uri: \"/in/{{ taskrun.value }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.fs.ftp.Trigger",
" host: localhost",
" port: 21",
" username: foo",
" password: bar",
" from: \"/in/\"",
" interval: PT10S",
" action: NONE",
}
),
@Example(
title = "Wait for one or more files in a given FTP server's directory and process each of these files sequentially. In this example, we restrict the trigger to only wait for CSV files in the `mydir` directory.",
full = true,
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/kestra/plugin/fs/sftp/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,41 @@
" moveDirectory: \"/archive/\"",
}
),
@Example(
title = "Wait for one or more files in a given SFTP server's directory and process each of these files sequentially. Delete files manually after processing to prevent infinite triggering.",
full = true,
code = {
"id: sftp_trigger_flow",
"namespace: dev",
"",
"tasks:",
" - id: for_each_file",
" type: io.kestra.core.tasks.flows.EachSequential",
" value: \"{{ trigger.files | jq('.path') }}\"",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ taskrun.value }}\"",
" - id: delete",
" type: io.kestra.plugin.fs.sftp.Delete",
" host: localhost",
" port: 6622",
" username: foo",
" password: bar",
" uri: \"/in/{{ taskrun.value }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.fs.sftp.Trigger",
" host: localhost",
" port: 6622",
" username: foo",
" password: bar",
" from: \"/in/\"",
" interval: PT10S",
" action: NONE",
}
),
@Example(
title = "Wait for one or more files in a given SFTP server's directory and process each of these files sequentially. In this example, we restrict the trigger to only wait for CSV files in the `mydir` directory.",
full = true,
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/io/kestra/plugin/fs/smb/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,43 @@
" moveDirectory: \"/archive_share/\"",
}
),
@Example(
title = """
Wait for one or more files in a given SMB server's directory and process each of these files sequentially.
Then move them to another share which is used as an archive.""",
full = true,
code = {
"id: smb_trigger_flow",
"namespace: dev",
"",
"tasks:",
" - id: for_each_file",
" type: io.kestra.core.tasks.flows.EachSequential",
" value: \"{{ trigger.files | jq('.path') }}\"",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ taskrun.value }}\"",
" - id: delete",
" type: io.kestra.plugin.fs.smb.Delete",
" host: localhost",
" port: 445",
" username: foo",
" password: bar",
" uri: \"/my_share/in/{{ taskrun.value }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.fs.smb.Trigger",
" host: localhost",
" port: 445",
" username: foo",
" password: bar",
" from: \"/my_share/in/\"",
" interval: PT10S",
" action: NONE"
}
),
@Example(
title = """
Wait for one or more files in a given SMB server's directory (composed of share name followed by dir path) and process each of these files sequentially.
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/kestra/plugin/fs/vfs/Downloads.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class Downloads extends AbstractVfsTask implements RunnableTask<
private String from;

@Schema(
title = "The action to do on find files"
title = "The action to do on downloaded files"
)
@PluginProperty(dynamic = true)
private Downloads.Action action;
Expand Down Expand Up @@ -106,7 +106,7 @@ public Output run(RunContext runContext) throws Exception {
.collect(Collectors.toList());

if (this.action != null) {
VfsService.archive(
VfsService.performAction(
runContext,
fsm,
fileSystemOptions,
Expand All @@ -125,7 +125,8 @@ public Output run(RunContext runContext) throws Exception {

public enum Action {
MOVE,
DELETE
DELETE,
NONE
}


Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/fs/vfs/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract class Trigger extends AbstractTrigger implements PollingTriggerI
private String from;

@Schema(
title = "The action to do on find files"
title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering."
)
@PluginProperty(dynamic = true)
@NotNull
Expand Down Expand Up @@ -155,7 +155,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.collect(Collectors.toList());

if (this.action != null) {
VfsService.archive(
VfsService.performAction(
runContext,
fsm,
fileSystemOptions,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/fs/vfs/VfsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public static Move.Output move(
}
}

public static void archive(
public static void performAction(
RunContext runContext,
StandardFileSystemManager fsm,
FileSystemOptions fileSystemOptions,
Expand Down
73 changes: 64 additions & 9 deletions src/test/java/io/kestra/plugin/fs/AbstractFileTriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kestra.core.schedulers.DefaultScheduler;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.plugin.fs.vfs.List;
import io.kestra.plugin.fs.vfs.Upload;
import io.kestra.plugin.fs.vfs.models.File;
import io.micronaut.context.ApplicationContext;
Expand All @@ -30,7 +31,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest
// FIXME Remove once Worker closing has been reworked (Micronaut 4 PR)
// We need to rebuild the context for each tests as currently Workers can't be closed properly (they keep listening to queues they shouldn't)
@MicronautTest(rebuildContext = true)
public abstract class AbstractFileTriggerTest {
@Inject
private ApplicationContext applicationContext;
Expand All @@ -55,12 +58,12 @@ public abstract class AbstractFileTriggerTest {
@Value("${kestra.variables.globals.random}")
protected String random;

abstract public Upload.Output upload(String to) throws Exception;

abstract protected String triggeringFlowId();

abstract protected AbstractUtils utils();

@Test
void flow() throws Exception {
void moveAction() throws Exception {
// mock flow listeners
CountDownLatch queueCount = new CountDownLatch(1);

Expand All @@ -86,19 +89,71 @@ void flow() throws Exception {


String out1 = FriendlyId.createFriendlyId();
upload("/upload/" + random + "/" + out1);
String toUploadDir = "/upload/" + random;
utils().upload(toUploadDir + "/" + out1);
String out2 = FriendlyId.createFriendlyId();
utils().upload(toUploadDir + "/" + out2);

worker.run();
scheduler.run();
repositoryLoader.load(Objects.requireNonNull(AbstractFileTriggerTest.class.getClassLoader().getResource("flows")));

boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));

@SuppressWarnings("unchecked")
java.util.List<File> trigger = (java.util.List<File>) last.get().getTrigger().getVariables().get("files");
assertThat(trigger.size(), is(2));

assertThat(utils().list(toUploadDir).getFiles().isEmpty(), is(true));
assertThat(utils().list(toUploadDir + "-move").getFiles().size(), is(2));
}
}

@Test
void noneAction() throws Exception {
// mock flow listeners
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
Worker worker = new Worker(applicationContext, 8, null);
try (
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.triggerState
);
) {
AtomicReference<Execution> last = new AtomicReference<>();

// wait for execution
executionQueue.receive(AbstractFileTriggerTest.class, execution -> {
if (execution.getLeft().getFlowId().equals(triggeringFlowId() + "-none-action")){
last.set(execution.getLeft());

queueCount.countDown();
}
});


String out1 = FriendlyId.createFriendlyId();
String toUploadDir = "/upload/" + random + "-none";
utils().upload(toUploadDir + "/" + out1);
String out2 = FriendlyId.createFriendlyId();
upload("/upload/" + random + "/" + out2);
utils().upload(toUploadDir + "/" + out2);

worker.run();
scheduler.run();
repositoryLoader.load(Objects.requireNonNull(AbstractFileTriggerTest.class.getClassLoader().getResource("flows")));

queueCount.await(1, TimeUnit.MINUTES);
boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));

@SuppressWarnings("unchecked")
java.util.List<File> trigger = (java.util.List<File>) last.get().getTrigger().getVariables().get("files");
assertThat(trigger.size(), is(2));

assertThat(utils().list(toUploadDir).getFiles().size(), is(2));
}
}

Expand Down Expand Up @@ -135,9 +190,9 @@ void missing() throws Exception {
Thread.sleep(1000);

String out1 = FriendlyId.createFriendlyId();
upload("/upload/" + random + "/" + out1);
utils().upload("/upload/" + random + "/" + out1);

queueCount.await(1, TimeUnit.MINUTES);
queueCount.await(10, TimeUnit.SECONDS);

@SuppressWarnings("unchecked")
java.util.List<URI> trigger = (java.util.List<URI>) last.get().getTrigger().getVariables().get("files");
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/io/kestra/plugin/fs/AbstractUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.fs.sftp.Upload;
import io.kestra.plugin.fs.vfs.List;
import jakarta.inject.Inject;

import java.io.File;
Expand Down Expand Up @@ -32,4 +33,6 @@ public Upload.Output upload(String to) throws Exception {
}

abstract public Upload.Output upload(URI source, String to) throws Exception;

abstract public List.Output list(String dir) throws Exception;
}
39 changes: 35 additions & 4 deletions src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ class DownloadsTest {
private String random;

@Test
void run() throws Exception {
void run_DeleteAfterDownloads() throws Exception {
String toUploadDir = "/upload/" + random;

String out1 = FriendlyId.createFriendlyId();
ftpUtils.upload("/upload/" + random + "/" + out1 + ".txt");
ftpUtils.upload(toUploadDir + "/" + out1 + ".txt");
String out2 = FriendlyId.createFriendlyId();
ftpUtils.upload("/upload/" + random + "/" + out2 + ".txt");
ftpUtils.upload(toUploadDir + "/" + out2 + ".txt");

Downloads task = Downloads.builder()
.id(DownloadsTest.class.getSimpleName())
.type(DownloadsTest.class.getName())
.from("/upload/" + random + "/")
.from(toUploadDir + "/")
.action(Downloads.Action.DELETE)
.host("localhost")
.port("6621")
Expand All @@ -46,5 +48,34 @@ void run() throws Exception {

assertThat(run.getFiles().size(), is(2));
assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt"));

assertThat(ftpUtils.list(toUploadDir).getFiles().isEmpty(), is(true));
}

@Test
void run_NoneAfterDownloads() throws Exception {
String toUploadDir = "/upload/" + random;
String out1 = FriendlyId.createFriendlyId();
ftpUtils.upload(toUploadDir + "/" + out1 + ".txt");
String out2 = FriendlyId.createFriendlyId();
ftpUtils.upload(toUploadDir + "/" + out2 + ".txt");

Downloads task = Downloads.builder()
.id(DownloadsTest.class.getSimpleName())
.type(DownloadsTest.class.getName())
.from(toUploadDir + "/")
.action(Downloads.Action.NONE)
.host("localhost")
.port("6621")
.username("guest")
.password("guest")
.build();

Downloads.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

assertThat(run.getFiles().size(), is(2));
assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt"));

assertThat(ftpUtils.list(toUploadDir).getFiles().size(), is(2));
}
}
Loading

0 comments on commit 276cafb

Please sign in to comment.