-
Notifications
You must be signed in to change notification settings - Fork 139
feat: checkpoint write supports parsed stats #1594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Robert Pack <robstar.pack@gmail.com>
f729a52 to
98458a9
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1594 +/- ##
==========================================
+ Coverage 84.04% 84.35% +0.31%
==========================================
Files 118 120 +2
Lines 32168 33634 +1466
Branches 32168 33634 +1466
==========================================
+ Hits 27035 28373 +1338
- Misses 3789 3879 +90
- Partials 1344 1382 +38 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
d21b024 to
87a1bd5
Compare
87a1bd5 to
d4c3f43
Compare
d4c3f43 to
1029b40
Compare
1029b40 to
b9172ea
Compare
kernel/src/checkpoint/mod.rs
Outdated
| } | ||
|
|
||
| // After all actions, yield the checkpoint metadata batch (if any) unchanged | ||
| self.checkpoint_metadata.take().map(Ok) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on comments abovr this would have a different schema though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, created 2nd checkpoint schema for V2
| /// | ||
| /// # Engine Usage | ||
| /// | ||
| /// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like useful information? Why is it being dropped?
kernel/src/checkpoint/mod.rs
Outdated
| output_schema.clone().into(), | ||
| )?; | ||
|
|
||
| // Create action reconciliation iterator (without checkpoint metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is without checkpoint metadata important point here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed by adding checkpoint metadata into v2 checkpoint schema
| /// let mut checkpoint_data = writer.checkpoint_data(&engine)?; | ||
| /// let output_schema = checkpoint_data.output_schema().clone(); | ||
| /// while let Some(batch) = checkpoint_data.next() { | ||
| /// let data = batch?.apply_selection_vector()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did we not modify parquet to take in FilteredEngineData as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this was ever done
| /// full checkpoint batch is produced with the modified Add action. | ||
| pub(crate) fn build_stats_transform( | ||
| config: &StatsTransformConfig, | ||
| stats_schema: SchemaRef, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to clarify what stats_schema should look like.
| let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties()); | ||
|
|
||
| // Get stats schema from table configuration. | ||
| // This already excludes partition columns and applies column mapping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do need to handle partitionValues_parsed appropriately:
partitionValues_parsed: In this struct, the column names correspond to the partition columns and the values are stored in their corresponding data type. This is a required field when the table is partitioned and the table property delta.checkpoint.writeStatsAsStruct is set to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to do this in a followup PR
| nullable: field.nullable, | ||
| metadata: field.metadata.clone(), | ||
| }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this if statement should never fail, if there is an add action? Can we add a check to ensure it doesn't (we should also validate that stats isn't already included in the ADD struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we separate out the engine change into its own PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a stack -- PR description said not to review this commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apologies i missed "Review only the most recent commit. b9172ea"
|
|
||
| let source_data: &dyn ProvidesColumnByName = match source_data { | ||
| // For nested transforms, get the source struct's null bitmap to preserve null rows | ||
| let source_null_buffer = source_array.as_ref().and_then(|arr| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this should be collapsed into the mach, so we have let source_data, source_null_buffer. Otherwise this could panic on the down-scast for RecordBatch?
| }) | ||
| .collect(); | ||
| let data = StructArray::try_new(output_fields.into(), output_cols, None)?; | ||
| let data = StructArray::try_new(output_fields.into(), output_cols, source_null_buffer)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you, for finding this, I suspect this might enable some code cleanup for changes I made where a lot of columns had to be converted to nullable.
| /// maxValues: <derived min/max schema>, | ||
| /// } | ||
| /// ``` | ||
| pub(crate) fn expected_stats_schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: on naming, parsed_stats_schema_from_config, or something else to indicate this is producing the derived parse stats schema from configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep as is, since writing stats also uses this
| | &PrimitiveType::Timestamp | ||
| | &PrimitiveType::TimestampNtz | ||
| | &PrimitiveType::String | ||
| // | &PrimitiveType::Boolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is bool commented out?
|
|
||
| // Get stats schema from table configuration. | ||
| // This already excludes partition columns and applies column mapping. | ||
| let stats_schema = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One question i have is whether we want this coupling to TableConfiguration (and automatic derivation of the stats schema), here or if the schema should be taken as a parameter. I thought we were trying to keep kernel code decoupled from config in general. CC @nicklan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Engine should be free to fetch those confs (or override them), while kernel should just do whatever engine said. This does seem like a reasonable default behavior tho, so probably we just need to split out the "with defaults" version that engines can use if they don't want to figure it out themselves?
| let physical_schema = StructType::try_new( | ||
| self.schema() | ||
| .fields() | ||
| .filter(|field| !partition_columns.contains(field.name())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment on why partition columns are filtered out.
| /// schema for statistics based on the table configuration. Often times the consfigration | ||
| /// is based on operator experience or automates systems as to what statistics are most | ||
| /// useful for a given table. | ||
| pub fn expected_stats_schema(&self) -> DeltaResult<SchemaRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment on naming, "expected_stats_schema" doesn't quite make sense to me. stats_schema or parsed_stats_schema make more sense. It would be good to document where this is expected to be used (it can't be used within add actions yet right? So does it need to be public at this point?
It would also be good to document how column mapping mode effects the schema.
If this will be a public function long term then copying/moving the details about which configuration options impact (instead of the delegating) is important. Including a basic example the stats schema returned here given a specific input schema.
Another open question here is whether as a public facing thing returned schemas, columns here should be column major (or stat major as they currently are). I think keeping stat major is probably OK.
emkornfield
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave it up to other reviewers but something to keep in mind for future PRs, is it seems this could easily be divided into 3 more focused PRs:
- Engine changes to support ParseJSON
- Schema construction changes.
- the checkpoint changes
I think better docs at least initially is the main blocker for me for merging, and understanding some of the iterator changes.
scovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These stats schema questions probably belong on a different PR but I couldn't easily find it so leaving them here instead...
| .unwrap_or(true); | ||
|
|
||
| if !should_include { | ||
| self.path.pop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A stray ? could very easily break this path stack protocol.
Can we create a newtype wrapper with appropriate impl Drop instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, but that would break the recursive self.transform call, because the &mut self.path would make the borrow checker unhappy. Hmm.
scovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice start! I don't see anything worrisome in the approach.
| Arc::new(Expression::variadic( | ||
| VariadicExpressionOp::Coalesce, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably time to define an Expression::coalesce helper method, since there are already three call sites like this upstream, and now we're adding two new ones with this PR?
| vec![ | ||
| Expression::column([ADD_NAME, STATS_FIELD]), | ||
| Expression::unary( | ||
| UnaryExpressionOp::ToJson, | ||
| Expression::column([ADD_NAME, STATS_PARSED_FIELD]), | ||
| ), | ||
| ], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fully static. We could potentially define it as a static init instead?
| // Insert stats_parsed right after stats | ||
| fields.push(StructField::nullable( | ||
| STATS_PARSED_FIELD, | ||
| DataType::Struct(Box::new(stats_schema.clone())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have a TODO somewhere to use Arc instead of Box for DataType::Struct. This might be a good reason to address the TODO pronto -- wide stats schemas will pay a high cost at every query right now.
| let fields: Vec<StructField> = base_schema | ||
| .fields() | ||
| .map(|field| { | ||
| if field.name == ADD_NAME { | ||
| if let DataType::Struct(add_struct) = &field.data_type { | ||
| let modified_add = build_add_output_schema(config, add_struct, stats_schema); | ||
| return StructField { | ||
| name: field.name.clone(), | ||
| data_type: DataType::Struct(Box::new(modified_add)), | ||
| nullable: field.nullable, | ||
| metadata: field.metadata.clone(), | ||
| }; | ||
| } | ||
| } | ||
| field.clone() | ||
| }) | ||
| .collect(); | ||
|
|
||
| Arc::new(StructType::new_unchecked(fields)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nearly identical to build_checkpoint_read_schema_with_stats. Is there a way to factor out the common code?
| arr.as_any() | ||
| .downcast_ref::<StructArray>() | ||
| .and_then(|s| s.nulls().cloned()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to cast here when Array::nulls is available?
Are we intentionally relying on a failed cast to not always propagate the null array?
| let result = parse_json_impl(json_strings, arrow_schema)?; | ||
|
|
||
| // Return as StructArray | ||
| Ok(Arc::new(StructArray::from(result)) as ArrayRef) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit surprised the upcast is needed here?
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta-kernel-rs/pull/1642/files) to review incremental changes. - [**stack/stats-schema**](#1642) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1642/files)] - [stack/write-stats-stack](#1656) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1656/files/f218eef7dafc67390b5fa3de7c4beea9e18acff4..38d6f4f61025af9adc10cd19737c3a6fb724350f)] --------- ## What changes are proposed in this pull request? Followups on stats_schema pr from #1594 <!-- **Uncomment** this section if there are any changes affecting public APIs. Else, **delete** this section. ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested?
Review only the most recent commit. b9172ea
What changes are proposed in this pull request?
Implements checkpoint writing with stats_parsed support, allowing checkpoints to include structured statistics alongside or instead of JSON statistics based on table properties:
New: stats_transform.rs
Modified: checkpoint/mod.rs
Fixed: evaluate_expression.rs
This PR affects the following public APIs
checkpoint_data() return type changed:
DeltaResult<ActionReconciliationIterator>DeltaResult<TransformingCheckpointIterator>finalize() parameter changed:
ActionReconciliationIteratorTransformingCheckpointIteratorNew public types:
How was this change tested?