-
Notifications
You must be signed in to change notification settings - Fork 417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add handling for unmanaged files to vacuum command #1817
base: main
Are you sure you want to change the base?
Changes from all commits
1dc65b3
e59bb34
90b7741
809f645
8e64a0d
7559c24
da6e438
140f949
a327fa8
b40f276
a358f06
752773a
dc74dcc
96a5e0a
5ea77fd
48b4e3c
f90b48c
d518f40
2733f3d
f4b9e91
83f2f99
b946d07
d4642bf
04e3811
27c2b53
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ use object_store::Error; | |
use object_store::{path::Path, ObjectStore}; | ||
use serde::Serialize; | ||
use serde_json::Value; | ||
use url::Url; | ||
|
||
use super::transaction::commit; | ||
use crate::crate_version; | ||
|
@@ -185,7 +186,6 @@ impl VacuumBuilder { | |
}; | ||
|
||
let expired_tombstones = get_stale_files(&self.snapshot, retention_period, now_millis); | ||
let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>(); | ||
|
||
let mut files_to_delete = vec![]; | ||
let mut file_sizes = vec![]; | ||
|
@@ -200,12 +200,58 @@ impl VacuumBuilder { | |
.ok_or(DeltaTableError::NoMetadata)? | ||
.partition_columns; | ||
|
||
// A set containing the absolute paths to managed files | ||
let managed_files = self | ||
.snapshot | ||
.files() | ||
.iter() | ||
.map(|a| { | ||
if is_absolute_path(&a.path) { | ||
a.path.clone() | ||
} else { | ||
format!("{}{}", self.log_store.root_uri(), a.path) | ||
} | ||
}) | ||
.chain(self.snapshot.all_tombstones().iter().map(|r| { | ||
if is_absolute_path(&r.path) { | ||
r.path.clone() | ||
} else { | ||
format!("{}{}", self.log_store.root_uri(), r.path) | ||
} | ||
})) | ||
.chain(self.snapshot.files().iter().filter_map(|a| { | ||
return if let Some(deletion_vector) = &a.deletion_vector { | ||
if let Ok(parent) = | ||
&Url::parse(&format!("file://{}", self.log_store.root_uri().as_str())) | ||
{ | ||
if let Ok(dv_absolute_path) = deletion_vector.absolute_path(parent) { | ||
Some(dv_absolute_path?.path().to_string()) | ||
} else { | ||
None | ||
} | ||
} else { | ||
None | ||
} | ||
} else { | ||
None | ||
}; | ||
})) | ||
.collect::<HashSet<String>>(); | ||
|
||
while let Some(obj_meta) = all_files.next().await { | ||
// TODO should we allow NotFound here in case we have a temporary commit file in the list | ||
let obj_meta = obj_meta.map_err(DeltaTableError::from)?; | ||
if valid_files.contains(&obj_meta.location) // file is still being tracked in table | ||
|| !expired_tombstones.contains(obj_meta.location.as_ref()) // file is not an expired tombstone | ||
|| is_hidden_directory(partition_columns, &obj_meta.location)? | ||
|
||
if is_hidden_file(partition_columns, &obj_meta.location)? { | ||
continue; | ||
} | ||
|
||
if self.is_file_managed(&managed_files, &obj_meta.location) { | ||
if !expired_tombstones.contains(obj_meta.location.as_ref()) { | ||
continue; | ||
} | ||
} else if now_millis - retention_period.num_milliseconds() | ||
< obj_meta.last_modified.timestamp_millis() | ||
{ | ||
continue; | ||
} | ||
|
@@ -222,6 +268,16 @@ impl VacuumBuilder { | |
specified_retention_millis: Some(retention_period.num_milliseconds()), | ||
}) | ||
} | ||
|
||
/// Whether a file is contained within the set of managed files. | ||
fn is_file_managed(&self, managed_files: &HashSet<String>, file: &Path) -> bool { | ||
return if is_absolute_path(file.as_ref()) { | ||
managed_files.contains(file.as_ref()) | ||
} else { | ||
let path = format!("{}{}", self.log_store.root_uri(), file.as_ref()); | ||
managed_files.contains(&path) | ||
}; | ||
} | ||
Comment on lines
+272
to
+280
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Jan-Schweizer the location can be an absolute URL, and that's the case with shallow cloned tables. The add action describes the So for better or worse, we need to check for an absolute path 😄 |
||
} | ||
|
||
impl std::future::IntoFuture for VacuumBuilder { | ||
|
@@ -254,6 +310,11 @@ impl std::future::IntoFuture for VacuumBuilder { | |
} | ||
} | ||
|
||
fn is_absolute_path(path: &str) -> bool { | ||
let path = std::path::Path::new(path); | ||
path.is_absolute() | ||
} | ||
|
||
/// Encapsulate which files are to be deleted and the parameters used to make that decision | ||
struct VacuumPlan { | ||
/// What files are to be deleted | ||
|
@@ -367,11 +428,15 @@ impl VacuumPlan { | |
/// Names of the form partitionCol=[value] are partition directories, and should be | ||
/// deleted even if they'd normally be hidden. The _db_index directory contains (bloom filter) | ||
/// indexes and these must be deleted when the data they are tied to is deleted. | ||
fn is_hidden_directory(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> { | ||
let path_name = path.to_string(); | ||
Ok((path_name.starts_with('.') || path_name.starts_with('_')) | ||
&& !path_name.starts_with("_delta_index") | ||
&& !path_name.starts_with("_change_data") | ||
fn is_hidden_file(partition_columns: &[String], path: &Path) -> Result<bool, DeltaTableError> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I renamed this to |
||
let path_name = path.as_ref(); | ||
let skip = path_name.starts_with("_delta_index") || path_name.starts_with("_change_data"); | ||
let is_hidden = path | ||
.parts() | ||
.skip(skip as usize) | ||
.any(|p| p.as_ref().starts_with('.') || p.as_ref().starts_with('_')); | ||
|
||
Ok(is_hidden | ||
&& !partition_columns | ||
.iter() | ||
.any(|partition_column| path_name.starts_with(partition_column))) | ||
|
@@ -451,4 +516,19 @@ mod tests { | |
|
||
assert_eq!(result.files_deleted, empty); | ||
} | ||
|
||
#[tokio::test] | ||
async fn vacuum_table_with_dv_small() { | ||
let table = open_table("./tests/data/table-with-dv-small") | ||
.await | ||
.unwrap(); | ||
|
||
let (_table, result) = VacuumBuilder::new(table.log_store, table.state) | ||
.with_dry_run(true) | ||
.await | ||
.unwrap(); | ||
|
||
let empty: Vec<String> = Vec::new(); | ||
assert_eq!(result.files_deleted, empty); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the deletion vector file path is being constructed on the fly by theabsolut_path
function, it now must be aHashSet<String>
instead ofHashSet<&str>
. This also means that we now have to clone the String of the add and remove path. Is this fine or is there a better way to handle this?Since the add/remove path potentially needs to be converted to an absolute path, I need to construct a String any way
Also, this nested handling of the deletion vector paths doesn't look nice, can you show me a better/more idiomatic way to achieve this?