Skip to content

Commit

Permalink
Fixed fomatting and test in python plus renamed out of range flag for…
Browse files Browse the repository at this point in the history
… clarity

Signed-off-by: Pablo Cabeza <[email protected]>
  • Loading branch information
pblocz authored and ion-elgreco committed Dec 7, 2024
1 parent d859ad3 commit a79c576
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 37 deletions.
38 changes: 22 additions & 16 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct CdfLoadBuilder {
/// Ending timestamp of commits to accept
ending_timestamp: Option<DateTime<Utc>>,
/// Enable ending version or timestamp exceeding the last commit
enable_out_of_range: bool,
allow_out_of_range: bool,
/// Provided Datafusion context
ctx: SessionContext,
}
Expand All @@ -60,7 +60,7 @@ impl CdfLoadBuilder {
ending_version: None,
starting_timestamp: None,
ending_timestamp: None,
enable_out_of_range: false,
allow_out_of_range: false,
ctx: SessionContext::new(),
}
}
Expand Down Expand Up @@ -96,8 +96,8 @@ impl CdfLoadBuilder {
}

/// Enable ending version or timestamp exceeding the last commit
pub fn with_out_of_range(mut self) -> Self {
self.enable_out_of_range = true;
pub fn with_allow_out_of_range(mut self) -> Self {
self.allow_out_of_range = true;
self
}

Expand All @@ -120,11 +120,11 @@ impl CdfLoadBuilder {
)> {
let start = self.starting_version;
let latest_version = self.log_store.get_latest_version(start).await?;
let mut end = self
.ending_version
.unwrap_or(latest_version);
let mut end = self.ending_version.unwrap_or(latest_version);

if end > latest_version { end = latest_version; }
if end > latest_version {
end = latest_version;
}

if end < start {
return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end });
Expand Down Expand Up @@ -153,7 +153,7 @@ impl CdfLoadBuilder {
.await?
.ok_or(DeltaTableError::InvalidVersion(version));

if snapshot_bytes.is_err() && version >= end && self.enable_out_of_range {
if snapshot_bytes.is_err() && version >= end && self.allow_out_of_range {
break;
}

Expand Down Expand Up @@ -254,8 +254,14 @@ impl CdfLoadBuilder {
}

// All versions were skipped due to date our of range
if !self.enable_out_of_range && change_files.is_empty() && add_files.is_empty() && remove_files.is_empty() {
return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit { ending_timestamp: ending_timestamp });
if !self.allow_out_of_range
&& change_files.is_empty()
&& add_files.is_empty()
&& remove_files.is_empty()
{
return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit {
ending_timestamp: ending_timestamp,
});
}

Ok((change_files, add_files, remove_files))
Expand Down Expand Up @@ -608,7 +614,7 @@ pub(crate) mod tests {
.with_starting_version(5)
.build()
.await;

assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
Expand All @@ -624,7 +630,7 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_version(5)
.with_out_of_range()
.with_allow_out_of_range()
.build()
.await?;

Expand All @@ -650,7 +656,7 @@ pub(crate) mod tests {
.with_starting_timestamp(ending_timestamp.and_utc())
.build()
.await;

assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
Expand All @@ -667,10 +673,10 @@ pub(crate) mod tests {
.await?
.load_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.with_out_of_range()
.with_allow_out_of_range()
.build()
.await?;

let ctx = SessionContext::new();
let batches = collect_batches(
table.properties().output_partitioning().partition_count(),
Expand Down
2 changes: 1 addition & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class RawDeltaTable:
ending_version: Optional[int] = None,
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
enable_out_of_range: bool = False,
allow_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader: ...
def transaction_versions(self) -> Dict[str, Transaction]: ...
def __datafusion_table_provider__(self) -> Any: ...
Expand Down
4 changes: 2 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,15 +689,15 @@ def load_cdf(
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
columns: Optional[List[str]] = None,
enable_out_of_range: bool = False,
allow_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader:
return self._table.load_cdf(
columns=columns,
starting_version=starting_version,
ending_version=ending_version,
starting_timestamp=starting_timestamp,
ending_timestamp=ending_timestamp,
enable_out_of_range=enable_out_of_range,
allow_out_of_range=allow_out_of_range,
)

@property
Expand Down
8 changes: 4 additions & 4 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, enable_out_of_range = false))]
#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))]
pub fn load_cdf(
&mut self,
py: Python,
Expand All @@ -684,7 +684,7 @@ impl RawDeltaTable {
starting_timestamp: Option<String>,
ending_timestamp: Option<String>,
columns: Option<Vec<String>>,
enable_out_of_range: bool,
allow_out_of_range: bool,
) -> PyResult<PyArrowType<ArrowArrayStreamReader>> {
let ctx = SessionContext::new();
let mut cdf_read = CdfLoadBuilder::new(
Expand All @@ -709,8 +709,8 @@ impl RawDeltaTable {
cdf_read = cdf_read.with_ending_timestamp(ending_ts);
}

if enable_out_of_range {
cdf_read = cdf_read.with_out_of_range();
if allow_out_of_range {
cdf_read = cdf_read.with_allow_out_of_range();
}

if let Some(columns) = columns {
Expand Down
28 changes: 14 additions & 14 deletions python/tests/test_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pytest

from deltalake import DeltaTable, _internal, write_deltalake
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaError


def test_read_cdf_partitioned():
Expand Down Expand Up @@ -682,16 +684,15 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table):
def test_read_cdf_version_out_of_range():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")

try:
b = dt.load_cdf(4).read_all().to_pydict()
assert False, "Should not get here"
except _internal.DeltaError as e:
assert "invalid table version" in str(e).lower()
with pytest.raises(DeltaError) as e:
dt.load_cdf(4).read_all().to_pydict()

assert "invalid table version" in str(e).lower()


def test_read_cdf_version_out_of_range_with_flag():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
b = dt.load_cdf(4, enable_out_of_range=True).read_all()
b = dt.load_cdf(4, allow_out_of_range=True).read_all()

assert len(b) == 0

Expand All @@ -700,17 +701,16 @@ def test_read_timestamp_cdf_out_of_range():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
start = "2033-12-22T17:10:21.675Z"

try:
b = dt.load_cdf(starting_timestamp=start).read_all().to_pydict()
assert False, "Should not get here"
except _internal.DeltaError as e:
assert "is greater than latest commit timestamp" in str(e).lower()
with pytest.raises(DeltaError) as e:
dt.load_cdf(starting_timestamp=start).read_all().to_pydict()

assert "is greater than latest commit timestamp" in str(e).lower()


def test_read_timestamp_cdf_out_of_range_with_flag():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")

start = "2033-12-22T17:10:21.675Z"
b = dt.load_cdf(starting_timestamp=start, enable_out_of_range=True).read_all()
b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all()

assert len(b) == 0
assert len(b) == 0

0 comments on commit a79c576

Please sign in to comment.