diff --git a/Cargo.lock b/Cargo.lock index bbead77..5eb0ac1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1743,9 +1743,10 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "s3-manifest" -version = "0.0.5" +version = "0.0.7" dependencies = [ "arrow", + "bytes", "chrono", "clap", "futures", @@ -1753,6 +1754,7 @@ dependencies = [ "parquet", "rusoto_core", "rusoto_s3", + "tempfile", "tokio", "tokio-retry", "url", diff --git a/Cargo.toml b/Cargo.toml index d8dc207..03af73b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "s3-manifest" authors = ["Kevin Booth "] -version = "0.0.5" +version = "0.0.7" edition = "2021" [dependencies] @@ -14,5 +14,7 @@ parquet = "52.2.0" rusoto_core = "0.48.0" rusoto_s3 = "0.48.0" tokio = { version = "1.0", features = ["full"] } -tokio-retry = "0.3" url = "2.2.2" +tempfile = "3.2" +tokio-retry = { version = "0.3", features = [] } +bytes = "1.0" diff --git a/README.md b/README.md index d28d55c..4916a6d 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,95 @@ - # S3 Manifest Generator +# S3 Manifest Generator - S3 Manifest Generator is a Rust-based command-line tool that creates a Parquet manifest file for objects in an S3 bucket or a specific prefix within a bucket. This tool is useful for quickly generating an inventory of S3 objects, including their metadata, in a compact and efficiently queryable format. +S3 Manifest Generator is a Rust-based command-line tool that creates a Parquet manifest file for objects in an S3 bucket or a specific prefix within a bucket. This tool is useful for quickly generating an inventory of S3 objects, including their metadata, in a compact and efficiently queryable format. - ## Features +## Features - - Generate a Parquet manifest file for S3 objects - - Support for custom S3-compatible endpoints - - Configurable delimiter for file name extraction - - Progress bar with real-time statistics - - Retry mechanism for S3 API calls - - Efficient batch processing of S3 objects +- Generate a Parquet manifest file for S3 objects +- Support for custom S3-compatible endpoints for both source and destination +- Configurable delimiter for file name extraction +- Progress bar with real-time statistics +- Retry mechanism for S3 API calls +- Efficient batch processing of S3 objects +- Option to output to local file or directly to S3 +- Support for separate credentials for source and destination buckets - ## Installation +## Installation - To install the S3 Manifest Generator, you need to have Rust and Cargo installed on your system. Then, you can build the project from source: +To install the S3 Manifest Generator, you need to have Rust and Cargo installed on your system. Then, you can build the project from source: - ```bash - git clone https://github.com/source-cooperative/s3-manifest.git - cd s3-manifest - cargo build --release - ``` +```bash +git clone https://github.com/source-cooperative/s3-manifest.git +cd s3-manifest +cargo build --release +``` - The compiled binary will be available in the `target/release` directory. +The compiled binary will be available in the `target/release` directory. - ## Usage +## Usage - ```bash - s3-manifest [OPTIONS] - ``` +```bash +s3-manifest [OPTIONS] +``` - ### Arguments +### Arguments - - ``: S3 URI containing both bucket and prefix (e.g., s3://bucket-name/prefix) +- ``: S3 URI containing both bucket and prefix (e.g., s3://bucket-name/prefix) - ### Options +### Options - - `-o, --output `: Output file name for the Parquet manifest [default: manifest.parquet] - - `--endpoint-url `: Custom S3 endpoint URL (optional, use for S3-compatible services) - - `-d, --delimiter `: Delimiter to use for extracting file name [default: "/"] - - `-h, --help`: Print help information - - `-V, --version`: Print version information +- `-o, --output `: Output file name for the Parquet manifest (local path or S3 URI) +- `--source-endpoint `: Custom S3 endpoint URL for source bucket (optional, use for S3-compatible services) +- `--dest-endpoint `: Custom S3 endpoint URL for destination bucket (optional, use for S3-compatible services) +- `-d, --delimiter `: Delimiter to use for extracting file name [default: "/"] +- `--source-access-key `: AWS Access Key ID for the source bucket +- `--source-secret-key `: AWS Secret Access Key for the source bucket +- `--dest-access-key `: AWS Access Key ID for the destination bucket +- `--dest-secret-key `: AWS Secret Access Key for the destination bucket +- `-h, --help`: Print help information +- `-V, --version`: Print version information - ### Example +### Example - ```bash - s3-manifest s3://my-bucket/my-prefix -o my-manifest.parquet --delimiter "/" - ``` +```bash +s3-manifest s3://my-bucket/my-prefix -o s3://output-bucket/my-manifest.parquet --delimiter "/" --source-endpoint https://custom-s3.example.com +``` - This command will generate a Parquet manifest file named `my-manifest.parquet` for all objects in the `my-prefix` of the `my-bucket` S3 bucket, using "/" as the delimiter for file name extraction. +This command will generate a Parquet manifest file named `my-manifest.parquet` in the `output-bucket` S3 bucket for all objects in the `my-prefix` of the `my-bucket` S3 bucket, using "/" as the delimiter for file name extraction and a custom S3 endpoint for the source bucket. - ## Output +## Output - The generated Parquet file contains the following columns: +The generated Parquet file contains the following columns: - - Bucket: The name of the S3 bucket - - Key: The full key of the S3 object - - FileName: The extracted file name based on the specified delimiter - - Size: The size of the object in bytes - - LastModified: The last modified timestamp of the object +- Bucket: The name of the S3 bucket +- Key: The full key of the S3 object +- FileName: The extracted file name based on the specified delimiter +- Size: The size of the object in bytes +- LastModified: The last modified timestamp of the object - ## Dependencies +## Dependencies - This project relies on several Rust crates, including: +This project relies on several Rust crates, including: - - arrow - - chrono - - clap - - indicatif - - parquet - - rusoto_core - - rusoto_s3 - - tokio - - url +- arrow +- chrono +- clap +- futures +- indicatif +- parquet +- rusoto_core +- rusoto_s3 +- tokio +- url +- tempfile +- tokio-retry +- bytes - For a complete list of dependencies and their versions, please refer to the `Cargo.toml` file. +For a complete list of dependencies and their versions, please refer to the `Cargo.toml` file. - ## License +## License - [MIT License](LICENSE) +[MIT License](LICENSE) - ## Contributing +## Contributing - Contributions are welcome! Please feel free to submit a Pull Request. +Contributions are welcome! Please feel free to submit a Pull Request. diff --git a/src/main.rs b/src/main.rs index 2b23397..6c6372b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,58 +6,83 @@ use clap::Parser; use indicatif::{ProgressBar, ProgressStyle}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; +use rusoto_core::credential::{ChainProvider, StaticProvider}; +use rusoto_core::ByteStream; use rusoto_core::{HttpClient, Region}; -use rusoto_s3::{ListObjectsV2Request, Object, S3Client, S3}; +use rusoto_s3::{ListObjectsV2Request, Object, PutObjectRequest, S3Client, S3}; use std::error::Error; use std::fs::File; +use std::io::Read; use std::sync::Arc; use std::time::Instant; +use tempfile::NamedTempFile; use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; use url::Url; -// Define the command-line arguments structure #[derive(Parser, Debug)] #[clap(author, version, about = "Generates a Parquet manifest file for an S3 bucket", long_about = None)] struct Args { /// S3 URI containing both bucket and prefix (e.g., s3://bucket-name/prefix) s3_uri: String, - /// Output file name for the Parquet manifest - #[clap(short, long, default_value = "manifest.parquet")] + /// Output file name for the Parquet manifest (local path or S3 URI) + #[clap(short, long)] output: String, - /// Custom S3 endpoint URL (optional, use for S3-compatible services) - #[clap(long = "endpoint-url")] - endpoint_url: Option, + /// Custom S3 endpoint URL for source bucket (optional, use for S3-compatible services) + #[clap(long = "source-endpoint")] + source_endpoint: Option, + + /// Custom S3 endpoint URL for destination bucket (optional, use for S3-compatible services) + #[clap(long = "dest-endpoint")] + dest_endpoint: Option, /// Delimiter to use for extracting file name (default: "/") #[clap(short, long, default_value = "/")] delimiter: String, + + /// AWS Access Key ID for the source bucket + #[clap(long = "source-access-key")] + source_access_key: Option, + + /// AWS Secret Access Key for the source bucket + #[clap(long = "source-secret-key")] + source_secret_key: Option, + + /// AWS Access Key ID for the destination bucket + #[clap(long = "dest-access-key")] + dest_access_key: Option, + + /// AWS Secret Access Key for the destination bucket + #[clap(long = "dest-secret-key")] + dest_secret_key: Option, } -// Main function #[tokio::main] async fn main() -> Result<(), Box> { - // Parse command-line arguments let args = Args::parse(); - - // Parse the S3 URI let (bucket, prefix) = parse_s3_uri(&args.s3_uri)?; + let (output_bucket, output_key) = parse_output_location(&args.output)?; - // Call the generate_manifest function with parsed arguments generate_manifest( &bucket, &args.output, - args.endpoint_url, + args.source_endpoint, + args.dest_endpoint, prefix, &args.delimiter, + output_bucket, + output_key, + args.source_access_key, + args.source_secret_key, + args.dest_access_key, + args.dest_secret_key, ) .await?; Ok(()) } -// Function to parse S3 URI fn parse_s3_uri(uri: &str) -> Result<(String, Option), Box> { let parsed_url = Url::parse(uri)?; @@ -78,18 +103,35 @@ fn parse_s3_uri(uri: &str) -> Result<(String, Option), Box> { Ok((bucket, prefix)) } -// Function to generate the manifest +fn parse_output_location(output: &str) -> Result<(Option, String), Box> { + if output.starts_with("s3://") { + let parsed_url = Url::parse(output)?; + let bucket = parsed_url + .host_str() + .ok_or("Missing bucket name")? + .to_string(); + let key = parsed_url.path().trim_start_matches('/').to_string(); + Ok((Some(bucket), key)) + } else { + Ok((None, output.to_string())) + } +} + async fn generate_manifest( bucket_name: &str, output_file: &str, - endpoint: Option, + source_endpoint: Option, + dest_endpoint: Option, prefix: Option, delimiter: &str, + output_bucket: Option, + output_key: String, + source_access_key: Option, + source_secret_key: Option, + dest_access_key: Option, + dest_secret_key: Option, ) -> Result<(), Box> { - // Create S3 client - let s3_client = create_s3_client(endpoint)?; - - // Define the schema for the Parquet file + let s3_client = create_s3_client(source_endpoint, source_access_key, source_secret_key)?; let schema = Arc::new(Schema::new(vec![ Field::new("Bucket", DataType::Utf8, false), Field::new("Key", DataType::Utf8, false), @@ -102,12 +144,26 @@ async fn generate_manifest( ), ])); - // Create the output file and Parquet writer - let file = File::create(output_file)?; - let props = WriterProperties::builder().build(); - let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props))?; + let (mut writer, temp_file) = if output_bucket.is_some() { + let temp_file = NamedTempFile::new()?; + let props = WriterProperties::builder().build(); + let writer = ArrowWriter::try_new( + Box::new(temp_file.reopen()?) as Box, + schema.clone(), + Some(props), + )?; + (writer, Some(temp_file)) + } else { + let file = File::create(output_file)?; + let props = WriterProperties::builder().build(); + let writer = ArrowWriter::try_new( + Box::new(file) as Box, + schema.clone(), + Some(props), + )?; + (writer, None) + }; - // Initialize variables for S3 listing and data processing let mut continuation_token: Option = None; let mut bucket_builder = StringBuilder::new(); let mut key_builder = StringBuilder::new(); @@ -115,26 +171,23 @@ async fn generate_manifest( let mut size_builder = UInt64Builder::new(); let mut last_modified_builder = TimestampMillisecondBuilder::new(); - // Set up retry strategy for S3 API calls let retry_strategy = ExponentialBackoff::from_millis(100).map(jitter).take(3); - // Set up progress bar let pb = ProgressBar::new_spinner(); pb.set_style( ProgressStyle::default_spinner() .template("{spinner:.green} [{elapsed_precise}] {pos} Objects Scanned ({per_sec})") .progress_chars("#>-"), ); - pb.set_length(0); // Indeterminate progress bar + pb.set_length(0); let start_time = Instant::now(); let mut total_objects = 0; - // Main loop to process S3 objects loop { let request = ListObjectsV2Request { bucket: bucket_name.to_string(), - prefix: prefix.clone(), // We keep this, but it might not filter as expected + prefix: prefix.clone(), continuation_token: continuation_token.clone(), max_keys: Some(1000), ..Default::default() @@ -154,7 +207,6 @@ async fn generate_manifest( if let Some(objects) = result.contents { for object in objects { - // Check if the object key starts with the desired prefix if let Some(ref prefix) = prefix { if !object .key @@ -162,7 +214,7 @@ async fn generate_manifest( .unwrap_or(&String::new()) .starts_with(prefix) { - continue; // Skip this object if it doesn't match the prefix + continue; } } @@ -193,10 +245,8 @@ async fn generate_manifest( )?; } - // Update the continuation token for the next iteration continuation_token = result.next_continuation_token; - // Check if there are more objects to list if !result.is_truncated.unwrap_or(false) { break; } @@ -206,7 +256,6 @@ async fn generate_manifest( pb.set_message(format!("{:.2} objects/sec", objects_per_second)); } - // Write any remaining data if key_builder.len() > 0 { write_batch( &mut writer, @@ -219,10 +268,19 @@ async fn generate_manifest( )?; } - // Close the Parquet writer writer.close()?; - // Display final statistics + if let Some(temp_file) = temp_file { + let dest_s3_client = create_s3_client(dest_endpoint, dest_access_key, dest_secret_key)?; + upload_to_s3( + &dest_s3_client, + temp_file, + &output_bucket.unwrap(), + &output_key, + ) + .await?; + } + let elapsed = start_time.elapsed(); let objects_per_second = total_objects as f64 / elapsed.as_secs_f64(); pb.finish_with_message(format!( @@ -233,28 +291,73 @@ async fn generate_manifest( Ok(()) } -// Function to create an S3 client -fn create_s3_client(endpoint: Option) -> Result> { - match endpoint { - Some(endpoint_url) => { - let region = Region::Custom { - name: "custom".to_string(), - endpoint: endpoint_url, +fn create_s3_client( + endpoint: Option, + access_key: Option, + secret_key: Option, +) -> Result> { + let region = match endpoint { + Some(endpoint_url) => Region::Custom { + name: "custom".to_string(), + endpoint: endpoint_url, + }, + None => Region::default(), + }; + + match (access_key, secret_key) { + (Some(access_key), Some(secret_key)) => Ok(S3Client::new_with( + HttpClient::new()?, + StaticProvider::new_minimal(access_key, secret_key), + region, + )), + _ => Ok(S3Client::new_with( + HttpClient::new()?, + ChainProvider::new(), + region, + )), + } +} + +async fn upload_to_s3( + s3_client: &S3Client, + temp_file: tempfile::NamedTempFile, + bucket: &str, + key: &str, +) -> Result<(), Box> { + let mut file = temp_file.reopen()?; + let mut contents = Vec::new(); + file.read_to_end(&mut contents)?; + + let retry_strategy = ExponentialBackoff::from_millis(100).map(jitter).take(3); + + // Create a Vec from the contents + let contents_vec = contents.to_vec(); + + let _result = Retry::spawn(retry_strategy, || { + let s3_client = s3_client.clone(); + let bucket = bucket.to_string(); + let key = key.to_string(); + let contents = contents_vec.clone(); + + async move { + let put_request = PutObjectRequest { + bucket: bucket, + key: key, + body: Some(ByteStream::from(contents)), + ..Default::default() }; - Ok(S3Client::new_with( - HttpClient::new()?, - rusoto_core::credential::StaticProvider::new_minimal( - "dummy".to_string(), - "dummy".to_string(), - ), - region, - )) + + s3_client.put_object(put_request).await.map_err(|e| { + println!("Error uploading object, retrying: {:?}", e); + e + }) } - None => Ok(S3Client::new(Region::default())), - } + }) + .await?; + + Ok(()) } -// Function to add an S3 object's data to the Arrow builders fn add_object_to_builders( bucket_name: &str, object: &Object, @@ -265,22 +368,16 @@ fn add_object_to_builders( size_builder: &mut UInt64Builder, last_modified_builder: &mut TimestampMillisecondBuilder, ) -> Result<(), Box> { - // Append the bucket name bucket_builder.append_value(bucket_name); - // Extract and append the object key let key = object.key.as_deref().unwrap_or(""); - key_builder.append_value(key); - // Extract and append the file name let file_name = key.rsplit(delimiter).next().unwrap_or(key); file_name_builder.append_value(file_name); - // Append the object size size_builder.append_value(object.size.unwrap_or(0) as u64); - // Parse and append the last modified timestamp let last_modified = object .last_modified .as_ref() @@ -292,9 +389,8 @@ fn add_object_to_builders( Ok(()) } -// Function to write a batch of data to the Parquet file fn write_batch( - writer: &mut ArrowWriter, + writer: &mut ArrowWriter>, schema: &Arc, bucket_builder: &mut StringBuilder, key_builder: &mut StringBuilder, @@ -302,7 +398,6 @@ fn write_batch( size_builder: &mut UInt64Builder, last_modified_builder: &mut TimestampMillisecondBuilder, ) -> Result<(), Box> { - // Create a RecordBatch from the builders let batch = RecordBatch::try_new( schema.clone(), vec![ @@ -314,10 +409,8 @@ fn write_batch( ], )?; - // Write the batch to the Parquet file writer.write(&batch)?; - // Reset builders by finishing them (which clears their internal state) bucket_builder.finish(); key_builder.finish(); file_name_builder.finish();