Skip to content

Commit

Permalink
Merge branch 'main' into move-unity
Browse files Browse the repository at this point in the history
  • Loading branch information
hntd187 authored Dec 5, 2024
2 parents 713c92c + 98f8b0b commit 2ae4011
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 90 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.4.1", features = ["sync-engine"] }
delta_kernel = { version = "0.4.1", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.22.2"
version = "0.22.3"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
50 changes: 23 additions & 27 deletions crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,23 +245,14 @@ impl LogicalFile<'_> {

/// Defines a deletion vector
pub fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
if let Some(arr) = self.deletion_vector.as_ref() {
// With v0.22 and the upgrade to a more recent arrow. Reading nullable structs with
// non-nullable entries back out of parquet is resulting in the DeletionVector having
// an empty string rather than a null. The addition check on the value ensures that a
// [DeletionVectorView] is not created in this scenario
//
// <https://github.com/delta-io/delta-rs/issues/3030>
if arr.storage_type.is_valid(self.index)
&& !arr.storage_type.value(self.index).is_empty()
{
return Some(DeletionVectorView {
self.deletion_vector.as_ref().and_then(|arr| {
arr.storage_type
.is_valid(self.index)
.then_some(DeletionVectorView {
data: arr,
index: self.index,
});
}
}
None
})
})
}

/// The number of records stored in the data file.
Expand Down Expand Up @@ -380,18 +371,23 @@ impl<'a> FileStatsAccessor<'a> {
);
let deletion_vector = extract_and_cast_opt::<StructArray>(data, "add.deletionVector");
let deletion_vector = deletion_vector.and_then(|dv| {
let storage_type = extract_and_cast::<StringArray>(dv, "storageType").ok()?;
let path_or_inline_dv = extract_and_cast::<StringArray>(dv, "pathOrInlineDv").ok()?;
let size_in_bytes = extract_and_cast::<Int32Array>(dv, "sizeInBytes").ok()?;
let cardinality = extract_and_cast::<Int64Array>(dv, "cardinality").ok()?;
let offset = extract_and_cast_opt::<Int32Array>(dv, "offset");
Some(DeletionVector {
storage_type,
path_or_inline_dv,
size_in_bytes,
cardinality,
offset,
})
if dv.null_count() == dv.len() {
None
} else {
let storage_type = extract_and_cast::<StringArray>(dv, "storageType").ok()?;
let path_or_inline_dv =
extract_and_cast::<StringArray>(dv, "pathOrInlineDv").ok()?;
let size_in_bytes = extract_and_cast::<Int32Array>(dv, "sizeInBytes").ok()?;
let cardinality = extract_and_cast::<Int64Array>(dv, "cardinality").ok()?;
let offset = extract_and_cast_opt::<Int32Array>(dv, "offset");
Some(DeletionVector {
storage_type,
path_or_inline_dv,
size_in_bytes,
cardinality,
offset,
})
}
});

Ok(Self {
Expand Down
90 changes: 58 additions & 32 deletions crates/core/src/kernel/snapshot/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<Add
let mut result = Vec::new();

if let Some(arr) = ex::extract_and_cast_opt::<StructArray>(array, "add") {
// Stop early if all values are null
if arr.null_count() == arr.len() {
return Ok(vec![]);
}
let path = ex::extract_and_cast::<StringArray>(arr, "path")?;
let pvs = ex::extract_and_cast_opt::<MapArray>(arr, "partitionValues");
let size = ex::extract_and_cast::<Int64Array>(arr, "size")?;
Expand All @@ -94,22 +98,33 @@ pub(super) fn read_adds(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<Add
let size_in_bytes = ex::extract_and_cast::<Int32Array>(d, "sizeInBytes")?;
let cardinality = ex::extract_and_cast::<Int64Array>(d, "cardinality")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| None)
} else {
Box::new(|idx: usize| {
d.is_valid(idx)
.then(|| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)
.ok()?
.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
.flatten()
})
}
} else {
Box::new(|_| None)
};
Expand Down Expand Up @@ -210,22 +225,33 @@ pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult<Vec<
let size_in_bytes = ex::extract_and_cast::<Int32Array>(d, "sizeInBytes")?;
let cardinality = ex::extract_and_cast::<Int64Array>(d, "cardinality")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx).ok()?.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| None)
} else {
Box::new(|idx: usize| {
d.is_valid(idx)
.then(|| {
if ex::read_str(storage_type, idx).is_ok() {
Some(DeletionVectorDescriptor {
storage_type: std::str::FromStr::from_str(
ex::read_str(storage_type, idx).ok()?,
)
.ok()?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)
.ok()?
.to_string(),
offset: ex::read_primitive_opt(offset, idx),
size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?,
cardinality: ex::read_primitive(cardinality, idx).ok()?,
})
} else {
None
}
})
.flatten()
})
}
} else {
Box::new(|_| None)
};
Expand Down
43 changes: 31 additions & 12 deletions crates/core/src/kernel/snapshot/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use hashbrown::HashSet;
use itertools::Itertools;
use percent_encoding::percent_decode_str;
use pin_project_lite::pin_project;
use tracing::debug;
use tracing::log::*;

use super::parse::collect_map;
use super::ReplayVisitor;
Expand Down Expand Up @@ -438,6 +438,14 @@ pub(super) struct DVInfo<'a> {
fn seen_key(info: &FileInfo<'_>) -> String {
let path = percent_decode_str(info.path).decode_utf8_lossy();
if let Some(dv) = &info.dv {
// If storage_type is empty then delta-rs has somehow gotten an empty rather than a null
// deletion vector, oooof
//
// See #3030
if dv.storage_type.is_empty() {
warn!("An empty but not nullable deletionVector was seen for {info:?}");
return path.to_string();
}
if let Some(offset) = &dv.offset {
format!(
"{}::{}{}@{offset}",
Expand Down Expand Up @@ -549,22 +557,32 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult<Vec<Opti
let path_or_inline_dv = ex::extract_and_cast::<StringArray>(d, "pathOrInlineDv")?;
let offset = ex::extract_and_cast::<Int32Array>(d, "offset")?;

Box::new(|idx: usize| {
if ex::read_str(storage_type, idx).is_ok() {
Ok(Some(DVInfo {
storage_type: ex::read_str(storage_type, idx)?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?,
offset: ex::read_primitive_opt(offset, idx),
}))
} else {
Ok(None)
}
})
// Column might exist but have nullability set for the whole array, so we just return Nones
if d.null_count() == d.len() {
Box::new(|_| Ok(None))
} else {
Box::new(|idx: usize| {
if d.is_valid(idx) {
if ex::read_str(storage_type, idx).is_ok() {
Ok(Some(DVInfo {
storage_type: ex::read_str(storage_type, idx)?,
path_or_inline_dv: ex::read_str(path_or_inline_dv, idx)?,
offset: ex::read_primitive_opt(offset, idx),
}))
} else {
Ok(None)
}
} else {
Ok(None)
}
})
}
} else {
Box::new(|_| Ok(None))
};

let mut adds = Vec::with_capacity(path.len());

for idx in 0..path.len() {
let value = path
.is_valid(idx)
Expand All @@ -577,6 +595,7 @@ fn read_file_info<'a>(arr: &'a dyn ProvidesColumnByName) -> DeltaResult<Vec<Opti
.transpose()?;
adds.push(value);
}

Ok(adds)
}

Expand Down
26 changes: 23 additions & 3 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,10 +1163,16 @@ mod tests {
}

/// <https://github.com/delta-io/delta-rs/issues/3030>
#[cfg(feature = "datafusion")]
#[tokio::test]
async fn test_create_checkpoint_overwrite() -> DeltaResult<()> {
use crate::protocol::SaveMode;
use crate::writer::test_utils::datafusion::get_data_sorted;
use crate::writer::test_utils::get_arrow_schema;
use datafusion::assert_batches_sorted_eq;

let tmp_dir = tempfile::tempdir().unwrap();
let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();

let batch = RecordBatch::try_new(
Arc::clone(&get_arrow_schema(&None)),
Expand All @@ -1177,13 +1183,15 @@ mod tests {
],
)
.unwrap();
let table = DeltaOps::try_from_uri_with_storage_options("memory://", HashMap::default())

let mut table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap())
.await?
.write(vec![batch])
.await?;
table.load().await?;
assert_eq!(table.version(), 0);

create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref()).await?;
create_checkpoint(&table).await?;

let batch = RecordBatch::try_new(
Arc::clone(&get_arrow_schema(&None)),
Expand All @@ -1194,11 +1202,23 @@ mod tests {
],
)
.unwrap();
let table = DeltaOps(table)

let table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap())
.await?
.write(vec![batch])
.with_save_mode(SaveMode::Overwrite)
.await?;
assert_eq!(table.version(), 1);

let expected = [
"+----+-------+------------+",
"| id | value | modified |",
"+----+-------+------------+",
"| A | 0 | 2021-02-02 |",
"+----+-------+------------+",
];
let actual = get_data_sorted(&table, "id,value,modified").await;
assert_batches_sorted_eq!(&expected, &actual);
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake"
version = "0.22.2"
version = "0.22.3"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-python"
version = "0.22.2"
version = "0.22.3"
authors = ["Qingping Hou <[email protected]>", "Will Jones <[email protected]>"]
homepage = "https://github.com/delta-io/delta-rs"
license = "Apache-2.0"
Expand Down
25 changes: 13 additions & 12 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,18 +483,19 @@ def test_checkpoint_with_multiple_writes(tmp_path: pathlib.Path):
}
),
)
DeltaTable(tmp_path).create_checkpoint()
dt = DeltaTable(tmp_path)
dt.create_checkpoint()
assert dt.version() == 0
df = pd.DataFrame(
{
"a": ["a"],
"b": [100],
}
)
write_deltalake(tmp_path, df, mode="overwrite")

dt = DeltaTable(tmp_path)
assert dt.version() == 1
new_df = dt.to_pandas()
print(dt.to_pandas())

write_deltalake(
tmp_path,
pd.DataFrame(
{
"a": ["a"],
"b": [100],
}
),
mode="overwrite",
)
assert len(new_df) == 1, "We overwrote! there should only be one row"

0 comments on commit 2ae4011

Please sign in to comment.