diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index e6cdb43bf0..f1c0418de4 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -44,7 +44,7 @@ pub struct CdfLoadBuilder { /// Ending timestamp of commits to accept ending_timestamp: Option>, /// Enable ending version or timestamp exceeding the last commit - enable_out_of_range: bool, + allow_out_of_range: bool, /// Provided Datafusion context ctx: SessionContext, } @@ -60,7 +60,7 @@ impl CdfLoadBuilder { ending_version: None, starting_timestamp: None, ending_timestamp: None, - enable_out_of_range: false, + allow_out_of_range: false, ctx: SessionContext::new(), } } @@ -96,8 +96,8 @@ impl CdfLoadBuilder { } /// Enable ending version or timestamp exceeding the last commit - pub fn with_out_of_range(mut self) -> Self { - self.enable_out_of_range = true; + pub fn with_allow_out_of_range(mut self) -> Self { + self.allow_out_of_range = true; self } @@ -120,11 +120,11 @@ impl CdfLoadBuilder { )> { let start = self.starting_version; let latest_version = self.log_store.get_latest_version(start).await?; - let mut end = self - .ending_version - .unwrap_or(latest_version); + let mut end = self.ending_version.unwrap_or(latest_version); - if end > latest_version { end = latest_version; } + if end > latest_version { + end = latest_version; + } if end < start { return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end }); @@ -153,7 +153,7 @@ impl CdfLoadBuilder { .await? .ok_or(DeltaTableError::InvalidVersion(version)); - if snapshot_bytes.is_err() && version >= end && self.enable_out_of_range { + if snapshot_bytes.is_err() && version >= end && self.allow_out_of_range { break; } @@ -254,8 +254,14 @@ impl CdfLoadBuilder { } // All versions were skipped due to date our of range - if !self.enable_out_of_range && change_files.is_empty() && add_files.is_empty() && remove_files.is_empty() { - return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { ending_timestamp: ending_timestamp }); + if !self.allow_out_of_range + && change_files.is_empty() + && add_files.is_empty() + && remove_files.is_empty() + { + return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { + ending_timestamp: ending_timestamp, + }); } Ok((change_files, add_files, remove_files)) @@ -608,7 +614,7 @@ pub(crate) mod tests { .with_starting_version(5) .build() .await; - + assert!(table.is_err()); assert!(matches!( table.unwrap_err(), @@ -624,7 +630,7 @@ pub(crate) mod tests { .await? .load_cdf() .with_starting_version(5) - .with_out_of_range() + .with_allow_out_of_range() .build() .await?; @@ -650,7 +656,7 @@ pub(crate) mod tests { .with_starting_timestamp(ending_timestamp.and_utc()) .build() .await; - + assert!(table.is_err()); assert!(matches!( table.unwrap_err(), @@ -667,10 +673,10 @@ pub(crate) mod tests { .await? .load_cdf() .with_starting_timestamp(ending_timestamp.and_utc()) - .with_out_of_range() + .with_allow_out_of_range() .build() .await?; - + let ctx = SessionContext::new(); let batches = collect_batches( table.properties().output_partitioning().partition_count(), diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index ab2f04eb0d..f19c685118 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -220,7 +220,7 @@ class RawDeltaTable: ending_version: Optional[int] = None, starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, - enable_out_of_range: bool = False, + allow_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: ... def transaction_versions(self) -> Dict[str, Transaction]: ... def __datafusion_table_provider__(self) -> Any: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 7f3f246780..e8fc7d866b 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -689,7 +689,7 @@ def load_cdf( starting_timestamp: Optional[str] = None, ending_timestamp: Optional[str] = None, columns: Optional[List[str]] = None, - enable_out_of_range: bool = False, + allow_out_of_range: bool = False, ) -> pyarrow.RecordBatchReader: return self._table.load_cdf( columns=columns, @@ -697,7 +697,7 @@ def load_cdf( ending_version=ending_version, starting_timestamp=starting_timestamp, ending_timestamp=ending_timestamp, - enable_out_of_range=enable_out_of_range, + allow_out_of_range=allow_out_of_range, ) @property diff --git a/python/src/lib.rs b/python/src/lib.rs index 97aeeec005..cfd27bbdfa 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -675,7 +675,7 @@ impl RawDeltaTable { Ok(()) } - #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, enable_out_of_range = false))] + #[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))] pub fn load_cdf( &mut self, py: Python, @@ -684,7 +684,7 @@ impl RawDeltaTable { starting_timestamp: Option, ending_timestamp: Option, columns: Option>, - enable_out_of_range: bool, + allow_out_of_range: bool, ) -> PyResult> { let ctx = SessionContext::new(); let mut cdf_read = CdfLoadBuilder::new( @@ -709,8 +709,8 @@ impl RawDeltaTable { cdf_read = cdf_read.with_ending_timestamp(ending_ts); } - if enable_out_of_range { - cdf_read = cdf_read.with_out_of_range(); + if allow_out_of_range { + cdf_read = cdf_read.with_allow_out_of_range(); } if let Some(columns) = columns { diff --git a/python/tests/test_cdf.py b/python/tests/test_cdf.py index de512cc99b..3dcdd457fb 100644 --- a/python/tests/test_cdf.py +++ b/python/tests/test_cdf.py @@ -5,8 +5,10 @@ import pyarrow.compute as pc import pyarrow.dataset as ds import pyarrow.parquet as pq +import pytest -from deltalake import DeltaTable, _internal, write_deltalake +from deltalake import DeltaTable, write_deltalake +from deltalake.exceptions import DeltaError def test_read_cdf_partitioned(): @@ -682,16 +684,15 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table): def test_read_cdf_version_out_of_range(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") - try: - b = dt.load_cdf(4).read_all().to_pydict() - assert False, "Should not get here" - except _internal.DeltaError as e: - assert "invalid table version" in str(e).lower() + with pytest.raises(DeltaError) as e: + dt.load_cdf(4).read_all().to_pydict() + + assert "invalid table version" in str(e).lower() def test_read_cdf_version_out_of_range_with_flag(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") - b = dt.load_cdf(4, enable_out_of_range=True).read_all() + b = dt.load_cdf(4, allow_out_of_range=True).read_all() assert len(b) == 0 @@ -700,17 +701,16 @@ def test_read_timestamp_cdf_out_of_range(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") start = "2033-12-22T17:10:21.675Z" - try: - b = dt.load_cdf(starting_timestamp=start).read_all().to_pydict() - assert False, "Should not get here" - except _internal.DeltaError as e: - assert "is greater than latest commit timestamp" in str(e).lower() + with pytest.raises(DeltaError) as e: + dt.load_cdf(starting_timestamp=start).read_all().to_pydict() + + assert "is greater than latest commit timestamp" in str(e).lower() def test_read_timestamp_cdf_out_of_range_with_flag(): dt = DeltaTable("../crates/test/tests/data/cdf-table/") start = "2033-12-22T17:10:21.675Z" - b = dt.load_cdf(starting_timestamp=start, enable_out_of_range=True).read_all() + b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all() - assert len(b) == 0 \ No newline at end of file + assert len(b) == 0