From f7f9990f95146400c3aef8afc11804e4a6e3afe3 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 3 Jan 2025 15:49:35 +0800 Subject: [PATCH] fix(integration/object_store): object_store requires metadata in list (#5501) --- integrations/object_store/src/store.rs | 73 ++++++++++++++++++-------- integrations/object_store/src/utils.rs | 12 +---- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/integrations/object_store/src/store.rs b/integrations/object_store/src/store.rs index c5ba9a6750f7..32ccbc2d8a23 100644 --- a/integrations/object_store/src/store.rs +++ b/integrations/object_store/src/store.rs @@ -323,32 +323,52 @@ impl ObjectStore for OpendalStore { let offset = offset.clone(); let fut = async move { - let fut = if self.inner.info().full_capability().list_with_start_after { - self.inner - .lister_with(&path) - .start_after(offset.as_ref()) - .recursive(true) - .into_future() - .into_send() - .await - .map_err(|err| format_object_store_error(err, &path))? - .then(try_format_object_meta) - .into_send() - .boxed() + let list_with_start_after = self.inner.info().full_capability().list_with_start_after; + let mut fut = self.inner.lister_with(&path).recursive(true); + + // Use native start_after support if possible. + if list_with_start_after { + fut = fut.start_after(offset.as_ref()); + } + + let lister = fut + .await + .map_err(|err| format_object_store_error(err, &path))? + .then(move |entry| { + let path = path.clone(); + + async move { + let entry = entry.map_err(|err| format_object_store_error(err, &path))?; + let (path, metadata) = entry.into_parts(); + + // If it's a dir or last_modified is present, we can use it directly. + if metadata.is_dir() || metadata.last_modified().is_some() { + let object_meta = format_object_meta(&path, &metadata); + return Ok(object_meta); + } + + let metadata = self + .inner + .stat(&path) + .await + .map_err(|err| format_object_store_error(err, &path))?; + let object_meta = format_object_meta(&path, &metadata); + Ok::<_, object_store::Error>(object_meta) + } + }) + .into_send() + .boxed(); + + let stream = if list_with_start_after { + lister } else { - self.inner - .lister_with(&path) - .recursive(true) - .into_future() - .into_send() - .await - .map_err(|err| format_object_store_error(err, &path))? - .try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref())) - .then(try_format_object_meta) + lister + .try_filter(move |entry| futures::future::ready(entry.location > offset)) .into_send() .boxed() }; - Ok::<_, object_store::Error>(fut) + + Ok::<_, object_store::Error>(stream) }; fut.into_stream().into_send().try_flatten().boxed() @@ -374,8 +394,15 @@ impl ObjectStore for OpendalStore { if meta.is_dir() { common_prefixes.push(entry.path().into()); - } else { + } else if meta.last_modified().is_some() { objects.push(format_object_meta(entry.path(), meta)); + } else { + let meta = self + .inner + .stat(entry.path()) + .await + .map_err(|err| format_object_store_error(err, entry.path()))?; + objects.push(format_object_meta(entry.path(), &meta)); } } diff --git a/integrations/object_store/src/utils.rs b/integrations/object_store/src/utils.rs index 5eb4a10f73bc..63357402b7a0 100644 --- a/integrations/object_store/src/utils.rs +++ b/integrations/object_store/src/utils.rs @@ -17,7 +17,7 @@ use futures::Stream; use object_store::ObjectMeta; -use opendal::{Entry, Metadata}; +use opendal::Metadata; use std::future::IntoFuture; /// Conditionally add the `Send` marker trait for the wrapped type. @@ -60,16 +60,6 @@ pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta { } } -/// Try to format `opendal::Entry` to `object_store::ObjectMeta`. -pub async fn try_format_object_meta( - res: object_store::Result, -) -> object_store::Result { - let entry = res.map_err(|err| format_object_store_error(err, ""))?; - let meta = entry.metadata(); - - Ok(format_object_meta(entry.path(), meta)) -} - /// Make given future `Send`. pub trait IntoSendFuture { type Output;