Skip to content

Commit

Permalink
refactor: improve error handling in storage module (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Jul 3, 2024
1 parent 199a25d commit 52a9245
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 95 deletions.
Empty file.
198 changes: 125 additions & 73 deletions crates/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context, Result};
use arrow::compute::concat_batches;
use arrow::record_batch::RecordBatch;
use async_recursion::async_recursion;
use bytes::Bytes;
Expand Down Expand Up @@ -60,16 +61,21 @@ impl Storage {
}
}

#[allow(dead_code)]
pub async fn get_file_info(&self, relative_path: &str) -> FileInfo {
let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap();
let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
let meta = self.object_store.head(&obj_path).await.unwrap();
FileInfo {
uri: obj_url.to_string(),
name: obj_path.filename().unwrap().to_string(),
#[cfg(test)]
async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let meta = self.object_store.head(&obj_path).await?;
let uri = obj_url.to_string();
let name = obj_path
.filename()
.ok_or(anyhow!("Failed to get file name for {}", obj_path))?
.to_string();
Ok(FileInfo {
uri,
name,
size: meta.size,
}
})
}

pub async fn get_parquet_file_metadata(&self, relative_path: &str) -> Result<ParquetMetaData> {
Expand All @@ -79,91 +85,115 @@ impl Storage {
let meta = obj_store.head(&obj_path).await?;
let reader = ParquetObjectReader::new(obj_store, meta);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
Ok(builder.metadata().as_ref().to_owned())
Ok(builder.metadata().as_ref().clone())
}

pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap();
let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
let result = self.object_store.get(&obj_path).await.unwrap();
result.bytes().await.unwrap()
pub async fn get_file_data(&self, relative_path: &str) -> Result<Bytes> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let result = self.object_store.get(&obj_path).await?;
let bytes = result.bytes().await?;
Ok(bytes)
}

pub async fn get_parquet_file_data(&self, relative_path: &str) -> Vec<RecordBatch> {
let obj_url = join_url_segments(&self.base_url, &[relative_path]).unwrap();
let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
pub async fn get_parquet_file_data(&self, relative_path: &str) -> Result<RecordBatch> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let obj_store = self.object_store.clone();
let meta = obj_store.head(&obj_path).await.unwrap();
let meta = obj_store.head(&obj_path).await?;

// read parquet
let reader = ParquetObjectReader::new(obj_store, meta);
let stream = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
.build()
.unwrap();
stream
.collect::<Vec<_>>()
.await
.into_iter()
.map(|r| r.unwrap())
.collect()
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let schema = builder.schema().clone();
let mut stream = builder.build()?;
let mut batches = Vec::new();

while let Some(r) = stream.next().await {
let batch = r.context("Failed to read record batch.")?;
batches.push(batch)
}

if batches.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}

concat_batches(&schema, &batches)
.map_err(|e| anyhow!("Failed to concat record batches: {}", e))
}

pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
self.list_dirs_as_obj_paths(subdir)
.await
.into_iter()
.map(|p| p.filename().unwrap().to_string())
.collect()
pub async fn list_dirs(&self, subdir: Option<&str>) -> Result<Vec<String>> {
let dir_paths = self.list_dirs_as_obj_paths(subdir).await?;
let mut dirs = Vec::new();
for dir in dir_paths {
dirs.push(
dir.filename()
.ok_or(anyhow!("Failed to get file name for {}", dir))?
.to_string(),
)
}
Ok(dirs)
}

async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> Vec<ObjPath> {
let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()]).unwrap();
let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
self.object_store
async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) -> Result<Vec<ObjPath>> {
let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()])?;
let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
let list_res = self
.object_store
.list_with_delimiter(Some(&prefix_path))
.await
.unwrap()
.common_prefixes
.await?;
Ok(list_res.common_prefixes)
}

pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileInfo> {
let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()]).unwrap();
let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
self.object_store
pub async fn list_files(&self, subdir: Option<&str>) -> Result<Vec<FileInfo>> {
let prefix_url = join_url_segments(&self.base_url, &[subdir.unwrap_or_default()])?;
let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
let list_res = self
.object_store
.list_with_delimiter(Some(&prefix_path))
.await
.unwrap()
.objects
.into_iter()
.map(|obj_meta| FileInfo {
uri: join_url_segments(&prefix_url, &[obj_meta.location.filename().unwrap()])
.unwrap()
.to_string(),
name: obj_meta.location.filename().unwrap().to_string(),
.await?;
let mut file_info = Vec::new();
for obj_meta in list_res.objects {
let name = obj_meta
.location
.filename()
.ok_or(anyhow!(
"Failed to get file name for {:?}",
obj_meta.location
))?
.to_string();
let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
file_info.push(FileInfo {
uri,
name,
size: obj_meta.size,
})
.collect()
});
}
Ok(file_info)
}
}

#[async_recursion]
pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Vec<String> {
pub async fn get_leaf_dirs(storage: &Storage, subdir: Option<&str>) -> Result<Vec<String>> {
let mut leaf_dirs = Vec::new();
let child_dirs = storage.list_dirs(subdir).await;
let child_dirs = storage.list_dirs(subdir).await?;
if child_dirs.is_empty() {
leaf_dirs.push(subdir.unwrap().to_owned());
leaf_dirs.push(subdir.unwrap_or_default().to_owned());
} else {
for child_dir in child_dirs {
let mut next_subdir = PathBuf::new();
if let Some(curr) = subdir {
next_subdir.push(curr);
}
next_subdir.push(child_dir);
let curr_leaf_dir = get_leaf_dirs(storage, Some(next_subdir.to_str().unwrap())).await;
let next_subdir = next_subdir
.to_str()
.ok_or(anyhow!("Failed to convert path: {:?}", next_subdir))?;
let curr_leaf_dir = get_leaf_dirs(storage, Some(next_subdir)).await?;
leaf_dirs.extend(curr_leaf_dir);
}
}
leaf_dirs
Ok(leaf_dirs)
}

#[cfg(test)]
Expand All @@ -187,17 +217,18 @@ mod tests {
)
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let first_level_dirs: HashSet<String> = storage.list_dirs(None).await.into_iter().collect();
let first_level_dirs: HashSet<String> =
storage.list_dirs(None).await.unwrap().into_iter().collect();
assert_eq!(
first_level_dirs,
vec![".hoodie", "part1", "part2", "part3"]
.into_iter()
.map(String::from)
.collect()
);
let second_level_dirs: Vec<String> = storage.list_dirs(Some("part2")).await;
let second_level_dirs: Vec<String> = storage.list_dirs(Some("part2")).await.unwrap();
assert_eq!(second_level_dirs, vec!["part22"]);
let no_dirs = storage.list_dirs(Some("part1")).await;
let no_dirs = storage.list_dirs(Some("part1")).await.unwrap();
assert!(no_dirs.is_empty());
}

Expand All @@ -211,6 +242,7 @@ mod tests {
let first_level_dirs: HashSet<ObjPath> = storage
.list_dirs_as_obj_paths(None)
.await
.unwrap()
.into_iter()
.collect();
let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1", "part2", "part3"]
Expand All @@ -230,7 +262,12 @@ mod tests {
)
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_info_1: Vec<FileInfo> = storage.list_files(None).await.into_iter().collect();
let file_info_1: Vec<FileInfo> = storage
.list_files(None)
.await
.unwrap()
.into_iter()
.collect();
assert_eq!(
file_info_1,
vec![FileInfo {
Expand All @@ -242,6 +279,7 @@ mod tests {
let file_info_2: Vec<FileInfo> = storage
.list_files(Some("part1"))
.await
.unwrap()
.into_iter()
.collect();
assert_eq!(
Expand All @@ -259,6 +297,7 @@ mod tests {
let file_info_3: Vec<FileInfo> = storage
.list_files(Some("part2/part22"))
.await
.unwrap()
.into_iter()
.collect();
assert_eq!(
Expand All @@ -282,19 +321,33 @@ mod tests {
)
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await;
let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
assert_eq!(
leaf_dirs,
vec![".hoodie", "part1", "part2/part22", "part3/part32/part33"]
);
}

#[tokio::test]
async fn use_storage_to_get_leaf_dirs_for_leaf_dir() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures/leaf_dir")).unwrap())
.unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let leaf_dirs = get_leaf_dirs(&storage, None).await.unwrap();
assert_eq!(
leaf_dirs,
vec![""],
"Listing a leaf dir should get the relative path to itself."
);
}

#[tokio::test]
async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_info = storage.get_file_info("a.parquet").await;
let file_info = storage.get_file_info("a.parquet").await.unwrap();
assert_eq!(file_info.name, "a.parquet");
assert_eq!(
file_info.uri,
Expand All @@ -308,8 +361,7 @@ mod tests {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
let storage = Storage::new(Arc::new(base_url), Arc::new(HashMap::new())).unwrap();
let file_data = storage.get_parquet_file_data("a.parquet").await;
assert_eq!(file_data.len(), 1);
assert_eq!(file_data.first().unwrap().num_rows(), 5);
let file_data = storage.get_parquet_file_data("a.parquet").await.unwrap();
assert_eq!(file_data.num_rows(), 5);
}
}
15 changes: 6 additions & 9 deletions crates/core/src/table/fs_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ impl FileSystemView {
async fn load_partition_paths(storage: &Storage) -> Result<Vec<String>> {
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await
.await?
.into_iter()
.filter(|dir| dir != ".hoodie")
.collect();
let mut partition_paths = Vec::new();
for dir in top_level_dirs {
partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await);
partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?);
}
if partition_paths.is_empty() {
partition_paths.push("".to_string())
Expand Down Expand Up @@ -94,7 +94,7 @@ impl FileSystemView {
) -> Result<Vec<FileGroup>> {
let file_info: Vec<FileInfo> = storage
.list_files(Some(partition_path))
.await
.await?
.into_iter()
.filter(|f| f.name.ends_with(".parquet"))
.collect();
Expand Down Expand Up @@ -152,13 +152,10 @@ impl FileSystemView {
pub async fn read_file_slice_by_path_unchecked(
&self,
relative_path: &str,
) -> Result<Vec<RecordBatch>> {
Ok(self.storage.get_parquet_file_data(relative_path).await)
) -> Result<RecordBatch> {
self.storage.get_parquet_file_data(relative_path).await
}
pub async fn read_file_slice_unchecked(
&self,
file_slice: &FileSlice,
) -> Result<Vec<RecordBatch>> {
pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) -> Result<RecordBatch> {
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
.await
}
Expand Down
11 changes: 5 additions & 6 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Table {
storage_options: Arc<HashMap<String, String>>,
) -> Result<HashMap<String, String>> {
let storage = Storage::new(base_url, storage_options)?;
let data = storage.get_file_data(".hoodie/hoodie.properties").await;
let data = storage.get_file_data(".hoodie/hoodie.properties").await?;
let cursor = std::io::Cursor::new(data);
let lines = BufReader::new(cursor).lines();
let mut properties: HashMap<String, String> = HashMap::new();
Expand Down Expand Up @@ -146,7 +146,7 @@ impl Table {
let mut batches = Vec::new();
for f in file_slices {
match self.file_system_view.read_file_slice_unchecked(&f).await {
Ok(batch) => batches.extend(batch),
Ok(batch) => batches.push(batch),
Err(e) => return Err(anyhow!("Failed to read file slice {:?} - {}", f, e)),
}
}
Expand All @@ -162,7 +162,7 @@ impl Table {
Ok(file_paths)
}

pub async fn read_file_slice_by_path(&self, relative_path: &str) -> Result<Vec<RecordBatch>> {
pub async fn read_file_slice_by_path(&self, relative_path: &str) -> Result<RecordBatch> {
self.file_system_view
.read_file_slice_by_path_unchecked(relative_path)
.await
Expand Down Expand Up @@ -329,9 +329,8 @@ mod tests {
)
.await
.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches.first().unwrap().num_rows(), 4);
assert_eq!(batches.first().unwrap().num_columns(), 21);
assert_eq!(batches.num_rows(), 4);
assert_eq!(batches.num_columns(), 21);
}

#[tokio::test]
Expand Down
Loading

0 comments on commit 52a9245

Please sign in to comment.