-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[kv] Support first row merge engine #240
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need add validations for FlinkTableSink
that first_row
table doesn't supports UPDATE/DELETE and partial-update! And add tests to verify that.
fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java
Outdated
Show resolved
Hide resolved
...nnector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java
Outdated
Show resolved
Hide resolved
...nnector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java
Outdated
Show resolved
Hide resolved
@wuchong Comments addressed... |
if (mergeEngine != null && mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) { | ||
throw new UnsupportedOperationException( | ||
String.format( | ||
"Table %s is with merge engine '%s'. Table with '%s' merge engine doesn't support DELETE and UPDATE statements.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Table xxx uses the 'first_row' merge engine, which does not support DELETE or UPDATE statements.
@@ -712,6 +745,35 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() { | |||
"Currently, Fluss table only supports UPDATE statement with conditions on primary key."); | |||
} | |||
|
|||
@Test | |||
void testUnsupportedDeleteAndUpdateStmtOnFirstRowMergeEngine() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
verify partial update as well!
key(ConfigOptions.TABLE_MERGE_ENGINE.key()) | ||
.enumType(ConfigOptions.MergeEngine.class) | ||
.noDefaultValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuse the definition?
...uss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java
Outdated
Show resolved
Hide resolved
...connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java
Outdated
Show resolved
Hide resolved
e64b962
to
9de11b8
Compare
@wuchong Comments addressed again... |
9de11b8
to
1353756
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @luoyuxia I appended a commit to improve the config operations:
- Move
MergeEngine
out ofConfigOptions
, as it is very widely used in the code base, just likeKvFormat
. - use
toFlinkOption
for quick/easy conversion from Fluss ConfigOption to Flink ConfigOption, avoid duplicate definition inFlinkTableFactory
.
The CI is passed. I will merge it... @luoyuxia please check the commit I appended. |
1 similar comment
The CI is passed. I will merge it... @luoyuxia please check the commit I appended. |
Purpose
Linked issue: close #133
Introduce first row merge engine for primary key table
Tests
FlussTableITCase#testFirstRowMergeEngine
FlinkTableSinkITCase#testFirstRowMergeEngine
API and Format
N/A
Documentation
I'd like to complete the document in another pr #241 since after introduce merge engine, we may need to refactor the document struture about primary key table.