diff --git a/src/bin/main.rs b/src/bin/main.rs index 4950148..5cff5d0 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -43,6 +43,8 @@ enum Command { Convert { #[structopt(short, long)] single_file: bool, + #[structopt(short, long)] + zstd: bool, #[structopt(parse(from_os_str))] input: PathBuf, #[structopt(parse(from_os_str))] @@ -133,10 +135,11 @@ async fn execute_command(cmd: Command) -> Result<(), Error> { single_file, input, output, + zstd, } => { let input_filename = parse_filename(&input)?; let output_filename = parse_filename(&output)?; - convert_files(&ctx, input_filename, output_filename, single_file).await?; + convert_files(&ctx, input_filename, output_filename, single_file, zstd).await?; } Command::Query { table, diff --git a/src/convert.rs b/src/convert.rs index c99b9f0..f89e226 100644 --- a/src/convert.rs +++ b/src/convert.rs @@ -15,10 +15,24 @@ pub async fn convert_files( ctx: &SessionContext, input_filename: &str, output_filename: &str, - one_file: bool + single_file: bool, + zstd: bool, ) -> Result, Error> { let df = register_table(ctx, "t", input_filename).await?; - let write_options = DataFrameWriteOptions::default().with_single_file_output(one_file); + let write_options = DataFrameWriteOptions::default().with_single_file_output(single_file); + let props = if zstd { + WriterProperties::builder() + .set_created_by("bdt".to_string()) + .set_encoding(Encoding::PLAIN) + .set_compression(Compression::ZSTD(ZstdLevel::try_new(8)?)) + .build() + } else { + WriterProperties::builder() + .set_created_by("bdt".to_string()) + .set_encoding(Encoding::PLAIN) + .build() + }; + match file_format(output_filename)? { FileFormat::Avro => Err(Error::General( "Conversion to Avro is not supported".to_string(), @@ -32,7 +46,7 @@ pub async fn convert_files( .await .map_err(|e| e.into()), FileFormat::Parquet => df - .write_parquet(output_filename, write_options, None) + .write_parquet(output_filename, write_options, Some(props)) .await .map_err(|e| e.into()), FileFormat::Arrow => unimplemented!(),