diff --git a/src/main/java/io/kestra/plugin/fs/ftp/Trigger.java b/src/main/java/io/kestra/plugin/fs/ftp/Trigger.java index f2a1480..d540e9c 100644 --- a/src/main/java/io/kestra/plugin/fs/ftp/Trigger.java +++ b/src/main/java/io/kestra/plugin/fs/ftp/Trigger.java @@ -51,6 +51,41 @@ " moveDirectory: \"/archive/\"", } ), + @Example( + title = "Wait for one or more files in a given FTP server's directory and process each of these files sequentially. Delete files manually after processing to prevent infinite triggering.", + full = true, + code = { + "id: ftp_trigger_flow", + "namespace: dev", + "", + "tasks:", + " - id: for_each_file", + " type: io.kestra.core.tasks.flows.EachSequential", + " value: \"{{ trigger.files | jq('.name') }}\"", + " tasks:", + " - id: return", + " type: io.kestra.core.tasks.debugs.Return", + " format: \"{{ taskrun.value }}\"", + " - id: delete", + " type: io.kestra.plugin.fs.ftp.Delete", + " host: localhost", + " port: 21", + " username: foo", + " password: bar", + " uri: \"/in/{{ taskrun.value }}\"", + "", + "triggers:", + " - id: watch", + " type: io.kestra.plugin.fs.ftp.Trigger", + " host: localhost", + " port: 21", + " username: foo", + " password: bar", + " from: \"/in/\"", + " interval: PT10S", + " action: NONE", + } + ), @Example( title = "Wait for one or more files in a given FTP server's directory and process each of these files sequentially. In this example, we restrict the trigger to only wait for CSV files in the `mydir` directory.", full = true, diff --git a/src/main/java/io/kestra/plugin/fs/sftp/Trigger.java b/src/main/java/io/kestra/plugin/fs/sftp/Trigger.java index 61bd023..83555f6 100644 --- a/src/main/java/io/kestra/plugin/fs/sftp/Trigger.java +++ b/src/main/java/io/kestra/plugin/fs/sftp/Trigger.java @@ -50,6 +50,41 @@ " moveDirectory: \"/archive/\"", } ), + @Example( + title = "Wait for one or more files in a given SFTP server's directory and process each of these files sequentially. Delete files manually after processing to prevent infinite triggering.", + full = true, + code = { + "id: sftp_trigger_flow", + "namespace: dev", + "", + "tasks:", + " - id: for_each_file", + " type: io.kestra.core.tasks.flows.EachSequential", + " value: \"{{ trigger.files | jq('.path') }}\"", + " tasks:", + " - id: return", + " type: io.kestra.core.tasks.debugs.Return", + " format: \"{{ taskrun.value }}\"", + " - id: delete", + " type: io.kestra.plugin.fs.sftp.Delete", + " host: localhost", + " port: 6622", + " username: foo", + " password: bar", + " uri: \"/in/{{ taskrun.value }}\"", + "", + "triggers:", + " - id: watch", + " type: io.kestra.plugin.fs.sftp.Trigger", + " host: localhost", + " port: 6622", + " username: foo", + " password: bar", + " from: \"/in/\"", + " interval: PT10S", + " action: NONE", + } + ), @Example( title = "Wait for one or more files in a given SFTP server's directory and process each of these files sequentially. In this example, we restrict the trigger to only wait for CSV files in the `mydir` directory.", full = true, diff --git a/src/main/java/io/kestra/plugin/fs/smb/Trigger.java b/src/main/java/io/kestra/plugin/fs/smb/Trigger.java index 0cdb07d..332d873 100644 --- a/src/main/java/io/kestra/plugin/fs/smb/Trigger.java +++ b/src/main/java/io/kestra/plugin/fs/smb/Trigger.java @@ -52,6 +52,43 @@ " moveDirectory: \"/archive_share/\"", } ), + @Example( + title = """ + Wait for one or more files in a given SMB server's directory and process each of these files sequentially. + Then move them to another share which is used as an archive.""", + full = true, + code = { + "id: smb_trigger_flow", + "namespace: dev", + "", + "tasks:", + " - id: for_each_file", + " type: io.kestra.core.tasks.flows.EachSequential", + " value: \"{{ trigger.files | jq('.path') }}\"", + " tasks:", + " - id: return", + " type: io.kestra.core.tasks.debugs.Return", + " format: \"{{ taskrun.value }}\"", + " - id: delete", + " type: io.kestra.plugin.fs.smb.Delete", + " host: localhost", + " port: 445", + " username: foo", + " password: bar", + " uri: \"/my_share/in/{{ taskrun.value }}\"", + "", + "triggers:", + " - id: watch", + " type: io.kestra.plugin.fs.smb.Trigger", + " host: localhost", + " port: 445", + " username: foo", + " password: bar", + " from: \"/my_share/in/\"", + " interval: PT10S", + " action: NONE" + } + ), @Example( title = """ Wait for one or more files in a given SMB server's directory (composed of share name followed by dir path) and process each of these files sequentially. diff --git a/src/main/java/io/kestra/plugin/fs/vfs/Downloads.java b/src/main/java/io/kestra/plugin/fs/vfs/Downloads.java index 9bd2c11..0596603 100644 --- a/src/main/java/io/kestra/plugin/fs/vfs/Downloads.java +++ b/src/main/java/io/kestra/plugin/fs/vfs/Downloads.java @@ -31,7 +31,7 @@ public abstract class Downloads extends AbstractVfsTask implements RunnableTask< private String from; @Schema( - title = "The action to do on find files" + title = "The action to do on downloaded files" ) @PluginProperty(dynamic = true) private Downloads.Action action; @@ -106,7 +106,7 @@ public Output run(RunContext runContext) throws Exception { .collect(Collectors.toList()); if (this.action != null) { - VfsService.archive( + VfsService.performAction( runContext, fsm, fileSystemOptions, @@ -125,7 +125,8 @@ public Output run(RunContext runContext) throws Exception { public enum Action { MOVE, - DELETE + DELETE, + NONE } diff --git a/src/main/java/io/kestra/plugin/fs/vfs/Trigger.java b/src/main/java/io/kestra/plugin/fs/vfs/Trigger.java index 6a5941e..ab8afbc 100644 --- a/src/main/java/io/kestra/plugin/fs/vfs/Trigger.java +++ b/src/main/java/io/kestra/plugin/fs/vfs/Trigger.java @@ -54,7 +54,7 @@ public abstract class Trigger extends AbstractTrigger implements PollingTriggerI private String from; @Schema( - title = "The action to do on find files" + title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering." ) @PluginProperty(dynamic = true) @NotNull @@ -155,7 +155,7 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo .collect(Collectors.toList()); if (this.action != null) { - VfsService.archive( + VfsService.performAction( runContext, fsm, fileSystemOptions, diff --git a/src/main/java/io/kestra/plugin/fs/vfs/VfsService.java b/src/main/java/io/kestra/plugin/fs/vfs/VfsService.java index a760bf8..f6e933b 100644 --- a/src/main/java/io/kestra/plugin/fs/vfs/VfsService.java +++ b/src/main/java/io/kestra/plugin/fs/vfs/VfsService.java @@ -251,7 +251,7 @@ public static Move.Output move( } } - public static void archive( + public static void performAction( RunContext runContext, StandardFileSystemManager fsm, FileSystemOptions fileSystemOptions, diff --git a/src/test/java/io/kestra/plugin/fs/AbstractFileTriggerTest.java b/src/test/java/io/kestra/plugin/fs/AbstractFileTriggerTest.java index b932a48..2102b11 100644 --- a/src/test/java/io/kestra/plugin/fs/AbstractFileTriggerTest.java +++ b/src/test/java/io/kestra/plugin/fs/AbstractFileTriggerTest.java @@ -11,6 +11,7 @@ import io.kestra.core.schedulers.DefaultScheduler; import io.kestra.core.schedulers.SchedulerTriggerStateInterface; import io.kestra.core.services.FlowListenersInterface; +import io.kestra.plugin.fs.vfs.List; import io.kestra.plugin.fs.vfs.Upload; import io.kestra.plugin.fs.vfs.models.File; import io.micronaut.context.ApplicationContext; @@ -30,7 +31,9 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -@MicronautTest +// FIXME Remove once Worker closing has been reworked (Micronaut 4 PR) +// We need to rebuild the context for each tests as currently Workers can't be closed properly (they keep listening to queues they shouldn't) +@MicronautTest(rebuildContext = true) public abstract class AbstractFileTriggerTest { @Inject private ApplicationContext applicationContext; @@ -55,12 +58,12 @@ public abstract class AbstractFileTriggerTest { @Value("${kestra.variables.globals.random}") protected String random; - abstract public Upload.Output upload(String to) throws Exception; - abstract protected String triggeringFlowId(); + abstract protected AbstractUtils utils(); + @Test - void flow() throws Exception { + void moveAction() throws Exception { // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); @@ -86,19 +89,71 @@ void flow() throws Exception { String out1 = FriendlyId.createFriendlyId(); - upload("/upload/" + random + "/" + out1); + String toUploadDir = "/upload/" + random; + utils().upload(toUploadDir + "/" + out1); + String out2 = FriendlyId.createFriendlyId(); + utils().upload(toUploadDir + "/" + out2); + + worker.run(); + scheduler.run(); + repositoryLoader.load(Objects.requireNonNull(AbstractFileTriggerTest.class.getClassLoader().getResource("flows"))); + + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); + + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("files"); + assertThat(trigger.size(), is(2)); + + assertThat(utils().list(toUploadDir).getFiles().isEmpty(), is(true)); + assertThat(utils().list(toUploadDir + "-move").getFiles().size(), is(2)); + } + } + + @Test + void noneAction() throws Exception { + // mock flow listeners + CountDownLatch queueCount = new CountDownLatch(1); + + // scheduler + Worker worker = new Worker(applicationContext, 8, null); + try ( + AbstractScheduler scheduler = new DefaultScheduler( + this.applicationContext, + this.flowListenersService, + this.triggerState + ); + ) { + AtomicReference last = new AtomicReference<>(); + + // wait for execution + executionQueue.receive(AbstractFileTriggerTest.class, execution -> { + if (execution.getLeft().getFlowId().equals(triggeringFlowId() + "-none-action")){ + last.set(execution.getLeft()); + + queueCount.countDown(); + } + }); + + + String out1 = FriendlyId.createFriendlyId(); + String toUploadDir = "/upload/" + random + "-none"; + utils().upload(toUploadDir + "/" + out1); String out2 = FriendlyId.createFriendlyId(); - upload("/upload/" + random + "/" + out2); + utils().upload(toUploadDir + "/" + out2); worker.run(); scheduler.run(); repositoryLoader.load(Objects.requireNonNull(AbstractFileTriggerTest.class.getClassLoader().getResource("flows"))); - queueCount.await(1, TimeUnit.MINUTES); + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("files"); assertThat(trigger.size(), is(2)); + + assertThat(utils().list(toUploadDir).getFiles().size(), is(2)); } } @@ -135,9 +190,9 @@ void missing() throws Exception { Thread.sleep(1000); String out1 = FriendlyId.createFriendlyId(); - upload("/upload/" + random + "/" + out1); + utils().upload("/upload/" + random + "/" + out1); - queueCount.await(1, TimeUnit.MINUTES); + queueCount.await(10, TimeUnit.SECONDS); @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("files"); diff --git a/src/test/java/io/kestra/plugin/fs/AbstractUtils.java b/src/test/java/io/kestra/plugin/fs/AbstractUtils.java index 1dbdcf9..67caf2f 100644 --- a/src/test/java/io/kestra/plugin/fs/AbstractUtils.java +++ b/src/test/java/io/kestra/plugin/fs/AbstractUtils.java @@ -3,6 +3,7 @@ import com.devskiller.friendly_id.FriendlyId; import io.kestra.core.storages.StorageInterface; import io.kestra.plugin.fs.sftp.Upload; +import io.kestra.plugin.fs.vfs.List; import jakarta.inject.Inject; import java.io.File; @@ -32,4 +33,6 @@ public Upload.Output upload(String to) throws Exception { } abstract public Upload.Output upload(URI source, String to) throws Exception; + + abstract public List.Output list(String dir) throws Exception; } diff --git a/src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java b/src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java index db60d5e..efd8338 100644 --- a/src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java +++ b/src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java @@ -25,16 +25,18 @@ class DownloadsTest { private String random; @Test - void run() throws Exception { + void run_DeleteAfterDownloads() throws Exception { + String toUploadDir = "/upload/" + random; + String out1 = FriendlyId.createFriendlyId(); - ftpUtils.upload("/upload/" + random + "/" + out1 + ".txt"); + ftpUtils.upload(toUploadDir + "/" + out1 + ".txt"); String out2 = FriendlyId.createFriendlyId(); - ftpUtils.upload("/upload/" + random + "/" + out2 + ".txt"); + ftpUtils.upload(toUploadDir + "/" + out2 + ".txt"); Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(DownloadsTest.class.getName()) - .from("/upload/" + random + "/") + .from(toUploadDir + "/") .action(Downloads.Action.DELETE) .host("localhost") .port("6621") @@ -46,5 +48,34 @@ void run() throws Exception { assertThat(run.getFiles().size(), is(2)); assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt")); + + assertThat(ftpUtils.list(toUploadDir).getFiles().isEmpty(), is(true)); + } + + @Test + void run_NoneAfterDownloads() throws Exception { + String toUploadDir = "/upload/" + random; + String out1 = FriendlyId.createFriendlyId(); + ftpUtils.upload(toUploadDir + "/" + out1 + ".txt"); + String out2 = FriendlyId.createFriendlyId(); + ftpUtils.upload(toUploadDir + "/" + out2 + ".txt"); + + Downloads task = Downloads.builder() + .id(DownloadsTest.class.getSimpleName()) + .type(DownloadsTest.class.getName()) + .from(toUploadDir + "/") + .action(Downloads.Action.NONE) + .host("localhost") + .port("6621") + .username("guest") + .password("guest") + .build(); + + Downloads.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); + + assertThat(run.getFiles().size(), is(2)); + assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt")); + + assertThat(ftpUtils.list(toUploadDir).getFiles().size(), is(2)); } } diff --git a/src/test/java/io/kestra/plugin/fs/ftp/FtpUtils.java b/src/test/java/io/kestra/plugin/fs/ftp/FtpUtils.java index ec44c45..5c6e950 100644 --- a/src/test/java/io/kestra/plugin/fs/ftp/FtpUtils.java +++ b/src/test/java/io/kestra/plugin/fs/ftp/FtpUtils.java @@ -4,6 +4,7 @@ import io.kestra.core.runners.RunContextFactory; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.fs.AbstractUtils; +import io.kestra.plugin.fs.vfs.List; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -28,4 +29,18 @@ public Upload.Output upload(URI source, String to) throws Exception { return task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); } + + @Override + public List.Output list(String dir) throws Exception { + return io.kestra.plugin.fs.ftp.List.builder() + .id(TriggerTest.class.getSimpleName()) + .type(TriggerTest.class.getName()) + .from(dir) + .host("localhost") + .port("6621") + .username("guest") + .password("guest") + .build() + .run(this.runContextFactory.of()); + } } diff --git a/src/test/java/io/kestra/plugin/fs/ftp/TriggerTest.java b/src/test/java/io/kestra/plugin/fs/ftp/TriggerTest.java index 533ab9c..42fb48f 100644 --- a/src/test/java/io/kestra/plugin/fs/ftp/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/fs/ftp/TriggerTest.java @@ -1,7 +1,7 @@ package io.kestra.plugin.fs.ftp; import io.kestra.plugin.fs.AbstractFileTriggerTest; -import io.kestra.plugin.fs.vfs.Upload; +import io.kestra.plugin.fs.AbstractUtils; import jakarta.inject.Inject; class TriggerTest extends AbstractFileTriggerTest { @@ -9,12 +9,12 @@ class TriggerTest extends AbstractFileTriggerTest { private FtpUtils ftpUtils; @Override - public Upload.Output upload(String to) throws Exception { - return this.ftpUtils.upload(to); + protected String triggeringFlowId() { + return "ftp-listen"; } @Override - protected String triggeringFlowId() { - return "ftp-listen"; + protected AbstractUtils utils() { + return ftpUtils; } } diff --git a/src/test/java/io/kestra/plugin/fs/sftp/DownloadsTest.java b/src/test/java/io/kestra/plugin/fs/sftp/DownloadsTest.java index 64b76ec..6d7f232 100644 --- a/src/test/java/io/kestra/plugin/fs/sftp/DownloadsTest.java +++ b/src/test/java/io/kestra/plugin/fs/sftp/DownloadsTest.java @@ -26,16 +26,17 @@ class DownloadsTest { private String random; @Test - void run() throws Exception { + void run_DeleteAfterDownloads() throws Exception { String out1 = FriendlyId.createFriendlyId(); - sftpUtils.upload("/upload/" + random + "/" + out1 + ".txt"); + String toUploadDir = "/upload/" + random; + sftpUtils.upload(toUploadDir + "/" + out1 + ".txt"); String out2 = FriendlyId.createFriendlyId(); - sftpUtils.upload("/upload/" + random + "/" + out2 + ".txt"); + sftpUtils.upload(toUploadDir + "/" + out2 + ".txt"); Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(DownloadsTest.class.getName()) - .from("/upload/" + random + "/") + .from(toUploadDir + "/") .action(Downloads.Action.DELETE) .host("localhost") .port("6622") @@ -47,5 +48,34 @@ void run() throws Exception { assertThat(run.getFiles().size(), is(2)); assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt")); + + assertThat(sftpUtils.list(toUploadDir).getFiles().isEmpty(), is(true)); + } + + @Test + void run_NoneAfterDownloads() throws Exception { + String out1 = FriendlyId.createFriendlyId(); + String toUploadDir = "/upload/" + random; + sftpUtils.upload(toUploadDir + "/" + out1 + ".txt"); + String out2 = FriendlyId.createFriendlyId(); + sftpUtils.upload(toUploadDir + "/" + out2 + ".txt"); + + Downloads task = Downloads.builder() + .id(DownloadsTest.class.getSimpleName()) + .type(DownloadsTest.class.getName()) + .from(toUploadDir + "/") + .action(Downloads.Action.NONE) + .host("localhost") + .port("6622") + .username("foo") + .password("pass") + .build(); + + Downloads.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); + + assertThat(run.getFiles().size(), is(2)); + assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt")); + + assertThat(sftpUtils.list(toUploadDir).getFiles().size(), is(2)); } } diff --git a/src/test/java/io/kestra/plugin/fs/sftp/SftpUtils.java b/src/test/java/io/kestra/plugin/fs/sftp/SftpUtils.java index 39b224c..7219eab 100644 --- a/src/test/java/io/kestra/plugin/fs/sftp/SftpUtils.java +++ b/src/test/java/io/kestra/plugin/fs/sftp/SftpUtils.java @@ -29,4 +29,18 @@ public Upload.Output upload(URI source, String to) throws Exception { return task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); } + + @Override + public io.kestra.plugin.fs.vfs.List.Output list(String dir) throws Exception { + return io.kestra.plugin.fs.sftp.List.builder() + .id(TriggerTest.class.getSimpleName()) + .type(TriggerTest.class.getName()) + .from(dir) + .host("localhost") + .port("6622") + .username("foo") + .password("pass") + .build() + .run(this.runContextFactory.of()); + } } diff --git a/src/test/java/io/kestra/plugin/fs/sftp/TriggerTest.java b/src/test/java/io/kestra/plugin/fs/sftp/TriggerTest.java index 0b14a79..2883a6a 100644 --- a/src/test/java/io/kestra/plugin/fs/sftp/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/fs/sftp/TriggerTest.java @@ -7,6 +7,8 @@ import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.fs.AbstractFileTriggerTest; +import io.kestra.plugin.fs.AbstractUtils; +import io.kestra.plugin.fs.vfs.List; import io.kestra.plugin.fs.vfs.Upload; import io.kestra.plugin.fs.vfs.models.File; import jakarta.inject.Inject; @@ -26,13 +28,13 @@ public class TriggerTest extends AbstractFileTriggerTest { private SftpUtils sftpUtils; @Override - public Upload.Output upload(String to) throws Exception { - return this.sftpUtils.upload(to); + protected String triggeringFlowId() { + return "sftp-listen"; } @Override - protected String triggeringFlowId() { - return "sftp-listen"; + protected AbstractUtils utils() { + return sftpUtils; } @Test @@ -50,7 +52,7 @@ void move() throws Exception { .build(); String out = FriendlyId.createFriendlyId(); - Upload.Output upload = upload("/upload/" + random + "/" + out + ".yml"); + Upload.Output upload = utils().upload("/upload/" + random + "/" + out + ".yml"); Map.Entry context = TestsUtils.mockTrigger(runContextFactory, trigger); Optional execution = trigger.evaluate(context.getKey(), context.getValue()); diff --git a/src/test/java/io/kestra/plugin/fs/smb/DownloadsTest.java b/src/test/java/io/kestra/plugin/fs/smb/DownloadsTest.java index 95d9101..f888793 100644 --- a/src/test/java/io/kestra/plugin/fs/smb/DownloadsTest.java +++ b/src/test/java/io/kestra/plugin/fs/smb/DownloadsTest.java @@ -23,13 +23,14 @@ class DownloadsTest { @Test void run_DeleteAfterDownloads() throws Exception { String rootFolder = IdUtils.create(); - smbUtils.upload("/" + SmbUtils.SHARE_NAME + "/" + rootFolder + "/" + IdUtils.create() + ".txt"); - smbUtils.upload("/" + SmbUtils.SHARE_NAME + "/" + rootFolder + "/" + IdUtils.create() + ".txt"); + String toUploadDir = "/" + SmbUtils.SHARE_NAME + "/" + rootFolder; + smbUtils.upload(toUploadDir + "/" + IdUtils.create() + ".txt"); + smbUtils.upload(toUploadDir + "/" + IdUtils.create() + ".txt"); Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(DownloadsTest.class.getName()) - .from("/" + SmbUtils.SHARE_NAME + "/" + rootFolder) + .from(toUploadDir) .action(Downloads.Action.DELETE) .host("localhost") .port("445") @@ -42,21 +43,21 @@ void run_DeleteAfterDownloads() throws Exception { assertThat(run.getFiles().size(), is(2)); assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt")); - run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); - assertThat(run.getFiles().isEmpty(), is(true)); + assertThat(smbUtils.list(toUploadDir).getFiles().isEmpty(), is(true)); } @Test void run_MoveAfterDownloads() throws Exception { String rootFolder = IdUtils.create(); - smbUtils.upload("/" + SmbUtils.SHARE_NAME + "/" + rootFolder + "/" + IdUtils.create() + ".txt"); - smbUtils.upload("/" + SmbUtils.SHARE_NAME + "/" + rootFolder + "/" + IdUtils.create() + ".txt"); + String toUploadDir = "/" + SmbUtils.SHARE_NAME + "/" + rootFolder; + smbUtils.upload(toUploadDir + "/" + IdUtils.create() + ".txt"); + smbUtils.upload(toUploadDir + "/" + IdUtils.create() + ".txt"); String archiveShareDirectory = SmbUtils.SECOND_SHARE_NAME + "/" + rootFolder; Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(DownloadsTest.class.getName()) - .from(SmbUtils.SHARE_NAME + "/" + rootFolder) + .from(toUploadDir) .moveDirectory(archiveShareDirectory + "/") .action(Downloads.Action.MOVE) .host("localhost") @@ -79,5 +80,34 @@ void run_MoveAfterDownloads() throws Exception { run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); assertThat(run.getFiles().size(), is(2)); assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt")); + + assertThat(smbUtils.list(toUploadDir).getFiles().isEmpty(), is(true)); + assertThat(smbUtils.list(archiveShareDirectory).getFiles().size(), is(2)); + } + + @Test + void run_NoneAfterDownloads() throws Exception { + String rootFolder = IdUtils.create(); + String toUploadDir = "/" + SmbUtils.SHARE_NAME + "/" + rootFolder; + smbUtils.upload(toUploadDir + "/" + IdUtils.create() + ".txt"); + smbUtils.upload(toUploadDir + "/" + IdUtils.create() + ".txt"); + + Downloads task = Downloads.builder() + .id(DownloadsTest.class.getSimpleName()) + .type(DownloadsTest.class.getName()) + .from(toUploadDir) + .action(Downloads.Action.NONE) + .host("localhost") + .port("445") + .username("alice") + .password("alipass") + .build(); + + Downloads.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); + + assertThat(run.getFiles().size(), is(2)); + assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt")); + + assertThat(smbUtils.list(toUploadDir).getFiles().size(), is(2)); } } diff --git a/src/test/java/io/kestra/plugin/fs/smb/SmbUtils.java b/src/test/java/io/kestra/plugin/fs/smb/SmbUtils.java index f21c392..06105fd 100644 --- a/src/test/java/io/kestra/plugin/fs/smb/SmbUtils.java +++ b/src/test/java/io/kestra/plugin/fs/smb/SmbUtils.java @@ -4,6 +4,7 @@ import io.kestra.core.runners.RunContextFactory; import io.kestra.core.utils.TestsUtils; import io.kestra.plugin.fs.AbstractUtils; +import io.kestra.plugin.fs.vfs.List; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -30,4 +31,18 @@ public Upload.Output upload(URI source, String to) throws Exception { return task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of())); } + + @Override + public List.Output list(String dir) throws Exception { + return io.kestra.plugin.fs.smb.List.builder() + .id(TriggerTest.class.getSimpleName()) + .type(TriggerTest.class.getName()) + .from(dir) + .host("localhost") + .port("445") + .username("alice") + .password("alipass") + .build() + .run(this.runContextFactory.of()); + } } diff --git a/src/test/java/io/kestra/plugin/fs/smb/TriggerTest.java b/src/test/java/io/kestra/plugin/fs/smb/TriggerTest.java index 5e39c19..545037f 100644 --- a/src/test/java/io/kestra/plugin/fs/smb/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/fs/smb/TriggerTest.java @@ -1,6 +1,8 @@ package io.kestra.plugin.fs.smb; import io.kestra.plugin.fs.AbstractFileTriggerTest; +import io.kestra.plugin.fs.AbstractUtils; +import io.kestra.plugin.fs.vfs.List; import io.kestra.plugin.fs.vfs.Upload; import jakarta.inject.Inject; @@ -9,12 +11,12 @@ class TriggerTest extends AbstractFileTriggerTest { private SmbUtils smbUtils; @Override - public Upload.Output upload(String to) throws Exception { - return this.smbUtils.upload(to); + protected String triggeringFlowId() { + return "smb-listen"; } @Override - protected String triggeringFlowId() { - return "smb-listen"; + protected AbstractUtils utils() { + return smbUtils; } } diff --git a/src/test/resources/flows/ftp-listen-none-action.yaml b/src/test/resources/flows/ftp-listen-none-action.yaml new file mode 100644 index 0000000..41488c6 --- /dev/null +++ b/src/test/resources/flows/ftp-listen-none-action.yaml @@ -0,0 +1,20 @@ +id: ftp-listen-none-action +namespace: io.kestra.tests + +triggers: + - id: watch + type: io.kestra.plugin.fs.ftp.Trigger + host: localhost + port: 6621 + username: guest + password: guest + from: "/upload/{{ globals.random }}-none/" + interval: PT10S + action: NONE + passiveMode: true + moveDirectory: "/upload/{{ globals.random }}-move/" + +tasks: + - id: end + type: io.kestra.core.tasks.debugs.Return + format: "{{task.id}} > {{taskrun.startDate}}" diff --git a/src/test/resources/flows/sftp-listen-none-action.yaml b/src/test/resources/flows/sftp-listen-none-action.yaml new file mode 100644 index 0000000..3bfb3ce --- /dev/null +++ b/src/test/resources/flows/sftp-listen-none-action.yaml @@ -0,0 +1,19 @@ +id: sftp-listen-none-action +namespace: io.kestra.tests + +triggers: + - id: watch + type: io.kestra.plugin.fs.sftp.Trigger + host: localhost + port: 6622 + username: foo + password: pass + from: "/upload/{{ globals.random }}-none/" + interval: PT10S + action: NONE + moveDirectory: "/upload/{{ globals.random }}-move/" + +tasks: + - id: end + type: io.kestra.core.tasks.debugs.Return + format: "{{task.id}} > {{taskrun.startDate}}" diff --git a/src/test/resources/flows/smb-listen-none-action.yaml b/src/test/resources/flows/smb-listen-none-action.yaml new file mode 100644 index 0000000..a979d99 --- /dev/null +++ b/src/test/resources/flows/smb-listen-none-action.yaml @@ -0,0 +1,19 @@ +id: smb-listen-none-action +namespace: io.kestra.tests + +triggers: + - id: watch + type: io.kestra.plugin.fs.smb.Trigger + host: localhost + port: 445 + username: alice + password: alipass + from: "/upload/{{ globals.random }}-none/" + interval: PT10S + action: NONE + moveDirectory: "/upload/{{ globals.random }}-move/" + +tasks: + - id: end + type: io.kestra.core.tasks.debugs.Return + format: "{{task.id}} > {{taskrun.startDate}}"