Skip to content

Commit

Permalink
feat: support incremental read MOR tables
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 21, 2025
1 parent d1655c4 commit d26cebd
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 97 deletions.
51 changes: 36 additions & 15 deletions crates/core/src/file_group/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,42 @@ pub fn build_file_groups(commit_metadata: &Map<String, Value>) -> Result<HashSet
.and_then(|v| v.as_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in write stats".into()))?;

let path = stat
.get("path")
.and_then(|v| v.as_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid path in write stats".into()))?;

let file_name = Path::new(path)
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid file name in path".into()))?;

let file_group = FileGroup::new_with_base_file_name(
file_id.to_string(),
partition.clone(),
file_name,
)?;
let mut file_group = FileGroup::new(file_id.to_string(), partition.clone());

if let Some(base_file_name) = stat.get("baseFile") {
let base_file_name = base_file_name
.as_str()
.ok_or_else(|| CoreError::CommitMetadata("Invalid base file name".into()))?;
file_group.add_base_file_from_name(base_file_name)?;

Check warning on line 53 in crates/core/src/file_group/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/builder.rs#L52-L53

Added lines #L52 - L53 were not covered by tests

if let Some(log_file_names) = stat.get("logFiles") {
let log_file_names = log_file_names.as_array().ok_or_else(|| {
CoreError::CommitMetadata("Invalid log files array".into())

Check warning on line 57 in crates/core/src/file_group/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/builder.rs#L57

Added line #L57 was not covered by tests
})?;
for log_file_name in log_file_names {
let log_file_name = log_file_name.as_str().ok_or_else(|| {
CoreError::CommitMetadata("Invalid log file name".into())

Check warning on line 61 in crates/core/src/file_group/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/builder.rs#L61

Added line #L61 was not covered by tests
})?;
file_group.add_log_file_from_name(log_file_name)?;

Check warning on line 63 in crates/core/src/file_group/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/builder.rs#L63

Added line #L63 was not covered by tests
}
} else {
return Err(CoreError::CommitMetadata(
"Missing log files in write stats".into(),

Check warning on line 67 in crates/core/src/file_group/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/builder.rs#L66-L67

Added lines #L66 - L67 were not covered by tests
));
}
} else {
let path = stat.get("path").and_then(|v| v.as_str()).ok_or_else(|| {
CoreError::CommitMetadata("Invalid path in write stats".into())
})?;

let file_name = Path::new(path)
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid file name in path".into()))?;

file_group.add_base_file_from_name(file_name)?;

Check warning on line 80 in crates/core/src/file_group/builder.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/file_group/builder.rs#L80

Added line #L80 was not covered by tests
}

file_groups.insert(file_group);
}
}
Expand Down
167 changes: 85 additions & 82 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl Table {
end_timestamp: Option<&str>,
) -> Result<Vec<RecordBatch>> {
// If the end timestamp is not provided, use the latest commit timestamp.
let Some(as_of_timestamp) =
let Some(end_timestamp) =
end_timestamp.or_else(|| self.timeline.get_latest_commit_timestamp())
else {
return Ok(Vec::new());
Expand All @@ -336,24 +336,25 @@ impl Table {
let mut file_slices: Vec<FileSlice> = Vec::new();
let file_groups = self
.timeline
.get_incremental_file_groups(Some(start_timestamp), Some(as_of_timestamp))
.get_incremental_file_groups(Some(start_timestamp), Some(end_timestamp))

Check warning on line 339 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L339

Added line #L339 was not covered by tests
.await?;
for file_group in file_groups {
if let Some(file_slice) = file_group.get_file_slice_as_of(as_of_timestamp) {
if let Some(file_slice) = file_group.get_file_slice_as_of(end_timestamp) {
file_slices.push(file_slice.clone());
}
}

// Read incremental records from the file slices.
let filters = &[
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp),
FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp),
];
let fg_reader =
self.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())?;
let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
let instant_range = InstantRange::up_to(as_of_timestamp, &timezone);
let instant_range =
InstantRange::within_open_closed(start_timestamp, end_timestamp, &timezone);

Check warning on line 357 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L355-L357

Added lines #L355 - L357 were not covered by tests
let batches = futures::future::try_join_all(
file_slices
.iter()
Expand Down Expand Up @@ -992,93 +993,95 @@ mod tests {

mod test_incremental_queries {
use super::super::*;
use arrow_array::{Array, StringArray};
use arrow_select::concat::concat_batches;
use hudi_tests::SampleTable;
use std::collections::HashSet;

#[tokio::test]
async fn test_empty() -> Result<()> {
let base_url = SampleTable::V6Empty.url_to_cow();
let hudi_table = Table::new(base_url.path()).await?;

let records = hudi_table.read_incremental_records("0", None).await?;
assert!(records.is_empty());

for base_url in SampleTable::V6Empty.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let records = hudi_table.read_incremental_records("0", None).await?;
assert!(records.is_empty())
}
Ok(())
}

#[tokio::test]
async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await?;

// read records changed from the first commit (exclusive) to the second commit (inclusive)
let records = hudi_table
.read_incremental_records("20240707001301554", Some("20240707001302376"))
.await?;
assert_eq!(records.len(), 2);
assert_eq!(records[0].num_rows(), 1);
assert_eq!(records[1].num_rows(), 1);

// verify the partition paths
let partition_paths = StringArray::from(
arrow::compute::concat(&[
records[0].column_by_name("_hoodie_partition_path").unwrap(),
records[1].column_by_name("_hoodie_partition_path").unwrap(),
])?
.to_data(),
);
let actual_partition_paths =
HashSet::<&str>::from_iter(partition_paths.iter().map(|s| s.unwrap()));
let expected_partition_paths = HashSet::from_iter(vec!["10", "30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let file_names = StringArray::from(
arrow::compute::concat(&[
records[0].column_by_name("_hoodie_file_name").unwrap(),
records[1].column_by_name("_hoodie_file_name").unwrap(),
])?
.to_data(),
);
let actual_file_names =
HashSet::<&str>::from_iter(file_names.iter().map(|s| s.unwrap()));
let expected_file_names = HashSet::from_iter(vec![
"d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet",
"4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);

// read records changed from the first commit (exclusive) to
// the latest (an insert overwrite table's replacecommit)
let records = hudi_table
.read_incremental_records("20240707001301554", None)
.await?;
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 1);

// verify the partition paths
let actual_partition_paths = StringArray::from(
records[0]
.column_by_name("_hoodie_partition_path")
.unwrap()
.to_data(),
);
let expected_partition_paths = StringArray::from(vec!["30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let actual_file_names = StringArray::from(
records[0]
.column_by_name("_hoodie_file_name")
.unwrap()
.to_data(),
);
let expected_file_names = StringArray::from(vec![
"ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);
for base_url in SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
.iter()
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
assert_eq!(commit_timestamps.len(), 3);
let first_commit = commit_timestamps[0];
let second_commit = commit_timestamps[1];
let third_commit = commit_timestamps[2];

// read records changed from the beginning to the 1st commit
let records = hudi_table
.read_incremental_records("19700101000000", Some(first_commit))
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", true),],
"Should return 3 records inserted in the 1st commit"
);

// read records changed from the 1st to the 2nd commit
let records = hudi_table
.read_incremental_records(first_commit, Some(second_commit))
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(1, "Alice", false), (4, "Diana", true),],
"Should return 2 records inserted or updated in the 2nd commit"
);

// read records changed from the 2nd to the 3rd commit
let records = hudi_table
.read_incremental_records(second_commit, Some(third_commit))
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(4, "Diana", false),],
"Should return 1 record insert-overwritten in the 3rd commit"
);

// read records changed from the 1st commit
let records = hudi_table
.read_incremental_records(first_commit, None)
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(4, "Diana", false),],
"Should return 1 record insert-overwritten in the 3rd commit"
);

// read records changed from the 3rd commit
let records = hudi_table
.read_incremental_records(third_commit, None)
.await?;
assert!(
records.is_empty(),
"Should return 0 record as it's the latest commit"
);
}
Ok(())
}
}
Expand Down
23 changes: 23 additions & 0 deletions crates/core/src/timeline/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::Result;
use chrono::{DateTime, Utc};
use std::sync::Arc;

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct InstantRange {
timezone: String,
Expand Down Expand Up @@ -62,6 +63,28 @@ impl InstantRange {
)
}

/// Create a new [InstantRange] with an open timestamp range.
pub fn within(start_timestamp: &str, end_timestamp: &str, timezone: &str) -> Self {

Check warning on line 67 in crates/core/src/timeline/selector.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/timeline/selector.rs#L67

Added line #L67 was not covered by tests
Self::new(
timezone.to_string(),
Some(start_timestamp.to_string()),
Some(end_timestamp.to_string()),

Check warning on line 71 in crates/core/src/timeline/selector.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/timeline/selector.rs#L69-L71

Added lines #L69 - L71 were not covered by tests
false,
false,
)
}

/// Create a new [InstantRange] with an open start and closed end timestamp range.
pub fn within_open_closed(start_timestamp: &str, end_timestamp: &str, timezone: &str) -> Self {
Self::new(
timezone.to_string(),
Some(start_timestamp.to_string()),
Some(end_timestamp.to_string()),
false,
true,
)
}

pub fn timezone(&self) -> &str {
&self.timezone

Check warning on line 89 in crates/core/src/timeline/selector.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/timeline/selector.rs#L88-L89

Added lines #L88 - L89 were not covered by tests
}
Expand Down
Loading

0 comments on commit d26cebd

Please sign in to comment.