Skip to content

Commit

Permalink
Merge pull request #2165 from input-output-hk/ensemble/2151/refactor-…
Browse files Browse the repository at this point in the history
…uploaders-module

Refactor: rework `snapshot_uploaders` module to improve genericity
  • Loading branch information
sfauvel authored Dec 11, 2024
2 parents 4cd38e2 + 460c9af commit 404212a
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 341 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.6.1"
version = "0.6.2"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use slog::{debug, warn, Logger};
use std::sync::Arc;
use thiserror::Error;

use crate::{
snapshot_uploaders::SnapshotLocation, snapshotter::OngoingSnapshot, SnapshotUploader,
Snapshotter,
};
use crate::{file_uploaders::FileUri, snapshotter::OngoingSnapshot, FileUploader, Snapshotter};

use super::ArtifactBuilder;
use mithril_common::logging::LoggerExtensions;
Expand All @@ -33,7 +30,7 @@ pub struct CardanoImmutableFilesFullArtifactBuilder {
cardano_network: CardanoNetwork,
cardano_node_version: Version,
snapshotter: Arc<dyn Snapshotter>,
snapshot_uploader: Arc<dyn SnapshotUploader>,
snapshot_uploader: Arc<dyn FileUploader>,
compression_algorithm: CompressionAlgorithm,
logger: Logger,
}
Expand All @@ -44,7 +41,7 @@ impl CardanoImmutableFilesFullArtifactBuilder {
cardano_network: CardanoNetwork,
cardano_node_version: &Version,
snapshotter: Arc<dyn Snapshotter>,
snapshot_uploader: Arc<dyn SnapshotUploader>,
snapshot_uploader: Arc<dyn FileUploader>,
compression_algorithm: CompressionAlgorithm,
logger: Logger,
) -> Self {
Expand Down Expand Up @@ -89,11 +86,11 @@ impl CardanoImmutableFilesFullArtifactBuilder {
async fn upload_snapshot_archive(
&self,
ongoing_snapshot: &OngoingSnapshot,
) -> StdResult<Vec<SnapshotLocation>> {
) -> StdResult<Vec<FileUri>> {
debug!(self.logger, ">> upload_snapshot_archive");
let location = self
.snapshot_uploader
.upload_snapshot(ongoing_snapshot.get_file_path())
.upload(ongoing_snapshot.get_file_path())
.await;

if let Err(error) = tokio::fs::remove_file(ongoing_snapshot.get_file_path()).await {
Expand Down Expand Up @@ -158,7 +155,12 @@ impl ArtifactBuilder<CardanoDbBeacon, Snapshot> for CardanoImmutableFilesFullArt
})?;

let snapshot = self
.create_snapshot(beacon, &ongoing_snapshot, snapshot_digest, locations)
.create_snapshot(
beacon,
&ongoing_snapshot,
snapshot_digest,
locations.into_iter().map(Into::into).collect(),
)
.await?;

Ok(snapshot)
Expand All @@ -174,8 +176,7 @@ mod tests {
use mithril_common::{entities::CompressionAlgorithm, test_utils::fake_data};

use crate::{
snapshot_uploaders::MockSnapshotUploader, test_tools::TestLogger, DumbSnapshotUploader,
DumbSnapshotter,
file_uploaders::MockFileUploader, test_tools::TestLogger, DumbSnapshotter, DumbUploader,
};

use super::*;
Expand All @@ -190,7 +191,7 @@ mod tests {
.unwrap();

let dumb_snapshotter = Arc::new(DumbSnapshotter::new());
let dumb_snapshot_uploader = Arc::new(DumbSnapshotUploader::new());
let dumb_snapshot_uploader = Arc::new(DumbUploader::new());

let cardano_immutable_files_full_artifact_builder =
CardanoImmutableFilesFullArtifactBuilder::new(
Expand All @@ -213,6 +214,7 @@ mod tests {
let remote_locations = vec![dumb_snapshot_uploader
.get_last_upload()
.unwrap()
.map(Into::into)
.expect("A snapshot should have been 'uploaded'")];
let artifact_expected = Snapshot::new(
snapshot_digest.to_owned(),
Expand All @@ -237,7 +239,7 @@ mod tests {
fake_data::network(),
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
Arc::new(DumbUploader::new()),
CompressionAlgorithm::default(),
TestLogger::stdout(),
);
Expand All @@ -264,7 +266,7 @@ mod tests {
network,
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
Arc::new(DumbUploader::new()),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
);
Expand Down Expand Up @@ -293,7 +295,7 @@ mod tests {
fake_data::network(),
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbSnapshotUploader::new()),
Arc::new(DumbUploader::new()),
algorithm,
TestLogger::stdout(),
);
Expand Down Expand Up @@ -325,9 +327,9 @@ mod tests {
let file = NamedTempFile::new().unwrap();
let file_path = file.path();
let snapshot = OngoingSnapshot::new(file_path.to_path_buf(), 7331);
let mut snapshot_uploader = MockSnapshotUploader::new();
let mut snapshot_uploader = MockFileUploader::new();
snapshot_uploader
.expect_upload_snapshot()
.expect_upload()
.return_once(|_| Err(anyhow!("an error")))
.once();

Expand Down
28 changes: 14 additions & 14 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use crate::{
},
entities::AggregatorEpochSettings,
event_store::{EventMessage, EventStore, TransmitterService},
file_uploaders::GcpUploader,
http_server::routes::router::{self, RouterConfig, RouterState},
services::{
AggregatorSignableSeedBuilder, AggregatorUpkeepService, BufferedCertifierService,
Expand All @@ -74,12 +75,12 @@ use crate::{
UpkeepService, UsageReporter,
},
store::CertificatePendingStorer,
tools::{CExplorerSignerRetriever, GcpFileUploader, GenesisToolsDependency, SignersImporter},
tools::{CExplorerSignerRetriever, GenesisToolsDependency, SignersImporter},
AggregatorConfig, AggregatorRunner, AggregatorRuntime, CompressedArchiveSnapshotter,
Configuration, DependencyContainer, DumbSnapshotUploader, DumbSnapshotter, EpochSettingsStorer,
LocalSnapshotUploader, MetricsService, MithrilSignerRegisterer, MultiSigner, MultiSignerImpl,
RemoteSnapshotUploader, SingleSignatureAuthenticator, SnapshotUploader, SnapshotUploaderType,
Snapshotter, SnapshotterCompressionAlgorithm, VerificationKeyStorer,
Configuration, DependencyContainer, DumbSnapshotter, DumbUploader, EpochSettingsStorer,
FileUploader, LocalUploader, MetricsService, MithrilSignerRegisterer, MultiSigner,
MultiSignerImpl, SingleSignatureAuthenticator, SnapshotUploaderType, Snapshotter,
SnapshotterCompressionAlgorithm, VerificationKeyStorer,
};

const SQLITE_FILE: &str = "aggregator.sqlite3";
Expand Down Expand Up @@ -118,7 +119,7 @@ pub struct DependenciesBuilder {
pub stake_store: Option<Arc<StakePoolStore>>,

/// Snapshot uploader service.
pub snapshot_uploader: Option<Arc<dyn SnapshotUploader>>,
pub snapshot_uploader: Option<Arc<dyn FileUploader>>,

/// Multisigner service.
pub multi_signer: Option<Arc<dyn MultiSigner>>,
Expand Down Expand Up @@ -446,7 +447,7 @@ impl DependenciesBuilder {
Ok(self.stake_store.as_ref().cloned().unwrap())
}

async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn SnapshotUploader>> {
async fn build_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
let logger = self.root_logger();
if self.configuration.environment == ExecutionEnvironment::Production {
match self.configuration.snapshot_uploader_type {
Expand All @@ -461,26 +462,25 @@ impl DependenciesBuilder {
)
})?;

Ok(Arc::new(RemoteSnapshotUploader::new(
Box::new(GcpFileUploader::new(bucket.clone(), logger.clone())),
Ok(Arc::new(GcpUploader::new(
bucket,
self.configuration.snapshot_use_cdn_domain,
logger,
logger.clone(),
)))
}
SnapshotUploaderType::Local => Ok(Arc::new(LocalSnapshotUploader::new(
SnapshotUploaderType::Local => Ok(Arc::new(LocalUploader::new(
self.configuration.get_server_url(),
&self.configuration.snapshot_directory,
logger,
))),
}
} else {
Ok(Arc::new(DumbSnapshotUploader::new()))
Ok(Arc::new(DumbUploader::new()))
}
}

/// Get a [SnapshotUploader]
pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn SnapshotUploader>> {
/// Get a [FileUploader]
pub async fn get_snapshot_uploader(&mut self) -> Result<Arc<dyn FileUploader>> {
if self.snapshot_uploader.is_none() {
self.snapshot_uploader = Some(self.build_snapshot_uploader().await?);
}
Expand Down
4 changes: 2 additions & 2 deletions mithril-aggregator/src/dependency_injection/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ use crate::{
},
entities::AggregatorEpochSettings,
event_store::{EventMessage, TransmitterService},
file_uploaders::FileUploader,
multi_signer::MultiSigner,
services::{
CertifierService, EpochService, MessageService, ProverService, SignedEntityService,
StakeDistributionService, TransactionStore, UpkeepService,
},
signer_registerer::SignerRecorder,
snapshot_uploaders::SnapshotUploader,
store::CertificatePendingStorer,
EpochSettingsStorer, MetricsService, SignerRegisterer, SignerRegistrationRoundOpener,
SingleSignatureAuthenticator, Snapshotter, VerificationKeyStorer,
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct DependencyContainer {
pub stake_store: Arc<StakePoolStore>,

/// Snapshot uploader service.
pub snapshot_uploader: Arc<dyn SnapshotUploader>,
pub snapshot_uploader: Arc<dyn FileUploader>,

/// Multisigner service.
pub multi_signer: Arc<dyn MultiSigner>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ use async_trait::async_trait;
use mithril_common::StdResult;
use std::{path::Path, sync::RwLock};

use super::{SnapshotLocation, SnapshotUploader};
use crate::file_uploaders::{FileUploader, FileUri};

/// Dummy uploader for test purposes.
///
/// It actually does NOT upload any snapshot but remembers the last snapshot it
/// It actually does NOT upload any file but remembers the last file it
/// was asked to upload. This is intended to by used by integration tests.
pub struct DumbSnapshotUploader {
last_uploaded: RwLock<Option<String>>,
pub struct DumbUploader {
last_uploaded: RwLock<Option<FileUri>>,
}

impl DumbSnapshotUploader {
impl DumbUploader {
/// Create a new instance.
pub fn new() -> Self {
Self {
Expand All @@ -22,32 +22,32 @@ impl DumbSnapshotUploader {
}

/// Return the last upload that was triggered.
pub fn get_last_upload(&self) -> StdResult<Option<String>> {
pub fn get_last_upload(&self) -> StdResult<Option<FileUri>> {
let value = self
.last_uploaded
.read()
.map_err(|e| anyhow!("Error while saving filepath location: {e}"))?;

Ok(value.as_ref().map(|v| v.to_string()))
Ok(value.as_ref().map(Clone::clone))
}
}

impl Default for DumbSnapshotUploader {
impl Default for DumbUploader {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl SnapshotUploader for DumbSnapshotUploader {
/// Upload a snapshot
async fn upload_snapshot(&self, snapshot_filepath: &Path) -> StdResult<SnapshotLocation> {
impl FileUploader for DumbUploader {
/// Upload a file
async fn upload(&self, filepath: &Path) -> StdResult<FileUri> {
let mut value = self
.last_uploaded
.write()
.map_err(|e| anyhow!("Error while saving filepath location: {e}"))?;

let location = snapshot_filepath.to_string_lossy().to_string();
let location = FileUri(filepath.to_string_lossy().to_string());
*value = Some(location.clone());

Ok(location)
Expand All @@ -60,18 +60,18 @@ mod tests {

#[tokio::test]
async fn test_dumb_uploader() {
let uploader = DumbSnapshotUploader::new();
let uploader = DumbUploader::new();
assert!(uploader
.get_last_upload()
.expect("uploader should not fail")
.is_none());
let res = uploader
.upload_snapshot(Path::new("/tmp/whatever"))
.upload(Path::new("/tmp/whatever"))
.await
.expect("uploading with a dumb uploader should not fail");
assert_eq!(res, "/tmp/whatever".to_string());
assert_eq!(res, FileUri("/tmp/whatever".to_string()));
assert_eq!(
Some("/tmp/whatever".to_string()),
Some(FileUri("/tmp/whatever".to_string())),
uploader
.get_last_upload()
.expect("getting dumb uploader last value after a fake download should not fail")
Expand Down
Loading

0 comments on commit 404212a

Please sign in to comment.