diff --git a/Cargo.lock b/Cargo.lock index 6c53e4a836..4a78883698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3586,7 +3586,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.6.5" +version = "0.6.6" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index f6ccc5cf16..faed78adc4 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.6.5" +version = "0.6.6" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true } diff --git a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs index 49a0548808..34006596ac 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs @@ -74,7 +74,7 @@ impl CardanoImmutableFilesFullArtifactBuilder { // spawn a separate thread to prevent blocking let ongoing_snapshot = tokio::task::spawn_blocking(move || -> StdResult { - snapshotter.snapshot(&snapshot_name) + snapshotter.snapshot_all(&snapshot_name) }) .await??; diff --git a/mithril-aggregator/src/snapshotter.rs b/mithril-aggregator/src/snapshotter.rs index ac4ba0b294..a917d979fc 100644 --- a/mithril-aggregator/src/snapshotter.rs +++ b/mithril-aggregator/src/snapshotter.rs @@ -3,7 +3,7 @@ use flate2::Compression; use flate2::{read::GzDecoder, write::GzEncoder}; use slog::{info, warn, Logger}; use std::fs::{self, File}; -use std::io::{self, Read, Seek, SeekFrom}; +use std::io::{self, Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use std::sync::RwLock; use tar::{Archive, Entry, EntryType}; @@ -19,7 +19,14 @@ use crate::ZstandardCompressionParameters; /// Define the ability to create snapshots. pub trait Snapshotter: Sync + Send { /// Create a new snapshot with the given archive name. - fn snapshot(&self, archive_name: &str) -> StdResult; + fn snapshot_all(&self, archive_name: &str) -> StdResult; + + /// Create a new snapshot with the given archive name from a subset of directories and files. + fn snapshot_subset( + &self, + archive_name: &str, + files: Vec, + ) -> StdResult; } /// Compression algorithm and parameters of the [CompressedArchiveSnapshotter]. @@ -37,6 +44,64 @@ impl From for SnapshotterCompressionAlgorithm { } } +/// Define multiple ways to append content to a tar archive. +trait TarAppender { + fn append(&self, tar: &mut tar::Builder) -> StdResult<()>; +} + +struct AppenderDirAll { + db_directory: PathBuf, +} + +impl TarAppender for AppenderDirAll { + fn append(&self, tar: &mut tar::Builder) -> StdResult<()> { + tar.append_dir_all(".", &self.db_directory) + .map_err(SnapshotError::CreateArchiveError) + .with_context(|| { + format!( + "Can not add directory: '{}' to the archive", + self.db_directory.display() + ) + })?; + Ok(()) + } +} + +struct AppenderEntries { + entries: Vec, + db_directory: PathBuf, +} + +impl TarAppender for AppenderEntries { + fn append(&self, tar: &mut tar::Builder) -> StdResult<()> { + for entry in &self.entries { + let entry_path = self.db_directory.join(entry); + if entry_path.is_dir() { + tar.append_dir_all(entry, entry_path.clone()) + .with_context(|| { + format!( + "Can not add directory: '{}' to the archive", + entry_path.display() + ) + })?; + } else if entry_path.is_file() { + let mut file = File::open(entry_path.clone())?; + tar.append_file(entry, &mut file).with_context(|| { + format!( + "Can not add file: '{}' to the archive", + entry_path.display() + ) + })?; + } else { + return Err(anyhow!( + "The entry: '{}' is not valid", + entry_path.display() + )); + } + } + Ok(()) + } +} /// Compressed Archive Snapshotter create a compressed file. pub struct CompressedArchiveSnapshotter { /// DB directory to snapshot @@ -96,24 +161,29 @@ pub enum SnapshotError { } impl Snapshotter for CompressedArchiveSnapshotter { - fn snapshot(&self, archive_name: &str) -> StdResult { - let archive_path = self.ongoing_snapshot_directory.join(archive_name); - let filesize = self.create_and_verify_archive(&archive_path).inspect_err(|_err| { - if archive_path.exists() { - if let Err(remove_error) = fs::remove_file(&archive_path) { - warn!( - self.logger, " > Post snapshotter.snapshot failure, could not remove temporary archive"; - "archive_path" => archive_path.display(), - "error" => remove_error - ); - } - } - }).with_context(|| format!("CompressedArchiveSnapshotter can not create and verify archive: '{}'", archive_path.display()))?; + fn snapshot_all(&self, archive_name: &str) -> StdResult { + let appender = AppenderDirAll { + db_directory: self.db_directory.clone(), + }; - Ok(OngoingSnapshot { - filepath: archive_path, - filesize, - }) + self.snapshot(archive_name, appender) + } + + fn snapshot_subset( + &self, + archive_name: &str, + entries: Vec, + ) -> StdResult { + if entries.is_empty() { + return Err(anyhow!("Can not create snapshot with empty entries")); + } + + let appender = AppenderEntries { + db_directory: self.db_directory.clone(), + entries, + }; + + self.snapshot(archive_name, appender) } } @@ -152,6 +222,30 @@ impl CompressedArchiveSnapshotter { }) } + fn snapshot( + &self, + archive_name: &str, + appender: T, + ) -> StdResult { + let archive_path = self.ongoing_snapshot_directory.join(archive_name); + let filesize = self.create_and_verify_archive(&archive_path, appender).inspect_err(|_err| { + if archive_path.exists() { + if let Err(remove_error) = fs::remove_file(&archive_path) { + warn!( + self.logger, " > Post snapshotter.snapshot failure, could not remove temporary archive"; + "archive_path" => archive_path.display(), + "error" => remove_error + ); + } + } + }).with_context(|| format!("CompressedArchiveSnapshotter can not create and verify archive: '{}'", archive_path.display()))?; + + Ok(OngoingSnapshot { + filepath: archive_path, + filesize, + }) + } + fn get_file_size(filepath: &Path) -> StdResult { let res = fs::metadata(filepath) .map_err(|e| SnapshotError::GeneralError(e.to_string()))? @@ -159,7 +253,7 @@ impl CompressedArchiveSnapshotter { Ok(res) } - fn create_archive(&self, archive_path: &Path) -> StdResult { + fn create_archive(&self, archive_path: &Path, appender: T) -> StdResult { info!( self.logger, "Compressing {} into {}", @@ -174,14 +268,9 @@ impl CompressedArchiveSnapshotter { let enc = GzEncoder::new(tar_file, Compression::default()); let mut tar = tar::Builder::new(enc); - tar.append_dir_all(".", &self.db_directory) - .map_err(SnapshotError::CreateArchiveError) - .with_context(|| { - format!( - "GzEncoder Builder can not add directory: '{}' to the archive", - self.db_directory.display() - ) - })?; + appender + .append(&mut tar) + .with_context(|| "GzEncoder Builder failed to append content")?; let mut gz = tar .into_inner() @@ -197,14 +286,9 @@ impl CompressedArchiveSnapshotter { .map_err(SnapshotError::CreateArchiveError)?; let mut tar = tar::Builder::new(enc); - tar.append_dir_all(".", &self.db_directory) - .map_err(SnapshotError::CreateArchiveError) - .with_context(|| { - format!( - "ZstandardEncoder Builder can not add directory: '{}' to the archive", - self.db_directory.display() - ) - })?; + appender + .append(&mut tar) + .with_context(|| "ZstandardEncoder Builder failed to append content")?; let zstd = tar .into_inner() @@ -228,13 +312,19 @@ impl CompressedArchiveSnapshotter { Ok(filesize) } - fn create_and_verify_archive(&self, archive_path: &Path) -> StdResult { - let filesize = self.create_archive(archive_path).with_context(|| { - format!( - "CompressedArchiveSnapshotter can not create archive with path: '{}''", - archive_path.display() - ) - })?; + fn create_and_verify_archive( + &self, + archive_path: &Path, + appender: T, + ) -> StdResult { + let filesize = self + .create_archive(archive_path, appender) + .with_context(|| { + format!( + "CompressedArchiveSnapshotter can not create archive with path: '{}''", + archive_path.display() + ) + })?; self.verify_archive(archive_path).with_context(|| { format!( "CompressedArchiveSnapshotter can not verify archive with path: '{}''", @@ -376,7 +466,7 @@ impl Default for DumbSnapshotter { } impl Snapshotter for DumbSnapshotter { - fn snapshot(&self, archive_name: &str) -> StdResult { + fn snapshot_all(&self, archive_name: &str) -> StdResult { let mut value = self .last_snapshot .write() @@ -389,14 +479,23 @@ impl Snapshotter for DumbSnapshotter { Ok(snapshot) } + + fn snapshot_subset( + &self, + archive_name: &str, + _files: Vec, + ) -> StdResult { + self.snapshot_all(archive_name) + } } #[cfg(test)] mod tests { use std::sync::Arc; - use mithril_common::digesters::DummyImmutablesDbBuilder; - use mithril_common::test_utils::TempDir; + use uuid::Uuid; + + use mithril_common::{digesters::DummyImmutablesDbBuilder, test_utils::TempDir}; use crate::test_tools::TestLogger; @@ -406,6 +505,48 @@ mod tests { TempDir::create("snapshotter", dir_name) } + fn create_file(root: &Path, filename: &str) -> PathBuf { + let file_path = PathBuf::from(filename); + File::create(root.join(file_path.clone())).unwrap(); + file_path + } + + fn create_dir(root: &Path, dirname: &str) -> PathBuf { + let dir_path = PathBuf::from(dirname); + std::fs::create_dir(root.join(dir_path.clone())).unwrap(); + dir_path + } + + fn unpack_gz_decoder(test_dir: PathBuf, snapshot: OngoingSnapshot) -> PathBuf { + let file_tar_gz = File::open(snapshot.get_file_path()).unwrap(); + let file_tar_gz_decoder = GzDecoder::new(file_tar_gz); + let mut archive = Archive::new(file_tar_gz_decoder); + let unpack_path = test_dir.join(create_dir(&test_dir, "unpack")); + archive.unpack(&unpack_path).unwrap(); + + unpack_path + } + + // Generate unique name for the archive is mandatory to avoid conflicts during the verification. + fn random_archive_name() -> String { + format!("{}.tar.gz", Uuid::new_v4()) + } + + #[test] + fn test_dumb_snapshotter_snasphot_return_archive_name_with_size_0() { + let snapshotter = DumbSnapshotter::new(); + let snapshot = snapshotter.snapshot_all("archive.tar.gz").unwrap(); + + assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path()); + assert_eq!(0, *snapshot.get_file_size()); + + let snapshot = snapshotter + .snapshot_subset("archive.tar.gz", vec![PathBuf::from("whatever")]) + .unwrap(); + assert_eq!(PathBuf::from("archive.tar.gz"), *snapshot.get_file_path()); + assert_eq!(0, *snapshot.get_file_size()); + } + #[test] fn test_dumb_snapshotter() { let snapshotter = DumbSnapshotter::new(); @@ -415,7 +556,17 @@ mod tests { .is_none()); let snapshot = snapshotter - .snapshot("whatever") + .snapshot_all("whatever") + .expect("Dumb snapshotter::snapshot should not fail."); + assert_eq!( + Some(snapshot), + snapshotter.get_last_snapshot().expect( + "Dumb snapshotter::get_last_snapshot should not fail when some last snapshot." + ) + ); + + let snapshot = snapshotter + .snapshot_subset("another_whatever", vec![PathBuf::from("subdir")]) .expect("Dumb snapshotter::snapshot should not fail."); assert_eq!( Some(snapshot), @@ -490,7 +641,7 @@ mod tests { File::create(pending_snapshot_directory.join("other-process.file")).unwrap(); let _ = snapshotter - .snapshot("whatever.tar.gz") + .snapshot_all("whatever.tar.gz") .expect_err("Snapshotter::snapshot should fail if the db is empty."); let remaining_files: Vec = fs::read_dir(&pending_snapshot_directory) .unwrap() @@ -514,7 +665,7 @@ mod tests { let snapshotter = Arc::new( CompressedArchiveSnapshotter::new( - db_directory, + db_directory.clone(), pending_snapshot_directory.clone(), SnapshotterCompressionAlgorithm::Gzip, TestLogger::stdout(), @@ -522,9 +673,11 @@ mod tests { .unwrap(), ); + let appender = AppenderDirAll { db_directory }; snapshotter .create_archive( &pending_snapshot_directory.join(Path::new(pending_snapshot_archive_file)), + appender, ) .expect("create_archive should not fail"); snapshotter @@ -534,7 +687,7 @@ mod tests { .expect("verify_archive should not fail"); snapshotter - .snapshot(pending_snapshot_archive_file) + .snapshot_all(pending_snapshot_archive_file) .expect("Snapshotter::snapshot should not fail."); } @@ -553,7 +706,7 @@ mod tests { let snapshotter = Arc::new( CompressedArchiveSnapshotter::new( - db_directory, + db_directory.clone(), pending_snapshot_directory.clone(), ZstandardCompressionParameters::default().into(), TestLogger::stdout(), @@ -561,9 +714,11 @@ mod tests { .unwrap(), ); + let appender = AppenderDirAll { db_directory }; snapshotter .create_archive( &pending_snapshot_directory.join(Path::new(pending_snapshot_archive_file)), + appender, ) .expect("create_archive should not fail"); snapshotter @@ -573,7 +728,117 @@ mod tests { .expect("verify_archive should not fail"); snapshotter - .snapshot(pending_snapshot_archive_file) + .snapshot_all(pending_snapshot_archive_file) .expect("Snapshotter::snapshot should not fail."); } + + #[test] + fn snapshot_subset_should_create_archive_only_for_specified_directories_and_files() { + let test_dir = get_test_directory("only_for_specified_directories_and_files"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let directory_to_archive_path = create_dir(&source, "directory_to_archive"); + let file_to_archive_path = create_file(&source, "file_to_archive.txt"); + let directory_not_to_archive_path = create_dir(&source, "directory_not_to_archive"); + let file_not_to_archive_path = create_file(&source, "file_not_to_archive.txt"); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let snapshot = snapshotter + .snapshot_subset( + &random_archive_name(), + vec![ + directory_to_archive_path.clone(), + file_to_archive_path.clone(), + ], + ) + .unwrap(); + + let unpack_path = unpack_gz_decoder(test_dir, snapshot); + + assert!(unpack_path.join(directory_to_archive_path).is_dir()); + assert!(unpack_path.join(file_to_archive_path).is_file()); + assert!(!unpack_path.join(directory_not_to_archive_path).exists()); + assert!(!unpack_path.join(file_not_to_archive_path).exists()); + } + + #[test] + fn snapshot_subset_return_error_when_file_or_directory_not_exist() { + let test_dir = get_test_directory("file_or_directory_not_exist"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + snapshotter + .snapshot_subset(&random_archive_name(), vec![PathBuf::from("not_exist")]) + .expect_err("snapshot_subset should return error when file or directory not exist"); + } + + #[test] + fn snapshot_subset_return_error_when_empty_entries() { + let test_dir = get_test_directory("empty_entries"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + snapshotter + .snapshot_subset(&random_archive_name(), vec![]) + .expect_err("snapshot_subset should return error when entries is empty"); + } + + #[test] + fn snapshot_subset_with_duplicate_files_and_directories() { + let test_dir = get_test_directory("with_duplicate_files_and_directories"); + let destination = test_dir.join(create_dir(&test_dir, "destination")); + let source = test_dir.join(create_dir(&test_dir, "source")); + + let directory_to_archive_path = create_dir(&source, "directory_to_archive"); + let file_to_archive_path = create_file(&source, "directory_to_archive/file_to_archive.txt"); + + let snapshotter = CompressedArchiveSnapshotter::new( + source, + destination, + SnapshotterCompressionAlgorithm::Gzip, + TestLogger::stdout(), + ) + .unwrap(); + + let snapshot = snapshotter + .snapshot_subset( + &random_archive_name(), + vec![ + directory_to_archive_path.clone(), + directory_to_archive_path.clone(), + file_to_archive_path.clone(), + file_to_archive_path.clone(), + ], + ) + .unwrap(); + + let unpack_path = unpack_gz_decoder(test_dir, snapshot); + + assert!(unpack_path.join(directory_to_archive_path).is_dir()); + assert!(unpack_path.join(file_to_archive_path).is_file()); + } }