From 0146a12210c74aaa635ffba64ad8fec2ea8c3226 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Thu, 2 Jan 2025 21:26:58 +0800 Subject: [PATCH] feat(core): Implement list with deleted for s3 service (#5498) --- core/src/raw/ops.rs | 20 +++ core/src/services/s3/backend.rs | 11 +- core/src/services/s3/core.rs | 31 ++++- core/src/services/s3/lister.rs | 117 +++++++++------- core/src/types/capability.rs | 4 +- core/src/types/metadata.rs | 145 ++++++++++++++------ core/src/types/operator/operator_futures.rs | 44 ++++-- core/tests/behavior/async_list.rs | 33 ++++- core/tests/behavior/utils.rs | 12 +- 9 files changed, 290 insertions(+), 127 deletions(-) diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 2620bb541a1..f002fc903e8 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -112,6 +112,14 @@ pub struct OpList { /// /// Default to `false` versions: bool, + /// The deleted is used to control whether the deleted objects should be returned. + /// + /// - If `false`, list operation will not return with deleted objects + /// - If `true`, list operation will return with deleted objects if object versioning is supported + /// by the underlying service + /// + /// Default to `false` + deleted: bool, } impl Default for OpList { @@ -122,6 +130,7 @@ impl Default for OpList { recursive: false, concurrent: 1, versions: false, + deleted: false, } } } @@ -206,6 +215,17 @@ impl OpList { pub fn versions(&self) -> bool { self.versions } + + /// Change the deleted of this list operation + pub fn with_deleted(mut self, deleted: bool) -> Self { + self.deleted = deleted; + self + } + + /// Get the deleted of this list operation + pub fn deleted(&self) -> bool { + self.deleted + } } /// Args for `presign` operation. diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 24f441f36c4..f0f019100c4 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -983,6 +983,7 @@ impl Access for S3Backend { list_with_start_after: true, list_with_recursive: true, list_with_versions: self.core.enable_versioning, + list_with_deleted: self.core.enable_versioning, list_has_etag: true, list_has_content_md5: true, list_has_content_length: true, @@ -1060,21 +1061,17 @@ impl Access for S3Backend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let l = if args.versions() { + let l = if args.versions() || args.deleted() { TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new( self.core.clone(), path, - args.recursive(), - args.limit(), - args.start_after(), + args, ))) } else { TwoWays::One(PageLister::new(S3Lister::new( self.core.clone(), path, - args.recursive(), - args.limit(), - args.start_after(), + args, ))) }; diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 2f9a359c770..f8140c5e230 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -612,7 +612,6 @@ impl S3Core { write!(url, "&max-keys={limit}").expect("write into string must succeed"); } if let Some(start_after) = start_after { - let start_after = build_abs_path(&self.root, &start_after); write!(url, "&start-after={}", percent_encode_path(&start_after)) .expect("write into string must succeed"); } @@ -994,6 +993,7 @@ pub struct ListObjectVersionsOutput { pub next_version_id_marker: Option, pub common_prefixes: Vec, pub version: Vec, + pub delete_marker: Vec, } #[derive(Default, Debug, Eq, PartialEq, Deserialize)] @@ -1008,6 +1008,15 @@ pub struct ListObjectVersionsOutputVersion { pub etag: Option, } +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectVersionsOutputDeleteMarker { + pub key: String, + pub version_id: String, + pub is_latest: bool, + pub last_modified: String, +} + pub enum ChecksumAlgorithm { Crc32c, } @@ -1284,6 +1293,16 @@ mod tests { videos/ + + my-third-image.jpg + 03jpff543dhffds434rfdsFDN943fdsFkdmqnh892 + true + 2009-10-15T17:50:30.000Z + + 75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a + mtd@amazon.com + + "#, ); @@ -1329,5 +1348,15 @@ mod tests { } ] ); + + assert_eq!( + output.delete_marker, + vec![ListObjectVersionsOutputDeleteMarker { + key: "my-third-image.jpg".to_owned(), + version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(), + is_latest: true, + last_modified: "2009-10-15T17:50:30.000Z".to_owned(), + },] + ); } } diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs index fb27f23ee5b..dd4359499d2 100644 --- a/core/src/services/s3/lister.rs +++ b/core/src/services/s3/lister.rs @@ -35,29 +35,26 @@ pub struct S3Lister { core: Arc, path: String, - delimiter: &'static str, - limit: Option, + args: OpList, - /// Amazon S3 starts listing **after** this specified key - start_after: Option, + delimiter: &'static str, + abs_start_after: Option, } impl S3Lister { - pub fn new( - core: Arc, - path: &str, - recursive: bool, - limit: Option, - start_after: Option<&str>, - ) -> Self { - let delimiter = if recursive { "" } else { "/" }; + pub fn new(core: Arc, path: &str, args: OpList) -> Self { + let delimiter = if args.recursive() { "" } else { "/" }; + let abs_start_after = args + .start_after() + .map(|start_after| build_abs_path(&core.root, start_after)); + Self { core, path: path.to_string(), + args, delimiter, - limit, - start_after: start_after.map(String::from), + abs_start_after, } } } @@ -70,10 +67,10 @@ impl oio::PageList for S3Lister { &self.path, &ctx.token, self.delimiter, - self.limit, + self.args.limit(), // start after should only be set for the first page. if ctx.token.is_empty() { - self.start_after.clone() + self.abs_start_after.clone() } else { None }, @@ -143,35 +140,29 @@ impl oio::PageList for S3Lister { } } -// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html +/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html pub struct S3ObjectVersionsLister { core: Arc, prefix: String, + args: OpList, + delimiter: &'static str, - limit: Option, - start_after: String, - abs_start_after: String, + abs_start_after: Option, } impl S3ObjectVersionsLister { - pub fn new( - core: Arc, - path: &str, - recursive: bool, - limit: Option, - start_after: Option<&str>, - ) -> Self { - let delimiter = if recursive { "" } else { "/" }; - let start_after = start_after.unwrap_or_default().to_owned(); - let abs_start_after = build_abs_path(core.root.as_str(), start_after.as_str()); + pub fn new(core: Arc, path: &str, args: OpList) -> Self { + let delimiter = if args.recursive() { "" } else { "/" }; + let abs_start_after = args + .start_after() + .map(|start_after| build_abs_path(&core.root, start_after)); Self { core, prefix: path.to_string(), + args, delimiter, - limit, - start_after, abs_start_after, } } @@ -182,8 +173,8 @@ impl oio::PageList for S3ObjectVersionsLister { let markers = ctx.token.rsplit_once(" "); let (key_marker, version_id_marker) = if let Some(data) = markers { data - } else if !self.start_after.is_empty() { - (self.abs_start_after.as_str(), "") + } else if let Some(start_after) = &self.abs_start_after { + (start_after.as_str(), "") } else { ("", "") }; @@ -193,7 +184,7 @@ impl oio::PageList for S3ObjectVersionsLister { .s3_list_object_versions( &self.prefix, self.delimiter, - self.limit, + self.args.limit(), key_marker, version_id_marker, ) @@ -231,26 +222,48 @@ impl oio::PageList for S3ObjectVersionsLister { ctx.entries.push_back(de); } - for version_object in output.version { - let mut path = build_rel_path(&self.core.root, &version_object.key); - if path.is_empty() { - path = "/".to_owned(); + if self.args.versions() { + for version_object in output.version { + let mut path = build_rel_path(&self.core.root, &version_object.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + meta.set_version(&version_object.version_id); + meta.set_is_current(version_object.is_latest); + meta.set_content_length(version_object.size); + meta.set_last_modified(parse_datetime_from_rfc3339( + version_object.last_modified.as_str(), + )?); + if let Some(etag) = version_object.etag { + meta.set_etag(&etag); + meta.set_content_md5(etag.trim_matches('"')); + } + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); } + } - let mut meta = Metadata::new(EntryMode::from_path(&path)); - meta.set_version(&version_object.version_id); - meta.set_is_current(version_object.is_latest); - meta.set_content_length(version_object.size); - meta.set_last_modified(parse_datetime_from_rfc3339( - version_object.last_modified.as_str(), - )?); - if let Some(etag) = version_object.etag { - meta.set_etag(&etag); - meta.set_content_md5(etag.trim_matches('"')); + if self.args.deleted() { + for delete_marker in output.delete_marker { + let mut path = build_rel_path(&self.core.root, &delete_marker.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_version(&delete_marker.version_id); + meta.set_is_deleted(true); + meta.set_is_current(delete_marker.is_latest); + meta.set_last_modified(parse_datetime_from_rfc3339( + delete_marker.last_modified.as_str(), + )?); + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); } - - let entry = oio::Entry::new(&path, meta); - ctx.entries.push_back(entry); } Ok(()) diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 2c4d3faf402..c5165a6a586 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -177,8 +177,10 @@ pub struct Capability { /// Indicates if versions listing is supported. #[deprecated(since = "0.51.1", note = "use with_versions instead")] pub list_with_version: bool, - /// Indicates if versions listing is supported. + /// Indicates if listing with versions included is supported. pub list_with_versions: bool, + /// Indicates if listing with deleted files included is supported. + pub list_with_deleted: bool, /// Indicates whether cache control information is available in list response pub list_has_cache_control: bool, /// Indicates whether content disposition information is available in list response diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs index 9e3066596b8..e6e6130aa19 100644 --- a/core/src/types/metadata.rs +++ b/core/src/types/metadata.rs @@ -27,11 +27,32 @@ use crate::*; /// Depending on the context of the requests, the metadata for the same path may vary. For example, two /// versions of the same path might have different content lengths. Keep in mind that metadata is always /// tied to the given context and is not a global state. +/// +/// ## File Versions +/// +/// In systems that support versioning, such as AWS S3, the metadata may represent a specific version +/// of a file. +/// +/// Users can access [`Metadata::version`] to retrieve the file's version, if available. They can also +/// use [`Metadata::is_current`] and [`Metadata::is_deleted`] to determine whether the metadata +/// corresponds to the latest version or a deleted one. +/// +/// The all possible combinations of `is_current` and `is_deleted` are as follows: +/// +/// | `is_current` | `is_deleted` | description | +/// |---------------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +/// | `Some(true)` | `false` | **The metadata's associated version is the latest, current version.** This is the normal state, indicating that this version is the most up-to-date and accessible version. | +/// | `Some(true)` | `true` | **The metadata's associated version is the latest, deleted version (Latest Delete Marker or Soft Deleted).** This is particularly important in object storage systems like S3. It signifies that this version is the **most recent delete marker**, indicating the object has been deleted. Subsequent GET requests will return 404 errors unless a specific version ID is provided. | +/// | `Some(false)` | `false` | **The metadata's associated version is neither the latest version nor deleted.** This indicates that this version is a previous version, still accessible by specifying its version ID. | +/// | `Some(false)` | `true` | **The metadata's associated version is not the latest version and is deleted.** This represents a historical version that has been marked for deletion. Users will need to specify the version ID to access it, and accessing it may be subject to specific delete marker behavior (e.g., in S3, it might not return actual data but a specific delete marker response). | +/// | `None` | `false` | **The metadata's associated file is not deleted, but its version status is either unknown or it is not the latest version.** This likely indicates that versioning is not enabled for this file, or versioning information is unavailable. | +/// | `None` | `true` | **The metadata's associated file is deleted, but its version status is either unknown or it is not the latest version.** This typically means the file was deleted without versioning enabled, or its versioning information is unavailable. This may represent an actual data deletion operation rather than an S3 delete marker. | #[derive(Debug, Clone, Eq, PartialEq)] pub struct Metadata { mode: EntryMode, is_current: Option, + is_deleted: bool, cache_control: Option, content_disposition: Option, @@ -54,6 +75,7 @@ impl Metadata { mode, is_current: None, + is_deleted: false, cache_control: None, content_length: None, @@ -74,6 +96,18 @@ impl Metadata { self.mode } + /// Set mode for entry. + pub fn set_mode(&mut self, v: EntryMode) -> &mut Self { + self.mode = v; + self + } + + /// Set mode for entry. + pub fn with_mode(mut self, v: EntryMode) -> Self { + self.mode = v; + self + } + /// Returns `true` if this metadata is for a file. pub fn is_file(&self) -> bool { matches!(self.mode, EntryMode::FILE) @@ -84,15 +118,71 @@ impl Metadata { matches!(self.mode, EntryMode::DIR) } - /// Set mode for entry. - pub fn set_mode(&mut self, v: EntryMode) -> &mut Self { - self.mode = v; + /// Checks whether the metadata corresponds to the most recent version of the file. + /// + /// This function is particularly useful when working with versioned objects, + /// such as those stored in systems like AWS S3 with versioning enabled. It helps + /// determine if the retrieved metadata represents the current state of the file + /// or an older version. + /// + /// Refer to docs in [`Metadata`] for more information about file versions. + /// + /// # Return Value + /// + /// The function returns an `Option` which can have the following values: + /// + /// - `Some(true)`: Indicates that the metadata **is** associated with the latest version of the file. + /// The metadata is current and reflects the most up-to-date state. + /// - `Some(false)`: Indicates that the metadata **is not** associated with the latest version of the file. + /// The metadata belongs to an older version, and there might be a more recent version available. + /// - `None`: Indicates that the currency of the metadata **cannot be determined**. This might occur if + /// versioning is not supported or enabled, or if there is insufficient information to ascertain the version status. + pub fn is_current(&self) -> Option { + self.is_current + } + + /// Set the `is_current` status of this entry. + /// + /// By default, this value will be `None`. Please avoid using this API if it's unclear whether the entry is current. + /// Set it to `true` if it is known to be the latest; otherwise, set it to `false`. + pub fn set_is_current(&mut self, is_current: bool) -> &mut Self { + self.is_current = Some(is_current); self } - /// Set mode for entry. - pub fn with_mode(mut self, v: EntryMode) -> Self { - self.mode = v; + /// Set the `is_current` status of this entry. + /// + /// By default, this value will be `None`. Please avoid using this API if it's unclear whether the entry is current. + /// Set it to `true` if it is known to be the latest; otherwise, set it to `false`. + pub fn with_is_current(mut self, is_current: Option) -> Self { + self.is_current = is_current; + self + } + + /// Checks if the file (or version) associated with this metadata has been deleted. + /// + /// This function returns `true` if the file represented by this metadata has been marked for + /// deletion or has been permanently deleted. + /// It returns `false` otherwise, indicating that the file (or version) is still present and accessible. + /// + /// Refer to docs in [`Metadata`] for more information about file versions. + /// + /// # Returns + /// + /// `bool`: `true` if the object is considered deleted, `false` otherwise. + pub fn is_deleted(&self) -> bool { + self.is_deleted + } + + /// Set the deleted status of this entry. + pub fn set_is_deleted(&mut self, v: bool) -> &mut Self { + self.is_deleted = v; + self + } + + /// Set the deleted status of this entry. + pub fn with_is_deleted(mut self, v: bool) -> Self { + self.is_deleted = v; self } @@ -127,6 +217,10 @@ impl Metadata { /// `Content-Length` is defined by [RFC 7230](https://httpwg.org/specs/rfc7230.html#header.content-length) /// /// Refer to [MDN Content-Length](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length) for more information. + /// + /// # Returns + /// + /// Content length of this entry. It will be `0` if the content length is not set by the storage services. pub fn content_length(&self) -> u64 { self.content_length.unwrap_or_default() } @@ -326,45 +420,6 @@ impl Metadata { self } - /// Checks whether the metadata corresponds to the most recent version of the file. - /// - /// This function is particularly useful when working with versioned objects, - /// such as those stored in systems like AWS S3 with versioning enabled. It helps - /// determine if the retrieved metadata represents the current state of the file - /// or an older version. - /// - /// # Return Value - /// - /// The function returns an `Option` which can have the following values: - /// - /// - `Some(true)`: Indicates that the metadata **is** associated with the latest version of the file. - /// The metadata is current and reflects the most up-to-date state. - /// - `Some(false)`: Indicates that the metadata **is not** associated with the latest version of the file. - /// The metadata belongs to an older version, and there might be a more recent version available. - /// - `None`: Indicates that the currency of the metadata **cannot be determined**. This might occur if - /// versioning is not supported or enabled, or if there is insufficient information to ascertain the version status. - pub fn is_current(&self) -> Option { - self.is_current - } - - /// Set the `is_current` status of this entry. - /// - /// By default, this value will be `None`. Please avoid using this API if it's unclear whether the entry is current. - /// Set it to `true` if it is known to be the latest; otherwise, set it to `false`. - pub fn set_is_current(&mut self, is_current: bool) -> &mut Self { - self.is_current = Some(is_current); - self - } - - /// Set the `is_current` status of this entry. - /// - /// By default, this value will be `None`. Please avoid using this API if it's unclear whether the entry is current. - /// Set it to `true` if it is known to be the latest; otherwise, set it to `false`. - pub fn with_is_current(mut self, is_current: Option) -> Self { - self.is_current = is_current; - self - } - /// User defined metadata of this entry /// /// The prefix of the user defined metadata key(for example: in oss, it's x-oss-meta-) diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index f3dceb2d67b..478025a3065 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -501,16 +501,30 @@ impl>>> FutureList { self.map(|args| args.with_versions(v)) } - /// The version is used to control whether the object versions should be returned. + /// Controls whether the `list` operation should return file versions. /// - /// - If `false`, list operation will not return with object versions - /// - If `true`, list operation will return with object versions if object versioning is supported - /// by the underlying service + /// This function allows you to specify if the `list` operation, when executed, should include + /// information about different versions of files, if versioning is supported and enabled. + /// + /// If `true`, subsequent `list` operations will include version information for each file. + /// If `false`, version information will be omitted from the `list` results. /// /// Default to `false` pub fn versions(self, v: bool) -> Self { self.map(|args| args.with_versions(v)) } + + /// Controls whether the `list` operation should include deleted files (or versions). + /// + /// This function allows you to specify if the `list` operation, when executed, should include + /// entries for files or versions that have been marked as deleted. This is particularly relevant + /// in object storage systems that support soft deletion or versioning. + /// + /// If `true`, subsequent `list` operations will include deleted files or versions. + /// If `false`, deleted files or versions will be excluded from the `list` results. + pub fn deleted(self, v: bool) -> Self { + self.map(|args| args.with_deleted(v)) + } } /// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`]. @@ -555,14 +569,28 @@ impl>> FutureLister { self.map(|args| args.with_versions(v)) } - /// The version is used to control whether the object versions should be returned. + /// Controls whether the `list` operation should return file versions. /// - /// - If `false`, list operation will not return with object versions - /// - If `true`, list operation will return with object versions if object versioning is supported - /// by the underlying service + /// This function allows you to specify if the `list` operation, when executed, should include + /// information about different versions of files, if versioning is supported and enabled. + /// + /// If `true`, subsequent `list` operations will include version information for each file. + /// If `false`, version information will be omitted from the `list` results. /// /// Default to `false` pub fn versions(self, v: bool) -> Self { self.map(|args| args.with_versions(v)) } + + /// Controls whether the `list` operation should include deleted files (or versions). + /// + /// This function allows you to specify if the `list` operation, when executed, should include + /// entries for files or versions that have been marked as deleted. This is particularly relevant + /// in object storage systems that support soft deletion or versioning. + /// + /// If `true`, subsequent `list` operations will include deleted files or versions. + /// If `false`, deleted files or versions will be excluded from the `list` results. + pub fn deleted(self, v: bool) -> Self { + self.map(|args| args.with_deleted(v)) + } } diff --git a/core/tests/behavior/async_list.rs b/core/tests/behavior/async_list.rs index 6a0aa2ffb5e..3e0822d8ab6 100644 --- a/core/tests/behavior/async_list.rs +++ b/core/tests/behavior/async_list.rs @@ -49,7 +49,8 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_remove_all, test_list_files_with_versions, test_list_with_versions_and_limit, - test_list_with_versions_and_start_after + test_list_with_versions_and_start_after, + test_list_files_with_deleted )) } @@ -595,15 +596,33 @@ pub async fn test_list_files_with_versions(op: Operator) -> Result<()> { assert_eq!(de.name(), file_name); let meta = de.metadata(); assert_eq!(meta.mode(), EntryMode::FILE); + } + + Ok(()) +} - // just ensure they don't panic - let _ = meta.content_length(); - let _ = meta.version(); - let _ = meta.last_modified(); - let _ = meta.etag(); - let _ = meta.content_md5(); +pub async fn test_list_files_with_deleted(op: Operator) -> Result<()> { + if !op.info().full_capability().list_with_deleted { + return Ok(()); } + let parent = TEST_FIXTURE.new_dir_path(); + let file_name = TEST_FIXTURE.new_file_path(); + let file_path = format!("{}{}", parent, file_name); + op.write(file_path.as_str(), "1").await?; + op.write(file_path.as_str(), "2").await?; + op.delete(file_path.as_str()).await?; + + // This file has been deleted + let mut ds = op.list_with(&file_path).deleted(true).await?; + ds.retain(|de| de.path() == file_path && de.metadata().is_deleted()); + + assert_eq!( + ds.len(), + 1, + "deleted file should be found and only have one" + ); + Ok(()) } diff --git a/core/tests/behavior/utils.rs b/core/tests/behavior/utils.rs index 91deb1178b5..1f4c2410ee3 100644 --- a/core/tests/behavior/utils.rs +++ b/core/tests/behavior/utils.rs @@ -187,12 +187,12 @@ impl Fixture { pub async fn cleanup(&self, op: impl Into) { let op = op.into(); let paths: Vec<_> = mem::take(self.paths.lock().unwrap().as_mut()); - for path in paths.iter() { - // We try our best to clean up fixtures, but won't panic if failed. - let _ = op.delete(path).await.map_err(|err| { - log::error!("fixture cleanup path {path} failed: {:?}", err); - }); - log::info!("fixture cleanup path {path} succeeded") + // Don't call delete if paths is empty + if paths.is_empty() { + return; } + + // We try our best to clean up fixtures, but won't panic if failed. + let _ = op.delete_iter(paths).await; } }