diff --git a/Cargo.lock b/Cargo.lock index 0964284..cf6420a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -132,11 +132,10 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8919668503a4f2d8b6da96fa7c16e93046bfb3412ffcfa1e5dc7d2e3adcb378" +checksum = "aa285343fba4d829d49985bdc541e3789cf6000ed0e84be7c039438df4a4e78c" dependencies = [ - "ahash", "arrow-arith", "arrow-array", "arrow-buffer", @@ -154,9 +153,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef983914f477d4278b068f13b3224b7d19eb2b807ac9048544d3bfebdf2554c4" +checksum = "753abd0a5290c1bcade7c6623a556f7d1659c5f4148b140b5b63ce7bd1a45705" dependencies = [ "arrow-array", "arrow-buffer", @@ -169,9 +168,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6eaf89041fa5937940ae390294ece29e1db584f46d995608d6e5fe65a2e0e9b" +checksum = "d390feeb7f21b78ec997a4081a025baef1e2e0d6069e181939b61864c9779609" dependencies = [ "ahash", "arrow-buffer", @@ -186,9 +185,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55512d988c6fbd76e514fd3ff537ac50b0a675da5a245e4fdad77ecfd654205f" +checksum = "69615b061701bcdffbc62756bc7e85c827d5290b472b580c972ebbbf690f5aa4" dependencies = [ "bytes", "half", @@ -197,15 +196,16 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655ee51a2156ba5375931ce21c1b2494b1d9260e6dcdc6d4db9060c37dc3325b" +checksum = "e448e5dd2f4113bf5b74a1f26531708f5edcacc77335b7066f9398f4bcf4cdef" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", + "base64", "chrono", "comfy-table 7.0.1", "half", @@ -215,9 +215,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "258bb689997ad5b6660b3ce3638bd6b383d668ec555ed41ad7c6559cbb2e4f91" +checksum = "46af72211f0712612f5b18325530b9ad1bfbdc87290d5fbfd32a7da128983781" dependencies = [ "arrow-array", "arrow-buffer", @@ -234,9 +234,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6dc2b9fec74763427e2e5575b8cc31ce96ba4c9b4eb05ce40e0616d9fad12461" +checksum = "67d644b91a162f3ad3135ce1184d0a31c28b816a581e08f29e8e9277a574c64e" dependencies = [ "arrow-buffer", "arrow-schema", @@ -246,9 +246,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6eaa6ab203cc6d89b7eaa1ac781c1dfeef325454c5d5a0419017f95e6bafc03c" +checksum = "03dea5e79b48de6c2e04f03f62b0afea7105be7b77d134f6c5414868feefb80d" dependencies = [ "arrow-array", "arrow-buffer", @@ -256,13 +256,14 @@ dependencies = [ "arrow-data", "arrow-schema", "flatbuffers", + "lz4_flex", ] [[package]] name = "arrow-json" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb64e30d9b73f66fdc5c52d5f4cf69bbf03d62f64ffeafa0715590a5320baed7" +checksum = "8950719280397a47d37ac01492e3506a8a724b3fb81001900b866637a829ee0f" dependencies = [ "arrow-array", "arrow-buffer", @@ -280,9 +281,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a818951c0d11c428dda03e908175969c262629dd20bd0850bd6c7a8c3bfe48" +checksum = "1ed9630979034077982d8e74a942b7ac228f33dd93a93b615b4d02ad60c260be" dependencies = [ "arrow-array", "arrow-buffer", @@ -295,9 +296,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5d664318bc05f930559fc088888f0f7174d3c5bc888c0f4f9ae8f23aa398ba3" +checksum = "007035e17ae09c4e8993e4cb8b5b96edf0afb927cd38e2dff27189b274d83dcf" dependencies = [ "ahash", "arrow-array", @@ -310,15 +311,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aaf4d737bba93da59f16129bec21e087aed0be84ff840e74146d4703879436cb" +checksum = "0ff3e9c01f7cd169379d269f926892d0e622a704960350d09d331be3ec9e0029" [[package]] name = "arrow-select" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374c4c3b812ecc2118727b892252a4a4308f87a8aca1dbf09f3ce4bc578e668a" +checksum = "1ce20973c1912de6514348e064829e50947e35977bb9d7fb637dc99ea9ffd78c" dependencies = [ "ahash", "arrow-array", @@ -330,9 +331,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15aed5624bb23da09142f58502b59c23f5bea607393298bb81dab1ce60fc769" +checksum = "00f3b37f2aeece31a2636d1b037dabb69ef590e03bdc7eb68519b51ec86932a7" dependencies = [ "arrow-array", "arrow-buffer", @@ -758,14 +759,15 @@ dependencies = [ [[package]] name = "datafusion" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "676796427e638d85e9eadf13765705212be60b34f8fc5d3934d95184c63ca1b4" +checksum = "4328f5467f76d890fe3f924362dbc3a838c6a733f762b32d87f9e0b7bef5fb49" dependencies = [ "ahash", "apache-avro", "arrow", "arrow-array", + "arrow-ipc", "arrow-schema", "async-compression", "async-trait", @@ -807,9 +809,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e23b3d21a6531259d291bd20ce59282ea794bda1018b0a1e278c13cd52e50c" +checksum = "d29a7752143b446db4a2cccd9a6517293c6b97e8c39e520ca43ccd07135a4f7e" dependencies = [ "ahash", "apache-avro", @@ -819,6 +821,7 @@ dependencies = [ "arrow-schema", "chrono", "half", + "libc", "num_cpus", "object_store", "parquet", @@ -827,9 +830,9 @@ dependencies = [ [[package]] name = "datafusion-execution" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de1fd0d8db0f2b8e4f4121bfa1c7c09d3a5c08a0a65c2229cd849eb65cff855" +checksum = "2d447650af16e138c31237f53ddaef6dd4f92f0e2d3f2f35d190e16c214ca496" dependencies = [ "arrow", "chrono", @@ -848,14 +851,15 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18e227fe88bf6730cab378d0cd8fc4c6b2ea42bc7e414a8ea9feba7225932735" +checksum = "d8d19598e48a498850fb79f97a9719b1f95e7deb64a7a06f93f313e8fa1d524b" dependencies = [ "ahash", "arrow", "arrow-array", "datafusion-common", + "paste", "sqlparser", "strum 0.25.0", "strum_macros 0.25.3", @@ -863,9 +867,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6648e62ea7605b9bfcd87fdc9d67e579c3b9ac563a87734ae5fe6d79ee4547" +checksum = "8b7feb0391f1fc75575acb95b74bfd276903dc37a5409fcebe160bc7ddff2010" dependencies = [ "arrow", "async-trait", @@ -881,9 +885,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f32b8574add16a32411a9b3fb3844ac1fc09ab4e7be289f86fd56d620e4f2508" +checksum = "e911bca609c89a54e8f014777449d8290327414d3e10c57a3e3c2122e38878d0" dependencies = [ "ahash", "arrow", @@ -902,7 +906,6 @@ dependencies = [ "hex", "indexmap 2.0.0", "itertools", - "libc", "log", "md-5", "paste", @@ -916,9 +919,9 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "796abd77d5bfecd9e5275a99daf0ec45f5b3a793ec431349ce8211a67826fd22" +checksum = "e96b546b8a02e9c2ab35ac6420d511f12a4701950c1eb2e568c122b4fefb0be3" dependencies = [ "ahash", "arrow", @@ -947,9 +950,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "33.0.0" +version = "35.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ced70b8a5648ba7b95c61fc512183c33287ffe2c9f22ffe22700619d7d48c84f" +checksum = "2d18d36f260bbbd63aafdb55339213a23d540d3419810575850ef0a798a6b768" dependencies = [ "arrow", "arrow-schema", @@ -1311,9 +1314,9 @@ checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" [[package]] name = "itertools" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" dependencies = [ "either", ] @@ -1624,9 +1627,9 @@ dependencies = [ [[package]] name = "object_store" -version = "0.7.1" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" +checksum = "d139f545f64630e2e3688fd9f81c470888ab01edeb72d13b4e86c566f1130000" dependencies = [ "async-trait", "bytes", @@ -1683,9 +1686,9 @@ dependencies = [ [[package]] name = "parquet" -version = "48.0.1" +version = "50.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bfe55df96e3f02f11bf197ae37d91bb79801631f82f6195dd196ef521df3597" +checksum = "547b92ebf0c1177e3892f44c8f79757ee62e678d564a9834189725f2c5b7a750" dependencies = [ "ahash", "arrow-array", @@ -1701,6 +1704,7 @@ dependencies = [ "chrono", "flate2", "futures", + "half", "hashbrown 0.14.0", "lz4_flex", "num", @@ -2138,9 +2142,9 @@ checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "sqlparser" -version = "0.39.0" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "743b4dc2cbde11890ccb254a8fc9d537fa41b36da00de2a1c5e9848c9bc42bd7" +checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" dependencies = [ "log", "sqlparser_derive", @@ -2148,13 +2152,13 @@ dependencies = [ [[package]] name = "sqlparser_derive" -version = "0.1.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55fe75cb4a364c7f7ae06c7dbbc8d84bddd85d6cdf9975963c3935bc1991761e" +checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.28", ] [[package]] @@ -2341,24 +2345,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.29.1" +version = "1.36.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" +checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" dependencies = [ - "autocfg", "backtrace", "bytes", "num_cpus", - "parking_lot", "pin-project-lite", "tokio-macros", ] [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 81aaf97..21fab6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] comfy-table = "6.1.2" -datafusion = { version = "33.0", features = ["avro"] } +datafusion = { version = "35.0", features = ["avro"] } structopt = "0.3" -tokio = "1.19" -thiserror = "1" +tokio = { version = "1.36", features = ["rt-multi-thread"] } +thiserror = "1" \ No newline at end of file diff --git a/src/bin/main.rs b/src/bin/main.rs index 974668a..42f6a46 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -41,6 +41,12 @@ enum Command { }, /// Convert a file to a different format Convert { + /// Output in a single file + #[structopt(short, long)] + single_file: bool, + /// Enable Zstd compression + #[structopt(short, long)] + zstd: bool, #[structopt(parse(from_os_str))] input: PathBuf, #[structopt(parse(from_os_str))] @@ -127,10 +133,15 @@ async fn execute_command(cmd: Command) -> Result<(), Error> { let df = ctx.sql(sql).await?; df.show().await?; } - Command::Convert { input, output } => { + Command::Convert { + single_file, + input, + output, + zstd, + } => { let input_filename = parse_filename(&input)?; let output_filename = parse_filename(&output)?; - convert_files(&ctx, input_filename, output_filename).await?; + convert_files(&ctx, input_filename, output_filename, single_file, zstd).await?; } Command::Query { table, diff --git a/src/convert.rs b/src/convert.rs index e492370..f89e226 100644 --- a/src/convert.rs +++ b/src/convert.rs @@ -1,16 +1,38 @@ use crate::utils::{file_format, register_table}; use crate::{Error, FileFormat}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::dataframe::DataFrameWriteOptions; + use datafusion::prelude::SessionContext; +use datafusion::{ + arrow::record_batch::RecordBatch, + dataframe::DataFrameWriteOptions, + parquet::{ + basic::{Compression, Encoding, ZstdLevel}, + file::properties::WriterProperties, + }, +}; pub async fn convert_files( ctx: &SessionContext, input_filename: &str, output_filename: &str, + single_file: bool, + zstd: bool, ) -> Result, Error> { let df = register_table(ctx, "t", input_filename).await?; - let write_options = DataFrameWriteOptions::default(); + let write_options = DataFrameWriteOptions::default().with_single_file_output(single_file); + let props = if zstd { + WriterProperties::builder() + .set_created_by("bdt".to_string()) + .set_encoding(Encoding::PLAIN) + .set_compression(Compression::ZSTD(ZstdLevel::try_new(8)?)) + .build() + } else { + WriterProperties::builder() + .set_created_by("bdt".to_string()) + .set_encoding(Encoding::PLAIN) + .build() + }; + match file_format(output_filename)? { FileFormat::Avro => Err(Error::General( "Conversion to Avro is not supported".to_string(), @@ -24,7 +46,7 @@ pub async fn convert_files( .await .map_err(|e| e.into()), FileFormat::Parquet => df - .write_parquet(output_filename, write_options, None) + .write_parquet(output_filename, write_options, Some(props)) .await .map_err(|e| e.into()), FileFormat::Arrow => unimplemented!(),