-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Write DVs for V3 MoR tables #11561
base: main
Are you sure you want to change the base?
Spark: Write DVs for V3 MoR tables #11561
Conversation
7df6e83
to
0b63df8
Compare
Still some failing tests, and figuring out a good pattern to extend the existing Delete/Merge/Update tests to run against DVs. Something also worth thinking about for V3 is preventing partition granularity from being set since it's at odds with DVs. |
* PartitioningDVWriter is a PartitioningWriter implementation which writes DVs for a given file | ||
* position | ||
*/ | ||
public class PartitioningDVWriter<T> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote this to avoid changing quite a bit of machinery in SparkPositionDeltaWrite which relies on PartitioningWriter. With this adapter we can just return this writer at a single point, and the existing delegate logic in SparkPositionDeltaWrite just works (line 495 in SparkPositionDeltaWRite). Another advantage is PartitioningWriter is a long standing interface which user's existing code could've been written against and so this implementation could just fit that interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively we'd have to update SparkPositioNDeltaWrite DeleteOnlyDataWriter and DataAndDelteWriter to take in a function as a delegate (instead of a writer as a delegate) and the function either just uses the existing DV writer for V3 tables or goes to the Fanout writers. But this just makes it more complicated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having this class here makes a lot of sense and I've reused it for #11545
0b63df8
to
b116637
Compare
b116637
to
0fdfd0c
Compare
...sions/src/test/java/org/apache/iceberg/spark/extensions/SparkRowLevelOperationsTestBase.java
Outdated
Show resolved
Hide resolved
0fdfd0c
to
b698904
Compare
5e8e727
to
0f00dfc
Compare
0f00dfc
to
546f6b8
Compare
546f6b8
to
e6806d0
Compare
@@ -521,7 +528,10 @@ public void deleteSingleRecordProducesDeleteOperation() throws NoSuchTableExcept | |||
} else { | |||
// this is a RowDelta that produces a "delete" instead of "overwrite" | |||
validateMergeOnRead(currentSnapshot, "1", "1", null); | |||
validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1"); | |||
TableOperations ops = ((HasTableOperations) table).operations(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need ops
to get the version. The super class already has formatVersion
defined, so we can just examine that field
validateProperty(currentSnapshot, ADD_POS_DELETE_FILES_PROP, "1"); | ||
TableOperations ops = ((HasTableOperations) table).operations(); | ||
String property = | ||
ops.current().formatVersion() >= 3 ? ADDED_DVS_PROP : ADD_POS_DELETE_FILES_PROP; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ops.current().formatVersion() >= 3 ? ADDED_DVS_PROP : ADD_POS_DELETE_FILES_PROP; | |
formatVersion >= 3 ? ADDED_DVS_PROP : ADD_POS_DELETE_FILES_PROP; |
@@ -397,4 +450,9 @@ protected void assertAllBatchScansVectorized(SparkPlan plan) { | |||
List<SparkPlan> batchScans = SparkPlanUtil.collectBatchScans(plan); | |||
assertThat(batchScans).hasSizeGreaterThan(0).allMatch(SparkPlan::supportsColumnar); | |||
} | |||
|
|||
protected int formatVersion(Table table) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is necessary anymore
No description provided.