-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[kv] Support version merge engine #277
base: main
Are you sure you want to change the base?
Conversation
944bdb3
to
7f115bf
Compare
@wuchong CLA has been sent, PTAL |
Thanks for the contribution @sunxiaojian ! Will take a look. cc @luoyuxia as well. |
Btw, the checkstyle is failed: https://github.com/alibaba/fluss/actions/runs/12515363835/job/34959455034?pr=277 |
b1363d6
to
5b67c20
Compare
fixed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sunxiaojian Thanks for the pr... I left some comments... PTAL
throw new IllegalArgumentException( | ||
"When the merge engine is set to version, the 'table.merge-engine.version.column' cannot be empty."); | ||
} | ||
return new MergeEngine(Type.VERSION, column); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need two checks for merge_engine_version_column:
1: the column is in the table
2: the data type of the column is supported as version column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
> oldRow.getTimestampLtz(fieldIndex, ((TimestampType) dataType).getPrecision()) | ||
.toEpochMicros(); | ||
} else { | ||
throw new FlussRuntimeException("Unsupported data type: " + dataType.asSummaryString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate the data type of version column while creating the table...
And I think TINYINT
/SMALLINT
should also be supported..
Maybe DATE
type also can be supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be frequently used as a version column, but can be expanded later if necessary
import com.alibaba.fluss.record.KvRecord; | ||
|
||
/** Merge engine wrapper for table. */ | ||
public interface MergeEngineWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
method writeRecord,getAppendedRecordCount, getLogOffset is an interface method, but only AbstractMergeEngineWrapper
...And the overwrite method update
by VersionMergeEngineWrapper is not an interface method.. Looks a little strange to me...
And there's some duplicate code piece in method update
between AbstractMergeEngineWrapper
and VersionMergeEngineWrapper
..
I'm wondering whether it'll be better to organize these classes as following:
1:
Introduce an interface RowMergeEngine
public interface RowMergeEngine {
/**
* Upsert the old row with the new row. Return the row after upsert, null if the new row is ignored.
*
* @param oldRow the old row
* @param newRow the new row
*/
@Nullable
BinaryRow upsertRow(BinaryRow oldRow, BinaryRow newRow);
}
2: And then implement it with a VersionRowMergeEngine
:
public class VersionRowMergeEngine implements RowMergeEngine {
@Nullable
@Override
public BinaryRow upsertRow(BinaryRow oldRow, BinaryRow newRow) {
RowType rowType = schema.toRowType();
if (checkNewRowVersion(mergeEngine, rowType, oldRow, newRow)) {
return newRow;
}
return null;
}
}
3: Then, we won't need to extract the put kv logic to AbstractMergeEngineWrapper
, but hold a RowMergeEngine
in KvTablet.. If RowMergeEngine
is not null, we use
RowMergeEngine to merge the row to a new row.. May be some thing like:
byte[] oldValue = getFromBufferOrKv(key);
// it's update
if (oldValue != null) {
if (rowMergeEngine == null) {
newRow = updateRow(
oldRow, kvRecord.getRow(), partialUpdater);
} else {
newRow = rowMergeEngine.mergeRow(oldRow, kvRecord.getRow());
}
if (newRow != null) {
walBuilder.append(RowKind.UPDATE_BEFORE, oldRow);
walBuilder.append(RowKind.UPDATE_AFTER, newRow);
appendedRecordCount += 2;
....
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks,Let me think about how to make better modifications first
} | ||
DataType dataType = rowType.getTypeAt(fieldIndex); | ||
if (dataType instanceof BigIntType) { | ||
return newRow.getLong(fieldIndex) > oldRow.getLong(fieldIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the field value is null? We should use InternalRow.FieldGetter
and handle null value...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -124,7 +124,8 @@ void beforeEach() throws Exception { | |||
conf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a ut for version merge engine in KvTabletTest
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
@@ -67,17 +68,21 @@ public class FlinkTableSink | |||
private boolean appliedUpdates = false; | |||
@Nullable private GenericRow deleteRow; | |||
|
|||
private final MergeEngine mergeEngine; | |||
|
|||
public FlinkTableSink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add validation to throw exception in class FlinkTableSink
that update/delete/partial update is not supported in the version merge engine and a test to verify the exception should be thrown...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it should support updates and partial updates if your update includes this version field
@luoyuxia Thanks for the review. I will handle it as soon as possible |
f53900a
to
c289ee1
Compare
c289ee1
to
d3134fe
Compare
Purpose
Support version merge engine
Linked issue: close #213
Tests
com.alibaba.fluss.connector.flink.sink.FlinkTableSinkITCase#testVersionMergeEngineWithTypeBigint
com.alibaba.fluss.connector.flink.sink.FlinkTableSinkITCase#testVersionMergeEngineWithTypeTimestamp
com.alibaba.fluss.client.table.FlussTableITCase#testMergeEngineWithVersion
API and Format
No
Documentation
No