-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Option to inject kafka metadata #205
Conversation
tabmatfournier
commented
Mar 9, 2024
•
edited
Loading
edited
- adds KafkaMetadataTransform to inject metadata to Struct/Map records
- adds option to inject Kafka metadata to both DMS and Debezium SMTs w/o an additional record copy (using an internally nested KafkaMetadataAppender)
- can be configured to inject an (optional) static field such as the (external) kafka cluster address/name
- add option to inject kafka metadata if Debezium or DMS messages are SinkRecords
...onnect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java
Outdated
Show resolved
Hide resolved
...orms/src/test/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppenderTest.java
Outdated
Show resolved
Hide resolved
kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DmsTransform.java
Outdated
Show resolved
Hide resolved
...nect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/util/RecordAppender.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite close, think we can wrap this up today.
The only major comment is I don't think offset field should be optional.
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Show resolved
Hide resolved
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
...ansforms/src/main/java/io/tabular/iceberg/connect/transforms/util/KafkaMetadataAppender.java
Outdated
Show resolved
Hide resolved
Addressed all of this. |
...t-transforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private SinkRecord applySchemaless(SinkRecord record) { | ||
Map<String, Object> value = Requirements.requireMap(record.value(), "KafkaMetadata transform"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Map<String, Object> value = Requirements.requireMap(record.value(), "KafkaMetadata transform"); | |
Map<String, Object> value = Requirements.requireMap(record.value(), "KafkaMetadataTransform"); |