Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add out_of_range flag to load_cdf #3040

Merged
merged 4 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Exceptions for the deltalake crate
use chrono::{DateTime, Utc};
use object_store::Error as ObjectStoreError;

use crate::operations::transaction::{CommitBuilderError, TransactionError};
Expand Down Expand Up @@ -232,6 +233,9 @@ pub enum DeltaTableError {

#[error("Invalid version start version {start} is greater than version {end}")]
ChangeDataInvalidVersionRange { start: i64, end: i64 },

#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
}

impl From<object_store::path::Error> for DeltaTableError {
Expand Down
158 changes: 148 additions & 10 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct CdfLoadBuilder {
starting_timestamp: Option<DateTime<Utc>>,
/// Ending timestamp of commits to accept
ending_timestamp: Option<DateTime<Utc>>,
/// Enable ending version or timestamp exceeding the last commit
allow_out_of_range: bool,
/// Provided Datafusion context
ctx: SessionContext,
}
Expand All @@ -58,6 +60,7 @@ impl CdfLoadBuilder {
ending_version: None,
starting_timestamp: None,
ending_timestamp: None,
allow_out_of_range: false,
ctx: SessionContext::new(),
}
}
Expand Down Expand Up @@ -92,6 +95,12 @@ impl CdfLoadBuilder {
self
}

/// Enable ending version or timestamp exceeding the last commit
pub fn with_allow_out_of_range(mut self) -> Self {
self.allow_out_of_range = true;
self
}

/// Columns to select
pub fn with_columns(mut self, columns: Vec<String>) -> Self {
self.columns = Some(columns);
Expand All @@ -110,37 +119,82 @@ impl CdfLoadBuilder {
Vec<CdcDataSpec<Remove>>,
)> {
let start = self.starting_version;
let end = self
.ending_version
.unwrap_or(self.log_store.get_latest_version(start).await?);
let latest_version = self.log_store.get_latest_version(0).await?; // Start from 0 since if start > latest commit, the returned commit is not a valid commit
let mut end = self.ending_version.unwrap_or(latest_version);

let mut change_files: Vec<CdcDataSpec<AddCDCFile>> = vec![];
let mut add_files: Vec<CdcDataSpec<Add>> = vec![];
let mut remove_files: Vec<CdcDataSpec<Remove>> = vec![];

if end > latest_version {
end = latest_version;
}

if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::InvalidVersion(start))
};
}

if end < start {
return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end });
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}

let starting_timestamp = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
let ending_timestamp = self
.ending_timestamp
.unwrap_or(DateTime::from(SystemTime::now()));

// Check that starting_timestmp is within boundaries of the latest version
let latest_snapshot_bytes = self
.log_store
.read_commit_entry(latest_version)
.await?
.ok_or(DeltaTableError::InvalidVersion(latest_version));

let latest_version_actions: Vec<Action> =
get_actions(latest_version, latest_snapshot_bytes?).await?;
let latest_version_commit = latest_version_actions
.iter()
.find(|a| matches!(a, Action::CommitInfo(_)));

if let Some(Action::CommitInfo(CommitInfo {
timestamp: Some(latest_timestamp),
..
})) = latest_version_commit
{
if starting_timestamp.timestamp_millis() > *latest_timestamp {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit {
ending_timestamp: ending_timestamp,
})
};
}
}

log::debug!(
"starting timestamp = {:?}, ending timestamp = {:?}",
&starting_timestamp,
&ending_timestamp
);
log::debug!("starting version = {}, ending version = {:?}", start, end);

let mut change_files: Vec<CdcDataSpec<AddCDCFile>> = vec![];
let mut add_files: Vec<CdcDataSpec<Add>> = vec![];
let mut remove_files: Vec<CdcDataSpec<Remove>> = vec![];

for version in start..=end {
let snapshot_bytes = self
.log_store
.read_commit_entry(version)
.await?
.ok_or(DeltaTableError::InvalidVersion(version))?;
let version_actions = get_actions(version, snapshot_bytes).await?;
.ok_or(DeltaTableError::InvalidVersion(version));

let version_actions: Vec<Action> = get_actions(version, snapshot_bytes?).await?;

let mut ts = 0;
let mut cdc_actions = vec![];
Expand Down Expand Up @@ -578,6 +632,90 @@ pub(crate) mod tests {
Ok(())
}

#[tokio::test]
async fn test_load_version_out_of_range() -> TestResult {
let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned")
.await?
.load_cdf()
.with_starting_version(5)
.build()
.await;

assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::InvalidVersion { .. }
));

Ok(())
}

#[tokio::test]
async fn test_load_version_out_of_range_with_flag() -> TestResult {
let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned")
.await?
.load_cdf()
.with_starting_version(5)
.with_allow_out_of_range()
.build()
.await?;

let ctx = SessionContext::new();
let batches = collect_batches(
table.properties().output_partitioning().partition_count(),
table.clone(),
ctx,
)
.await?;

assert!(batches.is_empty());

Ok(())
}

#[tokio::test]
async fn test_load_timestamp_out_of_range() -> TestResult {
let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap();
let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned")
.await?
.load_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.build()
.await;

assert!(table.is_err());
assert!(matches!(
table.unwrap_err(),
DeltaTableError::ChangeDataTimestampGreaterThanCommit { .. }
));

Ok(())
}

#[tokio::test]
async fn test_load_timestamp_out_of_range_with_flag() -> TestResult {
let ending_timestamp = NaiveDateTime::from_str("2033-12-22T17:10:21.675").unwrap();
let table = DeltaOps::try_from_uri("../test/tests/data/cdf-table-non-partitioned")
.await?
.load_cdf()
.with_starting_timestamp(ending_timestamp.and_utc())
.with_allow_out_of_range()
.build()
.await?;

let ctx = SessionContext::new();
let batches = collect_batches(
table.properties().output_partitioning().partition_count(),
table.clone(),
ctx,
)
.await?;

assert!(batches.is_empty());

Ok(())
}

#[tokio::test]
async fn test_load_non_cdf() -> TestResult {
let table = DeltaOps::try_from_uri("../test/tests/data/simple_table")
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class RawDeltaTable:
ending_version: Optional[int] = None,
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
allow_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader: ...
def transaction_versions(self) -> Dict[str, Transaction]: ...
def __datafusion_table_provider__(self) -> Any: ...
Expand Down
2 changes: 2 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,13 +689,15 @@ def load_cdf(
starting_timestamp: Optional[str] = None,
ending_timestamp: Optional[str] = None,
columns: Optional[List[str]] = None,
allow_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader:
return self._table.load_cdf(
columns=columns,
starting_version=starting_version,
ending_version=ending_version,
starting_timestamp=starting_timestamp,
ending_timestamp=ending_timestamp,
allow_out_of_range=allow_out_of_range,
)

@property
Expand Down
7 changes: 6 additions & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ impl RawDeltaTable {
Ok(())
}

#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None))]
#[pyo3(signature = (starting_version = 0, ending_version = None, starting_timestamp = None, ending_timestamp = None, columns = None, allow_out_of_range = false))]
pub fn load_cdf(
&mut self,
py: Python,
Expand All @@ -684,6 +684,7 @@ impl RawDeltaTable {
starting_timestamp: Option<String>,
ending_timestamp: Option<String>,
columns: Option<Vec<String>>,
allow_out_of_range: bool,
) -> PyResult<PyArrowType<ArrowArrayStreamReader>> {
let ctx = SessionContext::new();
let mut cdf_read = CdfLoadBuilder::new(
Expand All @@ -708,6 +709,10 @@ impl RawDeltaTable {
cdf_read = cdf_read.with_ending_timestamp(ending_ts);
}

if allow_out_of_range {
cdf_read = cdf_read.with_allow_out_of_range();
}

if let Some(columns) = columns {
cdf_read = cdf_read.with_columns(columns);
}
Expand Down
37 changes: 37 additions & 0 deletions python/tests/test_cdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.parquet as pq
import pytest

from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import DeltaError


def test_read_cdf_partitioned():
Expand Down Expand Up @@ -677,3 +679,38 @@ def test_write_overwrite_partitioned_cdf(tmp_path, sample_data: pa.Table):
).sort_by(sort_values).select(expected_data.column_names) == pa.concat_tables(
[first_batch, expected_data]
).sort_by(sort_values)


def test_read_cdf_version_out_of_range():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")

with pytest.raises(DeltaError) as e:
dt.load_cdf(4).read_all().to_pydict()

assert "invalid table version" in str(e).lower()


def test_read_cdf_version_out_of_range_with_flag():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
b = dt.load_cdf(4, allow_out_of_range=True).read_all()

assert len(b) == 0


def test_read_timestamp_cdf_out_of_range():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")
start = "2033-12-22T17:10:21.675Z"

with pytest.raises(DeltaError) as e:
dt.load_cdf(starting_timestamp=start).read_all().to_pydict()

assert "is greater than latest commit timestamp" in str(e).lower()


def test_read_timestamp_cdf_out_of_range_with_flag():
dt = DeltaTable("../crates/test/tests/data/cdf-table/")

start = "2033-12-22T17:10:21.675Z"
b = dt.load_cdf(starting_timestamp=start, allow_out_of_range=True).read_all()

assert len(b) == 0
Loading