Skip to content

Commit

Permalink
Merge branch 'main' into spark-integration-test
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Jan 28, 2024
2 parents 6cfc616 + 0f6790f commit f718765
Show file tree
Hide file tree
Showing 16 changed files with 557 additions and 115 deletions.
38 changes: 31 additions & 7 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,29 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
env.register_object_store(url, store.object_store());
}

pub(crate) fn logical_schema(
/// The logical schema for a Deltatable is different then protocol level schema since partiton columns must appear at the end of the schema.
/// This is to align with how partition are handled at the physical level
pub(crate) fn df_logical_schema(
snapshot: &DeltaTableState,
scan_config: &DeltaScanConfig,
) -> DeltaResult<SchemaRef> {
let input_schema = snapshot.arrow_schema()?;
let mut fields = Vec::new();
for field in input_schema.fields.iter() {
fields.push(field.to_owned());
let table_partition_cols = &snapshot.metadata().partition_columns;

let mut fields: Vec<Arc<Field>> = input_schema
.fields()
.iter()
.filter(|f| !table_partition_cols.contains(f.name()))
.cloned()
.collect();

for partition_col in table_partition_cols.iter() {
fields.push(Arc::new(
input_schema
.field_with_name(partition_col)
.unwrap()
.to_owned(),
));
}

if let Some(file_column_name) = &scan_config.file_column_name {
Expand Down Expand Up @@ -309,7 +324,7 @@ impl<'a> DeltaScanBuilder<'a> {
.await?
}
};
let logical_schema = logical_schema(self.snapshot, &config)?;
let logical_schema = df_logical_schema(self.snapshot, &config)?;

let logical_schema = if let Some(used_columns) = self.projection {
let mut fields = vec![];
Expand Down Expand Up @@ -501,7 +516,7 @@ impl DeltaTableProvider {
config: DeltaScanConfig,
) -> DeltaResult<Self> {
Ok(DeltaTableProvider {
schema: logical_schema(&snapshot, &config)?,
schema: df_logical_schema(&snapshot, &config)?,
snapshot,
log_store,
config,
Expand Down Expand Up @@ -1197,7 +1212,7 @@ pub(crate) async fn find_files_scan<'a>(
}
.build(snapshot)?;

let logical_schema = logical_schema(snapshot, &scan_config)?;
let logical_schema = df_logical_schema(snapshot, &scan_config)?;

// Identify which columns we need to project
let mut used_columns = expression
Expand Down Expand Up @@ -1750,9 +1765,17 @@ mod tests {
let provider =
DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
.unwrap();
let logical_schema = provider.schema();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(provider)).unwrap();

let expected_logical_order = vec!["value", "modified", "id"];
let actual_order: Vec<String> = logical_schema
.fields()
.iter()
.map(|f| f.name().to_owned())
.collect();

let df = ctx.sql("select * from test").await.unwrap();
let actual = df.collect().await.unwrap();
let expected = vec![
Expand All @@ -1766,6 +1789,7 @@ mod tests {
"+-------+------------+----+",
];
assert_batches_sorted_eq!(&expected, &actual);
assert_eq!(expected_logical_order, actual_order);
}

#[tokio::test]
Expand Down
12 changes: 7 additions & 5 deletions crates/deltalake-core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,11 +564,13 @@ fn max_min_schema_for_fields(dest: &mut Vec<ArrowField>, f: &ArrowField) {
max_min_schema_for_fields(&mut child_dest, f);
}

dest.push(ArrowField::new(
f.name(),
ArrowDataType::Struct(child_dest.into()),
true,
));
if !child_dest.is_empty() {
dest.push(ArrowField::new(
f.name(),
ArrowDataType::Struct(child_dest.into()),
true,
));
}
}
// don't compute min or max for list, map or binary types
ArrowDataType::List(_) | ArrowDataType::Map(_, _) | ArrowDataType::Binary => { /* noop */ }
Expand Down
101 changes: 80 additions & 21 deletions crates/deltalake-core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use object_store::ObjectStore;
use self::log_segment::{CommitData, LogSegment, PathExt};
use self::parse::{read_adds, read_removes};
use self::replay::{LogMapper, LogReplayScanner, ReplayStream};
use super::{Action, Add, CommitInfo, Metadata, Protocol, Remove};
use super::{Action, Add, CommitInfo, DataType, Metadata, Protocol, Remove, StructField};
use crate::kernel::StructType;
use crate::table::config::TableConfig;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};
Expand Down Expand Up @@ -69,8 +69,7 @@ impl Snapshot {
"Cannot read metadata from log segment".into(),
));
};
let metadata = metadata.unwrap();
let protocol = protocol.unwrap();
let (metadata, protocol) = (metadata.unwrap(), protocol.unwrap());
let schema = serde_json::from_str(&metadata.schema_string)?;
Ok(Self {
log_segment,
Expand Down Expand Up @@ -212,12 +211,7 @@ impl Snapshot {
&log_segment::CHECKPOINT_SCHEMA,
&self.config,
);
ReplayStream::try_new(
log_stream,
checkpoint_stream,
&self.schema,
self.config.clone(),
)
ReplayStream::try_new(log_stream, checkpoint_stream, &self)
}

/// Get the commit infos in the snapshot
Expand Down Expand Up @@ -288,6 +282,59 @@ impl Snapshot {
})
.boxed())
}

/// Get the statistics schema of the snapshot
pub fn stats_schema(&self) -> DeltaResult<StructType> {
let stats_fields = if let Some(stats_cols) = self.table_config().stats_columns() {
stats_cols
.iter()
.map(|col| match self.schema().field_with_name(col) {
Ok(field) => match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => {
Err(DeltaTableError::Generic(format!(
"Stats column {} has unsupported type {}",
col,
field.data_type()
)))
}
_ => Ok(StructField::new(
field.name(),
field.data_type().clone(),
true,
)),
},
_ => Err(DeltaTableError::Generic(format!(
"Stats column {} not found in schema",
col
))),
})
.collect::<Result<Vec<_>, _>>()?
} else {
let num_indexed_cols = self.table_config().num_indexed_cols();
self.schema()
.fields
.iter()
.enumerate()
.filter_map(|(idx, f)| match f.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None,
_ if num_indexed_cols < 0 || (idx as i32) < num_indexed_cols => {
Some(StructField::new(f.name(), f.data_type().clone(), true))
}
_ => None,
})
.collect()
};
Ok(StructType::new(vec![
StructField::new("numRecords", DataType::LONG, true),
StructField::new("minValues", StructType::new(stats_fields.clone()), true),
StructField::new("maxValues", StructType::new(stats_fields.clone()), true),
StructField::new(
"nullCount",
StructType::new(stats_fields.iter().filter_map(to_count_field).collect()),
true,
),
]))
}
}

/// A snapshot of a Delta table that has been eagerly loaded into memory.
Expand Down Expand Up @@ -318,7 +365,7 @@ impl EagerSnapshot {
let mut files = Vec::new();
let mut scanner = LogReplayScanner::new();
files.push(scanner.process_files_batch(&batch, true)?);
let mapper = LogMapper::try_new(snapshot.schema(), snapshot.config.clone())?;
let mapper = LogMapper::try_new(&snapshot)?;
files = files
.into_iter()
.map(|b| mapper.map_batch(b))
Expand Down Expand Up @@ -357,16 +404,11 @@ impl EagerSnapshot {
)
.boxed()
};
let mapper = LogMapper::try_new(self.snapshot.schema(), self.snapshot.config.clone())?;
let files = ReplayStream::try_new(
log_stream,
checkpoint_stream,
self.schema(),
self.snapshot.config.clone(),
)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
.await?;
let mapper = LogMapper::try_new(&self.snapshot)?;
let files = ReplayStream::try_new(log_stream, checkpoint_stream, &self.snapshot)?
.map(|batch| batch.and_then(|b| mapper.map_batch(b)))
.try_collect()
.await?;

self.files = files;
}
Expand Down Expand Up @@ -473,7 +515,7 @@ impl EagerSnapshot {
files.push(scanner.process_files_batch(&batch?, true)?);
}

let mapper = LogMapper::try_new(self.snapshot.schema(), self.snapshot.config.clone())?;
let mapper = LogMapper::try_new(&self.snapshot)?;
self.files = files
.into_iter()
.chain(
Expand All @@ -496,6 +538,23 @@ impl EagerSnapshot {
}
}

fn to_count_field(field: &StructField) -> Option<StructField> {
match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None,
DataType::Struct(s) => Some(StructField::new(
field.name(),
StructType::new(
s.fields()
.iter()
.filter_map(to_count_field)
.collect::<Vec<_>>(),
),
true,
)),
_ => Some(StructField::new(field.name(), DataType::LONG, true)),
}
}

#[cfg(feature = "datafusion")]
mod datafusion {
use datafusion_common::stats::Statistics;
Expand Down
71 changes: 12 additions & 59 deletions crates/deltalake-core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use tracing::debug;

use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
use crate::kernel::arrow::json;
use crate::kernel::{DataType, Schema, StructField, StructType};
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

use super::Snapshot;

pin_project! {
pub struct ReplayStream<S> {
scanner: LogReplayScanner,
Expand All @@ -38,60 +39,12 @@ pin_project! {
}
}

fn to_count_field(field: &StructField) -> Option<StructField> {
match field.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None,
DataType::Struct(s) => Some(StructField::new(
field.name(),
StructType::new(
s.fields()
.iter()
.filter_map(to_count_field)
.collect::<Vec<_>>(),
),
true,
)),
_ => Some(StructField::new(field.name(), DataType::LONG, true)),
}
}

pub(super) fn get_stats_schema(table_schema: &StructType) -> DeltaResult<ArrowSchemaRef> {
let data_fields: Vec<_> = table_schema
.fields
.iter()
.enumerate()
.filter_map(|(idx, f)| match f.data_type() {
DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None,
// TODO: the number of stats fields shopuld be configurable?
// or rather we should likely read all of we parse JSON?
_ if idx < 32 => Some(StructField::new(f.name(), f.data_type().clone(), true)),
_ => None,
})
.collect();
let stats_schema = StructType::new(vec![
StructField::new("numRecords", DataType::LONG, true),
StructField::new("minValues", StructType::new(data_fields.clone()), true),
StructField::new("maxValues", StructType::new(data_fields.clone()), true),
StructField::new(
"nullCount",
StructType::new(data_fields.iter().filter_map(to_count_field).collect()),
true,
),
]);
Ok(std::sync::Arc::new((&stats_schema).try_into()?))
}

impl<S> ReplayStream<S> {
pub(super) fn try_new(
commits: S,
checkpoint: S,
table_schema: &Schema,
config: DeltaTableConfig,
) -> DeltaResult<Self> {
let stats_schema = get_stats_schema(table_schema)?;
pub(super) fn try_new(commits: S, checkpoint: S, snapshot: &Snapshot) -> DeltaResult<Self> {
let stats_schema = Arc::new((&snapshot.stats_schema()?).try_into()?);
let mapper = Arc::new(LogMapper {
stats_schema,
config,
config: snapshot.config.clone(),
});
Ok(Self {
commits,
Expand All @@ -108,10 +61,10 @@ pub(super) struct LogMapper {
}

impl LogMapper {
pub(super) fn try_new(table_schema: &Schema, config: DeltaTableConfig) -> DeltaResult<Self> {
pub(super) fn try_new(snapshot: &Snapshot) -> DeltaResult<Self> {
Ok(Self {
stats_schema: get_stats_schema(table_schema)?,
config,
stats_schema: Arc::new((&snapshot.stats_schema()?).try_into()?),
config: snapshot.config.clone(),
})
}

Expand All @@ -120,7 +73,7 @@ impl LogMapper {
}
}

pub(super) fn map_batch(
fn map_batch(
batch: RecordBatch,
stats_schema: ArrowSchemaRef,
config: &DeltaTableConfig,
Expand All @@ -135,7 +88,7 @@ pub(super) fn map_batch(
Arc::new(json::parse_json(stats, stats_schema.clone(), config)?.into());
let schema = batch.schema();
let add_col = ex::extract_and_cast::<StructArray>(&batch, "add")?;
let add_idx = schema.column_with_name("add").unwrap();
let (add_idx, _) = schema.column_with_name("add").unwrap();
let add_type = add_col
.fields()
.iter()
Expand All @@ -162,9 +115,9 @@ pub(super) fn map_batch(
true,
));
let mut fields = schema.fields().to_vec();
let _ = std::mem::replace(&mut fields[add_idx.0], new_add_field);
let _ = std::mem::replace(&mut fields[add_idx], new_add_field);
let mut columns = batch.columns().to_vec();
let _ = std::mem::replace(&mut columns[add_idx.0], new_add);
let _ = std::mem::replace(&mut columns[add_idx], new_add);
return Ok(RecordBatch::try_new(
Arc::new(ArrowSchema::new(fields)),
columns,
Expand Down
7 changes: 6 additions & 1 deletion crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,10 +859,15 @@ fn build_compaction_plan(
metrics.total_files_skipped += 1;
continue;
}
let partition_values = add
.partition_values()?
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<BTreeMap<_, _>>();

partition_files
.entry(add.partition_values()?.hive_partition_path())
.or_default()
.or_insert_with(|| (partition_values, vec![]))
.1
.push(object_meta);
}
Expand Down
Loading

0 comments on commit f718765

Please sign in to comment.