From b9cf76ca3f6426221dabfacf9d8e79b23dd0654f Mon Sep 17 00:00:00 2001 From: Adam Nelson Date: Tue, 30 Aug 2022 15:35:03 +0200 Subject: [PATCH] Add some integration tests that exercise S3 These discovered a couple of bugs that didn't manifest under Minio but did under the real S3, so those are fixed as well. --- .env | 7 ++ Cargo.lock | 37 ++++++++ ssstar-testing/src/minio.rs | 4 +- ssstar-testing/src/test_data.rs | 132 ++++++++++++++++++++++++-- ssstar/Cargo.toml | 2 + ssstar/src/extract.rs | 37 ++++++-- ssstar/src/objstore/s3.rs | 10 +- ssstar/tests/integration.rs | 2 + ssstar/tests/objstore/s3.rs | 161 +++++++++++++++++++++++++++++++- 9 files changed, 373 insertions(+), 19 deletions(-) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000..16a7d3e --- /dev/null +++ b/.env @@ -0,0 +1,7 @@ +# These vars identify cloud resources that are available at CI time for tests that require real live cloud instances +# +# NOTE: You won't be able to access these buckets unless you have access to internal Elastio development accounts, +# and you point the `AWS_PROFILE` env var to the name of an AWS config profile that has write access to the CI buckets +# in the `assuriodev` AWS account. +TEST_S3_BUCKET=ci-s3-tests.elastio.dev +TEST_S3_BUCKET_REGION=eu-central-1 diff --git a/Cargo.lock b/Cargo.lock index 279ad9b..b475e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -725,6 +725,35 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + +[[package]] +name = "dotenv_codegen" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56966279c10e4f8ee8c22123a15ed74e7c8150b658b26c619c53f4a56eb4a8aa" +dependencies = [ + "dotenv_codegen_implementation", + "proc-macro-hack", +] + +[[package]] +name = "dotenv_codegen_implementation" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53e737a3522cd45f6adc19b644ce43ef53e1e9045f2d2de425c1f468abd4cf33" +dependencies = [ + "dotenv", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "duct" version = "0.13.5" @@ -1530,6 +1559,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + [[package]] name = "proc-macro2" version = "1.0.43" @@ -1963,6 +1998,8 @@ dependencies = [ "chrono", "clap", "color-eyre", + "dotenv", + "dotenv_codegen", "dyn-clone", "futures", "glob", diff --git a/ssstar-testing/src/minio.rs b/ssstar-testing/src/minio.rs index 53f045b..312811a 100644 --- a/ssstar-testing/src/minio.rs +++ b/ssstar-testing/src/minio.rs @@ -146,10 +146,10 @@ impl MinioServer { // Shorten the bucket name so we can append the unique ID and it will still be under 63 // chars - let bucket = &bucket[..bucket.len().min(63 - 8)]; + let bucket = &bucket[..bucket.len().min(63 - 9)]; // Prepend a random number to ensure the bucket name is unique across multiple tests - let bucket = format!("{:08x}-{}", rand::thread_rng().next_u32(), bucket); + let bucket = format!("{:08x}-{bucket}", rand::thread_rng().next_u32()); let client = self.aws_client().await?; diff --git a/ssstar-testing/src/test_data.rs b/ssstar-testing/src/test_data.rs index d81e6b9..030881e 100644 --- a/ssstar-testing/src/test_data.rs +++ b/ssstar-testing/src/test_data.rs @@ -3,7 +3,9 @@ use crate::Result; use aws_sdk_s3::{types::ByteStream, Client}; use bytes::Bytes; use futures::{StreamExt, TryStreamExt}; +use once_cell::sync::Lazy; use rand::prelude::*; +use regex::Regex; use sha2::Digest; use std::{ borrow::Cow, @@ -47,6 +49,31 @@ pub struct TestObjectWithData { pub hash: [u8; 32], } +/// Generate a unique prefix ending in a `/` character, and prepend it to the `key` in a collection +/// of [`TestObject`]s. +/// +/// Returns the unique prefix an an iterator that yields the modified test objects. +/// +/// This is useful when running tests against a real S3 bucket, where multiple runs of the same +/// test may write to the bucket so each test's object keys must be unique +pub fn prepend_unique_prefix( + objects: impl IntoIterator, +) -> (String, impl IntoIterator) { + let prefix = format!("{:08x}/", rand::thread_rng().next_u32()); + + let objects = { + let prefix = prefix.clone(); + + objects.into_iter().map(move |mut object| { + object.key = format!("{}{}", prefix, object.key); + + object + }) + }; + + (prefix, objects) +} + /// Generate one or more test objects in a bucket. /// /// Each object has a size specified. Random data will be generated for each object. @@ -312,18 +339,109 @@ where .send() .await?; - let object = dbg!(object); - - object.checksum_sha256().map(|hash| hash.to_owned()) + dbg!(object.checksum_sha256) }; let hash = match object_hash { Some(object_hash) => { // The hash is expressed as a base64 encoded string. - // Decode into a propery hash and then compare - let mut hash = [0u8; 32]; - hash.copy_from_slice(&base64::decode(object_hash).unwrap()); - hash + // + // It has two forms: if this object was uploaded as a single part, this string is + // literally the hash of the contents of the object. But if it was uploaded using + // the multi-part API, the string is base64 hash, followed by `-` and the number + // of parts. In that case, the hash is a hash of all of the parts' hashes. + static MULTIPART_HASH: Lazy = Lazy::new(|| { + Regex::new(r##"^(?P(?:[A-Za-z\d+/]{4})*(?:[A-Za-z\d+/]{3}=|[A-Za-z\d+/]{2}==)?)-(?P\d+)$"##).unwrap() + }); + + if let Some(_captures) = MULTIPART_HASH.captures(&object_hash) { + // XXX: The commented-out code below tries to use list_parts to compute the + // aggregate hash of the object, but this doesn't work as of 30 Aug 2022 + // because the AWS SDK for Rust seems not to support `list_parts` correctly. I + // theorize that it's due to https://github.com/awslabs/smithy-rs/issues/1668 + // however I can't be sure. + // + // Since this is just test code, we know the objects in question will not be + // particularly huge. So in the case where the hash is a multipart hash, just + // download the entire object from S3 and compute the hash anew. + // + // Someday perhaps restore this code and save a few seconds on integration test + // runs + + /* + // This is a multi-part hash so the validation of the hash just got more + // complicated + let hash_text = captures.name("hash").unwrap().as_str(); + + // Decode the hash into binary + let mut hash = [0u8; 32]; + + hash.copy_from_slice(&base64::decode(hash_text).unwrap()); + + // get the hashes of all of the parts in this object + // + // NOTE: There is a new GetObjectAttributes S3 API, but as of late August 2022 + // the AWS Rust SDK doesn't work right with it. When callng it with multiple + // object attributes, it fails with a signature mismatch error. It seems + // likely that the reason https://github.com/awslabs/aws-sdk-rust/issues/500 + // that was preventing the import of the latest S3 API model is probably to + // blame. As of now there is a `get_object_attributes` method in the Rust code + // but it's broken so probably there's something in the S3 API model for this + // function that isn't handled right in Rust. + let mut parts = client + .list_parts() + .bucket(bucket) + .key(&key) + .into_paginator() + .send(); + + let mut hasher = sha2::Sha256::new(); + while let Some(page) = parts.try_next().await? { + for part in page.parts.unwrap() { + dbg!(part.part_number()); + + let hash = base64::decode(part.checksum_sha256().unwrap())?; + assert_eq!(hash.len(), sha2::Sha256::output_size()); + hasher.update(&hash); + //hasher.update(object.checksum_sha256().unwrap().as_bytes()); + } + } + + //for part in parts.parts().unwrap() { + // dbg!(part.part_number()); + // + // let hash = base64::decode(part.checksum_sha256().unwrap())?; + // assert_eq!(hash.len(), sha2::Sha256::output_size()); + // hasher.update(&hash); + // //hasher.update(object.checksum_sha256().unwrap().as_bytes()); + //} + + // Computed the hash of all parts. + let mut hash = [0u8; 32]; + + hash.copy_from_slice(&hasher.finalize()); + hash + */ + + let response = client.get_object().bucket(bucket).key(&key).send().await?; + + let mut body = response.body; + let mut hasher = sha2::Sha256::new(); + while let Some(bytes) = body.try_next().await? { + hasher.update(bytes); + } + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hasher.finalize()); + hash + } else { + // This is a simple hash + + // Decode into a binary hash and then compare + let mut hash = [0u8; 32]; + + hash.copy_from_slice(&base64::decode(object_hash).unwrap()); + hash + } } None => { // XXX: Actually the above statement would be correct, except that minio still doesn't diff --git a/ssstar/Cargo.toml b/ssstar/Cargo.toml index 85b4a48..d41dd8d 100644 --- a/ssstar/Cargo.toml +++ b/ssstar/Cargo.toml @@ -55,6 +55,8 @@ strum = { version = "0.24.1", features = ["derive"] } assert_matches = "1.5.0" more-asserts = "0.3.0" tempdir = "0.3.7" +dotenv = "0.15.0" +dotenv_codegen = "0.15.0" [build-dependencies] vergen = "7.4.0" diff --git a/ssstar/src/extract.rs b/ssstar/src/extract.rs index 4e1c790..4b93180 100644 --- a/ssstar/src/extract.rs +++ b/ssstar/src/extract.rs @@ -145,7 +145,7 @@ impl std::fmt::Debug for SourceArchiveInternal { len, } => f .debug_struct("ObjectStorage") - .field("bucket", &bucket) + .field("bucket", &bucket.name()) .field("key", &key) .field("version_id", &version_id) .field("len", &len) @@ -456,8 +456,8 @@ impl ExtractArchiveJob { { let span = info_span!("run", source_archive = ?self.source_archive, - target_bucket = ?self.target_bucket, - target_prefix = ?self.target_prefix); + target_bucket = self.target_bucket.name(), + target_prefix = %self.target_prefix); async move { info!(?self.filters, ?self.archive_size, "Starting extract archive job"); @@ -535,12 +535,35 @@ impl ExtractArchiveJob { // Wait for both tasks to finish and only then look at results let (reader_result, processor_result) = futures::join!(reader_fut, processor_fut); + let reader_result = + reader_result.with_context(|_| crate::error::SpawnBlockingSnafu {})?; + let processor_result = + processor_result.with_context(|_| crate::error::SpawnSnafu {})?; // If there's an error in the reader, the processor can also fail but with a less // meaningful error. So evaluate reader results first. - reader_result.with_context(|_| crate::error::SpawnBlockingSnafu {})??; - let (total_objects, total_object_bytes) = - processor_result.with_context(|_| crate::error::SpawnSnafu {})??; + // + // If the error indicates the sender mpsc was dropped, that's the queue to check the + // processor task's result instead + let (total_objects, total_object_bytes) = match reader_result { + Ok(_) => { + // Good sign. Only the processor can fail + processor_result? + } + e @ Err(crate::S3TarError::TarExtractAborted) => { + // This is the error the reader fails with when it got an error sending + // something to the processor task. This suggests the processor task has + // failed. + processor_result?; + + // `processor_result` wasn't an error, so return the reader error + return e; + } + Err(e) => { + // Any other error means the failure was on the reader's side + return Err(e); + } + }; progress.objects_uploaded(total_objects, total_object_bytes); @@ -721,7 +744,7 @@ impl ExtractArchiveJob { /// reporting by the caller, since this function can't tell if `entry_receiver` stopped /// producing components because there is no more work to to, or because of an error in the /// reader. - #[instrument(skip(progress, entry_receiver))] + #[instrument(skip(target_bucket, progress, entry_receiver), fields(target_bucket = target_bucket.name()))] async fn process_tar_entries( target_bucket: Box, target_prefix: String, diff --git a/ssstar/src/objstore/s3.rs b/ssstar/src/objstore/s3.rs index 6699b27..1cc75ae 100644 --- a/ssstar/src/objstore/s3.rs +++ b/ssstar/src/objstore/s3.rs @@ -319,7 +319,14 @@ impl S3Bucket { .expect("BUG: uploaded part missing etag") .to_string(); - debug!(%e_tag, "Uploaded multi-part chunk"); + // XXX: When running against Minio, as of 30 Aug 2022 it doesn't have checksum + // support so thsi can be empty. In that case, it won't be an error to omit the + // sha256 hash when completing the multipart upload + let sha256 = response + .checksum_sha256() + .map(|hash| hash.to_string()); + + debug!(%e_tag, sha256 = sha256.as_deref().unwrap_or_default(), "Uploaded multi-part chunk"); let _ = progress_sender.send(chunk_size); @@ -327,6 +334,7 @@ impl S3Bucket { // to the CompleteMultipartUpload call, so retain the key bits here let completed_part = aws_sdk_s3::model::CompletedPart::builder() .e_tag(e_tag) + .set_checksum_sha256(sha256) .part_number(part_number as i32) .build(); diff --git a/ssstar/tests/integration.rs b/ssstar/tests/integration.rs index 36f5548..778edc0 100644 --- a/ssstar/tests/integration.rs +++ b/ssstar/tests/integration.rs @@ -4,6 +4,8 @@ //! //! For all practical purposes each submodule here is a separate logical integration test fixture. //! See mod-level comments in those modules for more details. +#[macro_use] +extern crate dotenv_codegen; /// Test code that reports errors can just cheat and use `eyre` type Result = color_eyre::Result; diff --git a/ssstar/tests/objstore/s3.rs b/ssstar/tests/objstore/s3.rs index 5c16a1a..642b81d 100644 --- a/ssstar/tests/objstore/s3.rs +++ b/ssstar/tests/objstore/s3.rs @@ -4,6 +4,163 @@ //! running live S3 tests is both more expensive to do, and also inconvenient for contributors //! since these tests assume they can access Elastio-owned buckets specifically created for the //! purposes of running these tests. +use crate::{ + progress::{TestCreateProgressCallback, TestExtractProgressCallback}, + Result, +}; +use aws_config::meta::region::RegionProviderChain; +use aws_types::region::Region; +use ssstar::{Config, CreateArchiveJobBuilder, TargetArchive}; +use ssstar_testing::test_data; +use url::Url; -// TODO: do at least one minimal smoke test against the real S3, including exercising correct -// behavior when the bucket is in a different region than the AWS config specifies by default +// Details about a real live S3 bucket which can be used for S3 integration tests. +// This bucket is in the `assuriodev` AWS account and is only accessible to Elastio developers. +// To Elastio Developers: Set the AWS_PROFILE env var to the name of the AWS config profile that +// you use to access `assuriodev`. +// To non-Elastio developers: If you're trying to run these tests without access to Elastio AWS infra, +// you need to override this to a bucket that you do have access to. +// +// This bucket has a lifecycle policy configured that deletes all objects after 24 hours, so we +// don't have to worry about cleaning up results of tests. If you do override this to point to +// another bucket, make sure it has the same lifecycle policy or you might get a surprise S3 bill. +const TEST_BUCKET: &str = dotenv!("TEST_S3_BUCKET"); +const TEST_BUCKET_REGION: &str = dotenv!("TEST_S3_BUCKET_REGION"); + +async fn create_s3_client() -> Result { + let region_provider = RegionProviderChain::first_try(Region::new(TEST_BUCKET_REGION)); + + let aws_config_builder = aws_config::from_env().region(region_provider); + + let aws_config = aws_config_builder.load().await; + + let s3_config_builder = aws_sdk_s3::config::Builder::from(&aws_config); + + Ok(aws_sdk_s3::Client::from_conf(s3_config_builder.build())) +} + +async fn run_test(test_data: Vec) -> Result<()> { + let client = create_s3_client().await?; + + let (prefix, test_data) = test_data::prepend_unique_prefix(test_data); + + let test_data = test_data::make_test_data(&client, TEST_BUCKET, test_data).await?; + + // Ingest this test data, which is inside this unique prefix so we won't see any other test + // data written to this bucket + let archive_url: Url = format!("s3://{TEST_BUCKET}/{prefix}test.tar").parse()?; + let target_archive = TargetArchive::ObjectStorage(archive_url.clone()); + let mut builder = CreateArchiveJobBuilder::new(Config::default(), target_archive); + builder + .add_input(&format!("s3://{TEST_BUCKET}/{prefix}**").parse()?) + .await?; + let job = builder.build().await?; + + let progress = TestCreateProgressCallback::new(); + job.run(futures::future::pending(), progress.clone()) + .await?; + progress.sanity_check_updates(); + + // There should be a SHA256 hash computed on the tar archive. Minio doesn't support that + // but live S3 does + let archive_key = archive_url.path().strip_prefix("/").unwrap(); + let object = client + .head_object() + .bucket(TEST_BUCKET) + .key(archive_key) + .checksum_mode(aws_sdk_s3::model::ChecksumMode::Enabled) + .send() + .await?; + + assert!(object.checksum_sha256().is_some()); + + // Archive is created; now extract it + let source_archive = ssstar::SourceArchive::ObjectStorage(archive_url); + + // Even though the test data all have a shared prefix unique to this test, we need to extract + // the archive containing those test data into a separate unique prefix, because the test data + // validation code will subtract whatever prefix we specify from the object key because it + // assumes the prefix is the prefix used when extracting, not a prefix which was shared by the + // test data objects at archive creation time. + let restore_prefix = format!("restore-{prefix}"); + let builder = ssstar::ExtractArchiveJobBuilder::new( + Config::default(), + source_archive, + format!("s3://{TEST_BUCKET}/{restore_prefix}").parse()?, + ) + .await?; + + let job = builder.build().await?; + let progress = TestExtractProgressCallback::new(); + + job.run(futures::future::pending(), progress.clone()) + .await?; + + progress.sanity_check_updates(); + + test_data::validate_test_data_in_s3( + &client, + &test_data, + TEST_BUCKET, + &restore_prefix, + test_data + .keys() + .into_iter() + .map(|key| key.to_string()) + .collect::>(), + ) + .await?; + + Ok(()) +} + +/// A few small objects so the objects and the archive will both be under the multipart threshold +#[test] +#[ignore = "Run explicitly with --ignored if you're sure you have setup S3 access correctly"] +fn small_objects_archive_in_bucket() -> Result<()> { + ssstar_testing::logging::test_with_logging(async move { + let test_data = vec![ + test_data::TestObject::new("test1", "1KiB"), + test_data::TestObject::new("test2", "1KiB"), + test_data::TestObject::new("foo/test3", "1KiB"), + ]; + + run_test(test_data).await + }) +} + +/// A bunch of objects which are each under the multipart threshold, but together they will produce +/// an archive that is over the multipart threshold +#[test] +#[ignore = "Run explicitly with --ignored if you're sure you have setup S3 access correctly"] +fn small_objects_multipart_archive_in_bucket() -> Result<()> { + ssstar_testing::logging::test_with_logging(async move { + let test_data = vec![ + test_data::TestObject::new("test1", "1MiB"), + test_data::TestObject::new("test2", "2MiB"), + test_data::TestObject::new("foo/test1", "2MiB"), + test_data::TestObject::new("foo/test2", "2MiB"), + test_data::TestObject::new("bar/test1", "2MiB"), + ]; + + run_test(test_data).await + }) +} + +/// A mix of small and large objects, such that some of the objects themselves are over the +/// multipart threshold and thus will be read and extracted using the multi-part implementation +#[test] +#[ignore = "Run explicitly with --ignored if you're sure you have setup S3 access correctly"] +fn multipart_objects_multipart_archive_in_bucket() -> Result<()> { + ssstar_testing::logging::test_with_logging(async move { + let test_data = vec![ + test_data::TestObject::new("test1", "1MiB"), + test_data::TestObject::new("test2", "9MiB"), + test_data::TestObject::new("foo/test1", "8MiB"), + test_data::TestObject::new("foo/test2", "9MiB"), + test_data::TestObject::new("bar/test1", "10"), + ]; + + run_test(test_data).await + }) +}