Skip to content

Commit

Permalink
feat: upgrade to FastCSV 2.2.2
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Jan 9, 2024
1 parent 0ca244c commit d85a2e2
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 79 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ dependencies {

compileOnly group: 'ch.qos.logback', name: 'logback-classic'

api 'de.siegmar:fastcsv:1.0.3'
api 'de.siegmar:fastcsv:2.2.2'
api 'org.apache.avro:avro:1.11.3'
api group: 'org.apache.parquet', name: 'parquet-avro', version: '1.13.1'
api group: 'org.apache.poi', name: 'poi', version: '5.2.5'
Expand Down
66 changes: 28 additions & 38 deletions src/main/java/io/kestra/plugin/serdes/csv/CsvReader.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
package io.kestra.plugin.serdes.csv;

import de.siegmar.fastcsv.reader.CsvParser;
import de.siegmar.fastcsv.reader.CsvRow;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Single;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
Expand All @@ -16,12 +12,12 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.*;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -50,13 +46,13 @@ public class CsvReader extends Task implements RunnableTask<CsvReader.Output> {
@Schema(
title = "The field separator character"
)
private final Character fieldSeparator = ",".charAt(0);
private final Character fieldSeparator = ',';

@Builder.Default
@Schema(
title = "The text delimiter character"
)
private final Character textDelimiter = "\"".charAt(0);
private final Character textDelimiter = '"';

@Builder.Default
@Schema(
Expand Down Expand Up @@ -86,7 +82,6 @@ public class CsvReader extends Task implements RunnableTask<CsvReader.Output> {
public Output run(RunContext runContext) throws Exception {
// reader
URI from = new URI(runContext.render(this.from));
de.siegmar.fastcsv.reader.CsvReader csvReader = this.csvReader();

// temp file
File tempFile = runContext.tempFile(".ion").toFile();
Expand All @@ -95,12 +90,19 @@ public Output run(RunContext runContext) throws Exception {
AtomicInteger skipped = new AtomicInteger();

try (
CsvParser csvParser = csvReader.parse(new InputStreamReader(runContext.uriToInputStream(from), charset));
de.siegmar.fastcsv.reader.CsvReader csvReader = this.csvReader(new InputStreamReader(runContext.uriToInputStream(from), charset));
OutputStream output = new FileOutputStream(tempFile);
) {
Map<Integer, String> headers = new HashMap<>();
Flowable<Object> flowable = Flowable
.create(this.nextRow(csvParser), BackpressureStrategy.BUFFER)
.fromIterable(csvReader)
.filter(csvRow -> {
if (header && csvRow.getOriginalLineNumber() == 1) {
for (int i = 0; i < csvRow.getFieldCount(); i++) {
headers.put(i, csvRow.getField(i));
}
return false;
}
if (this.skipRows > 0 && skipped.get() < this.skipRows) {
skipped.incrementAndGet();
return false;
Expand All @@ -110,10 +112,13 @@ public Output run(RunContext runContext) throws Exception {
})
.map(r -> {
if (header) {
return r.getFieldMap();
} else {
return r.getFields();
Map<String, Object> fields = new HashMap<>();
for (Map.Entry<Integer, String> header : headers.entrySet()) {
fields.put(header.getValue(), r.getField(header.getKey()));
}
return fields;
}
return r.getFields();
})
.doOnNext(row -> FileSerde.write(output, row));

Expand All @@ -140,40 +145,25 @@ public static class Output implements io.kestra.core.models.tasks.Output {
private URI uri;
}

private FlowableOnSubscribe<CsvRow> nextRow(CsvParser csvParser) {
return s -> {
CsvRow row;
while ((row = csvParser.nextRow()) != null) {
s.onNext(row);
}

s.onComplete();
};
}

private de.siegmar.fastcsv.reader.CsvReader csvReader() {
de.siegmar.fastcsv.reader.CsvReader csvReader = new de.siegmar.fastcsv.reader.CsvReader();

if (this.header != null) {
csvReader.setContainsHeader(this.header);
}
private de.siegmar.fastcsv.reader.CsvReader csvReader(InputStreamReader inputStreamReader) {
var builder = de.siegmar.fastcsv.reader.CsvReader.builder();

if (this.textDelimiter != null) {
csvReader.setTextDelimiter(textDelimiter);
builder.quoteCharacter(textDelimiter);
}

if (this.fieldSeparator != null) {
csvReader.setFieldSeparator(fieldSeparator);
builder.fieldSeparator(fieldSeparator);
}

if (this.skipEmptyRows != null) {
csvReader.setSkipEmptyRows(skipEmptyRows);
builder.skipEmptyRows(skipEmptyRows);
}

if (this.errorOnDifferentFieldCount != null) {
csvReader.setErrorOnDifferentFieldCount(errorOnDifferentFieldCount);
builder.errorOnDifferentFieldCount(errorOnDifferentFieldCount);
}

return csvReader;
return builder.build(inputStreamReader);
}
}
60 changes: 21 additions & 39 deletions src/main/java/io/kestra/plugin/serdes/csv/CsvWriter.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.kestra.plugin.serdes.csv;

import de.siegmar.fastcsv.writer.CsvAppender;
import io.kestra.core.validations.DateFormat;
import de.siegmar.fastcsv.writer.LineDelimiter;
import de.siegmar.fastcsv.writer.QuoteStrategy;
import io.kestra.plugin.serdes.AbstractTextWriter;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
Expand All @@ -10,24 +10,16 @@
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.ArrayUtils;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;

import java.io.BufferedReader;
import java.io.File;
import java.io.InputStreamReader;
import java.io.*;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -58,19 +50,19 @@ public class CsvWriter extends AbstractTextWriter implements RunnableTask<CsvWri
@Schema(
title = "The field separator character"
)
private final Character fieldSeparator = ",".charAt(0);
private final Character fieldSeparator = ',';

@Builder.Default
@Schema(
title = "The text delimiter character"
)
private final Character textDelimiter = "\"".charAt(0);
private final Character textDelimiter = '"';

@Builder.Default
@Schema(
title = "The character used to separate rows"
)
private final Character[] lineDelimiter = ArrayUtils.toObject("\n".toCharArray());
private final String lineDelimiter = "\n";

@Builder.Default
@Schema(
Expand All @@ -90,9 +82,6 @@ public Output run(RunContext runContext) throws Exception {
// temp file
File tempFile = runContext.tempFile(".csv").toFile();

// writer
de.siegmar.fastcsv.writer.CsvWriter csvWriter = this.csvWriter();

// reader
URI from = new URI(runContext.render(this.from));

Expand All @@ -101,7 +90,8 @@ public Output run(RunContext runContext) throws Exception {

try (
BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)));
CsvAppender csvAppender = csvWriter.append(tempFile, Charset.forName(this.charset));
Writer fileWriter = new FileWriter(tempFile, Charset.forName(this.charset));
de.siegmar.fastcsv.writer.CsvWriter csvWriter = this.csvWriter(fileWriter)
) {
Flowable<Object> flowable = Flowable
.create(FileSerde.reader(inputStream), BackpressureStrategy.BUFFER)
Expand All @@ -118,37 +108,29 @@ public void accept(Object row) throws Exception {
throw new IllegalArgumentException("Invalid data of type List with header");
}

for (final Object value : casted) {
csvAppender.appendField(convert(value));
}
var rows = casted.stream().map(field -> convert(field)).toList();
csvWriter.writeRow(rows);
} else if (row instanceof Map) {
Map<String, Object> casted = (Map<String, Object>) row;

if (!first) {
this.first = true;
if (header) {
for (final String value : casted.keySet()) {
csvAppender.appendField(value);
}
csvAppender.endLine();
var rows = casted.keySet().stream().map(field -> convert(field)).toList();
csvWriter.writeRow(rows);
}
}

for (final Object value : casted.values()) {
csvAppender.appendField(convert(value));
}
var rows = casted.values().stream().map(field -> convert(field)).toList();
csvWriter.writeRow(rows);
}

csvAppender.endLine();
}
});

// metrics & finalize
Single<Long> count = flowable.count();
Long lineCount = count.blockingGet();
runContext.metric(Counter.of("records", lineCount));

csvAppender.flush();
}

return Output
Expand All @@ -166,14 +148,14 @@ public static class Output implements io.kestra.core.models.tasks.Output {
private URI uri;
}

private de.siegmar.fastcsv.writer.CsvWriter csvWriter() {
de.siegmar.fastcsv.writer.CsvWriter csvWriter = new de.siegmar.fastcsv.writer.CsvWriter();
private de.siegmar.fastcsv.writer.CsvWriter csvWriter(Writer writer) {
var builder = de.siegmar.fastcsv.writer.CsvWriter.builder();

csvWriter.setTextDelimiter(this.textDelimiter);
csvWriter.setFieldSeparator(this.fieldSeparator);
csvWriter.setLineDelimiter(ArrayUtils.toPrimitive(this.lineDelimiter));
csvWriter.setAlwaysDelimitText(this.alwaysDelimitText);
builder.quoteCharacter(this.textDelimiter);
builder.fieldSeparator(this.fieldSeparator);
builder.lineDelimiter(LineDelimiter.of(this.lineDelimiter));
builder.quoteStrategy(this.alwaysDelimitText ? QuoteStrategy.ALWAYS : QuoteStrategy.REQUIRED);

return csvWriter;
return builder.build(writer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private void test(String file, boolean header) throws Exception {
.from(readerRunOutput.getUri().toString())
.fieldSeparator(";".charAt(0))
.alwaysDelimitText(true)
.lineDelimiter(ArrayUtils.toObject((file.equals("csv/insurance_sample.csv") ? "\r\n" : "\n").toCharArray()))
.lineDelimiter((file.equals("csv/insurance_sample.csv") ? "\r\n" : "\n"))
.header(header)
.build();
CsvWriter.Output writerRunOutput = writer.run(TestsUtils.mockRunContext(runContextFactory, writer, ImmutableMap.of()));
Expand Down

0 comments on commit d85a2e2

Please sign in to comment.