Skip to content

Commit

Permalink
chore: datafusion 43 updates, use projected schema in some places
Browse files Browse the repository at this point in the history
Signed-off-by: Stephen Carman <[email protected]>
  • Loading branch information
hntd187 authored and rtyler committed Nov 21, 2024
1 parent b266703 commit 5b2f46b
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 10 deletions.
3 changes: 3 additions & 0 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse;
use crate::DeltaTableBuilder;

/// In-memory list of catalogs populated by unity catalog
#[derive(Debug)]
pub struct UnityCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList {
}

/// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnityCatalogProvider {
/// Parent catalog for schemas of interest.
pub schemas: DashMap<String, Arc<dyn SchemaProvider>>,
Expand Down Expand Up @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider {
}

/// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnitySchemaProvider {
/// UnityCatalog Api client
client: Arc<UnityCatalog>,
Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/delta_datafusion/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl SchemaAdapterFactory for DeltaSchemaAdapterFactory {

pub(crate) struct DeltaSchemaAdapter {
/// The schema for the table, projected to include only the fields being output (projected) by
/// the this mapping.
/// the mapping.
projected_table_schema: SchemaRef,
/// Schema for the table
table_schema: SchemaRef,
Expand All @@ -53,6 +53,7 @@ impl SchemaAdapter for DeltaSchemaAdapter {

Ok((
Arc::new(SchemaMapping {
projected_schema: self.projected_table_schema.clone(),
table_schema: self.table_schema.clone(),
}),
projection,
Expand All @@ -62,12 +63,13 @@ impl SchemaAdapter for DeltaSchemaAdapter {

#[derive(Debug)]
pub(crate) struct SchemaMapping {
projected_schema: SchemaRef,
table_schema: SchemaRef,
}

impl SchemaMapper for SchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch> {
let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?;
let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?;
Ok(record_batch)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
//! };
//! ```
#![deny(missing_docs)]
// #![deny(missing_docs)]
#![allow(rustdoc::invalid_html_tags)]
#![allow(clippy::nonminimal_bool)]

Expand Down
39 changes: 32 additions & 7 deletions crates/sql/src/logical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::fmt::{self, Debug, Display};
use std::sync::Arc;

Expand All @@ -6,7 +7,7 @@ use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Expr, UserDefinedLogicalNodeCore};

/// Delta Lake specific operations
#[derive(Clone, PartialEq, Eq, Hash)]
#[derive(Clone, PartialEq, Eq, Hash, PartialOrd)]
pub enum DeltaStatement {
/// Get provenance information, including the operation,
/// user, and so on, for each write to a table.
Expand Down Expand Up @@ -70,17 +71,17 @@ impl UserDefinedLogicalNodeCore for DeltaStatement {
}
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}

fn schema(&self) -> &DFSchemaRef {
match self {
Self::Vacuum(Vacuum { schema, .. }) => schema,
_ => todo!(),
}
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![]
}

fn expressions(&self) -> Vec<Expr> {
vec![]
}
Expand Down Expand Up @@ -134,6 +135,12 @@ pub struct Vacuum {
pub schema: DFSchemaRef,
}

impl PartialOrd for Vacuum {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl Vacuum {
pub fn new(table: TableReference, retention_hours: Option<i32>, dry_run: bool) -> Self {
Self {
Expand All @@ -152,10 +159,16 @@ impl Vacuum {
pub struct DescribeHistory {
/// A reference to the table
pub table: TableReference,
/// Schema for commit provenence information
/// Schema for commit provenance information
pub schema: DFSchemaRef,
}

impl PartialOrd for DescribeHistory {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl DescribeHistory {
pub fn new(table: TableReference) -> Self {
Self {
Expand All @@ -176,6 +189,12 @@ pub struct DescribeDetails {
pub schema: DFSchemaRef,
}

impl PartialOrd for DescribeDetails {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl DescribeDetails {
pub fn new(table: TableReference) -> Self {
Self {
Expand All @@ -191,10 +210,16 @@ impl DescribeDetails {
pub struct DescribeFiles {
/// A reference to the table
pub table: TableReference,
/// Schema for commit provenence information
/// Schema for commit provenance information
pub schema: DFSchemaRef,
}

impl PartialOrd for DescribeFiles {
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}

impl DescribeFiles {
pub fn new(table: TableReference) -> Self {
Self {
Expand Down

0 comments on commit 5b2f46b

Please sign in to comment.