Skip to content

Commit

Permalink
Moved all boundary conditions before the main load_cdf loop
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Cabeza <[email protected]>
  • Loading branch information
pblocz authored and ion-elgreco committed Dec 7, 2024
1 parent a79c576 commit 2501f2f
Showing 1 changed file with 47 additions and 21 deletions.
68 changes: 47 additions & 21 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,44 +119,81 @@ impl CdfLoadBuilder {
Vec<CdcDataSpec<Remove>>,
)> {
let start = self.starting_version;
let latest_version = self.log_store.get_latest_version(start).await?;
let latest_version = self.log_store.get_latest_version(0).await?; // Start from 0 since if start > latest commit, the returned commit is not a valid commit
let mut end = self.ending_version.unwrap_or(latest_version);

let mut change_files: Vec<CdcDataSpec<AddCDCFile>> = vec![];
let mut add_files: Vec<CdcDataSpec<Add>> = vec![];
let mut remove_files: Vec<CdcDataSpec<Remove>> = vec![];

if end > latest_version {
end = latest_version;
}

if start > latest_version {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::InvalidVersion(start))
};
}

if end < start {
return Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end });
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::ChangeDataInvalidVersionRange { start, end })
};
}

let starting_timestamp = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
let ending_timestamp = self
.ending_timestamp
.unwrap_or(DateTime::from(SystemTime::now()));

// Check that starting_timestmp is within boundaries of the latest version
let latest_snapshot_bytes = self
.log_store
.read_commit_entry(latest_version)
.await?
.ok_or(DeltaTableError::InvalidVersion(latest_version));

let latest_version_actions: Vec<Action> =
get_actions(latest_version, latest_snapshot_bytes?).await?;
let latest_version_commit = latest_version_actions
.iter()
.find(|a| matches!(a, Action::CommitInfo(_)));

if let Some(Action::CommitInfo(CommitInfo {
timestamp: Some(latest_timestamp),
..
})) = latest_version_commit
{
if starting_timestamp.timestamp_millis() > *latest_timestamp {
return if self.allow_out_of_range {
Ok((change_files, add_files, remove_files))
} else {
Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit {
ending_timestamp: ending_timestamp,
})
};
}
}

log::debug!(
"starting timestamp = {:?}, ending timestamp = {:?}",
&starting_timestamp,
&ending_timestamp
);
log::debug!("starting version = {}, ending version = {:?}", start, end);

let mut change_files: Vec<CdcDataSpec<AddCDCFile>> = vec![];
let mut add_files: Vec<CdcDataSpec<Add>> = vec![];
let mut remove_files: Vec<CdcDataSpec<Remove>> = vec![];

for version in start..=end {
let snapshot_bytes = self
.log_store
.read_commit_entry(version)
.await?
.ok_or(DeltaTableError::InvalidVersion(version));

if snapshot_bytes.is_err() && version >= end && self.allow_out_of_range {
break;
}

let version_actions: Vec<Action> = get_actions(version, snapshot_bytes?).await?;

let mut ts = 0;
Expand Down Expand Up @@ -253,17 +290,6 @@ impl CdfLoadBuilder {
}
}

// All versions were skipped due to date our of range
if !self.allow_out_of_range
&& change_files.is_empty()
&& add_files.is_empty()
&& remove_files.is_empty()
{
return Err(DeltaTableError::ChangeDataTimestampGreaterThanCommit {
ending_timestamp: ending_timestamp,
});
}

Ok((change_files, add_files, remove_files))
}

Expand Down

0 comments on commit 2501f2f

Please sign in to comment.