From 5c3a5de29700029bf83fe51b9ba0f1df51ab3ac5 Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 02:56:53 +0000 Subject: [PATCH 1/4] Added enable out of range to load_cdf Signed-off-by: Pablo Cabeza --- crates/core/src/errors.rs | 4 + crates/core/src/operations/load_cdf.rs | 114 ++++++++++++++++++++++++- 2 files changed, 114 insertions(+), 4 deletions(-) diff --git a/crates/core/src/errors.rs b/crates/core/src/errors.rs index 609bc16656..e3447cad72 100644 --- a/crates/core/src/errors.rs +++ b/crates/core/src/errors.rs @@ -1,4 +1,5 @@ //! Exceptions for the deltalake crate +use chrono::{DateTime, Utc}; use object_store::Error as ObjectStoreError; use crate::operations::transaction::{CommitBuilderError, TransactionError}; @@ -232,6 +233,9 @@ pub enum DeltaTableError { #[error("Invalid version start version {start} is greater than version {end}")] ChangeDataInvalidVersionRange { start: i64, end: i64 }, + + #[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")] + ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime }, } impl From for DeltaTableError { diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index ad2986de80..e6cdb43bf0 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -43,6 +43,8 @@ pub struct CdfLoadBuilder { starting_timestamp: Option>, /// Ending timestamp of commits to accept ending_timestamp: Option>, + /// Enable ending version or timestamp exceeding the last commit + enable_out_of_range: bool, /// Provided Datafusion context ctx: SessionContext, } @@ -58,6 +60,7 @@ impl CdfLoadBuilder { ending_version: None, starting_timestamp: None, ending_timestamp: None, + enable_out_of_range: false, ctx: SessionContext::new(), } } @@ -92,6 +95,12 @@ impl CdfLoadBuilder { self } + /// Enable ending version or timestamp exceeding the last commit + pub fn with_out_of_range(mut self) -> Self { + self.enable_out_of_range = true; + self + } + /// Columns to select pub fn with_columns(mut self, columns: Vec) -> Self { self.columns = Some(columns); @@ -110,9 +119,12 @@ impl CdfLoadBuilder { Vec>, )> { let start = self.starting_version; - let end = self + let latest_version = self.log_store.get_latest_version(start).await?; + let mut end = self .ending_version - .unwrap_or(self.log_store.get_latest_version(start).await?); + .unwrap_or(latest_version); + + if end > latest_version { end = latest_version; } if end < start { return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }); @@ -139,8 +151,13 @@ impl CdfLoadBuilder { .log_store .read_commit_entry(version) .await? - .ok_or(DeltaTableError::InvalidVersion(version))?; - let version_actions = get_actions(version, snapshot_bytes).await?; + .ok_or(DeltaTableError::InvalidVersion(version)); + + if snapshot_bytes.is_err() && version >= end && self.enable_out_of_range { + break; + } + + let version_actions: Vec = get_actions(version, snapshot_bytes?).await?; let mut ts = 0; let mut cdc_actions = vec![]; @@ -236,6 +253,11 @@ impl CdfLoadBuilder { } } + // All versions were skipped due to date our of range + if !self.enable_out_of_range && change_files.is_empty() && add_files.is_empty() && remove_files.is_empty() { + return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { ending_timestamp: ending_timestamp }); + } + Ok((change_files, add_files, remove_files)) } @@ -578,6 +600,90 @@ pub(crate) mod tests { Ok(()) } + #[tokio::test] + async fn test_load_version_out_of_range() -> TestResult { + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_version(5) + .build() + .await; + + assert!(table.is_err()); + assert!(matches!( + table.unwrap_err(), + DeltaTableError::InvalidVersion { .. } + )); + + Ok(()) + } + + #[tokio::test] + async fn test_load_version_out_of_range_with_flag() -> TestResult { + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_version(5) + .with_out_of_range() + .build() + .await?; + + let ctx = SessionContext::new(); + let batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table.clone(), + ctx, + ) + .await?; + + assert!(batches.is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn test_load_timestamp_out_of_range() -> TestResult { + let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap(); + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_timestamp(ending_timestamp.and_utc()) + .build() + .await; + + assert!(table.is_err()); + assert!(matches!( + table.unwrap_err(), + DeltaTableError::ChangeDataTimestampGreaterThanCommit { .. } + )); + + Ok(()) + } + + #[tokio::test] + async fn test_load_timestamp_out_of_range_with_flag() -> TestResult { + let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap(); + let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned") + .await? + .load_cdf() + .with_starting_timestamp(ending_timestamp.and_utc()) + .with_out_of_range() + .build() + .await?; + + let ctx = SessionContext::new(); + let batches = collect_batches( + table.properties().output_partitioning().partition_count(), + table.clone(), + ctx, + ) + .await?; + + assert!(batches.is_empty()); + + Ok(()) + } + #[tokio::test] async fn test_load_non_cdf() -> TestResult { let table = DeltaOps::try_from_uri("../test/tests/data/simple_table") From 01b5bab6a78222438330a1854913198c9f4b6f0e Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 03:39:35 +0000 Subject: [PATCH 2/4] Added enable out of range to python Signed-off-by: Pablo Cabeza --- python/deltalake/_internal.pyi | 1 + python/deltalake/table.py | 2 ++ python/src/lib.rs | 7 +++++- python/tests/test_cdf.py | 39 +++++++++++++++++++++++++++++++++- 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 052cf1ebb6..ab2f04eb0d 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -220,6 +220,7 @@ class RawDeltaTable: ending_version: Optional[int] = None, starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, + enable_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: ... def transaction_versions(self) -> Dict[str, Transaction]: ... def __datafusion_table_provider__(self) -> Any: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 247a2b9527..7f3f246780 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -689,6 +689,7 @@ def load_cdf( starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, columns: Optional[List[str]] = None, + enable_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: return self._table.load_cdf( columns=columns, @@ -696,6 +697,7 @@ def load_cdf( ending_version=ending_version, starting_timestamp=starting_timestamp, ending_timestamp=ending_timestamp, + enable_out_of_range=enable_out_of_range, ) @property diff --git a/python/src/lib.rs b/python/src/lib.rs index c4a4d80b78..97aeeec005 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -675,7 +675,7 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None))] + #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, enable_out_of_range = false))] pub fn load_cdf( &mut self, py: Python, @@ -684,6 +684,7 @@ impl RawDeltaTable { starting_timestamp: Option, ending_timestamp: Option, columns: Option>, + enable_out_of_range: bool, ) -> PyResult> { let ctx = SessionContext::new(); let mut cdf_read = CdfLoadBuilder::new( @@ -708,6 +709,10 @@ impl RawDeltaTable { cdf_read = cdf_read.with_ending_timestamp(ending_ts); } + if enable_out_of_range { + cdf_read = cdf_read.with_out_of_range(); + } + if let Some(columns) = columns { cdf_read = cdf_read.with_columns(columns); } diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index 36d94c9f99..de512cc99b 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -6,7 +6,7 @@ import pyarrow.dataset as ds import pyarrow.parquet as pq -from deltalake import DeltaTable, write_deltalake +from deltalake import DeltaTable, _internal, write_deltalake def test_read_cdf_partitioned(): @@ -677,3 +677,40 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table): ).sort_by(sort_values).select(expected_data.column_names) == pa.concat_tables( [first_batch, expected_data] ).sort_by(sort_values) + + +def test_read_cdf_version_out_of_range(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + + try: + b = dt.load_cdf(4).read_all().to_pydict() + assert False, "Should not get here" + except _internal.DeltaError as e: + assert "invalid table version" in str(e).lower() + + +def test_read_cdf_version_out_of_range_with_flag(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + b = dt.load_cdf(4, enable_out_of_range=True).read_all() + + assert len(b) == 0 + + +def test_read_timestamp_cdf_out_of_range(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + start = "2033-12-22T17:10:21.675Z" + + try: + b = dt.load_cdf(starting_timestamp=start).read_all().to_pydict() + assert False, "Should not get here" + except _internal.DeltaError as e: + assert "is greater than latest commit timestamp" in str(e).lower() + + +def test_read_timestamp_cdf_out_of_range_with_flag(): + dt = DeltaTable("../crates/test/tests/data/cdf-table/") + + start = "2033-12-22T17:10:21.675Z" + b = dt.load_cdf(starting_timestamp=start, enable_out_of_range=True).read_all() + + assert len(b) == 0 \ No newline at end of file From ebffafb6329be05e4abd1fae7839a31a86d27724 Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 16:26:47 +0000 Subject: [PATCH 3/4] Fixed fomatting and test in python plus renamed out of range flag for clarity Signed-off-by: Pablo Cabeza --- crates/core/src/operations/load_cdf.rs | 38 +++++++++++++++----------- python/deltalake/_internal.pyi | 2 +- python/deltalake/table.py | 4 +-- python/src/lib.rs | 8 +++--- python/tests/test_cdf.py | 28 +++++++++---------- 5 files changed, 43 insertions(+), 37 deletions(-) diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index e6cdb43bf0..f1c0418de4 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -44,7 +44,7 @@ pub struct CdfLoadBuilder { /// Ending timestamp of commits to accept ending_timestamp: Option>, /// Enable ending version or timestamp exceeding the last commit - enable_out_of_range: bool, + allow_out_of_range: bool, /// Provided Datafusion context ctx: SessionContext, } @@ -60,7 +60,7 @@ impl CdfLoadBuilder { ending_version: None, starting_timestamp: None, ending_timestamp: None, - enable_out_of_range: false, + allow_out_of_range: false, ctx: SessionContext::new(), } } @@ -96,8 +96,8 @@ impl CdfLoadBuilder { } /// Enable ending version or timestamp exceeding the last commit - pub fn with_out_of_range(mut self) -> Self { - self.enable_out_of_range = true; + pub fn with_allow_out_of_range(mut self) -> Self { + self.allow_out_of_range = true; self } @@ -120,11 +120,11 @@ impl CdfLoadBuilder { )> { let start = self.starting_version; let latest_version = self.log_store.get_latest_version(start).await?; - let mut end = self - .ending_version - .unwrap_or(latest_version); + let mut end = self.ending_version.unwrap_or(latest_version); - if end > latest_version { end = latest_version; } + if end > latest_version { + end = latest_version; + } if end < start { return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }); @@ -153,7 +153,7 @@ impl CdfLoadBuilder { .await? .ok_or(DeltaTableError::InvalidVersion(version)); - if snapshot_bytes.is_err() && version >= end && self.enable_out_of_range { + if snapshot_bytes.is_err() && version >= end && self.allow_out_of_range { break; } @@ -254,8 +254,14 @@ impl CdfLoadBuilder { } // All versions were skipped due to date our of range - if !self.enable_out_of_range && change_files.is_empty() && add_files.is_empty() && remove_files.is_empty() { - return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { ending_timestamp: ending_timestamp }); + if !self.allow_out_of_range + && change_files.is_empty() + && add_files.is_empty() + && remove_files.is_empty() + { + return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { + ending_timestamp: ending_timestamp, + }); } Ok((change_files, add_files, remove_files)) @@ -608,7 +614,7 @@ pub(crate) mod tests { .with_starting_version(5) .build() .await; - + assert!(table.is_err()); assert!(matches!( table.unwrap_err(), @@ -624,7 +630,7 @@ pub(crate) mod tests { .await? .load_cdf() .with_starting_version(5) - .with_out_of_range() + .with_allow_out_of_range() .build() .await?; @@ -650,7 +656,7 @@ pub(crate) mod tests { .with_starting_timestamp(ending_timestamp.and_utc()) .build() .await; - + assert!(table.is_err()); assert!(matches!( table.unwrap_err(), @@ -667,10 +673,10 @@ pub(crate) mod tests { .await? .load_cdf() .with_starting_timestamp(ending_timestamp.and_utc()) - .with_out_of_range() + .with_allow_out_of_range() .build() .await?; - + let ctx = SessionContext::new(); let batches = collect_batches( table.properties().output_partitioning().partition_count(), diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index ab2f04eb0d..f19c685118 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -220,7 +220,7 @@ class RawDeltaTable: ending_version: Optional[int] = None, starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, - enable_out_of_range: bool = False, + allow_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: ... def transaction_versions(self) -> Dict[str, Transaction]: ... def __datafusion_table_provider__(self) -> Any: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 7f3f246780..e8fc7d866b 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -689,7 +689,7 @@ def load_cdf( starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, columns: Optional[List[str]] = None, - enable_out_of_range: bool = False, + allow_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: return self._table.load_cdf( columns=columns, @@ -697,7 +697,7 @@ def load_cdf( ending_version=ending_version, starting_timestamp=starting_timestamp, ending_timestamp=ending_timestamp, - enable_out_of_range=enable_out_of_range, + allow_out_of_range=allow_out_of_range, ) @property diff --git a/python/src/lib.rs b/python/src/lib.rs index 97aeeec005..cfd27bbdfa 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -675,7 +675,7 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, enable_out_of_range = false))] + #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))] pub fn load_cdf( &mut self, py: Python, @@ -684,7 +684,7 @@ impl RawDeltaTable { starting_timestamp: Option, ending_timestamp: Option, columns: Option>, - enable_out_of_range: bool, + allow_out_of_range: bool, ) -> PyResult> { let ctx = SessionContext::new(); let mut cdf_read = CdfLoadBuilder::new( @@ -709,8 +709,8 @@ impl RawDeltaTable { cdf_read = cdf_read.with_ending_timestamp(ending_ts); } - if enable_out_of_range { - cdf_read = cdf_read.with_out_of_range(); + if allow_out_of_range { + cdf_read = cdf_read.with_allow_out_of_range(); } if let Some(columns) = columns { diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index de512cc99b..3dcdd457fb 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -5,8 +5,10 @@ import pyarrow.compute as pc import pyarrow.dataset as ds import pyarrow.parquet as pq +import pytest -from deltalake import DeltaTable, _internal, write_deltalake +from deltalake import DeltaTable, write_deltalake +from deltalake.exceptions import DeltaError def test_read_cdf_partitioned(): @@ -682,16 +684,15 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table): def test_read_cdf_version_out_of_range(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") - try: - b = dt.load_cdf(4).read_all().to_pydict() - assert False, "Should not get here" - except _internal.DeltaError as e: - assert "invalid table version" in str(e).lower() + with pytest.raises(DeltaError) as e: + dt.load_cdf(4).read_all().to_pydict() + + assert "invalid table version" in str(e).lower() def test_read_cdf_version_out_of_range_with_flag(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") - b = dt.load_cdf(4, enable_out_of_range=True).read_all() + b = dt.load_cdf(4, allow_out_of_range=True).read_all() assert len(b) == 0 @@ -700,17 +701,16 @@ def test_read_timestamp_cdf_out_of_range(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") start = "2033-12-22T17:10:21.675Z" - try: - b = dt.load_cdf(starting_timestamp=start).read_all().to_pydict() - assert False, "Should not get here" - except _internal.DeltaError as e: - assert "is greater than latest commit timestamp" in str(e).lower() + with pytest.raises(DeltaError) as e: + dt.load_cdf(starting_timestamp=start).read_all().to_pydict() + + assert "is greater than latest commit timestamp" in str(e).lower() def test_read_timestamp_cdf_out_of_range_with_flag(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") start = "2033-12-22T17:10:21.675Z" - b = dt.load_cdf(starting_timestamp=start, enable_out_of_range=True).read_all() + b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all() - assert len(b) == 0 \ No newline at end of file + assert len(b) == 0 From b96b93b412c96843c6f9e6ce24e5fd34f9a18a27 Mon Sep 17 00:00:00 2001 From: Pablo Cabeza Date: Sun, 1 Dec 2024 19:04:38 +0000 Subject: [PATCH 4/4] Moved all boundary conditions before the main load_cdf loop Signed-off-by: Pablo Cabeza --- crates/core/src/operations/load_cdf.rs | 68 ++++++++++++++++++-------- 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index f1c0418de4..a63b5182b2 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -119,15 +119,31 @@ impl CdfLoadBuilder { Vec>, )> { let start = self.starting_version; - let latest_version = self.log_store.get_latest_version(start).await?; + let latest_version = self.log_store.get_latest_version(0).await?; // Start from 0 since if start > latest commit, the returned commit is not a valid commit let mut end = self.ending_version.unwrap_or(latest_version); + let mut change_files: Vec> = vec![]; + let mut add_files: Vec> = vec![]; + let mut remove_files: Vec> = vec![]; + if end > latest_version { end = latest_version; } + if start > latest_version { + return if self.allow_out_of_range { + Ok((change_files, add_files, remove_files)) + } else { + Err(DeltaTableError::InvalidVersion(start)) + }; + } + if end < start { - return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }); + return if self.allow_out_of_range { + Ok((change_files, add_files, remove_files)) + } else { + Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }) + }; } let starting_timestamp = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH); @@ -135,6 +151,35 @@ impl CdfLoadBuilder { .ending_timestamp .unwrap_or(DateTime::from(SystemTime::now())); + // Check that starting_timestmp is within boundaries of the latest version + let latest_snapshot_bytes = self + .log_store + .read_commit_entry(latest_version) + .await? + .ok_or(DeltaTableError::InvalidVersion(latest_version)); + + let latest_version_actions: Vec = + get_actions(latest_version, latest_snapshot_bytes?).await?; + let latest_version_commit = latest_version_actions + .iter() + .find(|a| matches!(a, Action::CommitInfo(_))); + + if let Some(Action::CommitInfo(CommitInfo { + timestamp: Some(latest_timestamp), + .. + })) = latest_version_commit + { + if starting_timestamp.timestamp_millis() > *latest_timestamp { + return if self.allow_out_of_range { + Ok((change_files, add_files, remove_files)) + } else { + Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { + ending_timestamp: ending_timestamp, + }) + }; + } + } + log::debug!( "starting timestamp = {:?}, ending timestamp = {:?}", &starting_timestamp, @@ -142,10 +187,6 @@ impl CdfLoadBuilder { ); log::debug!("starting version = {}, ending version = {:?}", start, end); - let mut change_files: Vec> = vec![]; - let mut add_files: Vec> = vec![]; - let mut remove_files: Vec> = vec![]; - for version in start..=end { let snapshot_bytes = self .log_store @@ -153,10 +194,6 @@ impl CdfLoadBuilder { .await? .ok_or(DeltaTableError::InvalidVersion(version)); - if snapshot_bytes.is_err() && version >= end && self.allow_out_of_range { - break; - } - let version_actions: Vec = get_actions(version, snapshot_bytes?).await?; let mut ts = 0; @@ -253,17 +290,6 @@ impl CdfLoadBuilder { } } - // All versions were skipped due to date our of range - if !self.allow_out_of_range - && change_files.is_empty() - && add_files.is_empty() - && remove_files.is_empty() - { - return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { - ending_timestamp: ending_timestamp, - }); - } - Ok((change_files, add_files, remove_files)) }