-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37475] Drop ChangelogNormalize for piping from upsert source to sink #26306
Conversation
2eb62e6
to
eb97525
Compare
.../flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
Outdated
Show resolved
Hide resolved
...rc/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesOnKeyPrograms.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have only a couple of minor comments
...k-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
Outdated
Show resolved
Hide resolved
...k-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
Outdated
Show resolved
Hide resolved
...java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
Outdated
Show resolved
Hide resolved
.../flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java
Outdated
Show resolved
Hide resolved
...le-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala
Show resolved
Hide resolved
.../java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
Outdated
Show resolved
Hide resolved
@@ -577,6 +577,7 @@ void testFromAndToChangelogStreamUpsert() throws Exception { | |||
tableEnv.fromChangelogStream( | |||
changelogStream, | |||
Schema.newBuilder().primaryKey("f0").build(), | |||
// produce partial deletes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the comment incorrect? it produces full deletes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, it's correct. I changed it that, by default upsert
produces partial deletes.
...ources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml
Outdated
Show resolved
Hide resolved
...k-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
Outdated
Show resolved
Hide resolved
...table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
Outdated
Show resolved
Hide resolved
...table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
Outdated
Show resolved
Hide resolved
...e-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
Show resolved
Hide resolved
...k-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
Outdated
Show resolved
Hide resolved
@flinkbot run azure |
…o sink * Add information about DELETE to ChangelogMode * Adapt FlinkChangelogModeInferenceProgram to remove ChangelogNormalize if possible
What is the purpose of the change
Remove ChangelogNormalize if possible for
INSERT INTO SELECT * FROM
Verifying this change
Added plan and semantic tests
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation