diff --git a/crates/deltalake-core/src/operations/vacuum.rs b/crates/deltalake-core/src/operations/vacuum.rs index 2c4e00793c..015af693a9 100644 --- a/crates/deltalake-core/src/operations/vacuum.rs +++ b/crates/deltalake-core/src/operations/vacuum.rs @@ -32,6 +32,7 @@ use object_store::Error; use object_store::{path::Path, ObjectStore}; use serde::Serialize; use serde_json::Value; +use url::Url; use super::transaction::commit; use crate::crate_version; @@ -198,27 +199,44 @@ impl VacuumBuilder { .snapshot .files() .iter() - .map(|a| a.path.as_str()) + .map(|a| a.path.clone()) .chain( self.snapshot .all_tombstones() .iter() - .map(|r| r.path.as_str()), + .map(|r| r.path.clone()), ) - .collect::>(); + .chain(self.snapshot.files().iter().filter_map(|a| { + return if let Some(deletion_vector) = &a.deletion_vector { + if let Ok(parent) = &Url::parse(self.log_store.root_uri().as_str()) { + if let Ok(dv_absolut_path) = deletion_vector.absolute_path(&parent) { + Some(dv_absolut_path?.to_string()) + } else { + None + } + } else { + None + } + } else { + None + }; + })) + .collect::>(); while let Some(obj_meta) = all_files.next().await { // TODO should we allow NotFound here in case we have a temporary commit file in the list let obj_meta = obj_meta.map_err(DeltaTableError::from)?; - let is_hidden = is_hidden_directory(partition_columns, &obj_meta.location)?; + + if is_hidden_file(partition_columns, &obj_meta.location)? { + continue; + } if managed_files.contains(obj_meta.location.as_ref()) { - if !expired_tombstones.contains(obj_meta.location.as_ref()) || is_hidden { + if !expired_tombstones.contains(obj_meta.location.as_ref()) { continue; } } else if now_millis - retention_period.num_milliseconds() < obj_meta.last_modified.timestamp_millis() - || is_hidden { continue; } @@ -380,15 +398,15 @@ impl VacuumPlan { /// Names of the form partitionCol=[value] are partition directories, and should be /// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter) /// indexes and these must be deleted when the data they are tied to is deleted. -fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result { +fn is_hidden_file(partition_columns: &[String], path: &Path) -> Result { + let path_name = path.as_ref(); + let skip = path_name.starts_with("_delta_index") || path_name.starts_with("_change_data"); let is_hidden = path .parts() + .skip(skip as usize) .any(|p| p.as_ref().starts_with('.') || p.as_ref().starts_with('_')); - let path_name = path.as_ref(); Ok(is_hidden - && !path_name.starts_with("_delta_index") - && !path_name.starts_with("_change_data") && !partition_columns .iter() .any(|partition_column| path_name.starts_with(partition_column))) @@ -468,4 +486,19 @@ mod tests { assert_eq!(result.files_deleted, empty); } + + #[tokio::test] + async fn vacuum_table_with_dv_small() { + let table = open_table("./tests/data/table-with-dv-small") + .await + .unwrap(); + + let (_table, result) = VacuumBuilder::new(table.log_store, table.state) + .with_dry_run(true) + .await + .unwrap(); + + let empty: Vec = Vec::new(); + assert_eq!(result.files_deleted, empty); + } }