From 1506b704f35575c2985d9ad4a15718720010e516 Mon Sep 17 00:00:00 2001 From: Adam Nelson Date: Fri, 2 Sep 2022 16:41:16 +0300 Subject: [PATCH] Add average transfer rate to final progress report and tweak bars (#7) A bug report from QA indicated that the speed with which the `extract` operation was writing to S3 was poor; around 35MiB/s. That doesn't make sense, as the same code is used for writing archives in `create` and writing extracted objects in `extract`. I suspect the actual problem is that the bytes per second reported by `indicatif` are very primitive. They are instantaneous transfer rate based on the number of bytes added since the last update divided by time elapsed. That's not a meaningful number, what we really care about is the average transfer rate over time. So I've modified the progress callback trait (warning: breaking change), and the CLI, to report bytes per second at the end of completed operations. In my own testing on an `m6a.2xlarge` instance with a 100GB tar archive containing 50 files, I got the following results: ```text $ ssstar extract --s3 s3://tar-file98gb/file s3://anelson-ssstar-test/foo/ Extraction complete! Read 50 objects (97.66 GiB) from archive in 8 minutes (211.27 MiB/s) Extracted 50 objects (97.66 GiB) Skipped 0 objects (0B) Upload complete! Uploaded 50 objects (97.66 GiB) in 8 minutes (211.22 MiB/s) ``` This is close to the maximum network transfer speed for the `m6a.2xlarge` instance type. I've also removed the useless instantenous bytes per second output from the progress bars, and changed the sizing of the progress lines to make more room for message text. Target of Opportunity Changes === These aren't directly related to the progress issue but I made them as targets of opportunity: * Correct trivial typo in README * Improve the doc comments for the Rust crates * Add some helper scripts to make it easier to test ssstar in AWS --- .github/workflows/publish-release.yml | 2 +- CHANGELOG.md | 5 ++ README.md | 2 +- scripts/launch-instance.sh | 110 ++++++++++++++++++++++++++ scripts/rsync-dir.sh | 69 ++++++++++++++++ scripts/s0-bootstrap.yml | 79 ++++++++++++++++++ scripts/utils.sh | 5 ++ ssstar-cli/src/main.rs | 2 + ssstar-cli/src/progress.rs | 76 +++++++++++------- ssstar-testing/src/lib.rs | 5 ++ ssstar/README.md | 61 ++++++++++++++ ssstar/src/create.rs | 28 ++++++- ssstar/src/extract.rs | 24 ++++-- ssstar/src/lib.rs | 2 + ssstar/tests/progress/mod.rs | 8 +- 15 files changed, 434 insertions(+), 44 deletions(-) create mode 100755 scripts/launch-instance.sh create mode 100755 scripts/rsync-dir.sh create mode 100644 scripts/s0-bootstrap.yml create mode 100644 scripts/utils.sh create mode 100644 ssstar/README.md diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index 106e49d..f544f3d 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -116,7 +116,7 @@ jobs: steps: - name: Checkout repository uses: actions/checkout@v3 - - name: Get ssstar version + - name: Build docker image shell: bash run: | # Tag the docker image with the full semver, major.minor, and major diff --git a/CHANGELOG.md b/CHANGELOG.md index 29dfd59..5093fbc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Breaking Changes + +* Report average bytes per second rates at the end of each stage of operation. This modifies the signature of the + progress callback traits. + ## 0.1.3 - 31-Aug-2022 First release published via GitHub Actions. Functionally identical to 0.1.0. diff --git a/README.md b/README.md index 3c3b4b9..d450b1e 100644 --- a/README.md +++ b/README.md @@ -143,7 +143,7 @@ and extraction, respectively. There are a few command line options that are par `ssstar` is developed and tested against AWS S3, however it should work with any object storage system that provides an S3-compatible API. In particular, most of the automated tests our CI system runs actually use [Minio](https://min.io) -and not the real S3 API. To use `ssstar` with an S3-compatible API, use the `--s3_endpoint` option. For example, if +and not the real S3 API. To use `ssstar` with an S3-compatible API, use the `--s3-endpoint` option. For example, if you have a Minio server running at `127.0.7.1:30000`, using default `minioadmin` credentials, you can use it with `ssstar` like this: diff --git a/scripts/launch-instance.sh b/scripts/launch-instance.sh new file mode 100755 index 0000000..3203c7d --- /dev/null +++ b/scripts/launch-instance.sh @@ -0,0 +1,110 @@ +#!/bin/bash +# +# Launches an EC2 instance for testing `ssstar` +set -euo pipefail + +scripts_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +default_instance_type="m5.2xlarge" +instance_type="$default_instance_type" +key_name="" +instance_name="test instance for $(whoami)" +instance_market_options="--instance-market-options MarketType=spot" +security_group="launch-instance.sh-sg" + +while getopts ":hk:i:n:g:o" opt; do + case ${opt} in + h ) + echo "Usage: $0 [options] [-k ] [-i ] [-n ] [ -o ]" + echo "" + echo "Options:" + echo " -k - Use a specified key name (Required)" + echo " -i - Use a specified instance type instead of the default $instance_type" + echo " -n - Give this instance a name to help you identify it in instance lists (prefix: $instance_name)" + echo " -g - Use this security group (default: $security_group)" + echo " -o - Create an on-demand instance instead of the default spot instance" + exit 0 + ;; + k ) + key_name=$OPTARG + ;; + + i ) + instance_type=$OPTARG + ;; + + n ) + instance_name="$instance_name:$OPTARG" + ;; + + o) + instance_market_options="" + ;; + + \? ) + echo "Invalid option: $OPTARG" 1>&2 + exit 0 + ;; + + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + exit 0 + ;; + esac +done + +if [[ -z "$key_name" ]]; then + echo "Error: -k is required" + exit -1 +fi + +ami=$(aws ssm get-parameters --names /aws/service/ami-amazon-linux-latest/amzn2-ami-hvm-x86_64-gp2 --output json | jq ".Parameters[].Value" -r) +. $(dirname "${0}")/utils.sh + +# Create the security group if it doesn't exist +if aws ec2 describe-security-groups --group-names "$security_group" 2>&1 > /dev/null; then + echo "Security group $security_group already exists; no need to create" +else + echo "Creating security group $security_group" + + security_group_id=$(aws ec2 create-security-group \ + --description "Auto-generated security group produced by ${0}" \ + --group-name "$security_group" \ + --vpc-id $(aws ec2 describe-vpcs --filters "Name=isDefault,Values=true" --output json | jq -r ".Vpcs[].VpcId") \ + | jq -r ".GroupId") + + aws ec2 authorize-security-group-ingress --group-id "$security_group_id" --protocol tcp --port 22 --cidr 0.0.0.0/0 +fi + +# 'envsubst` will expand placeholders in the YAML file with the values of the below env vars + +# Build the cloud-init script including some env vars +user_data=$(envsubst < $scripts_dir/s0-bootstrap.yml) + +echo "Launching \"$instance_name\" (instance type $instance_type) with AMI $ami" + +instance_json=$(aws ec2 run-instances \ + --image-id "$ami" \ + --instance-type "$instance_type" \ + $instance_market_options \ + --key-name "$key_name" \ + --ebs-optimized \ + --block-device-mappings "DeviceName=/dev/xvda,Ebs={VolumeSize=40,VolumeType=gp3}" \ + --user-data "$user_data" \ + --security-groups "$security_group" \ + --tag-specifications \ + "ResourceType=instance,Tags=[{Key=Name,Value=$instance_name}]" \ + --output json) + +instance_id=$(echo $instance_json | jq ".Instances[].InstanceId" -r) +echo "Launched EC2 instance id $instance_id" + +echo "Querying instance info for public DNS..." +instance_info=$(aws ec2 describe-instances --instance-ids $instance_id --output json) +#echo $instance_info | jq "." +instance_dns=$(echo $instance_info | jq ".Reservations[].Instances[].PublicDnsName" -r) +echo "SSH into the instance with ssh ec2-user@$instance_dns using key $key_name" + + + +# NEXT STEP +# Another script that copies the `elastio` source tree up to the spawned EC2 instance for convenient building diff --git a/scripts/rsync-dir.sh b/scripts/rsync-dir.sh new file mode 100755 index 0000000..41fc112 --- /dev/null +++ b/scripts/rsync-dir.sh @@ -0,0 +1,69 @@ +#!/bin/bash +# +# Rsync a directory (by default the `ssstar` project in its entirety) to a remote host. +# +# It's assumed the remote host has SSH and rsync installed, and is an Amazon Linux 2 EC2 instance. +set -euo pipefail + +scripts_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +src_dir="$(realpath $scripts_dir/../)" +dest_username="ec2-user" +dest_host="" + +usage() { + echo "Usage: $0 [-s ] [-u ] -d " + echo "Default source dir: $src_dir" + echo "Default destination username: $dest_username" +} + +while getopts ":hs:u:d:" opt; do + case ${opt} in + h ) + usage + exit 0 + ;; + s ) + src_dir=$OPTARG + ;; + + u ) + dest_username=$OPTARG + ;; + + d ) + dest_host=$OPTARG + ;; + + \? ) + echo "Invalid option: $OPTARG" 1>&2 + usage + exit 1 + ;; + + : ) + echo "Invalid option: $OPTARG requires an argument" 1>&2 + usage + exit 1 + ;; + esac +done + +if [[ -z $dest_host ]]; then + echo "A destination hostname is required" 1>&2 + usage + exit 1 +fi + +# By default, use the name of the source dir (without its entire path), and copy it into the +# home directory on the destination host +src_dir_path=$(realpath $src_dir) +src_dir_name=$(basename $src_dir_path) +dest_path="/home/$dest_username" + +echo Copying $src_dir_path to $dest_path on $dest_host + +# Use rsync to copy changed files, excluding anything that's ignored by `.gitignore` +rsync --info=progress2 -azzhe ssh \ + --filter=":- .gitignore" \ + $src_dir_path \ + $dest_username@$dest_host:$dest_path diff --git a/scripts/s0-bootstrap.yml b/scripts/s0-bootstrap.yml new file mode 100644 index 0000000..1a99a59 --- /dev/null +++ b/scripts/s0-bootstrap.yml @@ -0,0 +1,79 @@ +#cloud-config +# +# Bootstrap a `s0` server on Amazon Linux 2. +# +# This script is intended to run as part of the cloud init + +# Very early in the boot process, enable the EPEL repo +bootcmd: + - [ "cloud-init-per", "once", "amazon-linux-extras-epel", "amazon-linux-extras", "install", "epel" ] + +# Always pull in the latest updates +repo_update: true +repo_upgrade: all + +packages: + - python3-pip + - clang + - llvm-devel + - libudev-devel + - openssl-devel + - jq + - daemonize + - libblkid-devel + - parted-devel + +write_files: + # Add environment vars to enable metrics push by default + # Configure the AWS region appropriately + - path: /etc/profile + append: true + content: | + # These entries appended by the cloudinit script in s0-bootstrap.yml + + if command -v jq > /dev/null; then + # The default region should be the region this instance runs in (duh!) + export AWS_DEFAULT_REGION=$(curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq ".region" -r) + else + echo "WARNING: jq isn't installed yet. You probably shelled in too early. Give the instance a few more seconds and log in again" + fi + + # Can't always rely on the .cargo/config populated below to force native builds. + export RUSTFLAGS=-Ctarget-cpu=native + + source ~/.cargo/env + + # Tell cargo to always use all of the native CPU features + # Since we don't use these instances to build dist binaries, we don't care about making binaries that are compatible with all CPUs + - path: /.cargo/config + content: | + [target.'cfg(any(windows, unix))'] + rustflags = ["-Ctarget-cpu=native"] + + - path: /etc/security/limits.d/99-elastio.conf + content: | + ec2-user soft nofile 20000 + ec2-user hard nofile 100000 + root soft nofile 20000 + root hard nofile 100000 + +runcmd: + # Need development tools to support Rust + - yum groupinstall -y "Development Tools" + + # The EPEL repo has some additional packages we need + - yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm + + # viy depends on some libraries for file systems + - yum install -y e2fsprogs-devel xfsprogs-devel + + # until https://github.com/elastio/elastio-snap/issues/55 is fixed, need the kernel headers + - yum install -y kernel-devel + + # Need the AWS CLI and ansible + - pip3 install awscli ansible boto3 + + # Install rust for the ec2-user + - su ec2-user --session-command "curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y" + + diff --git a/scripts/utils.sh b/scripts/utils.sh new file mode 100644 index 0000000..13249e1 --- /dev/null +++ b/scripts/utils.sh @@ -0,0 +1,5 @@ +function get_ssm_secret { + local name="$1" + + aws ssm get-parameter --name "$name" --with-decryption --output json | jq ".Parameter.Value" -r +} diff --git a/ssstar-cli/src/main.rs b/ssstar-cli/src/main.rs index 26793f2..33ea171 100644 --- a/ssstar-cli/src/main.rs +++ b/ssstar-cli/src/main.rs @@ -1,3 +1,5 @@ +#![doc = include_str!("../../README.md")] + use clap::{ArgGroup, Parser, Subcommand}; use ssstar::{CreateArchiveJobBuilder, ExtractArchiveJobBuilder, SourceArchive, TargetArchive}; use std::path::PathBuf; diff --git a/ssstar-cli/src/progress.rs b/ssstar-cli/src/progress.rs index 4a35dae..3743ca1 100644 --- a/ssstar-cli/src/progress.rs +++ b/ssstar-cli/src/progress.rs @@ -109,7 +109,7 @@ struct CreateProgressReport { impl CreateProgressReport { fn new(hide_progress: bool, job: &ssstar::CreateArchiveJob) -> Self { fn standard_style() -> indicatif::ProgressStyle { - indicatif::ProgressStyle::with_template("{spinner:.green} {prefix}: {msg:<45!} [{bar:20.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})") + indicatif::ProgressStyle::with_template("{spinner:.green} {prefix}: {msg:<55!} [{bar:20.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec})") .unwrap() .progress_chars("#>-") } @@ -234,13 +234,21 @@ impl ssstar::CreateProgressCallback for CreateProgressReport { // Progress bars report part-by-part so there's no need to update a progress bar here } - fn input_objects_download_completed(&self, total_bytes: u64) { + fn input_objects_download_completed(&self, total_bytes: u64, duration: Duration) { // If everything is working right, the progress bars for download are already exactly at // 100% done from prior updates but let's mark them as officially "done" now. + let bytes_per_second = (total_bytes as f64 / duration.as_secs_f64()) as u64; + let bytes_per_second = indicatif::BinaryBytes(bytes_per_second); + let total_bytes = indicatif::BinaryBytes(total_bytes); + let duration = indicatif::HumanDuration(duration); + let message = + format!("Download completed ({total_bytes} in {duration}, {bytes_per_second}/s)"); + + self.multi.println(&message).unwrap(); self.raw_bytes_downloaded - .finish_with_message("Download completed"); + .finish_with_message(format!("Done ({total_bytes}, {bytes_per_second})")); self.ordered_bytes_downloaded - .finish_with_message("Download completed"); + .finish_with_message(format!("Done ({total_bytes}, {bytes_per_second})")); } fn tar_archive_initialized( @@ -303,7 +311,13 @@ impl ssstar::CreateProgressCallback for CreateProgressReport { .set_message("Upload in progress"); } - fn tar_archive_upload_completed(&self, size: u64) { + fn tar_archive_upload_completed(&self, size: u64, duration: Duration) { + let bytes_per_second = (size as f64 / duration.as_secs_f64()) as u64; + let bytes_per_second = indicatif::BinaryBytes(bytes_per_second); + let duration = indicatif::HumanDuration(duration); + let message = format!("Archive upload completed ({duration}, {bytes_per_second}/s)"); + + self.multi.println(&message).unwrap(); self.archive_bytes_uploaded .finish_with_message("Archive upload completed"); } @@ -335,7 +349,7 @@ struct ExtractProgressReport { impl ExtractProgressReport { fn new(hide_progress: bool, job: &ssstar::ExtractArchiveJob) -> Self { fn standard_style() -> indicatif::ProgressStyle { - indicatif::ProgressStyle::with_template("{spinner:.green} {prefix}: {msg:<45!} [{bar:20.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})") + indicatif::ProgressStyle::with_template("{spinner:.green} {prefix}: {msg:<55!} [{bar:20.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec})") .unwrap() .progress_chars("#>-") } @@ -424,38 +438,39 @@ impl ssstar::ExtractProgressCallback for ExtractProgressReport { skipped_objects: usize, skipped_object_bytes: u64, total_bytes: u64, + duration: Duration, ) { + let bytes_per_second = (total_bytes as f64 / duration.as_secs_f64()) as u64; + let bytes_per_second = indicatif::BinaryBytes(bytes_per_second); + let total_objects = indicatif::HumanCount((extracted_objects + skipped_objects) as u64); let extracted_objects = indicatif::HumanCount(extracted_objects as u64); let extracted_object_bytes = indicatif::BinaryBytes(extracted_object_bytes); let skipped_objects = indicatif::HumanCount(skipped_objects as u64); let skipped_object_bytes = indicatif::BinaryBytes(skipped_object_bytes); let total_bytes = indicatif::BinaryBytes(total_bytes); + let duration = indicatif::HumanDuration(duration); self.multi.println( - format!("Extraction complete! Read {total_bytes} from archive, extracted {extracted_objects} objects ({extracted_object_bytes})")).unwrap(); - if skipped_objects.0 > 0 { - self.multi - .println(format!( - "Skipped {skipped_objects} objects ({skipped_object_bytes})" - )) - .unwrap(); - } - - self.raw_bytes_read.set_length(total_bytes.0); - self.raw_bytes_read.set_position(total_bytes.0); - self.raw_bytes_read - .finish_with_message(format!("Extraction complete ({total_bytes} read)")); + format!("Extraction complete! Read {total_objects} objects ({total_bytes}) from archive in {duration} ({bytes_per_second}/s)")).unwrap(); + self.multi + .println(format!( + "Extracted {extracted_objects} objects ({extracted_object_bytes})" + )) + .unwrap(); + self.multi + .println(format!( + "Skipped {skipped_objects} objects ({skipped_object_bytes})" + )) + .unwrap(); - self.extract_object.set_length(extracted_object_bytes.0); - self.extract_object.set_position(extracted_object_bytes.0); - self.extract_object - .finish_with_message("Extraction complete ({extracted_objects} extracted)"); + self.raw_bytes_read.finish_and_clear(); + self.extract_object.finish_and_clear(); } fn object_upload_starting(&self, key: &str, size: u64) { self.upload_object.set_position(0); self.upload_object.set_length(size); - self.extract_object.set_message(key.to_string()); + self.upload_object.set_message(key.to_string()); } fn object_part_uploaded(&self, key: &str, bytes: usize) { @@ -466,17 +481,18 @@ impl ssstar::ExtractProgressCallback for ExtractProgressReport { self.upload_object.set_position(size); } - fn objects_uploaded(&self, total_objects: usize, total_object_bytes: u64) { + fn objects_uploaded(&self, total_objects: usize, total_object_bytes: u64, duration: Duration) { + let bytes_per_second = (total_object_bytes as f64 / duration.as_secs_f64()) as u64; + let bytes_per_second = indicatif::BinaryBytes(bytes_per_second); let total_objects = indicatif::HumanCount(total_objects as u64); let total_object_bytes = indicatif::BinaryBytes(total_object_bytes); - self.upload_object.set_length(total_object_bytes.0); - self.upload_object.set_position(total_object_bytes.0); + let duration = indicatif::HumanDuration(duration); + self.multi .println(format!( - "Upload complete! Uploaded {total_objects} objects ({total_object_bytes})" + "Upload complete! Uploaded {total_objects} objects ({total_object_bytes}) in {duration} ({bytes_per_second}/s)" )) .unwrap(); - self.upload_object - .finish_with_message(format!("Upload complete ({total_object_bytes})")) + self.upload_object.finish_and_clear(); } } diff --git a/ssstar-testing/src/lib.rs b/ssstar-testing/src/lib.rs index 2adac8c..9eef6d4 100644 --- a/ssstar-testing/src/lib.rs +++ b/ssstar-testing/src/lib.rs @@ -1,3 +1,8 @@ +//! Testing helpers for use writing unit and integration tests of the `ssstar` crate +//! +//! This is internal to `ssstar` and is not intended for use by any other crates. Breaking changes +//! can be made at any time. The only reason this is published at all is that `cargo publish` +//! requires that all `dev-dependencies` be resolvable in the public registry. pub mod logging; pub mod minio; pub mod tar; diff --git a/ssstar/README.md b/ssstar/README.md new file mode 100644 index 0000000..84cd87a --- /dev/null +++ b/ssstar/README.md @@ -0,0 +1,61 @@ +## ssstar + +Highly concurrent archiving of S3 objects to and from tar archives. + +--- + +This is the Rust library crate which powers the `ssstar` CLI. If you're looking for the +`ssstar` command line utility, see the [`ssstar-cli`](https://crates.io/crates/ssstar-cli) +crate. + +To create a tar archive containing S3 objects, instantiate a `CreateArchiveJob`: + +```rust,no_run +# use ssstar::*; +# #[tokio::main] +# async fn main() -> Result<(), Box> { +// Write the archive to a local file +let target = TargetArchive::File("test.tar".into()); + +let mut builder = CreateArchiveJobBuilder::new(Config::default(), target); + +// Archive all of the objects in this bucket +builder.add_input(&"s3://my-bucket".parse()?).await?; + +let job = builder.build().await?; + +job.run_without_progress(futures::future::pending()).await?; + +# Ok(()) +# } +``` + +Target archives can be written to a local file, an S3 bucket, or an arbitrary Tokio `AsyncWrite` implementation. See +[`TargetArchive`] for more details. + +Restoring a tar archive to object storage is similarly straightforward: + + +```rust,no_run +# use ssstar::*; +# #[tokio::main] +# async fn main() -> Result<(), Box> { +// Read the archive from a local file +let source = SourceArchive::File("test.tar".into()); + +// Extract the archive to an S3 bucket, prepending a `foo/` prefix to every file path in +// the archive +let target = "s3://my-bucket/foo/".parse::()?; + +let mut builder = ExtractArchiveJobBuilder::new(Config::default(), source, target).await?; + +// Extract only text files, in any directory, from the archive +builder.add_filter("**/*.txt")?; + +let job = builder.build().await?; + +job.run_without_progress(futures::future::pending()).await?; + +# Ok(()) +# } +``` diff --git a/ssstar/src/create.rs b/ssstar/src/create.rs index fd3cf2e..4d3ca10 100644 --- a/ssstar/src/create.rs +++ b/ssstar/src/create.rs @@ -47,6 +47,7 @@ use std::future::Future; use std::ops::Range; use std::path::PathBuf; use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::io::AsyncWrite; use tokio::sync::oneshot; use tracing::{debug, error, instrument}; @@ -475,7 +476,7 @@ pub trait CreateProgressCallback: Sync + Send { /// That doesn't mean the work is done; there can still be ongoing tasks either writing some of /// that downloaded data to the tar builder, or uploading writes to the tar archive to object /// storage. - fn input_objects_download_completed(&self, total_bytes: u64) {} + fn input_objects_download_completed(&self, total_bytes: u64, duration: Duration) {} /// The tar archive has been initialized but not yet written to. /// @@ -551,7 +552,7 @@ pub trait CreateProgressCallback: Sync + Send { /// This is the final event that can happen. Once this event fires, the job is done. /// /// If the tar archive is not being directed to object storage, then this event will never fire - fn tar_archive_upload_completed(&self, size: u64) {} + fn tar_archive_upload_completed(&self, size: u64, duration: Duration) {} } /// A job which will create a new tar archive from object store inputs. @@ -728,6 +729,7 @@ impl CreateArchiveJob { tokio::spawn(async move { let mut total_bytes_downloaded = 0u64; + let input_objects_download_started = Instant::now(); while let Some(result) = parts_stream.next().await { // Part downloads are yield from the stream in the order in which they appeared, @@ -774,7 +776,10 @@ impl CreateArchiveJob { } if !parts_sender.is_closed() { - progress.input_objects_download_completed(total_bytes_downloaded) + progress.input_objects_download_completed( + total_bytes_downloaded, + input_objects_download_started.elapsed(), + ); } }); } @@ -783,6 +788,13 @@ impl CreateArchiveJob { // For what should be obvious reasons, the writing of data to the tar archive must be done // serially, even though we downloaded the data in parallel, and the stream that the tar // Builder writes to will upload the written data in paralell also. + + // To keep track of when the writes to the tar archive started. Technically what we + // actually want to know is when the *upload* to S3 starts, which can be a bit after the + // writes start since the write buffer needs to fill up first. However this is close + // enough. + let mut tar_archive_writes_started: Option = None; + loop { // The next part must be part 0 of a new object match parts_receiver.recv().await { @@ -842,6 +854,11 @@ impl CreateArchiveJob { // and wait for the appender to stop let mut appender_aborted = false; if (sender.send(Ok(data)).await).is_ok() { + if tar_archive_writes_started.is_none() { + // Record this instant when writes first started + tar_archive_writes_started = Some(Instant::now()); + } + progress.tar_archive_part_written( part.input_object.bucket.name(), &part.input_object.key, @@ -927,7 +944,10 @@ impl CreateArchiveJob { bytes_written, "Upload of tar archive to object storage completed" ); - progress.tar_archive_upload_completed(bytes_written); + let elapsed = tar_archive_writes_started + .expect("BUG: is set unconditionally during tar writes") + .elapsed(); + progress.tar_archive_upload_completed(bytes_written, elapsed); Ok(()) } diff --git a/ssstar/src/extract.rs b/ssstar/src/extract.rs index 14598ea..99c41e3 100644 --- a/ssstar/src/extract.rs +++ b/ssstar/src/extract.rs @@ -44,6 +44,7 @@ use std::io::Read; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; +use std::time::{Duration, Instant}; use tokio::{io::AsyncWriteExt, sync::mpsc}; use tracing::{debug, debug_span, error, info, info_span, instrument, trace, Instrument}; use url::Url; @@ -392,6 +393,7 @@ pub trait ExtractProgressCallback: Sync + Send { skipped_objects: usize, skipped_object_bytes: u64, total_bytes: u64, + duration: Duration, ) { } @@ -406,7 +408,7 @@ pub trait ExtractProgressCallback: Sync + Send { /// All objects that matched the filter criteria have been uploaded and the job is now ready to /// complete - fn objects_uploaded(&self, total_objects: usize, total_object_bytes: u64) {} + fn objects_uploaded(&self, total_objects: usize, total_object_bytes: u64, duration: Duration) {} } #[derive(Debug)] @@ -539,7 +541,7 @@ impl ExtractArchiveJob { // // If the error indicates the sender mpsc was dropped, that's the queue to check the // processor task's result instead - let (total_objects, total_object_bytes) = match reader_result { + let (total_objects, total_object_bytes, elapsed) = match reader_result { Ok(_) => { // Good sign. Only the processor can fail processor_result? @@ -559,7 +561,7 @@ impl ExtractArchiveJob { } }; - progress.objects_uploaded(total_objects, total_object_bytes); + progress.objects_uploaded(total_objects, total_object_bytes, elapsed); info!("Finished extract job"); @@ -585,6 +587,7 @@ impl ExtractArchiveJob { let mut extracted_object_bytes = 0u64; let mut skipped_objects = 0usize; let mut skipped_object_bytes = 0u64; + let started = Instant::now(); for result in archive .entries() @@ -696,6 +699,7 @@ impl ExtractArchiveJob { skipped_objects, skipped_object_bytes, reader.total_bytes_read(), + started.elapsed(), ); Ok(()) @@ -739,12 +743,18 @@ impl ExtractArchiveJob { target_prefix: String, progress: Arc, mut entry_receiver: mpsc::Receiver, - ) -> Result<(usize, u64)> { + ) -> Result<(usize, u64, Duration)> { // Keep processing entries until the sender is dropped let mut total_objects = 0usize; let mut total_object_bytes = 0u64; + let mut started: Option = None; while let Some(tar_entry_component) = entry_receiver.recv().await { + // Don't start counting time elapsed for upload until the first entry is received + if started.is_none() { + started = Some(Instant::now()); + } + match tar_entry_component { TarEntryComponent::SmallFile { path, data } => { let key = format!("{}{}", target_prefix, path.display()); @@ -875,7 +885,11 @@ impl ExtractArchiveJob { debug!("Entry sender dropped; no more tar entries to process"); - Ok((total_objects, total_object_bytes)) + Ok(( + total_objects, + total_object_bytes, + started.expect("BUG: unconditionally set in loop").elapsed(), + )) } } diff --git a/ssstar/src/lib.rs b/ssstar/src/lib.rs index 7d44d6d..7350c8a 100644 --- a/ssstar/src/lib.rs +++ b/ssstar/src/lib.rs @@ -1,3 +1,5 @@ +#![doc = include_str!("../README.md")] + mod async_bridge; mod config; mod create; diff --git a/ssstar/tests/progress/mod.rs b/ssstar/tests/progress/mod.rs index dfa3b91..b69f632 100644 --- a/ssstar/tests/progress/mod.rs +++ b/ssstar/tests/progress/mod.rs @@ -3,6 +3,7 @@ use more_asserts::*; use ssstar::{CreateProgressCallback, ExtractProgressCallback}; use std::sync::{Arc, Mutex}; +use std::time::Duration; #[derive(Clone, Debug, strum::EnumDiscriminants)] #[allow(dead_code)] // Not all of these are used in tests but we want to capture all fields for all events @@ -539,7 +540,7 @@ impl CreateProgressCallback for TestCreateProgressCallback { }); } - fn input_objects_download_completed(&self, total_bytes: u64) { + fn input_objects_download_completed(&self, total_bytes: u64, _duration: Duration) { self.report_event(CreateProgressEvent::InputObjectsDownloadCompleted { total_bytes }); } @@ -602,7 +603,7 @@ impl CreateProgressCallback for TestCreateProgressCallback { self.report_event(CreateProgressEvent::TarArchiveBytesUploaded { bytes_uploaded }); } - fn tar_archive_upload_completed(&self, size: u64) { + fn tar_archive_upload_completed(&self, size: u64, _duration: Duration) { self.report_event(CreateProgressEvent::TarArchiveUploadCompleted { size }); } } @@ -1043,6 +1044,7 @@ impl ExtractProgressCallback for TestExtractProgressCallback { skipped_objects: usize, skipped_object_bytes: u64, total_bytes: u64, + _duration: Duration, ) { self.report_event(ExtractProgressEvent::ExtractFinished { extracted_objects, @@ -1074,7 +1076,7 @@ impl ExtractProgressCallback for TestExtractProgressCallback { }); } - fn objects_uploaded(&self, total_objects: usize, total_object_bytes: u64) { + fn objects_uploaded(&self, total_objects: usize, total_object_bytes: u64, _duration: Duration) { self.report_event(ExtractProgressEvent::ObjectsUploaded { total_objects, total_object_bytes,