Skip to content

Commit

Permalink
feat: support incremental read MOR tables (#258)
Browse files Browse the repository at this point in the history
Adjust the arguments to pass time range to file group readers to support incremental read for MOR tables.
  • Loading branch information
xushiyan authored Jan 22, 2025
1 parent 493697b commit b658a98
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 98 deletions.
51 changes: 36 additions & 15 deletions crates/core/src/file_group/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,42 @@ pub fn build_file_groups(commit_metadata: &Map<String, Value>) -> Result<HashSet
.and_then(|v| v.as_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid fileId in write stats".into()))?;

let path = stat
.get("path")
.and_then(|v| v.as_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid path in write stats".into()))?;

let file_name = Path::new(path)
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid file name in path".into()))?;

let file_group = FileGroup::new_with_base_file_name(
file_id.to_string(),
partition.clone(),
file_name,
)?;
let mut file_group = FileGroup::new(file_id.to_string(), partition.clone());

if let Some(base_file_name) = stat.get("baseFile") {
let base_file_name = base_file_name
.as_str()
.ok_or_else(|| CoreError::CommitMetadata("Invalid base file name".into()))?;
file_group.add_base_file_from_name(base_file_name)?;

if let Some(log_file_names) = stat.get("logFiles") {
let log_file_names = log_file_names.as_array().ok_or_else(|| {
CoreError::CommitMetadata("Invalid log files array".into())
})?;
for log_file_name in log_file_names {
let log_file_name = log_file_name.as_str().ok_or_else(|| {
CoreError::CommitMetadata("Invalid log file name".into())
})?;
file_group.add_log_file_from_name(log_file_name)?;
}
} else {
return Err(CoreError::CommitMetadata(
"Missing log files in write stats".into(),
));
}
} else {
let path = stat.get("path").and_then(|v| v.as_str()).ok_or_else(|| {
CoreError::CommitMetadata("Invalid path in write stats".into())
})?;

let file_name = Path::new(path)
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| CoreError::CommitMetadata("Invalid file name in path".into()))?;

file_group.add_base_file_from_name(file_name)?;
}

file_groups.insert(file_group);
}
}
Expand Down
167 changes: 85 additions & 82 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl Table {
end_timestamp: Option<&str>,
) -> Result<Vec<RecordBatch>> {
// If the end timestamp is not provided, use the latest commit timestamp.
let Some(as_of_timestamp) =
let Some(end_timestamp) =
end_timestamp.or_else(|| self.timeline.get_latest_commit_timestamp())
else {
return Ok(Vec::new());
Expand All @@ -336,24 +336,25 @@ impl Table {
let mut file_slices: Vec<FileSlice> = Vec::new();
let file_groups = self
.timeline
.get_incremental_file_groups(Some(start_timestamp), Some(as_of_timestamp))
.get_incremental_file_groups(Some(start_timestamp), Some(end_timestamp))
.await?;
for file_group in file_groups {
if let Some(file_slice) = file_group.get_file_slice_as_of(as_of_timestamp) {
if let Some(file_slice) = file_group.get_file_slice_as_of(end_timestamp) {
file_slices.push(file_slice.clone());
}
}

// Read incremental records from the file slices.
let filters = &[
FilterField::new(MetaField::CommitTime.as_ref()).gt(start_timestamp),
FilterField::new(MetaField::CommitTime.as_ref()).lte(as_of_timestamp),
FilterField::new(MetaField::CommitTime.as_ref()).lte(end_timestamp),
];
let fg_reader =
self.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())?;
let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
let instant_range = InstantRange::up_to(as_of_timestamp, &timezone);
let instant_range =
InstantRange::within_open_closed(start_timestamp, end_timestamp, &timezone);
let batches = futures::future::try_join_all(
file_slices
.iter()
Expand Down Expand Up @@ -992,93 +993,95 @@ mod tests {

mod test_incremental_queries {
use super::super::*;
use arrow_array::{Array, StringArray};
use arrow_select::concat::concat_batches;
use hudi_tests::SampleTable;
use std::collections::HashSet;

#[tokio::test]
async fn test_empty() -> Result<()> {
let base_url = SampleTable::V6Empty.url_to_cow();
let hudi_table = Table::new(base_url.path()).await?;

let records = hudi_table.read_incremental_records("0", None).await?;
assert!(records.is_empty());

for base_url in SampleTable::V6Empty.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let records = hudi_table.read_incremental_records("0", None).await?;
assert!(records.is_empty())
}
Ok(())
}

#[tokio::test]
async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
let base_url = SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await?;

// read records changed from the first commit (exclusive) to the second commit (inclusive)
let records = hudi_table
.read_incremental_records("20240707001301554", Some("20240707001302376"))
.await?;
assert_eq!(records.len(), 2);
assert_eq!(records[0].num_rows(), 1);
assert_eq!(records[1].num_rows(), 1);

// verify the partition paths
let partition_paths = StringArray::from(
arrow::compute::concat(&[
records[0].column_by_name("_hoodie_partition_path").unwrap(),
records[1].column_by_name("_hoodie_partition_path").unwrap(),
])?
.to_data(),
);
let actual_partition_paths =
HashSet::<&str>::from_iter(partition_paths.iter().map(|s| s.unwrap()));
let expected_partition_paths = HashSet::from_iter(vec!["10", "30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let file_names = StringArray::from(
arrow::compute::concat(&[
records[0].column_by_name("_hoodie_file_name").unwrap(),
records[1].column_by_name("_hoodie_file_name").unwrap(),
])?
.to_data(),
);
let actual_file_names =
HashSet::<&str>::from_iter(file_names.iter().map(|s| s.unwrap()));
let expected_file_names = HashSet::from_iter(vec![
"d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet",
"4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);

// read records changed from the first commit (exclusive) to
// the latest (an insert overwrite table's replacecommit)
let records = hudi_table
.read_incremental_records("20240707001301554", None)
.await?;
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 1);

// verify the partition paths
let actual_partition_paths = StringArray::from(
records[0]
.column_by_name("_hoodie_partition_path")
.unwrap()
.to_data(),
);
let expected_partition_paths = StringArray::from(vec!["30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let actual_file_names = StringArray::from(
records[0]
.column_by_name("_hoodie_file_name")
.unwrap()
.to_data(),
);
let expected_file_names = StringArray::from(vec![
"ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);
for base_url in SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
let hudi_table = Table::new(base_url.path()).await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
.iter()
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
assert_eq!(commit_timestamps.len(), 3);
let first_commit = commit_timestamps[0];
let second_commit = commit_timestamps[1];
let third_commit = commit_timestamps[2];

// read records changed from the beginning to the 1st commit
let records = hudi_table
.read_incremental_records("19700101000000", Some(first_commit))
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(1, "Alice", true), (2, "Bob", false), (3, "Carol", true),],
"Should return 3 records inserted in the 1st commit"
);

// read records changed from the 1st to the 2nd commit
let records = hudi_table
.read_incremental_records(first_commit, Some(second_commit))
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(1, "Alice", false), (4, "Diana", true),],
"Should return 2 records inserted or updated in the 2nd commit"
);

// read records changed from the 2nd to the 3rd commit
let records = hudi_table
.read_incremental_records(second_commit, Some(third_commit))
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(4, "Diana", false),],
"Should return 1 record insert-overwritten in the 3rd commit"
);

// read records changed from the 1st commit
let records = hudi_table
.read_incremental_records(first_commit, None)
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data = SampleTable::sample_data_order_by_id(&records);
assert_eq!(
sample_data,
vec![(4, "Diana", false),],
"Should return 1 record insert-overwritten in the 3rd commit"
);

// read records changed from the 3rd commit
let records = hudi_table
.read_incremental_records(third_commit, None)
.await?;
assert!(
records.is_empty(),
"Should return 0 record as it's the latest commit"
);
}
Ok(())
}
}
Expand Down
48 changes: 47 additions & 1 deletion crates/core/src/timeline/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::Result;
use chrono::{DateTime, Utc};
use std::sync::Arc;

#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct InstantRange {
timezone: String,
Expand All @@ -51,7 +52,7 @@ impl InstantRange {
}
}

/// Create a new [InstantRange] with an end timestamp inclusive.
/// Create a new [InstantRange] with a closed end timestamp range.
pub fn up_to(end_timestamp: &str, timezone: &str) -> Self {
Self::new(
timezone.to_string(),
Expand All @@ -62,6 +63,28 @@ impl InstantRange {
)
}

/// Create a new [InstantRange] with an open timestamp range.
pub fn within(start_timestamp: &str, end_timestamp: &str, timezone: &str) -> Self {
Self::new(
timezone.to_string(),
Some(start_timestamp.to_string()),
Some(end_timestamp.to_string()),
false,
false,
)
}

/// Create a new [InstantRange] with an open start and closed end timestamp range.
pub fn within_open_closed(start_timestamp: &str, end_timestamp: &str, timezone: &str) -> Self {
Self::new(
timezone.to_string(),
Some(start_timestamp.to_string()),
Some(end_timestamp.to_string()),
false,
true,
)
}

pub fn timezone(&self) -> &str {
&self.timezone
}
Expand Down Expand Up @@ -304,6 +327,29 @@ mod tests {
assert!(range.end_inclusive);
}

#[test]
fn test_within() {
let range = InstantRange::within("20240101000000000", "20241231235959999", "UTC");

assert_eq!(range.timezone(), "UTC");
assert_eq!(range.start_timestamp.as_deref(), Some("20240101000000000"));
assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
assert!(!range.start_inclusive);
assert!(!range.end_inclusive);
}

#[test]
fn test_within_open_closed() {
let range =
InstantRange::within_open_closed("20240101000000000", "20241231235959999", "UTC");

assert_eq!(range.timezone(), "UTC");
assert_eq!(range.start_timestamp.as_deref(), Some("20240101000000000"));
assert_eq!(range.end_timestamp.as_deref(), Some("20241231235959999"));
assert!(!range.start_inclusive);
assert!(range.end_inclusive);
}

#[test]
fn test_is_in_range_inclusive_bounds() {
let range = InstantRange::new(
Expand Down
Loading

0 comments on commit b658a98

Please sign in to comment.