-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[flink] Supports debezium-bson format of kafka which collected from mongodb via debezium #4870
Conversation
c37818d
to
e96aa44
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing! I left some comments, please take a look.
...c/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumBsonRecordParser.java
Outdated
Show resolved
Hide resolved
for (Map.Entry<String, BsonValue> entry : document.entrySet()) { | ||
String fieldName = entry.getKey(); | ||
resultMap.put(fieldName, toJsonString(BsonValueConvertor.convert(entry.getValue()))); | ||
rowTypeBuilder.field(fieldName, DataTypes.STRING()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can map bson type to actual paimon type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because mongodb does not have a schema, the data type of the same field in different documents may be different. For safety reasons, string type is used. By the way, this is the same as mongodb-cdc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition, BsonValueConvertor is used here for data conversion, while mongodb-cdc does not convert data. There will be inconsistencies here, and it may be necessary to discuss.
For example:
The data of mongodb-cdc is:
{
"_id": "{\"$oid\":\"64001c996f4de7ff3189d374\"}",
"updated_at": "{\"$numberLong\":\"1732232838425\"}}"
}
The data after BsonValueConvertor conversion is:
{
"_id": "64001c996f4de7ff3189d374",
"updated_at": "1732232838425"
}
I think it is also possible to configure whether to use BsonValueConvertor for conversion through TypeMapping
…from mongodb via debezium
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose
Linked issue: open #4615
Tests
API and Format
Documentation