From 209675f207511a40b2762f5a522546084d4278e9 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Thu, 4 Jan 2024 00:08:14 +0100 Subject: [PATCH] refactor: increase metadata action usage --- .../deltalake-core/src/delta_datafusion/mod.rs | 10 +++------- crates/deltalake-core/src/kernel/arrow/mod.rs | 14 ++++---------- .../deltalake-core/src/operations/constraints.rs | 10 +++------- crates/deltalake-core/src/operations/delete.rs | 8 ++------ .../deltalake-core/src/operations/merge/mod.rs | 4 ++-- crates/deltalake-core/src/operations/optimize.rs | 15 ++++----------- .../operations/transaction/conflict_checker.rs | 2 +- .../src/operations/transaction/state.rs | 8 ++++---- crates/deltalake-core/src/operations/update.rs | 4 ++-- crates/deltalake-core/src/operations/vacuum.rs | 9 ++++----- crates/deltalake-core/src/operations/write.rs | 16 ++++++---------- .../deltalake-core/src/protocol/checkpoints.rs | 2 +- crates/deltalake-core/src/table/mod.rs | 14 ++++++++------ crates/deltalake-core/src/table/state.rs | 6 +++--- crates/deltalake-core/src/table/state_arrow.rs | 8 ++++---- 15 files changed, 51 insertions(+), 79 deletions(-) diff --git a/crates/deltalake-core/src/delta_datafusion/mod.rs b/crates/deltalake-core/src/delta_datafusion/mod.rs index 02c12a564f..b8b0b3c152 100644 --- a/crates/deltalake-core/src/delta_datafusion/mod.rs +++ b/crates/deltalake-core/src/delta_datafusion/mod.rs @@ -588,11 +588,7 @@ impl<'a> DeltaScanBuilder<'a> { // However we may want to do some additional balancing in case we are far off from the above. let mut file_groups: HashMap, Vec> = HashMap::new(); - let table_partition_cols = &self - .snapshot - .metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns; + let table_partition_cols = &self.snapshot.metadata()?.partition_columns; for action in files.iter() { let mut part = partitioned_file_from_action(action, table_partition_cols, &schema); @@ -1095,7 +1091,7 @@ impl DeltaDataChecker { /// Create a new DeltaDataChecker pub fn new(snapshot: &DeltaTableState) -> Self { - let metadata = snapshot.metadata(); + let metadata = snapshot.delta_metadata(); let invariants = metadata .and_then(|meta| meta.schema.get_invariants().ok()) @@ -1539,7 +1535,7 @@ pub async fn find_files<'a>( state: &SessionState, predicate: Option, ) -> DeltaResult { - let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = snapshot.metadata()?; match &predicate { Some(predicate) => { diff --git a/crates/deltalake-core/src/kernel/arrow/mod.rs b/crates/deltalake-core/src/kernel/arrow/mod.rs index 1fbacec4f3..dfdb00b21b 100644 --- a/crates/deltalake-core/src/kernel/arrow/mod.rs +++ b/crates/deltalake-core/src/kernel/arrow/mod.rs @@ -137,26 +137,20 @@ impl TryFrom<&DataType> for ArrowDataType { DataType::Struct(s) => Ok(ArrowDataType::Struct( s.fields() .iter() - .map(>::try_from) + .map(TryInto::try_into) .collect::, ArrowError>>()? .into(), )), - DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(>::try_from(a)?))), + DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(a.as_ref().try_into()?))), DataType::Map(m) => Ok(ArrowDataType::Map( Arc::new(ArrowField::new( "entries", ArrowDataType::Struct( vec![ - ArrowField::new( - MAP_KEYS_NAME, - >::try_from(m.key_type())?, - false, - ), + ArrowField::new(MAP_KEYS_NAME, m.key_type().try_into()?, false), ArrowField::new( MAP_VALUES_NAME, - >::try_from(m.value_type())?, + m.value_type().try_into()?, m.value_contains_null(), ), ] diff --git a/crates/deltalake-core/src/operations/constraints.rs b/crates/deltalake-core/src/operations/constraints.rs index ed5888bd13..e2bd5c101f 100644 --- a/crates/deltalake-core/src/operations/constraints.rs +++ b/crates/deltalake-core/src/operations/constraints.rs @@ -17,7 +17,7 @@ use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{ register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext, }; -use crate::kernel::{Action, CommitInfo, IsolationLevel, Metadata, Protocol}; +use crate::kernel::{Action, CommitInfo, IsolationLevel, Protocol}; use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::Expression; use crate::operations::transaction::commit; @@ -86,11 +86,7 @@ impl std::future::IntoFuture for ConstraintBuilder { .expr .ok_or_else(|| DeltaTableError::Generic("No Expresion provided".to_string()))?; - let mut metadata = this - .snapshot - .metadata() - .ok_or(DeltaTableError::NoMetadata)? - .clone(); + let mut metadata = this.snapshot.metadata()?.clone(); let configuration_key = format!("delta.constraints.{}", name); if metadata.configuration.contains_key(&configuration_key) { @@ -190,7 +186,7 @@ impl std::future::IntoFuture for ConstraintBuilder { let actions = vec![ Action::CommitInfo(commit_info), - Action::Metadata(Metadata::try_from(metadata)?), + Action::Metadata(metadata), Action::Protocol(protocol), ]; diff --git a/crates/deltalake-core/src/operations/delete.rs b/crates/deltalake-core/src/operations/delete.rs index 419ecb0ade..9e5cdfc82a 100644 --- a/crates/deltalake-core/src/operations/delete.rs +++ b/crates/deltalake-core/src/operations/delete.rs @@ -38,7 +38,7 @@ use super::datafusion_utils::Expression; use super::transaction::PROTOCOL; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder, DeltaSessionContext}; -use crate::errors::{DeltaResult, DeltaTableError}; +use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; @@ -138,11 +138,7 @@ async fn excute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let table_partition_cols = snapshot - .metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .clone(); + let table_partition_cols = snapshot.metadata()?.partition_columns.clone(); let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) .with_files(rewrite) diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 419f2ceee2..2bc2ddb2b2 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -827,7 +827,7 @@ async fn try_construct_early_filter( ) -> DeltaResult> { let table_metadata = table_snapshot.metadata(); - if table_metadata.is_none() { + if table_metadata.is_err() { return Ok(None); } @@ -921,7 +921,7 @@ async fn execute( let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); - let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = snapshot.metadata()?; // TODO: Given the join predicate, remove any expression that involve the // source table and keep expressions that only involve the target table. diff --git a/crates/deltalake-core/src/operations/optimize.rs b/crates/deltalake-core/src/operations/optimize.rs index 963c0469ef..6639f5c441 100644 --- a/crates/deltalake-core/src/operations/optimize.rs +++ b/crates/deltalake-core/src/operations/optimize.rs @@ -774,10 +774,7 @@ pub fn create_merge_plan( ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size()); - let partitions_keys = &snapshot - .metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns; + let partitions_keys = &snapshot.metadata()?.partition_columns; let (operations, metrics) = match optimize_type { OptimizeType::Compact => { @@ -792,10 +789,7 @@ pub fn create_merge_plan( let file_schema = arrow_schema_without_partitions( &Arc::new( >::try_from( - &snapshot - .metadata() - .ok_or(DeltaTableError::NoMetadata)? - .schema, + &snapshot.metadata()?.schema()?, )?, ), partitions_keys, @@ -945,9 +939,8 @@ fn build_zorder_plan( ))); } let field_names = snapshot - .metadata() - .unwrap() - .schema + .metadata()? + .schema()? .fields() .iter() .map(|field| field.name().to_string()) diff --git a/crates/deltalake-core/src/operations/transaction/conflict_checker.rs b/crates/deltalake-core/src/operations/transaction/conflict_checker.rs index eed34ba595..d6a02c3d10 100644 --- a/crates/deltalake-core/src/operations/transaction/conflict_checker.rs +++ b/crates/deltalake-core/src/operations/transaction/conflict_checker.rs @@ -463,7 +463,7 @@ impl<'a> ConflictChecker<'a> { .txn_info .read_snapshot .metadata() - .ok_or(CommitConflictError::NoMetadata)? + .map_err(|_|CommitConflictError::NoMetadata)? .partition_columns; AddContainer::new(&added_files_to_check, partition_columns, arrow_schema) .predicate_matches(predicate.clone()) diff --git a/crates/deltalake-core/src/operations/transaction/state.rs b/crates/deltalake-core/src/operations/transaction/state.rs index 1a74582f78..dc7914ceea 100644 --- a/crates/deltalake-core/src/operations/transaction/state.rs +++ b/crates/deltalake-core/src/operations/transaction/state.rs @@ -29,7 +29,7 @@ impl DeltaTableState { } fn _arrow_schema(&self, wrap_partitions: bool) -> DeltaResult { - let meta = self.metadata().ok_or(DeltaTableError::NoMetadata)?; + let meta = self.delta_metadata().ok_or(DeltaTableError::NoMetadata)?; let fields = meta .schema .fields() @@ -298,7 +298,7 @@ impl PruningStatistics for DeltaTableState { /// return the minimum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows fn min_values(&self, column: &Column) -> Option { - let partition_columns = &self.metadata()?.partition_columns; + let partition_columns = &self.metadata().ok()?.partition_columns; let container = AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); container.min_values(column) @@ -307,7 +307,7 @@ impl PruningStatistics for DeltaTableState { /// return the maximum values for the named column, if known. /// Note: the returned array must contain `num_containers()` rows. fn max_values(&self, column: &Column) -> Option { - let partition_columns = &self.metadata()?.partition_columns; + let partition_columns = &self.metadata().ok()?.partition_columns; let container = AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); container.max_values(column) @@ -324,7 +324,7 @@ impl PruningStatistics for DeltaTableState { /// /// Note: the returned array must contain `num_containers()` rows. fn null_counts(&self, column: &Column) -> Option { - let partition_columns = &self.metadata()?.partition_columns; + let partition_columns = &self.metadata().ok()?.partition_columns; let container = AddContainer::new(self.files(), partition_columns, self.arrow_schema().ok()?); container.null_counts(column) diff --git a/crates/deltalake-core/src/operations/update.rs b/crates/deltalake-core/src/operations/update.rs index 6d96b4e1dd..8fee1b950a 100644 --- a/crates/deltalake-core/src/operations/update.rs +++ b/crates/deltalake-core/src/operations/update.rs @@ -54,7 +54,7 @@ use crate::kernel::{Action, Remove}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; -use crate::{DeltaResult, DeltaTable, DeltaTableError}; +use crate::{DeltaResult, DeltaTable}; /// Updates records in the Delta Table. /// See this module's documentation for more information @@ -209,7 +209,7 @@ async fn execute( }) .collect::, _>>()?; - let current_metadata = snapshot.metadata().ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = snapshot.metadata()?; let table_partition_cols = current_metadata.partition_columns.clone(); let scan_start = Instant::now(); diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 7011833e64..8d1f7cc9e2 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -59,6 +59,9 @@ enum VacuumError { /// Error returned #[error(transparent)] DeltaTable(#[from] DeltaTableError), + + #[error(transparent)] + Protocol(#[from] crate::protocol::ProtocolError), } impl From for DeltaTableError { @@ -191,11 +194,7 @@ impl VacuumBuilder { let mut file_sizes = vec![]; let object_store = self.log_store.object_store(); let mut all_files = object_store.list(None); - let partition_columns = &self - .snapshot - .metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns; + let partition_columns = &self.snapshot.metadata()?.partition_columns; while let Some(obj_meta) = all_files.next().await { // TODO should we allow NotFound here in case we have a temporary commit file in the list diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index b9ec348420..3d3c425b97 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -43,7 +43,7 @@ use super::writer::{DeltaWriter, WriterConfig}; use super::{transaction::commit, CreateBuilder}; use crate::delta_datafusion::DeltaDataChecker; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{Action, Add, Metadata, Remove, StructType}; +use crate::kernel::{Action, Add, Remove, StructType}; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::storage::ObjectStoreRef; @@ -376,7 +376,7 @@ impl std::future::IntoFuture for WriteBuilder { let active_partitions = this .snapshot - .metadata() + .delta_metadata() .map(|meta| meta.partition_columns.clone()); // validate partition columns @@ -492,14 +492,10 @@ impl std::future::IntoFuture for WriteBuilder { .unwrap_or(schema.clone()); if schema != table_schema { - let mut metadata = this - .snapshot - .metadata() - .ok_or(DeltaTableError::NoMetadata)? - .clone(); - metadata.schema = schema.clone().try_into()?; - let metadata_action = Metadata::try_from(metadata)?; - actions.push(Action::Metadata(metadata_action)); + let mut metadata = this.snapshot.metadata()?.clone(); + let delta_schema: StructType = schema.as_ref().try_into()?; + metadata.schema_string = serde_json::to_string(&delta_schema)?; + actions.push(Action::Metadata(metadata)); } // This should never error, since now() will always be larger than UNIX_EPOCH let deletion_timestamp = SystemTime::now() diff --git a/crates/deltalake-core/src/protocol/checkpoints.rs b/crates/deltalake-core/src/protocol/checkpoints.rs index 178ac5733a..d0fbeb337f 100644 --- a/crates/deltalake-core/src/protocol/checkpoints.rs +++ b/crates/deltalake-core/src/protocol/checkpoints.rs @@ -227,7 +227,7 @@ pub async fn cleanup_expired_logs_for( fn parquet_bytes_from_state( state: &DeltaTableState, ) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> { - let current_metadata = state.metadata().ok_or(ProtocolError::NoMetaData)?; + let current_metadata = state.delta_metadata().ok_or(ProtocolError::NoMetaData)?; let partition_col_data_types = current_metadata.get_partition_col_data_types(); diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index d89e68adba..2665ef827b 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -790,13 +790,15 @@ impl DeltaTable { /// Returns the metadata associated with the loaded state. pub fn metadata(&self) -> Result<&Metadata, DeltaTableError> { - Ok(self.state.metadata_action()?) + Ok(self.state.metadata()?) } /// Returns the metadata associated with the loaded state. #[deprecated(since = "0.17.0", note = "use metadata() instead")] pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> { - self.state.metadata().ok_or(DeltaTableError::NoMetadata) + self.state + .delta_metadata() + .ok_or(DeltaTableError::NoMetadata) } /// Returns a vector of active tombstones (i.e. `Remove` actions present in the current delta log). @@ -855,7 +857,7 @@ impl DeltaTable { pub fn get_configurations(&self) -> Result<&HashMap>, DeltaTableError> { Ok(self .state - .metadata() + .delta_metadata() .ok_or(DeltaTableError::NoMetadata)? .get_configuration()) } @@ -905,10 +907,10 @@ impl fmt::Display for DeltaTable { writeln!(f, "DeltaTable({})", self.table_uri())?; writeln!(f, "\tversion: {}", self.version())?; match self.state.metadata() { - Some(metadata) => { - writeln!(f, "\tmetadata: {metadata}")?; + Ok(metadata) => { + writeln!(f, "\tmetadata: {metadata:?}")?; } - None => { + _ => { writeln!(f, "\tmetadata: None")?; } } diff --git a/crates/deltalake-core/src/table/state.rs b/crates/deltalake-core/src/table/state.rs index 1aaf095d4f..03483ef8ff 100644 --- a/crates/deltalake-core/src/table/state.rs +++ b/crates/deltalake-core/src/table/state.rs @@ -188,12 +188,12 @@ impl DeltaTableState { } /// The most recent metadata of the table. - pub fn metadata_action(&self) -> Result<&Metadata, ProtocolError> { + pub fn metadata(&self) -> Result<&Metadata, ProtocolError> { self.metadata.as_ref().ok_or(ProtocolError::NoMetaData) } /// The most recent metadata of the table. - pub fn metadata(&self) -> Option<&DeltaTableMetaData> { + pub fn delta_metadata(&self) -> Option<&DeltaTableMetaData> { self.current_metadata.as_ref() } @@ -328,7 +328,7 @@ impl DeltaTableState { &'a self, filters: &'a [PartitionFilter], ) -> Result + '_, DeltaTableError> { - let current_metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; + let current_metadata = self.delta_metadata().ok_or(DeltaTableError::NoMetadata)?; let nonpartitioned_columns: Vec = filters .iter() diff --git a/crates/deltalake-core/src/table/state_arrow.rs b/crates/deltalake-core/src/table/state_arrow.rs index 5e2565ee08..0518d3d95d 100644 --- a/crates/deltalake-core/src/table/state_arrow.rs +++ b/crates/deltalake-core/src/table/state_arrow.rs @@ -86,7 +86,7 @@ impl DeltaTableState { (Cow::Borrowed("data_change"), Arc::new(data_change)), ]; - let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; + let metadata = self.metadata()?; if !metadata.partition_columns.is_empty() { let partition_cols_batch = self.partition_columns_as_batch(flatten)?; @@ -145,7 +145,7 @@ impl DeltaTableState { &self, flatten: bool, ) -> Result { - let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; + let metadata = self.delta_metadata().ok_or(DeltaTableError::NoMetadata)?; let column_mapping_mode = self.table_config().column_mapping_mode(); let partition_column_types: Vec = metadata .partition_columns @@ -413,8 +413,8 @@ impl DeltaTableState { .map(|maybe_stat| maybe_stat.as_ref().map(|stat| stat.num_records)) .collect::>>(), ); - let metadata = self.metadata().ok_or(DeltaTableError::NoMetadata)?; - let schema = &metadata.schema; + let metadata = self.metadata()?; + let schema = &metadata.schema()?; #[derive(Debug)] struct ColStats<'a> {