-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Option to inject kafka metadata #205
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
7d88b1d
option-to-inject-kafka-metadata
tabmatfournier 5788722
formatting and docs
tabmatfournier 0cb9d85
fix null checks
tabmatfournier 800de10
tests
tabmatfournier 7600e09
flakey test
tabmatfournier 86486a9
fix final static bug causing test failures
tabmatfournier 2fc31b5
reworked to allow nesting or flattening of kafka metadata
tabmatfournier fef04f8
readme fixes
tabmatfournier ea9efe6
fin
tabmatfournier 2bb6c0d
Merge branch 'main' into option-to-inject-kafka-metadata
tabmatfournier 52cbda9
fix keys
tabmatfournier 779c5dc
Merge branch 'main' into option-to-inject-kafka-metadata
tabmatfournier 1d3ecf6
comments, removing cdc/dms options
tabmatfournier 50a27dd
timestamp
tabmatfournier 84c8e91
comments
tabmatfournier 73ecdfe
void signature
tabmatfournier 70d8b87
readme typos
tabmatfournier 5008611
.
tabmatfournier 274b38d
fix readme
tabmatfournier File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
295 changes: 295 additions & 0 deletions
295
...ransforms/src/main/java/io/tabular/iceberg/connect/transforms/KafkaMetadataTransform.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,295 @@ | ||||||
/* | ||||||
* Licensed to the Apache Software Foundation (ASF) under one | ||||||
* or more contributor license agreements. See the NOTICE file | ||||||
* distributed with this work for additional information | ||||||
* regarding copyright ownership. The ASF licenses this file | ||||||
* to you under the Apache License, Version 2.0 (the | ||||||
* "License"); you may not use this file except in compliance | ||||||
* with the License. You may obtain a copy of the License at | ||||||
* | ||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||
* | ||||||
* Unless required by applicable law or agreed to in writing, | ||||||
* software distributed under the License is distributed on an | ||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||
* KIND, either express or implied. See the License for the | ||||||
* specific language governing permissions and limitations | ||||||
* under the License. | ||||||
*/ | ||||||
package io.tabular.iceberg.connect.transforms; | ||||||
|
||||||
import java.util.Map; | ||||||
import java.util.function.Function; | ||||||
import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||||||
import org.apache.kafka.common.config.ConfigDef; | ||||||
import org.apache.kafka.common.config.ConfigException; | ||||||
import org.apache.kafka.connect.data.Field; | ||||||
import org.apache.kafka.connect.data.Schema; | ||||||
import org.apache.kafka.connect.data.SchemaBuilder; | ||||||
import org.apache.kafka.connect.data.Struct; | ||||||
import org.apache.kafka.connect.sink.SinkRecord; | ||||||
import org.apache.kafka.connect.transforms.Transformation; | ||||||
import org.apache.kafka.connect.transforms.util.Requirements; | ||||||
import org.apache.kafka.connect.transforms.util.SchemaUtil; | ||||||
import org.apache.kafka.connect.transforms.util.SimpleConfig; | ||||||
|
||||||
public class KafkaMetadataTransform implements Transformation<SinkRecord> { | ||||||
|
||||||
private interface RecordAppender { | ||||||
|
||||||
void addToSchema(SchemaBuilder builder); | ||||||
|
||||||
void addToStruct(SinkRecord record, Struct struct); | ||||||
|
||||||
void addToMap(SinkRecord record, Map<String, Object> map); | ||||||
} | ||||||
|
||||||
private static class NoOpRecordAppender implements RecordAppender { | ||||||
|
||||||
@Override | ||||||
public void addToSchema(SchemaBuilder builder) {} | ||||||
|
||||||
@Override | ||||||
public void addToStruct(SinkRecord record, Struct struct) {} | ||||||
|
||||||
@Override | ||||||
public void addToMap(SinkRecord record, Map<String, Object> map) {} | ||||||
} | ||||||
|
||||||
private static RecordAppender getExternalFieldAppender( | ||||||
String field, Function<String, String> fieldNamer) { | ||||||
if (field == null) { | ||||||
return new NoOpRecordAppender(); | ||||||
} | ||||||
String[] parts = field.split(","); | ||||||
if (parts.length != 2) { | ||||||
throw new ConfigException( | ||||||
String.format("Could not parse %s for %s", field, EXTERNAL_KAFKA_METADATA)); | ||||||
} | ||||||
String fieldName = fieldNamer.apply(parts[0]); | ||||||
String fieldValue = parts[1]; | ||||||
return new RecordAppender() { | ||||||
|
||||||
@Override | ||||||
public void addToSchema(SchemaBuilder builder) { | ||||||
builder.field(fieldName, Schema.STRING_SCHEMA); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void addToStruct(SinkRecord record, Struct struct) { | ||||||
struct.put(fieldName, fieldValue); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void addToMap(SinkRecord record, Map<String, Object> map) { | ||||||
map.put(fieldName, fieldValue); | ||||||
} | ||||||
}; | ||||||
} | ||||||
|
||||||
private static final String TOPIC = "topic"; | ||||||
private static final String PARTITION = "partition"; | ||||||
private static final String OFFSET = "offset"; | ||||||
private static final String TIMESTAMP = "timestamp"; | ||||||
private static final String EXTERNAL_KAFKA_METADATA = "external_field"; | ||||||
private static final String KEY_METADATA_FIELD_NAME = "field_name"; | ||||||
private static final String KEY_METADATA_IS_NESTED = "nested"; | ||||||
private static final String DEFAULT_METADATA_FIELD_NAME = "_kafka_metadata"; | ||||||
private static RecordAppender recordAppender; | ||||||
|
||||||
private static final ConfigDef CONFIG_DEF = | ||||||
new ConfigDef() | ||||||
.define( | ||||||
KEY_METADATA_FIELD_NAME, | ||||||
ConfigDef.Type.STRING, | ||||||
DEFAULT_METADATA_FIELD_NAME, | ||||||
ConfigDef.Importance.LOW, | ||||||
"the field to append Kafka metadata under (or prefix fields with)") | ||||||
.define( | ||||||
KEY_METADATA_IS_NESTED, | ||||||
ConfigDef.Type.BOOLEAN, | ||||||
false, | ||||||
ConfigDef.Importance.LOW, | ||||||
"(true/false) to make a nested record under name or prefix names on the top level") | ||||||
.define( | ||||||
EXTERNAL_KAFKA_METADATA, | ||||||
ConfigDef.Type.STRING, | ||||||
null, | ||||||
ConfigDef.Importance.LOW, | ||||||
"key,value representing a String to be injected on Kafka metadata (e.g. Cluster)"); | ||||||
|
||||||
private static RecordAppender getRecordAppender(Map<String, ?> props) { | ||||||
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); | ||||||
return getRecordAppender(config); | ||||||
} | ||||||
|
||||||
private static RecordAppender getRecordAppender(SimpleConfig config) { | ||||||
RecordAppender externalFieldAppender; | ||||||
String metadataFieldName = config.getString(KEY_METADATA_FIELD_NAME); | ||||||
Boolean nestedMetadata = config.getBoolean(KEY_METADATA_IS_NESTED); | ||||||
|
||||||
String topicFieldName; | ||||||
String partitionFieldName; | ||||||
String offsetFieldName; | ||||||
String timestampFieldName; | ||||||
|
||||||
if (nestedMetadata) { | ||||||
externalFieldAppender = | ||||||
getExternalFieldAppender(config.getString(EXTERNAL_KAFKA_METADATA), name -> name); | ||||||
|
||||||
SchemaBuilder nestedSchemaBuilder = SchemaBuilder.struct(); | ||||||
nestedSchemaBuilder | ||||||
.field(TOPIC, Schema.STRING_SCHEMA) | ||||||
.field(PARTITION, Schema.INT32_SCHEMA) | ||||||
.field(OFFSET, Schema.INT64_SCHEMA) | ||||||
.field(TIMESTAMP, Schema.OPTIONAL_INT64_SCHEMA); | ||||||
externalFieldAppender.addToSchema(nestedSchemaBuilder); | ||||||
|
||||||
Schema nestedSchema = nestedSchemaBuilder.build(); | ||||||
|
||||||
return new RecordAppender() { | ||||||
@Override | ||||||
public void addToSchema(SchemaBuilder builder) { | ||||||
builder.field(metadataFieldName, nestedSchema); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void addToStruct(SinkRecord record, Struct struct) { | ||||||
Struct nested = new Struct(nestedSchema); | ||||||
nested.put(TOPIC, record.topic()); | ||||||
nested.put(PARTITION, record.kafkaPartition()); | ||||||
nested.put(OFFSET, record.kafkaOffset()); | ||||||
if (record.timestamp() != null) { | ||||||
nested.put(TIMESTAMP, record.timestamp()); | ||||||
} | ||||||
externalFieldAppender.addToStruct(record, nested); | ||||||
struct.put(metadataFieldName, nested); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void addToMap(SinkRecord record, Map<String, Object> map) { | ||||||
Map<String, Object> nested = Maps.newHashMap(); | ||||||
nested.put(TOPIC, record.topic()); | ||||||
nested.put(PARTITION, record.kafkaPartition()); | ||||||
nested.put(OFFSET, record.kafkaOffset()); | ||||||
if (record.timestamp() != null) { | ||||||
nested.put(TIMESTAMP, record.timestamp()); | ||||||
} | ||||||
externalFieldAppender.addToMap(record, nested); | ||||||
map.put(metadataFieldName, nested); | ||||||
} | ||||||
}; | ||||||
|
||||||
} else { | ||||||
Function<String, String> namer = name -> String.format("%s_%s", metadataFieldName, name); | ||||||
topicFieldName = namer.apply(TOPIC); | ||||||
partitionFieldName = namer.apply(PARTITION); | ||||||
offsetFieldName = namer.apply(OFFSET); | ||||||
timestampFieldName = namer.apply(TIMESTAMP); | ||||||
|
||||||
externalFieldAppender = | ||||||
getExternalFieldAppender(config.getString(EXTERNAL_KAFKA_METADATA), namer); | ||||||
return new RecordAppender() { | ||||||
@Override | ||||||
public void addToSchema(SchemaBuilder builder) { | ||||||
builder | ||||||
.field(topicFieldName, Schema.STRING_SCHEMA) | ||||||
.field(partitionFieldName, Schema.INT32_SCHEMA) | ||||||
.field(offsetFieldName, Schema.INT64_SCHEMA) | ||||||
.field(timestampFieldName, Schema.OPTIONAL_INT64_SCHEMA); | ||||||
externalFieldAppender.addToSchema(builder); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void addToStruct(SinkRecord record, Struct struct) { | ||||||
struct.put(topicFieldName, record.topic()); | ||||||
struct.put(partitionFieldName, record.kafkaPartition()); | ||||||
struct.put(offsetFieldName, record.kafkaOffset()); | ||||||
if (record.timestamp() != null) { | ||||||
struct.put(timestampFieldName, record.timestamp()); | ||||||
} | ||||||
externalFieldAppender.addToStruct(record, struct); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void addToMap(SinkRecord record, Map<String, Object> map) { | ||||||
map.put(topicFieldName, record.topic()); | ||||||
map.put(partitionFieldName, record.kafkaPartition()); | ||||||
map.put(offsetFieldName, record.kafkaOffset()); | ||||||
if (record.timestamp() != null) { | ||||||
map.put(timestampFieldName, record.timestamp()); | ||||||
} | ||||||
externalFieldAppender.addToMap(record, map); | ||||||
} | ||||||
}; | ||||||
} | ||||||
} | ||||||
|
||||||
@Override | ||||||
public SinkRecord apply(SinkRecord record) { | ||||||
if (record.value() == null) { | ||||||
return record; | ||||||
} else if (record.valueSchema() == null) { | ||||||
return applySchemaless(record); | ||||||
} else { | ||||||
return applyWithSchema(record); | ||||||
} | ||||||
} | ||||||
|
||||||
private SinkRecord applyWithSchema(SinkRecord record) { | ||||||
Struct value = Requirements.requireStruct(record.value(), "KafkaMetadataTransform"); | ||||||
Schema newSchema = makeUpdatedSchema(record.valueSchema()); | ||||||
Struct newValue = new Struct(newSchema); | ||||||
for (Field field : record.valueSchema().fields()) { | ||||||
newValue.put(field.name(), value.get(field)); | ||||||
} | ||||||
recordAppender.addToStruct(record, newValue); | ||||||
return record.newRecord( | ||||||
record.topic(), | ||||||
record.kafkaPartition(), | ||||||
record.keySchema(), | ||||||
record.key(), | ||||||
newSchema, | ||||||
newValue, | ||||||
record.timestamp(), | ||||||
record.headers()); | ||||||
} | ||||||
|
||||||
private Schema makeUpdatedSchema(Schema schema) { | ||||||
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); | ||||||
for (Field field : schema.fields()) { | ||||||
builder.field(field.name(), field.schema()); | ||||||
} | ||||||
recordAppender.addToSchema(builder); | ||||||
return builder.build(); | ||||||
} | ||||||
|
||||||
private SinkRecord applySchemaless(SinkRecord record) { | ||||||
Map<String, Object> value = Requirements.requireMap(record.value(), "KafkaMetadata transform"); | ||||||
tabmatfournier marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
Map<String, Object> newValue = Maps.newHashMap(value); | ||||||
recordAppender.addToMap(record, newValue); | ||||||
|
||||||
return record.newRecord( | ||||||
record.topic(), | ||||||
record.kafkaPartition(), | ||||||
record.keySchema(), | ||||||
record.key(), | ||||||
null, | ||||||
newValue, | ||||||
record.timestamp(), | ||||||
record.headers()); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public ConfigDef config() { | ||||||
return CONFIG_DEF; | ||||||
} | ||||||
|
||||||
@Override | ||||||
public void close() {} | ||||||
|
||||||
@Override | ||||||
public void configure(Map<String, ?> configs) { | ||||||
recordAppender = getRecordAppender(configs); | ||||||
} | ||||||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we just do this? Do we get an error?