Skip to content

Commit

Permalink
Add some integration tests that exercise S3
Browse files Browse the repository at this point in the history
These discovered a couple of bugs that didn't manifest under Minio but
did under the real S3, so those are fixed as well.
  • Loading branch information
anelson committed Aug 30, 2022
1 parent 8d78c9c commit b9cf76c
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 19 deletions.
7 changes: 7 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# These vars identify cloud resources that are available at CI time for tests that require real live cloud instances
#
# NOTE: You won't be able to access these buckets unless you have access to internal Elastio development accounts,
# and you point the `AWS_PROFILE` env var to the name of an AWS config profile that has write access to the CI buckets
# in the `assuriodev` AWS account.
TEST_S3_BUCKET=ci-s3-tests.elastio.dev
TEST_S3_BUCKET_REGION=eu-central-1
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions ssstar-testing/src/minio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ impl MinioServer {

// Shorten the bucket name so we can append the unique ID and it will still be under 63
// chars
let bucket = &bucket[..bucket.len().min(63 - 8)];
let bucket = &bucket[..bucket.len().min(63 - 9)];

// Prepend a random number to ensure the bucket name is unique across multiple tests
let bucket = format!("{:08x}-{}", rand::thread_rng().next_u32(), bucket);
let bucket = format!("{:08x}-{bucket}", rand::thread_rng().next_u32());

let client = self.aws_client().await?;

Expand Down
132 changes: 125 additions & 7 deletions ssstar-testing/src/test_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use crate::Result;
use aws_sdk_s3::{types::ByteStream, Client};
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use once_cell::sync::Lazy;
use rand::prelude::*;
use regex::Regex;
use sha2::Digest;
use std::{
borrow::Cow,
Expand Down Expand Up @@ -47,6 +49,31 @@ pub struct TestObjectWithData {
pub hash: [u8; 32],
}

/// Generate a unique prefix ending in a `/` character, and prepend it to the `key` in a collection
/// of [`TestObject`]s.
///
/// Returns the unique prefix an an iterator that yields the modified test objects.
///
/// This is useful when running tests against a real S3 bucket, where multiple runs of the same
/// test may write to the bucket so each test's object keys must be unique
pub fn prepend_unique_prefix(
objects: impl IntoIterator<Item = TestObject>,
) -> (String, impl IntoIterator<Item = TestObject>) {
let prefix = format!("{:08x}/", rand::thread_rng().next_u32());

let objects = {
let prefix = prefix.clone();

objects.into_iter().map(move |mut object| {
object.key = format!("{}{}", prefix, object.key);

object
})
};

(prefix, objects)
}

/// Generate one or more test objects in a bucket.
///
/// Each object has a size specified. Random data will be generated for each object.
Expand Down Expand Up @@ -312,18 +339,109 @@ where
.send()
.await?;

let object = dbg!(object);

object.checksum_sha256().map(|hash| hash.to_owned())
dbg!(object.checksum_sha256)
};

let hash = match object_hash {
Some(object_hash) => {
// The hash is expressed as a base64 encoded string.
// Decode into a propery hash and then compare
let mut hash = [0u8; 32];
hash.copy_from_slice(&base64::decode(object_hash).unwrap());
hash
//
// It has two forms: if this object was uploaded as a single part, this string is
// literally the hash of the contents of the object. But if it was uploaded using
// the multi-part API, the string is base64 hash, followed by `-` and the number
// of parts. In that case, the hash is a hash of all of the parts' hashes.
static MULTIPART_HASH: Lazy<Regex> = Lazy::new(|| {
Regex::new(r##"^(?P<hash>(?:[A-Za-z\d+/]{4})*(?:[A-Za-z\d+/]{3}=|[A-Za-z\d+/]{2}==)?)-(?P<parts>\d+)$"##).unwrap()
});

if let Some(_captures) = MULTIPART_HASH.captures(&object_hash) {
// XXX: The commented-out code below tries to use list_parts to compute the
// aggregate hash of the object, but this doesn't work as of 30 Aug 2022
// because the AWS SDK for Rust seems not to support `list_parts` correctly. I
// theorize that it's due to https://github.com/awslabs/smithy-rs/issues/1668
// however I can't be sure.
//
// Since this is just test code, we know the objects in question will not be
// particularly huge. So in the case where the hash is a multipart hash, just
// download the entire object from S3 and compute the hash anew.
//
// Someday perhaps restore this code and save a few seconds on integration test
// runs

/*
// This is a multi-part hash so the validation of the hash just got more
// complicated
let hash_text = captures.name("hash").unwrap().as_str();
// Decode the hash into binary
let mut hash = [0u8; 32];
hash.copy_from_slice(&base64::decode(hash_text).unwrap());
// get the hashes of all of the parts in this object
//
// NOTE: There is a new GetObjectAttributes S3 API, but as of late August 2022
// the AWS Rust SDK doesn't work right with it. When callng it with multiple
// object attributes, it fails with a signature mismatch error. It seems
// likely that the reason https://github.com/awslabs/aws-sdk-rust/issues/500
// that was preventing the import of the latest S3 API model is probably to
// blame. As of now there is a `get_object_attributes` method in the Rust code
// but it's broken so probably there's something in the S3 API model for this
// function that isn't handled right in Rust.
let mut parts = client
.list_parts()
.bucket(bucket)
.key(&key)
.into_paginator()
.send();
let mut hasher = sha2::Sha256::new();
while let Some(page) = parts.try_next().await? {
for part in page.parts.unwrap() {
dbg!(part.part_number());
let hash = base64::decode(part.checksum_sha256().unwrap())?;
assert_eq!(hash.len(), sha2::Sha256::output_size());
hasher.update(&hash);
//hasher.update(object.checksum_sha256().unwrap().as_bytes());
}
}
//for part in parts.parts().unwrap() {
// dbg!(part.part_number());
//
// let hash = base64::decode(part.checksum_sha256().unwrap())?;
// assert_eq!(hash.len(), sha2::Sha256::output_size());
// hasher.update(&hash);
// //hasher.update(object.checksum_sha256().unwrap().as_bytes());
//}
// Computed the hash of all parts.
let mut hash = [0u8; 32];
hash.copy_from_slice(&hasher.finalize());
hash
*/

let response = client.get_object().bucket(bucket).key(&key).send().await?;

let mut body = response.body;
let mut hasher = sha2::Sha256::new();
while let Some(bytes) = body.try_next().await? {
hasher.update(bytes);
}
let mut hash = [0u8; 32];
hash.copy_from_slice(&hasher.finalize());
hash
} else {
// This is a simple hash

// Decode into a binary hash and then compare
let mut hash = [0u8; 32];

hash.copy_from_slice(&base64::decode(object_hash).unwrap());
hash
}
}
None => {
// XXX: Actually the above statement would be correct, except that minio still doesn't
Expand Down
2 changes: 2 additions & 0 deletions ssstar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ strum = { version = "0.24.1", features = ["derive"] }
assert_matches = "1.5.0"
more-asserts = "0.3.0"
tempdir = "0.3.7"
dotenv = "0.15.0"
dotenv_codegen = "0.15.0"

[build-dependencies]
vergen = "7.4.0"
37 changes: 30 additions & 7 deletions ssstar/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl std::fmt::Debug for SourceArchiveInternal {
len,
} => f
.debug_struct("ObjectStorage")
.field("bucket", &bucket)
.field("bucket", &bucket.name())
.field("key", &key)
.field("version_id", &version_id)
.field("len", &len)
Expand Down Expand Up @@ -456,8 +456,8 @@ impl ExtractArchiveJob {
{
let span = info_span!("run",
source_archive = ?self.source_archive,
target_bucket = ?self.target_bucket,
target_prefix = ?self.target_prefix);
target_bucket = self.target_bucket.name(),
target_prefix = %self.target_prefix);

async move {
info!(?self.filters, ?self.archive_size, "Starting extract archive job");
Expand Down Expand Up @@ -535,12 +535,35 @@ impl ExtractArchiveJob {

// Wait for both tasks to finish and only then look at results
let (reader_result, processor_result) = futures::join!(reader_fut, processor_fut);
let reader_result =
reader_result.with_context(|_| crate::error::SpawnBlockingSnafu {})?;
let processor_result =
processor_result.with_context(|_| crate::error::SpawnSnafu {})?;

// If there's an error in the reader, the processor can also fail but with a less
// meaningful error. So evaluate reader results first.
reader_result.with_context(|_| crate::error::SpawnBlockingSnafu {})??;
let (total_objects, total_object_bytes) =
processor_result.with_context(|_| crate::error::SpawnSnafu {})??;
//
// If the error indicates the sender mpsc was dropped, that's the queue to check the
// processor task's result instead
let (total_objects, total_object_bytes) = match reader_result {
Ok(_) => {
// Good sign. Only the processor can fail
processor_result?
}
e @ Err(crate::S3TarError::TarExtractAborted) => {
// This is the error the reader fails with when it got an error sending
// something to the processor task. This suggests the processor task has
// failed.
processor_result?;

// `processor_result` wasn't an error, so return the reader error
return e;
}
Err(e) => {
// Any other error means the failure was on the reader's side
return Err(e);
}
};

progress.objects_uploaded(total_objects, total_object_bytes);

Expand Down Expand Up @@ -721,7 +744,7 @@ impl ExtractArchiveJob {
/// reporting by the caller, since this function can't tell if `entry_receiver` stopped
/// producing components because there is no more work to to, or because of an error in the
/// reader.
#[instrument(skip(progress, entry_receiver))]
#[instrument(skip(target_bucket, progress, entry_receiver), fields(target_bucket = target_bucket.name()))]
async fn process_tar_entries(
target_bucket: Box<dyn Bucket>,
target_prefix: String,
Expand Down
10 changes: 9 additions & 1 deletion ssstar/src/objstore/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,22 @@ impl S3Bucket {
.expect("BUG: uploaded part missing etag")
.to_string();

debug!(%e_tag, "Uploaded multi-part chunk");
// XXX: When running against Minio, as of 30 Aug 2022 it doesn't have checksum
// support so thsi can be empty. In that case, it won't be an error to omit the
// sha256 hash when completing the multipart upload
let sha256 = response
.checksum_sha256()
.map(|hash| hash.to_string());

debug!(%e_tag, sha256 = sha256.as_deref().unwrap_or_default(), "Uploaded multi-part chunk");

let _ = progress_sender.send(chunk_size);

// Once all of the uploads are done we must provide the information about each part
// to the CompleteMultipartUpload call, so retain the key bits here
let completed_part = aws_sdk_s3::model::CompletedPart::builder()
.e_tag(e_tag)
.set_checksum_sha256(sha256)
.part_number(part_number as i32)
.build();

Expand Down
2 changes: 2 additions & 0 deletions ssstar/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
//!
//! For all practical purposes each submodule here is a separate logical integration test fixture.
//! See mod-level comments in those modules for more details.
#[macro_use]
extern crate dotenv_codegen;

/// Test code that reports errors can just cheat and use `eyre`
type Result<T> = color_eyre::Result<T>;
Expand Down
Loading

0 comments on commit b9cf76c

Please sign in to comment.