Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: storage now have a namespace parameter #148

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=0.20.0-SNAPSHOT
kestraVersion=[0.18,)
kestraVersion=[0.20,)
1 change: 1 addition & 0 deletions src/test/java/io/kestra/plugin/serdes/SerdesUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public URI resourceToStorageObject(String file) throws URISyntaxException, IOExc

public URI resourceToStorageObject(File file) throws URISyntaxException, IOException {
return storageInterface.put(
null,
null,
new URI("/" + FriendlyId.createFriendlyId()),
new FileInputStream(file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void fullCsv() throws Exception {
IonToAvro.Output avroRunOutput = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

assertThat(
IonToAvroTest.avroSize(this.storageInterface.get(null, avroRunOutput.getUri())),
IonToAvroTest.avroSize(this.storageInterface.get(null, null, avroRunOutput.getUri())),
is(IonToAvroTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(IonToAvroTest.class.getClassLoader()
.getResource("csv/full.avro"))
Expand Down Expand Up @@ -105,7 +105,7 @@ void fullJson() throws Exception {
IonToAvro.Output avroRunOutput = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

assertThat(
IonToAvroTest.avroSize(this.storageInterface.get(null, avroRunOutput.getUri())),
IonToAvroTest.avroSize(this.storageInterface.get(null, null, avroRunOutput.getUri())),
is(IonToAvroTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(IonToAvroTest.class.getClassLoader()
.getResource("csv/full.avro"))
Expand Down Expand Up @@ -335,7 +335,7 @@ void jsonAliases() throws Exception {
IonToAvro.Output avroRunOutput = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

assertThat(
IonToAvroTest.avroSize(this.storageInterface.get(null, avroRunOutput.getUri())),
IonToAvroTest.avroSize(this.storageInterface.get(null, null, avroRunOutput.getUri())),
is(IonToAvroTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(IonToAvroTest.class.getClassLoader()
.getResource("csv/portfolio_aliases.avro"))
Expand All @@ -344,7 +344,7 @@ void jsonAliases() throws Exception {
);

DatumReader<GenericRecord> datumReader = new GenericDatumReader<>();
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(this.storageInterface.get(null, avroRunOutput.getUri()), datumReader);
DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(this.storageInterface.get(null, null, avroRunOutput.getUri()), datumReader);
dataFileReader.forEach(genericRecord -> {
GenericRecord scenario = ((GenericRecord) ((GenericRecord) genericRecord.get("it")).get("selectedScenario"));
assertThat(scenario.get("nbJH_DTP_Sales"), notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private void test(String file) throws Exception {
IonToAvro.Output run = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

assertThat(
IonToAvroTest.avroSize(this.storageInterface.get(null, run.getUri())),
IonToAvroTest.avroSize(this.storageInterface.get(null, null, run.getUri())),
is(IonToAvroTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(IonToAvroTest.class.getClassLoader()
.getResource(file))
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/kestra/plugin/serdes/avro/IonToAvroTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ void array() throws Exception {

void test(String file) throws Exception {
URI source = storageInterface.put(
null,
null,
new URI("/" + FriendlyId.createFriendlyId()),
new FileInputStream(new File(Objects.requireNonNull(IonToAvroTest.class.getClassLoader()
Expand All @@ -75,7 +76,7 @@ void test(String file) throws Exception {
IonToAvro.Output run = task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

assertThat(
IonToAvroTest.avroSize(this.storageInterface.get(null, run.getUri())),
IonToAvroTest.avroSize(this.storageInterface.get(null, null, run.getUri())),
is(IonToAvroTest.avroSize(
new FileInputStream(new File(Objects.requireNonNull(IonToAvroTest.class.getClassLoader()
.getResource("csv/insurance_sample.avro"))
Expand Down Expand Up @@ -115,7 +116,7 @@ void ion() throws Exception {
)
.forEach(throwConsumer(row -> FileSerde.write(output, row)));

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToAvro writer = IonToAvro.builder()
.id(IonToAvro.class.getSimpleName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private void test(String file, boolean header) throws Exception {
IonToCsv.Output writerRunOutput = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

assertThat(
CharStreams.toString(new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri()))),
CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri()))),
is(CharStreams.toString(new InputStreamReader(new FileInputStream(sourceFile))))
);
}
Expand Down
12 changes: 6 additions & 6 deletions src/test/java/io/kestra/plugin/serdes/csv/IonToCsvTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void map() throws Exception {
)
.forEach(throwConsumer(row -> FileSerde.write(output, row)));

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToCsv writer = IonToCsv.builder()
.id(IonToCsvTest.class.getSimpleName())
Expand All @@ -76,7 +76,7 @@ void map() throws Exception {
.build();
IonToCsv.Output writerRunOutput = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

String out = CharStreams.toString(new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri())));
String out = CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri())));

assertThat(out, containsString("\"string\";\"int\""));
assertThat(out, containsString("\"3.2\";\"" + ZonedDateTime.now().getYear()));
Expand Down Expand Up @@ -109,7 +109,7 @@ void list() throws Exception {
)
.forEach(throwConsumer(row -> FileSerde.write(output, row)));

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToCsv writer = IonToCsv.builder()
.id(IonToCsvTest.class.getSimpleName())
Expand All @@ -121,7 +121,7 @@ void list() throws Exception {
.build();
IonToCsv.Output writerRunOutput = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

String out = CharStreams.toString(new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri())));
String out = CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri())));

assertThat(out, containsString("\"3.2\";\"" + ZonedDateTime.now().getYear()));
assertThat(out, containsString("\"3.4\";\"" + ZonedDateTime.now().getYear()));
Expand Down Expand Up @@ -150,7 +150,7 @@ void ion() throws Exception {
)
.forEach(throwConsumer(row -> FileSerde.write(output, row)));

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToCsv writer = IonToCsv.builder()
.id(IonToAvro.class.getSimpleName())
Expand All @@ -164,7 +164,7 @@ void ion() throws Exception {
IonToCsv.Output run = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

assertThat(
IOUtils.toString(this.storageInterface.get(null, run.getUri()), Charsets.UTF_8),
IOUtils.toString(this.storageInterface.get(null, null, run.getUri()), Charsets.UTF_8),
is("\"string\"," +
"\"2\"," +
"\"3.200000047683716\"," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void ion() throws Exception {
.build();
ExcelToIon.Output ionOutput = reader.run(TestsUtils.mockRunContext(runContextFactory, reader, ImmutableMap.of()));

String out = CharStreams.toString(new InputStreamReader(storageInterface.get(null, ionOutput.getUris().get("Worksheet"))));
String out = CharStreams.toString(new InputStreamReader(storageInterface.get(null, null, ionOutput.getUris().get("Worksheet"))));

assertThat(out, containsString("policyID:\"333743\""));
assertThat(out, containsString("point_latitude:30.102261"));
Expand Down Expand Up @@ -87,6 +87,7 @@ void multiSheets() throws Exception {
String outWorkSheet1 = CharStreams.toString(
new InputStreamReader(
storageInterface.get(
null,
null,
ionOutput.getUris().get("Worksheet_1")
)
Expand All @@ -99,6 +100,7 @@ void multiSheets() throws Exception {
String outWorkSheet2 = CharStreams.toString(
new InputStreamReader(
storageInterface.get(
null,
null,
ionOutput.getUris().get("Worksheet_2")
)
Expand All @@ -111,6 +113,7 @@ void multiSheets() throws Exception {
String outWorkSheet3 = CharStreams.toString(
new InputStreamReader(
storageInterface.get(
null,
null,
ionOutput.getUris().get("Worksheet_3")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private void test(String inputIonResourcePath, String expectedExcelResourcePath,
.build();
IonToExcel.Output excelOutput = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

XSSFWorkbook actual = new XSSFWorkbook(storageInterface.get(null, (URI) excelOutput.getUri()));
XSSFWorkbook actual = new XSSFWorkbook(storageInterface.get(null, null, (URI) excelOutput.getUri()));
XSSFWorkbook expected = new XSSFWorkbook(new FileInputStream(SerdesUtils.resourceToFile(expectedExcelResourcePath)));
assertThat(actual, WorkbookMatcher.sameWorkbook(expected));
}
Expand Down Expand Up @@ -101,7 +101,7 @@ void large() throws Exception {
}
}

URI put = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI put = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToExcel writer = IonToExcel.builder()
.id(IonToExcel.class.getSimpleName())
Expand Down Expand Up @@ -144,7 +144,7 @@ void styles() throws Exception {
}
}

URI put = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI put = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToExcel writer = IonToExcel.builder()
.id(IonToExcel.class.getSimpleName())
Expand Down Expand Up @@ -197,7 +197,7 @@ void multiSheets() throws Exception {
)
);

XSSFWorkbook actual = new XSSFWorkbook(storageInterface.get(null, excelOutput.getUri()));
XSSFWorkbook actual = new XSSFWorkbook(storageInterface.get(null, null, excelOutput.getUri()));
XSSFWorkbook expected = new XSSFWorkbook(
new FileInputStream(
SerdesUtils.resourceToFile("excel/insurance_sample_multiple_sheets.xlsx")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void newLine() throws Exception {
IonToJson.Output writerRunOutput = this.writer(readerRunOutput.getUri(), true);

assertThat(
mapper.readTree(new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri()))),
mapper.readTree(new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri()))),
is(mapper.readTree(new InputStreamReader(new FileInputStream(sourceFile))))
);
assertThat(
Expand Down Expand Up @@ -105,7 +105,7 @@ void array() throws Exception {
IonToJson.Output writerRunOutput = this.writer(readerRunOutput.getUri(), false);

assertThat(
mapper.readTree(new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri()))),
mapper.readTree(new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri()))),
is(mapper.readTree(new InputStreamReader(new FileInputStream(sourceFile))))
);
}
Expand All @@ -119,7 +119,7 @@ void object() throws Exception {


List<Map> objects = Arrays.asList(mapper.readValue(
new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri())),
new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri())),
Map[].class
));

Expand Down Expand Up @@ -149,7 +149,7 @@ void ion() throws Exception {
)
.forEach(throwConsumer(row -> FileSerde.write(output, row)));

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToJson writer = IonToJson.builder()
.id(IonToAvro.class.getSimpleName())
Expand All @@ -160,7 +160,7 @@ void ion() throws Exception {
IonToJson.Output run = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

assertThat(
IOUtils.toString(this.storageInterface.get(null, run.getUri()), Charsets.UTF_8),
IOUtils.toString(this.storageInterface.get(null, null, run.getUri()), Charsets.UTF_8),
is("{" +
"\"String\":\"string\"," +
"\"Int\":2," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void ion() throws Exception {
)
.forEach(throwConsumer(row -> FileSerde.write(output, row)));

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToParquet writer = IonToParquet.builder()
.id(IonToParquet.class.getSimpleName())
Expand All @@ -77,7 +77,7 @@ void ion() throws Exception {
ParquetToIon.Output readerOutput = reader.run(TestsUtils.mockRunContext(runContextFactory, reader, ImmutableMap.of()));

List<Map<String, Object>> result = new ArrayList<>();
FileSerde.reader(new BufferedReader(new InputStreamReader(storageInterface.get(null, readerOutput.getUri()))), r -> result.add((Map<String, Object>) r));
FileSerde.reader(new BufferedReader(new InputStreamReader(storageInterface.get(null, null, readerOutput.getUri()))), r -> result.add((Map<String, Object>) r));

assertThat(result.size(), is(1));
assertThat(result.get(0).get("String"), is("string"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void bookWithQuery() throws Exception {
IonToXml.Output writerRunOutput = this.writer(readerRunOutput.getUri());

assertThat(
IOUtils.toString(new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri()))),
IOUtils.toString(new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri()))),
is(IOUtils.toString(new FileInputStream(resultFile), Charsets.UTF_8))
);
}
Expand All @@ -90,7 +90,7 @@ void docbook() throws Exception {
IonToXml.Output writerRunOutput = this.writer(readerRunOutput.getUri());

assertThat(
IOUtils.toString(new InputStreamReader(storageInterface.get(null, writerRunOutput.getUri()))),
IOUtils.toString(new InputStreamReader(storageInterface.get(null, null, writerRunOutput.getUri()))),
is(IOUtils.toString(new FileInputStream(resultFile), Charsets.UTF_8))
);
}
Expand All @@ -117,7 +117,7 @@ void ion() throws Exception {
)
.forEach(throwConsumer(row -> FileSerde.write(output, row)));

URI uri = storageInterface.put(null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));
URI uri = storageInterface.put(null, null, URI.create("/" + IdUtils.create() + ".ion"), new FileInputStream(tempFile));

IonToXml writer = IonToXml.builder()
.id(IonToAvro.class.getSimpleName())
Expand All @@ -129,7 +129,7 @@ void ion() throws Exception {
IonToXml.Output run = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));

assertThat(
IOUtils.toString(this.storageInterface.get(null, run.getUri()), Charsets.UTF_8),
IOUtils.toString(this.storageInterface.get(null, null, run.getUri()), Charsets.UTF_8),
is("<?xml version='1.0' encoding='UTF-8'?>\n<items>\n <item>\n " +
"<String>string</String>\n " +
"<Int>2</Int>\n " +
Expand Down