Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: ancillary archive creation #2191

Draft
wants to merge 8 commits into
base: ensemble/2151/extend-snapshotter-for-specific-files-directories
Choose a base branch
from
37 changes: 29 additions & 8 deletions mithril-aggregator/src/artifact_builder/cardano_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub struct CardanoDatabaseArtifactBuilder {
db_directory: PathBuf,
cardano_node_version: Version,
compression_algorithm: CompressionAlgorithm,
#[allow(dead_code)]
ancillary_builder: Arc<AncillaryArtifactBuilder>,
}

Expand Down Expand Up @@ -62,8 +61,10 @@ impl ArtifactBuilder<CardanoDbBeacon, CardanoDatabaseSnapshot> for CardanoDataba
})?;
let total_db_size_uncompressed = compute_uncompressed_database_size(&self.db_directory)?;

let ancillary_locations = self.ancillary_builder.upload(&beacon).await?;

let locations = ArtifactsLocations {
ancillary: vec![],
ancillary: ancillary_locations,
digests: vec![],
immutables: vec![],
};
Expand Down Expand Up @@ -111,9 +112,14 @@ mod tests {
use std::path::PathBuf;

use mithril_common::{
digesters::DummyImmutablesDbBuilder,
entities::{ProtocolMessage, ProtocolMessagePartKey},
digesters::DummyCardanoDbBuilder,
entities::{AncillaryLocation, ProtocolMessage, ProtocolMessagePartKey},
test_utils::{fake_data, TempDir},
CardanoNetwork,
};

use crate::{
artifact_builder::MockAncillaryFileUploader, test_tools::TestLogger, DumbSnapshotter,
};

use super::*;
Expand All @@ -129,7 +135,7 @@ mod tests {
let immutable_trio_file_size = 777;
let ledger_file_size = 6666;
let volatile_file_size = 99;
DummyImmutablesDbBuilder::new(test_dir.as_os_str().to_str().unwrap())
DummyCardanoDbBuilder::new(test_dir.as_os_str().to_str().unwrap())
.with_immutables(&[1, 2])
.set_immutable_trio_file_size(immutable_trio_file_size)
.with_ledger_files(&["blocks-0.dat", "blocks-1.dat", "blocks-2.dat"])
Expand All @@ -152,7 +158,7 @@ mod tests {
let immutable_trio_file_size = 777;
let ledger_file_size = 6666;
let volatile_file_size = 99;
DummyImmutablesDbBuilder::new(test_dir.as_os_str().to_str().unwrap())
DummyCardanoDbBuilder::new(test_dir.as_os_str().to_str().unwrap())
.with_immutables(&[1])
.set_immutable_trio_file_size(immutable_trio_file_size)
.with_ledger_files(&["blocks-0.dat"])
Expand All @@ -162,11 +168,23 @@ mod tests {
.build();
let expected_total_size = immutable_trio_file_size + ledger_file_size + volatile_file_size;

let mut ancillary_uploader = MockAncillaryFileUploader::new();
ancillary_uploader.expect_upload().return_once(|_| {
Ok(AncillaryLocation::CloudStorage {
uri: "ancillary_uri".to_string(),
})
});
let cardano_database_artifact_builder = CardanoDatabaseArtifactBuilder::new(
test_dir,
&Version::parse("1.0.0").unwrap(),
CompressionAlgorithm::Zstandard,
Arc::new(AncillaryArtifactBuilder::new(vec![])),
Arc::new(AncillaryArtifactBuilder::new(
vec![Arc::new(ancillary_uploader)],
Arc::new(DumbSnapshotter::new()),
CardanoNetwork::DevNet(123),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
)),
);

let beacon = fake_data::beacon();
Expand All @@ -187,12 +205,15 @@ mod tests {
.await
.unwrap();

let expected_ancillary_locations = vec![AncillaryLocation::CloudStorage {
uri: "ancillary_uri".to_string(),
}];
let artifact_expected = CardanoDatabaseSnapshot::new(
"merkleroot".to_string(),
beacon,
expected_total_size,
ArtifactsLocations {
ancillary: vec![],
ancillary: expected_ancillary_locations,
digests: vec![],
immutables: vec![],
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
#![allow(dead_code)]
use std::{
path::{Path, PathBuf},
sync::Arc,
};

use anyhow::Context;
use async_trait::async_trait;
use std::{path::Path, sync::Arc};
use slog::{debug, Logger};

use mithril_common::{entities::AncillaryLocation, StdResult};
use mithril_common::{
digesters::{IMMUTABLE_DIR, LEDGER_DIR, VOLATILE_DIR},
entities::{AncillaryLocation, CardanoDbBeacon, CompressionAlgorithm},
logging::LoggerExtensions,
CardanoNetwork, StdResult,
};

use crate::{FileUploader, LocalUploader};
use crate::{snapshotter::OngoingSnapshot, FileUploader, LocalUploader, Snapshotter};

/// The [AncillaryFileUploader] trait allows identifying uploaders that return locations for ancillary archive files.
#[cfg_attr(test, mockall::automock)]
Expand All @@ -27,18 +37,103 @@ impl AncillaryFileUploader for LocalUploader {
/// The archive is uploaded with the provided uploaders.
pub struct AncillaryArtifactBuilder {
uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
snapshotter: Arc<dyn Snapshotter>,
cardano_network: CardanoNetwork,
compression_algorithm: CompressionAlgorithm,
logger: Logger,
}

impl AncillaryArtifactBuilder {
pub fn new(uploaders: Vec<Arc<dyn AncillaryFileUploader>>) -> Self {
Self { uploaders }
/// Creates a new [AncillaryArtifactBuilder].
pub fn new(
uploaders: Vec<Arc<dyn AncillaryFileUploader>>,
snapshotter: Arc<dyn Snapshotter>,
cardano_network: CardanoNetwork,
compression_algorithm: CompressionAlgorithm,
logger: Logger,
) -> Self {
Self {
uploaders,
logger: logger.new_with_component_name::<Self>(),
cardano_network,
compression_algorithm,
snapshotter,
}
}

pub async fn upload(&self, beacon: &CardanoDbBeacon) -> StdResult<Vec<AncillaryLocation>> {
let snapshot = self.create_ancillary_archive(beacon)?;

let locations = self
.upload_ancillary_archive(snapshot.get_file_path())
.await?;

Ok(locations)
}

pub async fn upload_archive(&self, db_directory: &Path) -> StdResult<Vec<AncillaryLocation>> {
/// Returns the list of files and directories to include in the snapshot.
/// The immutable file number is incremented by 1 to include the not yet finalized immutable file.
fn get_files_and_directories_to_snapshot(immutable_file_number: u64) -> Vec<PathBuf> {
let next_immutable_file_number = immutable_file_number + 1;
let chunk_filename = format!("{:05}.chunk", next_immutable_file_number);
let primary_filename = format!("{:05}.primary", next_immutable_file_number);
let secondary_filename = format!("{:05}.secondary", next_immutable_file_number);

vec![
PathBuf::from(VOLATILE_DIR),
PathBuf::from(LEDGER_DIR),
PathBuf::from(IMMUTABLE_DIR).join(chunk_filename),
PathBuf::from(IMMUTABLE_DIR).join(primary_filename),
PathBuf::from(IMMUTABLE_DIR).join(secondary_filename),
]
}

/// Creates an archive for the Cardano database ancillary files for the given immutable file number.
fn create_ancillary_archive(&self, beacon: &CardanoDbBeacon) -> StdResult<OngoingSnapshot> {
debug!(
self.logger,
"Creating ancillary archive for immutable file number: {}",
beacon.immutable_file_number
);

let paths_to_include =
Self::get_files_and_directories_to_snapshot(beacon.immutable_file_number);

let archive_name = format!(
"{}-e{}-i{}.ac.{}", // Temporary name, `LocalUploader` needs to be refactored to handle names other than digests.
self.cardano_network,
*beacon.epoch,
beacon.immutable_file_number,
self.compression_algorithm.tar_file_extension()
);

let snapshot = self
.snapshotter
.snapshot_subset(&archive_name, paths_to_include)
.with_context(|| {
format!(
"Failed to create snapshot for immutable file number: {}",
beacon.immutable_file_number
)
})?;

debug!(
self.logger,
"Ancillary archive created at path: {:?}",
snapshot.get_file_path()
);

Ok(snapshot)
}

/// Uploads the ancillary archive and returns the locations of the uploaded files.
async fn upload_ancillary_archive(
&self,
archive_filepath: &Path,
) -> StdResult<Vec<AncillaryLocation>> {
let mut locations = Vec::new();
for uploader in &self.uploaders {
// TODO: Temporary preparation work, `db_directory` is used as the ancillary archive path for now.
let location = uploader.upload(db_directory).await?;
let location = uploader.upload(archive_filepath).await?;
locations.push(location);
}

Expand All @@ -48,21 +143,43 @@ impl AncillaryArtifactBuilder {

#[cfg(test)]
mod tests {
use std::fs::File;

use flate2::read::GzDecoder;
use mockall::predicate::eq;
use tar::Archive;

use mithril_common::digesters::{
DummyCardanoDbBuilder, IMMUTABLE_DIR, LEDGER_DIR, VOLATILE_DIR,
};

use crate::{
test_tools::TestLogger, CompressedArchiveSnapshotter, DumbSnapshotter,
SnapshotterCompressionAlgorithm,
};

use super::*;

#[tokio::test]
async fn upload_archive_should_return_empty_locations_with_no_uploader() {
let builder = AncillaryArtifactBuilder::new(vec![]);
async fn upload_ancillary_archive_should_return_empty_locations_with_no_uploader() {
let builder = AncillaryArtifactBuilder::new(
vec![],
Arc::new(DumbSnapshotter::new()),
CardanoNetwork::DevNet(123),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
);

let locations = builder.upload_archive(Path::new("whatever")).await.unwrap();
let locations = builder
.upload_ancillary_archive(Path::new("whatever"))
.await
.unwrap();

assert!(locations.is_empty());
}

#[tokio::test]
async fn upload_archive_should_return_all_uploaders_returned_locations() {
async fn upload_ancillary_archive_should_return_all_uploaders_returned_locations() {
let mut first_uploader = MockAncillaryFileUploader::new();
first_uploader
.expect_upload()
Expand All @@ -88,10 +205,16 @@ mod tests {
let uploaders: Vec<Arc<dyn AncillaryFileUploader>> =
vec![Arc::new(first_uploader), Arc::new(second_uploader)];

let builder = AncillaryArtifactBuilder::new(uploaders);
let builder = AncillaryArtifactBuilder::new(
uploaders,
Arc::new(DumbSnapshotter::new()),
CardanoNetwork::DevNet(123),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
);

let locations = builder
.upload_archive(Path::new("archive_path"))
.upload_ancillary_archive(Path::new("archive_path"))
.await
.unwrap();

Expand All @@ -107,4 +230,100 @@ mod tests {
]
);
}

#[tokio::test]
async fn create_archive_should_embed_ledger_volatile_directories_and_last_immutables() {
let test_dir = "cardano_database/create_archive";
let cardano_db = DummyCardanoDbBuilder::new(test_dir)
.with_immutables(&[1, 2, 3])
.with_ledger_files(&["blocks-0.dat", "blocks-1.dat", "blocks-2.dat"])
.with_volatile_files(&["437", "537", "637", "737"])
.build();
std::fs::create_dir(cardano_db.get_dir().join("whatever")).unwrap();

let db_directory = cardano_db.get_dir().to_path_buf();
let snapshotter = {
CompressedArchiveSnapshotter::new(
db_directory.clone(),
db_directory.parent().unwrap().join("snapshot_dest"),
SnapshotterCompressionAlgorithm::Gzip,
TestLogger::stdout(),
)
.unwrap()
};

let builder = AncillaryArtifactBuilder::new(
vec![],
Arc::new(snapshotter),
CardanoNetwork::DevNet(123),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
);

let snapshot = builder
.create_ancillary_archive(&CardanoDbBeacon::new(99, 1))
.unwrap();

let mut archive = {
let file_tar_gz = File::open(snapshot.get_file_path()).unwrap();
let file_tar_gz_decoder = GzDecoder::new(file_tar_gz);
Archive::new(file_tar_gz_decoder)
};

let dst = cardano_db.get_dir().join("unpack_dir");
archive.unpack(dst.clone()).unwrap();

let expected_immutable_path = dst.join(IMMUTABLE_DIR);
assert!(expected_immutable_path.join("00002.chunk").exists());
assert!(expected_immutable_path.join("00002.primary").exists());
assert!(expected_immutable_path.join("00002.secondary").exists());
let immutables_nb = std::fs::read_dir(expected_immutable_path).unwrap().count();
assert_eq!(3, immutables_nb);

let expected_ledger_path = dst.join(LEDGER_DIR);
assert!(expected_ledger_path.join("blocks-0.dat").exists());
assert!(expected_ledger_path.join("blocks-1.dat").exists());
assert!(expected_ledger_path.join("blocks-2.dat").exists());
let ledger_nb = std::fs::read_dir(expected_ledger_path).unwrap().count();
assert_eq!(3, ledger_nb);

let expected_volatile_path = dst.join(VOLATILE_DIR);
assert!(expected_volatile_path.join("437").exists());
assert!(expected_volatile_path.join("537").exists());
assert!(expected_volatile_path.join("637").exists());
assert!(expected_volatile_path.join("737").exists());
let volatile_nb = std::fs::read_dir(expected_volatile_path).unwrap().count();
assert_eq!(4, volatile_nb);

assert!(!dst.join("whatever").exists());
}

#[tokio::test]
async fn upload_should_return_error_and_not_upload_when_archive_creation_fails() {
let snapshotter = {
CompressedArchiveSnapshotter::new(
PathBuf::from("directory_not_existing"),
PathBuf::from("whatever"),
SnapshotterCompressionAlgorithm::Gzip,
TestLogger::stdout(),
)
.unwrap()
};

let mut uploader = MockAncillaryFileUploader::new();
uploader.expect_upload().never();

let builder = AncillaryArtifactBuilder::new(
vec![Arc::new(uploader)],
Arc::new(snapshotter),
CardanoNetwork::DevNet(123),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
);

builder
.upload(&CardanoDbBeacon::new(99, 1))
.await
.expect_err("Should return an error when archive creation fails");
}
}
Loading
Loading