Skip to content

Commit

Permalink
feat(snowflake): add download & upload from stage
Browse files Browse the repository at this point in the history
  • Loading branch information
tchiotludo committed Mar 31, 2022
1 parent 658981e commit 789b982
Show file tree
Hide file tree
Showing 10 changed files with 418 additions and 51 deletions.
1 change: 1 addition & 0 deletions plugin-jdbc-snowflake/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
dependencies {
implementation("net.snowflake:snowflake-jdbc:3.13.14")
implementation project(':plugin-jdbc')
implementation("javax.xml.bind:jaxb-api:2.3.1")

testImplementation project(':plugin-jdbc').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.kestra.plugin.jdbc.snowflake;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcConnection;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.io.IOException;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractSnowflakeConnection extends AbstractJdbcConnection implements SnowflakeInterface {
@Override
protected void registerDriver() throws SQLException {
DriverManager.registerDriver(new net.snowflake.client.jdbc.SnowflakeDriver());
}

@Override
protected Properties connectionProperties(RunContext runContext) throws IllegalVariableEvaluationException, IOException {
Properties properties = super.connectionProperties(runContext);

this.renderProperties(runContext, properties);

return properties;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.kestra.plugin.jdbc.snowflake;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import net.snowflake.client.jdbc.SnowflakeConnection;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.URI;
import java.sql.Connection;
import javax.validation.constraints.NotNull;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Download data to an internal stage"
)
@Plugin(
examples = {
@Example(
code = {
"stageName: MYSTAGE",
"fileName: prefix/destFile.csv"
}
)
}
)
public class Download extends AbstractSnowflakeConnection implements RunnableTask<Download.Output> {
private String database;
private String warehouse;
private String schema;
private String role;

@Schema(
title = "The stage name",
description = "~ or table name or stage name"
)
@PluginProperty(dynamic = true)
@NotNull
private String stageName;

@Schema(
title = "destination file name to use"
)
@PluginProperty(dynamic = true)
@NotNull
private String fileName;

@Schema(
title = "compress data or not before uploading stream"
)
@PluginProperty(dynamic = false)
@NotNull
@Builder.Default
private Boolean compress = true;

@Override
public Download.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();
File tempFile = runContext.tempFile().toFile();

try (
Connection conn = this.connection(runContext);
BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile))
) {
String stageName = runContext.render(this.stageName);
String filename = runContext.render(this.fileName);

logger.info("Starting download from stage '{}' with name '{}'", stageName, filename);

InputStream inputStream = conn
.unwrap(SnowflakeConnection.class)
.downloadStream(
stageName,
filename,
this.compress
);

IOUtils.copyLarge(inputStream, outputStream);

outputStream.flush();

return Output
.builder()
.uri(runContext.putTempFile(tempFile))
.build();
}
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The url of the file on kestra storage"
)
private final URI uri;
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package io.kestra.plugin.jdbc.snowflake;

import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.io.IOException;
import java.sql.DriverManager;
Expand Down Expand Up @@ -75,9 +75,4 @@ protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
protected void registerDriver() throws SQLException {
DriverManager.registerDriver(new net.snowflake.client.jdbc.SnowflakeDriver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.kestra.plugin.jdbc.snowflake;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;
import net.snowflake.client.jdbc.SnowflakeConnection;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import java.io.InputStream;
import java.net.URI;
import java.sql.Connection;
import javax.validation.constraints.NotNull;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Upload data to an internal stage"
)
@Plugin(
examples = {
@Example(
code = {
"stageName: MYSTAGE",
"prefix: testUploadStream",
"fileName: destFile.csv"
}
)
}
)
public class Upload extends AbstractSnowflakeConnection implements RunnableTask<Upload.Output> {
private String database;
private String warehouse;
private String schema;
private String role;

@Schema(
title = "The file to copy"
)
@PluginProperty(dynamic = true)
@NotNull
private String from;

@Schema(
title = "The stage name",
description = "~ or table name or stage name"
)
@PluginProperty(dynamic = true)
@NotNull
private String stageName;

@Schema(
title = "path / prefix under which the data should be uploaded on the stage"
)
@PluginProperty(dynamic = true)
@NotNull
private String prefix;

@Schema(
title = "destination file name to use"
)
@PluginProperty(dynamic = true)
@NotNull
private String fileName;

@Schema(
title = "compress data or not before uploading stream"
)
@PluginProperty(dynamic = false)
@NotNull
@Builder.Default
private Boolean compress = true;

@Override
public Upload.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();

URI from = new URI(runContext.render(this.from));
try (
Connection conn = this.connection(runContext);
InputStream inputStream = runContext.uriToInputStream(from);
) {
String stageName = runContext.render(this.stageName);
String prefix = runContext.render(this.prefix);
String filename = runContext.render(this.fileName);

logger.info("Starting upload to stage '{}' on '{}' with name '{}'", stageName, prefix, filename);

conn
.unwrap(SnowflakeConnection.class)
.uploadStream(
stageName,
prefix,
inputStream,
filename,
this.compress
);

return Output
.builder()
.uri(URI.create(StringUtils.stripEnd(prefix, "/") + "/" + filename + (this.compress ? ".gz" : "")))
.build();
}
}

@Builder
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "The url of the staged files"
)
private final URI uri;
}
}
Loading

0 comments on commit 789b982

Please sign in to comment.