Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use kernel table features #3117

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "0.6.0", features = ["default-engine"] }
delta_kernel = { version = "=0.6.0", features = ["default-engine"] }
#delta_kernel = { path = "../delta-kernel-rs/kernel", features = ["sync-engine"] }

# arrow
Expand Down Expand Up @@ -59,6 +59,8 @@ datafusion-sql = { version = "44" }
# serde
serde = { version = "1.0.194", features = ["derive"] }
serde_json = "1"
strum = { version = "*"}


# "stdlib"
bytes = { version = "1" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ datafusion-functions-aggregate = { workspace = true, optional = true }
# serde
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum = { workspace = true}

# "stdlib"
bytes = { workspace = true }
Expand Down
259 changes: 42 additions & 217 deletions crates/core/src/kernel/models/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use url::Url;
use super::schema::StructType;
use crate::kernel::{error::Error, DeltaResult};
use crate::TableProperty;
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};

/// Defines a file format used in table
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -183,18 +184,37 @@ impl Protocol {
mut self,
configuration: &HashMap<String, Option<String>>,
) -> Protocol {
fn parse_bool(value: &Option<String>) -> bool {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
}

if self.min_writer_version >= 7 {
// TODO: move this is in future to use delta_kernel::table_properties
let mut converted_writer_features = configuration
.iter()
.filter(|(_, value)| {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
})
.collect::<HashMap<&String, &Option<String>>>()
.keys()
.map(|key| (*key).clone().into())
.filter(|v| !matches!(v, WriterFeatures::Other(_)))
.filter_map(|(key, value)| match key.as_str() {
"delta.enableChangeDataFeed" if parse_bool(value) => {
Some(WriterFeatures::ChangeDataFeed)
}
"delta.appendOnly" if parse_bool(value) => Some(WriterFeatures::AppendOnly),
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(WriterFeatures::DeletionVectors)
}
"delta.enableRowTracking" if parse_bool(value) => {
Some(WriterFeatures::RowTracking)
}
"delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => {
Some(WriterFeatures::V2Checkpoint)
}
_ => None,
})
.collect::<HashSet<WriterFeatures>>();

if configuration
Expand All @@ -215,13 +235,15 @@ impl Protocol {
if self.min_reader_version >= 3 {
let converted_reader_features = configuration
.iter()
.filter(|(_, value)| {
value
.as_ref()
.is_some_and(|v| v.to_ascii_lowercase().parse::<bool>().is_ok_and(|v| v))
.filter_map(|(key, value)| match key.as_str() {
"delta.enableDeletionVectors" if parse_bool(value) => {
Some(ReaderFeatures::DeletionVectors)
}
"delta.checkpointPolicy" if value.clone().unwrap_or_default() == "v2" => {
Some(ReaderFeatures::V2Checkpoint)
}
_ => None,
})
.map(|(key, _)| (*key).clone().into())
.filter(|v| !matches!(v, ReaderFeatures::Other(_)))
.collect::<HashSet<ReaderFeatures>>();
match self.reader_features {
Some(mut features) => {
Expand Down Expand Up @@ -459,225 +481,28 @@ impl fmt::Display for TableFeatures {
}
}

impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

/// Features table readers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum ReaderFeatures {
/// Mapping of one column to another
ColumnMapping,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// version 2 of checkpointing
V2Checkpoint,
/// If we do not match any other reader features
#[serde(untagged)]
Other(String),
}

impl From<&parquet::record::Field> for ReaderFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"columnMapping" => ReaderFeatures::ColumnMapping,
"deletionVectors" | "delta.enableDeletionVectors" => {
ReaderFeatures::DeletionVectors
}
"timestampNtz" => ReaderFeatures::TimestampWithoutTimezone,
"v2Checkpoint" => ReaderFeatures::V2Checkpoint,
f => ReaderFeatures::Other(f.to_string()),
},
f => ReaderFeatures::Other(f.to_string()),
}
}
}

impl From<String> for ReaderFeatures {
fn from(value: String) -> Self {
value.as_str().into()
}
}

impl From<&str> for ReaderFeatures {
fn from(value: &str) -> Self {
match value {
"columnMapping" => ReaderFeatures::ColumnMapping,
"deletionVectors" => ReaderFeatures::DeletionVectors,
"timestampNtz" => ReaderFeatures::TimestampWithoutTimezone,
"v2Checkpoint" => ReaderFeatures::V2Checkpoint,
f => ReaderFeatures::Other(f.to_string()),
}
}
}

impl AsRef<str> for ReaderFeatures {
fn as_ref(&self) -> &str {
match self {
ReaderFeatures::ColumnMapping => "columnMapping",
ReaderFeatures::DeletionVectors => "deletionVectors",
ReaderFeatures::TimestampWithoutTimezone => "timestampNtz",
ReaderFeatures::V2Checkpoint => "v2Checkpoint",
ReaderFeatures::Other(f) => f,
}
}
}

impl fmt::Display for ReaderFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
}
}

impl TryFrom<&TableFeatures> for ReaderFeatures {
type Error = String;
type Error = strum::ParseError;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match ReaderFeatures::from(value.as_ref()) {
ReaderFeatures::Other(_) => {
Err(format!("Table feature {} is not a reader feature", value))
}
value => Ok(value),
}
}
}

/// Features table writers can support as well as let users know
/// what is supported
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum WriterFeatures {
/// Append Only Tables
AppendOnly,
/// Table invariants
Invariants,
/// Check constraints on columns
CheckConstraints,
/// CDF on a table
ChangeDataFeed,
/// Columns with generated values
GeneratedColumns,
/// Mapping of one column to another
ColumnMapping,
/// ID Columns
IdentityColumns,
/// Deletion vectors for merge, update, delete
DeletionVectors,
/// Row tracking on tables
RowTracking,
/// timestamps without timezone support
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
/// domain specific metadata
DomainMetadata,
/// version 2 of checkpointing
V2Checkpoint,
/// Iceberg compatibility support
IcebergCompatV1,
/// If we do not match any other reader features
#[serde(untagged)]
Other(String),
}

impl From<String> for WriterFeatures {
fn from(value: String) -> Self {
value.as_str().into()
}
}

impl From<&str> for WriterFeatures {
fn from(value: &str) -> Self {
match value {
"appendOnly" | "delta.appendOnly" => WriterFeatures::AppendOnly,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" | "delta.enableChangeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
"identityColumns" => WriterFeatures::IdentityColumns,
"deletionVectors" | "delta.enableDeletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" | "delta.enableRowTracking" => WriterFeatures::RowTracking,
"timestampNtz" => WriterFeatures::TimestampWithoutTimezone,
"domainMetadata" => WriterFeatures::DomainMetadata,
"v2Checkpoint" => WriterFeatures::V2Checkpoint,
"icebergCompatV1" => WriterFeatures::IcebergCompatV1,
f => WriterFeatures::Other(f.to_string()),
}
}
}

impl AsRef<str> for WriterFeatures {
fn as_ref(&self) -> &str {
match self {
WriterFeatures::AppendOnly => "appendOnly",
WriterFeatures::Invariants => "invariants",
WriterFeatures::CheckConstraints => "checkConstraints",
WriterFeatures::ChangeDataFeed => "changeDataFeed",
WriterFeatures::GeneratedColumns => "generatedColumns",
WriterFeatures::ColumnMapping => "columnMapping",
WriterFeatures::IdentityColumns => "identityColumns",
WriterFeatures::DeletionVectors => "deletionVectors",
WriterFeatures::RowTracking => "rowTracking",
WriterFeatures::TimestampWithoutTimezone => "timestampNtz",
WriterFeatures::DomainMetadata => "domainMetadata",
WriterFeatures::V2Checkpoint => "v2Checkpoint",
WriterFeatures::IcebergCompatV1 => "icebergCompatV1",
WriterFeatures::Other(f) => f,
}
}
}

impl fmt::Display for WriterFeatures {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_ref())
ReaderFeatures::try_from(value.as_ref())
}
}

impl TryFrom<&TableFeatures> for WriterFeatures {
type Error = String;
type Error = strum::ParseError;

fn try_from(value: &TableFeatures) -> Result<Self, Self::Error> {
match WriterFeatures::from(value.as_ref()) {
WriterFeatures::Other(_) => {
Err(format!("Table feature {} is not a writer feature", value))
}
value => Ok(value),
}
WriterFeatures::try_from(value.as_ref())
}
}

impl From<&parquet::record::Field> for WriterFeatures {
fn from(value: &parquet::record::Field) -> Self {
match value {
parquet::record::Field::Str(feature) => match feature.as_str() {
"appendOnly" => WriterFeatures::AppendOnly,
"invariants" => WriterFeatures::Invariants,
"checkConstraints" => WriterFeatures::CheckConstraints,
"changeDataFeed" => WriterFeatures::ChangeDataFeed,
"generatedColumns" => WriterFeatures::GeneratedColumns,
"columnMapping" => WriterFeatures::ColumnMapping,
"identityColumns" => WriterFeatures::IdentityColumns,
"deletionVectors" => WriterFeatures::DeletionVectors,
"rowTracking" => WriterFeatures::RowTracking,
"timestampNtz" => WriterFeatures::TimestampWithoutTimezone,
"domainMetadata" => WriterFeatures::DomainMetadata,
"v2Checkpoint" => WriterFeatures::V2Checkpoint,
"icebergCompatV1" => WriterFeatures::IcebergCompatV1,
f => WriterFeatures::Other(f.to_string()),
},
f => WriterFeatures::Other(f.to_string()),
}
impl TableFeatures {
/// Convert table feature to respective reader or/and write feature
pub fn to_reader_writer_features(&self) -> (Option<ReaderFeatures>, Option<WriterFeatures>) {
let reader_feature = ReaderFeatures::try_from(self).ok();
let writer_feature = WriterFeatures::try_from(self).ok();
(reader_feature, writer_feature)
}
}

Expand Down
6 changes: 4 additions & 2 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,9 @@ pub(super) async fn list_log_files(

#[cfg(test)]
pub(super) mod tests {
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};
use deltalake_test::utils::*;
use maplit::hashset;
use tokio::task::JoinHandle;

use crate::{
Expand Down Expand Up @@ -637,8 +639,8 @@ pub(super) mod tests {
let expected = Protocol {
min_reader_version: 3,
min_writer_version: 7,
reader_features: Some(vec!["deletionVectors".into()].into_iter().collect()),
writer_features: Some(vec!["deletionVectors".into()].into_iter().collect()),
reader_features: Some(hashset! {ReaderFeatures::DeletionVectors}),
writer_features: Some(hashset! {WriterFeatures::DeletionVectors}),
};
assert_eq!(protocol, expected);

Expand Down
17 changes: 13 additions & 4 deletions crates/core/src/kernel/snapshot/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use arrow_array::{
Array, BooleanArray, Int32Array, Int64Array, ListArray, MapArray, StringArray, StructArray,
};
use delta_kernel::table_features::{ReaderFeatures, WriterFeatures};
use percent_encoding::percent_decode_str;

use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName};
Expand Down Expand Up @@ -63,10 +64,18 @@ pub(super) fn read_protocol(batch: &dyn ProvidesColumnByName) -> DeltaResult<Opt
return Ok(Some(Protocol {
min_reader_version: ex::read_primitive(min_reader_version, idx)?,
min_writer_version: ex::read_primitive(min_writer_version, idx)?,
reader_features: collect_string_list(&maybe_reader_features, idx)
.map(|v| v.into_iter().map(Into::into).collect()),
writer_features: collect_string_list(&maybe_writer_features, idx)
.map(|v| v.into_iter().map(Into::into).collect()),
reader_features: collect_string_list(&maybe_reader_features, idx).map(|v| {
v.into_iter()
.map(|v| TryInto::<ReaderFeatures>::try_into(v.as_str()))
.filter_map(|v| v.ok())
.collect()
}),
writer_features: collect_string_list(&maybe_writer_features, idx).map(|v| {
v.into_iter()
.map(|v| TryInto::<WriterFeatures>::try_into(v.as_str()))
.filter_map(|v| v.ok())
.collect()
}),
}));
}
}
Expand Down
Loading
Loading