Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix/none-action #92

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/main/java/io/kestra/plugin/fs/ftp/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,41 @@
" moveDirectory: \"/archive/\"",
}
),
@Example(
title = "Wait for one or more files in a given FTP server's directory and process each of these files sequentially. Delete files manually after processing to prevent infinite triggering.",
full = true,
code = {
"id: ftp_trigger_flow",
"namespace: dev",
"",
"tasks:",
" - id: for_each_file",
" type: io.kestra.core.tasks.flows.EachSequential",
" value: \"{{ trigger.files | jq('.name') }}\"",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ taskrun.value }}\"",
" - id: delete",
" type: io.kestra.plugin.fs.ftp.Delete",
" host: localhost",
" port: 21",
" username: foo",
" password: bar",
" uri: \"/in/{{ taskrun.value }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.fs.ftp.Trigger",
" host: localhost",
" port: 21",
" username: foo",
" password: bar",
" from: \"/in/\"",
" interval: PT10S",
" action: NONE",
}
),
@Example(
title = "Wait for one or more files in a given FTP server's directory and process each of these files sequentially. In this example, we restrict the trigger to only wait for CSV files in the `mydir` directory.",
full = true,
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/kestra/plugin/fs/sftp/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,41 @@
" moveDirectory: \"/archive/\"",
}
),
@Example(
title = "Wait for one or more files in a given SFTP server's directory and process each of these files sequentially. Delete files manually after processing to prevent infinite triggering.",
full = true,
code = {
"id: sftp_trigger_flow",
"namespace: dev",
"",
"tasks:",
" - id: for_each_file",
" type: io.kestra.core.tasks.flows.EachSequential",
" value: \"{{ trigger.files | jq('.path') }}\"",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ taskrun.value }}\"",
" - id: delete",
" type: io.kestra.plugin.fs.sftp.Delete",
" host: localhost",
" port: 6622",
" username: foo",
" password: bar",
" uri: \"/in/{{ taskrun.value }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.fs.sftp.Trigger",
" host: localhost",
" port: 6622",
" username: foo",
" password: bar",
" from: \"/in/\"",
" interval: PT10S",
" action: NONE",
}
),
@Example(
title = "Wait for one or more files in a given SFTP server's directory and process each of these files sequentially. In this example, we restrict the trigger to only wait for CSV files in the `mydir` directory.",
full = true,
Expand Down
37 changes: 37 additions & 0 deletions src/main/java/io/kestra/plugin/fs/smb/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,43 @@
" moveDirectory: \"/archive_share/\"",
}
),
@Example(
title = """
Wait for one or more files in a given SMB server's directory and process each of these files sequentially.
Then move them to another share which is used as an archive.""",
full = true,
code = {
"id: smb_trigger_flow",
"namespace: dev",
"",
"tasks:",
" - id: for_each_file",
" type: io.kestra.core.tasks.flows.EachSequential",
" value: \"{{ trigger.files | jq('.path') }}\"",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ taskrun.value }}\"",
" - id: delete",
" type: io.kestra.plugin.fs.smb.Delete",
" host: localhost",
" port: 445",
" username: foo",
" password: bar",
" uri: \"/my_share/in/{{ taskrun.value }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.fs.smb.Trigger",
" host: localhost",
" port: 445",
" username: foo",
" password: bar",
" from: \"/my_share/in/\"",
" interval: PT10S",
" action: NONE"
}
),
@Example(
title = """
Wait for one or more files in a given SMB server's directory (composed of share name followed by dir path) and process each of these files sequentially.
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/kestra/plugin/fs/vfs/Downloads.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class Downloads extends AbstractVfsTask implements RunnableTask<
private String from;

@Schema(
title = "The action to do on find files"
title = "The action to do on downloaded files"
)
@PluginProperty(dynamic = true)
private Downloads.Action action;
Expand Down Expand Up @@ -106,7 +106,7 @@ public Output run(RunContext runContext) throws Exception {
.collect(Collectors.toList());

if (this.action != null) {
VfsService.archive(
VfsService.performAction(
runContext,
fsm,
fileSystemOptions,
Expand All @@ -125,7 +125,8 @@ public Output run(RunContext runContext) throws Exception {

public enum Action {
MOVE,
DELETE
DELETE,
NONE
}


Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/fs/vfs/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract class Trigger extends AbstractTrigger implements PollingTriggerI
private String from;

@Schema(
title = "The action to do on find files"
title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering."
)
@PluginProperty(dynamic = true)
@NotNull
Expand Down Expand Up @@ -155,7 +155,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.collect(Collectors.toList());

if (this.action != null) {
VfsService.archive(
VfsService.performAction(
runContext,
fsm,
fileSystemOptions,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/fs/vfs/VfsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public static Move.Output move(
}
}

public static void archive(
public static void performAction(
RunContext runContext,
StandardFileSystemManager fsm,
FileSystemOptions fileSystemOptions,
Expand Down
73 changes: 64 additions & 9 deletions src/test/java/io/kestra/plugin/fs/AbstractFileTriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.kestra.core.schedulers.DefaultScheduler;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.plugin.fs.vfs.List;
import io.kestra.plugin.fs.vfs.Upload;
import io.kestra.plugin.fs.vfs.models.File;
import io.micronaut.context.ApplicationContext;
Expand All @@ -30,7 +31,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest
// FIXME Remove once Worker closing has been reworked (Micronaut 4 PR)
// We need to rebuild the context for each tests as currently Workers can't be closed properly (they keep listening to queues they shouldn't)
@MicronautTest(rebuildContext = true)
public abstract class AbstractFileTriggerTest {
@Inject
private ApplicationContext applicationContext;
Expand All @@ -55,12 +58,12 @@ public abstract class AbstractFileTriggerTest {
@Value("${kestra.variables.globals.random}")
protected String random;

abstract public Upload.Output upload(String to) throws Exception;

abstract protected String triggeringFlowId();

abstract protected AbstractUtils utils();

@Test
void flow() throws Exception {
void moveAction() throws Exception {
// mock flow listeners
CountDownLatch queueCount = new CountDownLatch(1);

Expand All @@ -86,19 +89,71 @@ void flow() throws Exception {


String out1 = FriendlyId.createFriendlyId();
upload("/upload/" + random + "/" + out1);
String toUploadDir = "/upload/" + random;
utils().upload(toUploadDir + "/" + out1);
String out2 = FriendlyId.createFriendlyId();
utils().upload(toUploadDir + "/" + out2);

worker.run();
scheduler.run();
repositoryLoader.load(Objects.requireNonNull(AbstractFileTriggerTest.class.getClassLoader().getResource("flows")));

boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));

@SuppressWarnings("unchecked")
java.util.List<File> trigger = (java.util.List<File>) last.get().getTrigger().getVariables().get("files");
assertThat(trigger.size(), is(2));

assertThat(utils().list(toUploadDir).getFiles().isEmpty(), is(true));
assertThat(utils().list(toUploadDir + "-move").getFiles().size(), is(2));
}
}

@Test
void noneAction() throws Exception {
// mock flow listeners
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
Worker worker = new Worker(applicationContext, 8, null);
try (
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.triggerState
);
) {
AtomicReference<Execution> last = new AtomicReference<>();

// wait for execution
executionQueue.receive(AbstractFileTriggerTest.class, execution -> {
if (execution.getLeft().getFlowId().equals(triggeringFlowId() + "-none-action")){
last.set(execution.getLeft());

queueCount.countDown();
}
});


String out1 = FriendlyId.createFriendlyId();
String toUploadDir = "/upload/" + random + "-none";
utils().upload(toUploadDir + "/" + out1);
String out2 = FriendlyId.createFriendlyId();
upload("/upload/" + random + "/" + out2);
utils().upload(toUploadDir + "/" + out2);

worker.run();
scheduler.run();
repositoryLoader.load(Objects.requireNonNull(AbstractFileTriggerTest.class.getClassLoader().getResource("flows")));

queueCount.await(1, TimeUnit.MINUTES);
boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));

@SuppressWarnings("unchecked")
java.util.List<File> trigger = (java.util.List<File>) last.get().getTrigger().getVariables().get("files");
assertThat(trigger.size(), is(2));

assertThat(utils().list(toUploadDir).getFiles().size(), is(2));
}
}

Expand Down Expand Up @@ -135,9 +190,9 @@ void missing() throws Exception {
Thread.sleep(1000);

String out1 = FriendlyId.createFriendlyId();
upload("/upload/" + random + "/" + out1);
utils().upload("/upload/" + random + "/" + out1);

queueCount.await(1, TimeUnit.MINUTES);
queueCount.await(10, TimeUnit.SECONDS);

@SuppressWarnings("unchecked")
java.util.List<URI> trigger = (java.util.List<URI>) last.get().getTrigger().getVariables().get("files");
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/io/kestra/plugin/fs/AbstractUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.devskiller.friendly_id.FriendlyId;
import io.kestra.core.storages.StorageInterface;
import io.kestra.plugin.fs.sftp.Upload;
import io.kestra.plugin.fs.vfs.List;
import jakarta.inject.Inject;

import java.io.File;
Expand Down Expand Up @@ -32,4 +33,6 @@ public Upload.Output upload(String to) throws Exception {
}

abstract public Upload.Output upload(URI source, String to) throws Exception;

abstract public List.Output list(String dir) throws Exception;
}
39 changes: 35 additions & 4 deletions src/test/java/io/kestra/plugin/fs/ftp/DownloadsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ class DownloadsTest {
private String random;

@Test
void run() throws Exception {
void run_DeleteAfterDownloads() throws Exception {
String toUploadDir = "/upload/" + random;

String out1 = FriendlyId.createFriendlyId();
ftpUtils.upload("/upload/" + random + "/" + out1 + ".txt");
ftpUtils.upload(toUploadDir + "/" + out1 + ".txt");
String out2 = FriendlyId.createFriendlyId();
ftpUtils.upload("/upload/" + random + "/" + out2 + ".txt");
ftpUtils.upload(toUploadDir + "/" + out2 + ".txt");

Downloads task = Downloads.builder()
.id(DownloadsTest.class.getSimpleName())
.type(DownloadsTest.class.getName())
.from("/upload/" + random + "/")
.from(toUploadDir + "/")
.action(Downloads.Action.DELETE)
.host("localhost")
.port("6621")
Expand All @@ -46,5 +48,34 @@ void run() throws Exception {

assertThat(run.getFiles().size(), is(2));
assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt"));

assertThat(ftpUtils.list(toUploadDir).getFiles().isEmpty(), is(true));
}

@Test
void run_NoneAfterDownloads() throws Exception {
String toUploadDir = "/upload/" + random;
String out1 = FriendlyId.createFriendlyId();
ftpUtils.upload(toUploadDir + "/" + out1 + ".txt");
String out2 = FriendlyId.createFriendlyId();
ftpUtils.upload(toUploadDir + "/" + out2 + ".txt");

Downloads task = Downloads.builder()
.id(DownloadsTest.class.getSimpleName())
.type(DownloadsTest.class.getName())
.from(toUploadDir + "/")
.action(Downloads.Action.NONE)
.host("localhost")
.port("6621")
.username("guest")
.password("guest")
.build();

Downloads.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

assertThat(run.getFiles().size(), is(2));
assertThat(run.getFiles().get(0).getPath().getPath(), endsWith(".txt"));

assertThat(ftpUtils.list(toUploadDir).getFiles().size(), is(2));
}
}
Loading