Skip to content

Commit

Permalink
- Integrate PR#30 to support benchmarking and bringup of Parquet cach…
Browse files Browse the repository at this point in the history
…e configurations

Signed-off-by: shamb0 <[email protected]>
  • Loading branch information
shamb0 committed Oct 8, 2024
1 parent 0a47f64 commit b87f231
Show file tree
Hide file tree
Showing 16 changed files with 500 additions and 311 deletions.
42 changes: 40 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ strum = { version = "0.26.3", features = ["derive"] }
supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "c32abb7" }
thiserror = "1.0.63"
uuid = "1.10.0"
heapless = "0.7.16"

[dev-dependencies]
aws-config = "1.5.6"
Expand Down
66 changes: 35 additions & 31 deletions src/api/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use anyhow::Result;
use anyhow::{anyhow, Result};
use duckdb::types::Value;
use pgrx::*;

use crate::duckdb::connection;
use crate::duckdb::utils;
use crate::env::get_global_connection;
use crate::with_connection;
use duckdb::Connection;

type SniffCsvRow = (
Option<String>,
Expand Down Expand Up @@ -62,34 +64,36 @@ pub fn sniff_csv(

#[inline]
fn sniff_csv_impl(files: &str, sample_size: Option<i64>) -> Result<Vec<SniffCsvRow>> {
let schema_str = vec![
Some(utils::format_csv(files)),
sample_size.map(|s| s.to_string()),
]
.into_iter()
.flatten()
.collect::<Vec<String>>()
.join(", ");
let conn = unsafe { &*connection::get_global_connection().get() };
let query = format!("SELECT * FROM sniff_csv({schema_str})");
let mut stmt = conn.prepare(&query)?;
with_connection!(|conn: &Connection| {
let schema_str = vec![
Some(utils::format_csv(files)),
sample_size.map(|s| s.to_string()),
]
.into_iter()
.flatten()
.collect::<Vec<String>>()
.join(", ");

Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<i32>>(4)?,
row.get::<_, Option<bool>>(5)?,
row.get::<_, Option<Value>>(6)?.map(|v| format!("{:?}", v)),
row.get::<_, Option<String>>(7)?,
row.get::<_, Option<String>>(8)?,
row.get::<_, Option<String>>(9)?,
row.get::<_, Option<String>>(10)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<SniffCsvRow>>())
let query = format!("SELECT * FROM sniff_csv({schema_str})");
let mut stmt = conn.prepare(&query)?;

Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<i32>>(4)?,
row.get::<_, Option<bool>>(5)?,
row.get::<_, Option<Value>>(6)?.map(|v| format!("{:?}", v)),
row.get::<_, Option<String>>(7)?,
row.get::<_, Option<String>>(8)?,
row.get::<_, Option<String>>(9)?,
row.get::<_, Option<String>>(10)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<SniffCsvRow>>())
})
}
34 changes: 19 additions & 15 deletions src/api/duckdb.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use pgrx::*;

use crate::duckdb::connection;
use crate::env::get_global_connection;
use crate::with_connection;
use duckdb::Connection;

type DuckdbSettingsRow = (
Option<String>,
Expand Down Expand Up @@ -36,19 +39,20 @@ pub fn duckdb_settings() -> iter::TableIterator<

#[inline]
fn duckdb_settings_impl() -> Result<Vec<DuckdbSettingsRow>> {
let conn = unsafe { &*connection::get_global_connection().get() };
let mut stmt = conn.prepare("SELECT * FROM duckdb_settings()")?;
with_connection!(|conn: &Connection| {
let mut stmt = conn.prepare("SELECT * FROM duckdb_settings()")?;

Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<DuckdbSettingsRow>>())
Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<DuckdbSettingsRow>>())
})
}
86 changes: 45 additions & 41 deletions src/api/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use anyhow::Result;
use anyhow::{anyhow, Result};
use pgrx::*;

use crate::duckdb::connection;
use crate::duckdb::utils;
use crate::env::get_global_connection;
use crate::with_connection;
use duckdb::Connection;

type ParquetSchemaRow = (
Option<String>,
Expand Down Expand Up @@ -87,49 +89,51 @@ pub fn parquet_schema(

#[inline]
fn parquet_schema_impl(files: &str) -> Result<Vec<ParquetSchemaRow>> {
let schema_str = utils::format_csv(files);
let conn = unsafe { &*connection::get_global_connection().get() };
let query = format!("SELECT * FROM parquet_schema({schema_str})");
let mut stmt = conn.prepare(&query)?;
with_connection!(|conn: &Connection| {
let schema_str = utils::format_csv(files);
let query = format!("SELECT * FROM parquet_schema({schema_str})");
let mut stmt = conn.prepare(&query)?;

Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<i64>>(5)?,
row.get::<_, Option<String>>(6)?,
row.get::<_, Option<i64>>(7)?,
row.get::<_, Option<i64>>(8)?,
row.get::<_, Option<i64>>(9)?,
row.get::<_, Option<String>>(10)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<ParquetSchemaRow>>())
Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<i64>>(5)?,
row.get::<_, Option<String>>(6)?,
row.get::<_, Option<i64>>(7)?,
row.get::<_, Option<i64>>(8)?,
row.get::<_, Option<i64>>(9)?,
row.get::<_, Option<String>>(10)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<ParquetSchemaRow>>())
})
}

#[inline]
fn parquet_describe_impl(files: &str) -> Result<Vec<ParquetDescribeRow>> {
let schema_str = utils::format_csv(files);
let conn = unsafe { &*connection::get_global_connection().get() };
let query = format!("DESCRIBE SELECT * FROM {schema_str}");
let mut stmt = conn.prepare(&query)?;
with_connection!(|conn: &Connection| {
let schema_str = utils::format_csv(files);
let query = format!("DESCRIBE SELECT * FROM {schema_str}");
let mut stmt = conn.prepare(&query)?;

Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<String>>(5)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<ParquetDescribeRow>>())
Ok(stmt
.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<String>>(5)?,
))
})?
.map(|row| row.unwrap())
.collect::<Vec<ParquetDescribeRow>>())
})
}
Loading

0 comments on commit b87f231

Please sign in to comment.