-
Notifications
You must be signed in to change notification settings - Fork 28
Implement the filestore abstraction #178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| let mut parts = Vec::<PartMetadata>::new(); | ||
|
|
||
| for part_entry in std::fs::read_dir(file_parts_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not list file parts dir at '{}': {error}", | ||
| file_parts_dir_path.display(), | ||
| }))? | ||
| { | ||
| let part_entry = part_entry?; | ||
|
|
||
| let offset = part_entry.file_name() | ||
| .into_string().map_err(|string| std::io::Error::new( | ||
| std::io::ErrorKind::InvalidFilename, | ||
| format! { | ||
| "malformed part filename string: {string:?}", | ||
| }, | ||
| ))? | ||
| .parse::<u64>().map_err(|error| std::io::Error::new( | ||
| std::io::ErrorKind::InvalidFilename, | ||
| format! { | ||
| "malformed part filename '{}' format: {error}", | ||
| part_entry.file_name().display(), | ||
| }, | ||
| ))?; | ||
|
|
||
| let part_metadata = part_entry.metadata() | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not read part metadata at '{}': {error}", | ||
| part_entry.path().display(), | ||
| }))?; | ||
|
|
||
| if offset.checked_add(part_metadata.len()).is_none() { | ||
| return Err(std::io::Error::new( | ||
| std::io::ErrorKind::InvalidData, | ||
| format! { | ||
| "offset {} too big for content of length {} for part at '{}'", | ||
| offset, | ||
| part_metadata.len(), | ||
| part_entry.path().display(), | ||
| }, | ||
| )); | ||
| } | ||
|
|
||
| parts.push(PartMetadata { | ||
| offset, | ||
| len: part_metadata.len(), | ||
| }); | ||
| } | ||
|
|
||
| // This can theoretically happen if for whatever reason a part that we | ||
| // written in this call got deleted by the time we listed the folder | ||
| // with parts. | ||
| if parts.is_empty() { | ||
| return Err(std::io::Error::new( | ||
| std::io::ErrorKind::InvalidData, | ||
| "no parts", | ||
| )); | ||
| } | ||
|
|
||
| // There is no guarantee about the order in which the filesystem will | ||
| // yield part entries, so we need to sort them ourselves for checking | ||
| // completeness. | ||
| parts.sort_by_key(|part| part.offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this whole part is a clean enough cut point that it can be moved to its own function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be moved but for me it feels it has a negative impact on readability. I used to like chopping functions but now I am in the opposite camp and only do it when there is a clear abstraction boundary. In the case of this PR, the clear boundaries are shown in e.g. remove_file_if_old, remove_dir_if_empty—these are modelled after already familiar concepts from the standard library: remove_file and remove_dir. They build on something familiar, have an obvious interface and most importantly: they make sense without context of this code.
By splitting here to another function one has to come up with the right name (and naming is hard!) and a sensible interface. Unfortunately, I am not able to come up with anything except for generic names like collect_parts that do not really explain anything and reading the code forces the reader to jump to the implementation anyway. Moreover, later code puts a number of assumptions on parts: they are ordered by offset, the offset is validated against length, etc. Enforcing such contracts is difficult across function boundaries (enforcing them through assertions has performance penalty, just stating them in doc comments is brittle).
All that cost for no clear benefit to me: moving this bit to a separate function does not reduce nesting (the depth would be exactly the same) and does not remove any duplication (the function would be called exactly once).
There is an interesting piece from John Carmack ("appeal to authority", I know) on the subject. I also considered including comments to separate sections in this function but I decided that the log messages we have here are good enough landmarks and comments would essentially just repeat that information.
CRefice
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really wish there was a built-in way to add more context to an error that wasn't as noisy looking as .map_err(|error| std::io::Error::new(error.kind(), format!(...); :/
| log::info!("merging parts of '{}'", id); | ||
|
|
||
| let file_path = self.file_path(id); | ||
| let flow_files_dir_path = file_path.parent() | ||
| // This should never happen as file path by construction should not | ||
| // be empty and is always placed in some folder. | ||
| .expect("no file path parent"); | ||
| std::fs::create_dir_all(flow_files_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not create flow files dir at '{}': {error}", | ||
| flow_files_dir_path.display(), | ||
| }))?; | ||
|
|
||
| let mut file = std::fs::File::create_new(&file_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not create file at '{}': {error}", | ||
| file_path.display(), | ||
| }))?; | ||
|
|
||
| for part in parts.iter() { | ||
| let part_offset = part.offset; | ||
|
|
||
| let mut part = std::fs::File::open(&self.part_path(id, part_offset)) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not open part at '{}': {error}", | ||
| self.part_path(id, part.offset).display(), | ||
| }))?; | ||
|
|
||
| std::io::copy(&mut part, &mut file) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not copy part at '{}' to file at '{}': {error}", | ||
| self.part_path(id, part_offset).display(), | ||
| file_path.display(), | ||
| }))?; | ||
| } | ||
|
|
||
| log::info!("verifying SHA-256 of '{}' content", id); | ||
|
|
||
| use std::io::Seek as _; | ||
| file.seek(std::io::SeekFrom::Start(0)) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not seek file at '{}' for SHA-256 verification: {error}", | ||
| file_path.display(), | ||
| }))?; | ||
|
|
||
| use sha2::Digest as _; | ||
| let mut sha256 = sha2::Sha256::new(); | ||
| std::io::copy(&mut file, &mut sha256) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not read file at '{}' for SHA-256 verification: {error}", | ||
| file_path.display(), | ||
| }))?; | ||
| let sha256 = <[u8; 32]>::from(sha256.finalize()); | ||
|
|
||
| if sha256 != part.file_sha256 { | ||
| return Err(std::io::Error::new( | ||
| std::io::ErrorKind::InvalidData, | ||
| format! { | ||
| "computed digest ({:?}) doesn't match the expected one ({:?})", | ||
| sha256, | ||
| part.file_sha256, | ||
| }, | ||
| )); | ||
| } | ||
|
|
||
| log::info!("deleting merged parts of '{}'", id); | ||
| std::fs::remove_dir_all(file_parts_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not delete merged parts at '{}': {error}", | ||
| file_parts_dir_path.display(), | ||
| }))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This too can be moved into one or two separate functions, e.g. merge_parts and which calls verify_file_hash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar reply as above.
| /// This function will return an error if any underlying disk operation | ||
| /// can't complete. In such cases there is no guarantee about the state of | ||
| /// the file on disk and it should not be used again. | ||
| pub fn delete(&self, id: &Id) -> std::io::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this can take id by value so that the borrow checker can help enforce the contract of "can't use the file after it was deleted"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I wonder if there is utility in that given that Ids are easily constructible anyway. At the moment they do not implement Clone but they probably should and will, making such contracts even less attractive.
I feel like they are more akin to std::path::Path objects rather than a std::fs::File objects and no standard APIs "consume" paths that have destructive on-disk semantics (e.g. std::fs::remove_file, std::fs::remove_dir).
I will keep the suggestion in mind, I want to see how the API actually feels when it is used inside actions.
| match std::fs::read_dir(path.join("files")) { | ||
| Ok(flow_files_dir_entries) => { | ||
| for flow_file_dir_entry in flow_files_dir_entries { | ||
| let flow_files_dir_entry = flow_file_dir_entry | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not read flow files dir entry for '{}': {error}", | ||
| path.join("files").display(), | ||
| }))?; | ||
| let flow_files_dir_path = flow_files_dir_entry.path(); | ||
|
|
||
| for file_entry in std::fs::read_dir(&flow_files_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not list flow files dir at '{}': {error}", | ||
| flow_files_dir_path.display(), | ||
| }))? | ||
| { | ||
| let file_entry = file_entry | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not read file entry for '{}': {error}", | ||
| flow_files_dir_path.display(), | ||
| }))?; | ||
| let file_path = file_entry.path(); | ||
|
|
||
| if crate::fs::remove_file_if_old(&file_path, ttl) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not clean up file at '{}': {error}", | ||
| file_path.display(), | ||
| }))? | ||
| { | ||
| log::info!("deleted outdated file '{}'", file_path.display()); | ||
| } | ||
| } | ||
|
|
||
| // We also clean empty file and part directories. That can | ||
| // happen after a file or parts were deleted (be it by an | ||
| // explicit deletion or by them reaching their TTL). | ||
| // | ||
| // It is not strictly necessary but we don't want to pollute | ||
| // the filesystem without a reason. | ||
| if crate::fs::remove_dir_if_empty(&flow_files_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not clean up flow files dir at '{}': {error}", | ||
| flow_files_dir_path.display(), | ||
| }))? | ||
| { | ||
| log::info!("deleted empty flow files dir '{}'", flow_files_dir_path.display()); | ||
| } | ||
| } | ||
| } | ||
| // This is fine, files folder might not exist if the filestore was | ||
| // never used to store a file. | ||
| Err(error) if error.kind() == std::io::ErrorKind::NotFound => (), | ||
| Err(error) => return Err(std::io::Error::new(error.kind(), format! { | ||
| "could not read files dir at '{}': {error}", | ||
| path.join("files").display(), | ||
| })), | ||
| } | ||
| match std::fs::read_dir(path.join("parts")) { | ||
| Ok(flow_parts_dir_entries) => { | ||
| for flow_parts_dir_entry in flow_parts_dir_entries { | ||
| let flow_parts_dir_entry = flow_parts_dir_entry | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not read flow parts dir entry for '{}': {error}", | ||
| path.join("parts").display(), | ||
| }))?; | ||
| let flow_parts_dir_path = flow_parts_dir_entry.path(); | ||
|
|
||
| for file_parts_dir_entry in std::fs::read_dir(&flow_parts_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not list flow parts dir at '{}': {error}", | ||
| flow_parts_dir_path.display(), | ||
| }))? | ||
| { | ||
| let file_parts_dir_entry = file_parts_dir_entry | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not read file parts dir entry for '{}': {error}", | ||
| flow_parts_dir_path.display(), | ||
| }))?; | ||
| let file_parts_dir_path = file_parts_dir_entry.path(); | ||
|
|
||
| for part_entry in std::fs::read_dir(&file_parts_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not list file parts dir at '{}': {error}", | ||
| file_parts_dir_path.display(), | ||
| }))? | ||
| { | ||
| let part_entry = part_entry | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not read part entry for '{}': {error}", | ||
| file_parts_dir_path.display(), | ||
| }))?; | ||
| let part_path = part_entry.path(); | ||
|
|
||
| if crate::fs::remove_file_if_old(&part_path, ttl) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not clean up part at '{}': {error}", | ||
| part_path.display(), | ||
| }))? | ||
| { | ||
| log::info!("deleted outdated part '{}'", part_path.display()); | ||
| } | ||
| } | ||
|
|
||
| // See similar code for files folder cleanup above for | ||
| // the rationale. | ||
| if crate::fs::remove_dir_if_empty(&file_parts_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not clean up file parts dir at '{}': {error}", | ||
| file_parts_dir_path.display(), | ||
| }))? | ||
| { | ||
| log::info!("deleted empty file parts dir '{}'", file_parts_dir_path.display()); | ||
| } | ||
| } | ||
|
|
||
| // See similar code for files folder cleanup above for the | ||
| // rationale. | ||
| if crate::fs::remove_dir_if_empty(&flow_parts_dir_path) | ||
| .map_err(|error| std::io::Error::new(error.kind(), format! { | ||
| "could not clean up flow parts dir at '{}': {error}", | ||
| flow_parts_dir_path.display(), | ||
| }))? | ||
| { | ||
| log::info!("deleted empty flow parts dir '{}'", flow_parts_dir_path.display()); | ||
| } | ||
| } | ||
| } | ||
| // This is fine, parts folder might not exist if the filestore was | ||
| // never used to store a part. | ||
| Err(error) if error.kind() == std::io::ErrorKind::NotFound => (), | ||
| Err(error) => return Err(std::io::Error::new(error.kind(), format! { | ||
| "could not read parts dir at '{}': {error}", | ||
| path.join("parts").display(), | ||
| })), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part too could be moved to one or two cleanup() function/s, but this function is not too long so this works too
No description provided.