Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to inject kafka metadata #205

Merged
merged 19 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@ The `CopyValue` SMT copies a value from one field to a new field.
"transforms.copyId.target.field": "id_copy",
```

# KafkaMetadataTransform
_(Experimental)_

The `KafkaMetadata` injects `topic`, `partition`, `offset`, `record timestamp`.
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved

## Configuration

| Property | Description (default value) |
|-------------------------------|-----------------------------------------------------------------------------------|
| kafka_metadata.include | (true) includes kafka metadata. False becomes a no-op |
| kafka_metadata.field_name | (_kafka_metadata) prefix for fields |
| kafka_metadata.nested | (false) if true, nests data on a struct else adds to top level as prefixed fields |
| kafka_metadata.external_field | (none) appends a constant `key,value` to the metadata (e.g. cluster name) |

If `nested` is on:

`_kafka_metadata.topic`, `_kafka_metadata.partition`, `kafka_metadata.offset`, `kafka_metadata.record_timestamp`

If `nested` is off:
`_kafka_metdata_topic`, `kafka_metadata_partition`, `kafka_metadata_offset`, `kafka_metadata_record_timestamp`

# DmsTransform
_(Experimental)_

Expand All @@ -33,7 +54,16 @@ It will promote the `data` element fields to top level and add the following met

## Configuration

The SMT currently has no configuration.
The DMS transform can also append Kafka Metadata without an additional record copy as per the `KafkaMetadataTransform` with the following
configuration:

| Property | Description (default value) |
|-------------------------------|-----------------------------------------------------------------------------------|
| kafka_metadata.include | (true) includes kafka metadata. False becomes a no-op |
| kafka_metadata.field_name | (_kafka_metadata) prefix for fields |
| kafka_metadata.nested | (false) if true, nests data on a struct else adds to top level as prefixed fields |
| kafka_metadata.external_field | (none) appends a constant `key,value` to the metadata (e.g. cluster name) |


# DebeziumTransform
_(Experimental)_
Expand All @@ -44,6 +74,16 @@ It will promote the `before` or `after` element fields to top level and add the

## Configuration

| Property | Description |
|---------------------|-----------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` |
| Property | Description |
|:-----------------------------|:--------------------------------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` |

The Debezium transform can also append Kafka Metadata without an additional record copy as per the `KafkaMetadataTransform` with the following
configuration:

| Property | Description (default value) |
|-------------------------------|-----------------------------------------------------------------------------------|
| kafka_metadata.include | (true) includes kafka metadata. False becomes a no-op |
| kafka_metadata.field_name | (_kafka_metadata) prefix for fields |
| kafka_metadata.nested | (false) if true, nests data on a struct else adds to top level as prefixed fields |
| kafka_metadata.external_field | (none) appends a constant `key,value` to the metadata (e.g. cluster name) |
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.transforms.util.KafkaMetadataAppender;
import io.tabular.iceberg.connect.transforms.util.RecordAppender;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -43,6 +45,7 @@ public class DebeziumTransform<R extends ConnectRecord<R>> implements Transforma
private static final String CDC_TARGET_PATTERN = "cdc.target.pattern";
private static final String DB_PLACEHOLDER = "{db}";
private static final String TABLE_PLACEHOLDER = "{table}";
private RecordAppender<R> kafkaAppender;

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
Expand All @@ -51,14 +54,39 @@ public class DebeziumTransform<R extends ConnectRecord<R>> implements Transforma
ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Pattern to use for setting the CDC target field value.");
"Pattern to use for setting the CDC target field value.")
.define(
KafkaMetadataAppender.INCLUDE_KAFKA_METADATA,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Include appending of Kafka metadata to SinkRecord")
.define(
KafkaMetadataAppender.KEY_METADATA_FIELD_NAME,
ConfigDef.Type.STRING,
KafkaMetadataAppender.DEFAULT_METADATA_FIELD_NAME,
ConfigDef.Importance.LOW,
"field to append Kafka metadata under")
.define(
KafkaMetadataAppender.KEY_METADATA_IS_NESTED,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"(true/false) to make a nested record under name or prefix names on the top level")
.define(
KafkaMetadataAppender.EXTERNAL_KAFKA_METADATA,
ConfigDef.Type.STRING,
"none",
ConfigDef.Importance.LOW,
"key,value representing a String to be injected on Kafka metadata (e.g. Cluster)");

private String cdcTargetPattern;

@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
cdcTargetPattern = config.getString(CDC_TARGET_PATTERN);
kafkaAppender = KafkaMetadataAppender.from(props);
}

@Override
Expand Down Expand Up @@ -110,6 +138,8 @@ private R applyWithSchema(R record) {
}
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

kafkaAppender.addToStruct(record, newValue);

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Expand Down Expand Up @@ -138,21 +168,23 @@ private R applySchemaless(R record) {
return null;
}

// create the new value
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) payload);

// create the CDC metadata
Map<String, Object> cdcMetadata = Maps.newHashMap();
cdcMetadata.put(CdcConstants.COL_OP, op);
cdcMetadata.put(CdcConstants.COL_TS, value.get("ts_ms"));
if (record instanceof SinkRecord) {
cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset());
}
kafkaAppender.addToMap(record, newValue);
setTableAndTargetFromSourceMap(value.get("source"), cdcMetadata);

if (record.key() instanceof Map) {
cdcMetadata.put(CdcConstants.COL_KEY, record.key());
}

// create the new value
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) payload);
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

return record.newRecord(
Expand Down Expand Up @@ -237,6 +269,7 @@ private Schema makeUpdatedSchema(Schema schema, Schema cdcSchema) {
}

builder.field(CdcConstants.COL_CDC, cdcSchema);
kafkaAppender.addToSchema(builder);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.transforms.util.KafkaMetadataAppender;
import io.tabular.iceberg.connect.transforms.util.RecordAppender;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -30,7 +32,33 @@
public class DmsTransform<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger LOG = LoggerFactory.getLogger(DmsTransform.class.getName());
private static final ConfigDef EMPTY_CONFIG = new ConfigDef();
public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
KafkaMetadataAppender.INCLUDE_KAFKA_METADATA,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Include appending of Kafka metadata to SinkRecord")
.define(
KafkaMetadataAppender.KEY_METADATA_FIELD_NAME,
ConfigDef.Type.STRING,
KafkaMetadataAppender.DEFAULT_METADATA_FIELD_NAME,
ConfigDef.Importance.LOW,
"field to append Kafka metadata under")
.define(
KafkaMetadataAppender.KEY_METADATA_IS_NESTED,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"(true/false) to make a nested record under name or prefix names on the top level")
.define(
KafkaMetadataAppender.EXTERNAL_KAFKA_METADATA,
ConfigDef.Type.STRING,
"none",
ConfigDef.Importance.LOW,
"key,value representing a String to be injected on Kafka metadata (e.g. Cluster)");
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
private RecordAppender<R> kafkaAppender = null;

@Override
public R apply(R record) {
Expand Down Expand Up @@ -82,6 +110,8 @@ private R applySchemaless(R record) {
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) dataObj);
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

this.kafkaAppender.addToMap(record, newValue);
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Expand All @@ -94,12 +124,14 @@ private R applySchemaless(R record) {

@Override
public ConfigDef config() {
return EMPTY_CONFIG;
return CONFIG_DEF;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
public void configure(Map<String, ?> configs) {
this.kafkaAppender = KafkaMetadataAppender.from(configs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.transforms.util.KafkaMetadataAppender;
import io.tabular.iceberg.connect.transforms.util.RecordAppender;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;

public class KafkaMetadataTransform<R extends ConnectRecord<R>> implements Transformation<R> {
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
private RecordAppender<R> kafkaAppender;

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
KafkaMetadataAppender.INCLUDE_KAFKA_METADATA,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Include appending of Kafka metadata to SinkRecord")
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
.define(
KafkaMetadataAppender.KEY_METADATA_FIELD_NAME,
ConfigDef.Type.STRING,
KafkaMetadataAppender.DEFAULT_METADATA_FIELD_NAME,
ConfigDef.Importance.LOW,
"field to append Kafka metadata under")
.define(
KafkaMetadataAppender.KEY_METADATA_IS_NESTED,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"(true/false) to make a nested record under name or prefix names on the top level")
.define(
KafkaMetadataAppender.EXTERNAL_KAFKA_METADATA,
ConfigDef.Type.STRING,
"none",
ConfigDef.Importance.LOW,
"key,value representing a String to be injected on Kafka metadata (e.g. Cluster)");

@Override
public R apply(R record) {
if (record.value() == null) {
return record;
} else if (record.valueSchema() == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}

private R applyWithSchema(R record) {
Struct value = Requirements.requireStruct(record.value(), "KafkaMetadata transform");
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
Schema newSchema = makeUpdatedSchema(record.valueSchema());
Struct newValue = new Struct(newSchema);
for (Field field : record.valueSchema().fields()) {
newValue.put(field.name(), value.get(field));
}
kafkaAppender.addToStruct(record, newValue);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
newSchema,
newValue,
record.timestamp(),
record.headers());
}

private R applySchemaless(R record) {
Map<String, Object> value = Requirements.requireMap(record.value(), "KafkaMetadata transform");
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Map<String, Object> value = Requirements.requireMap(record.value(), "KafkaMetadata transform");
Map<String, Object> value = Requirements.requireMap(record.value(), "KafkaMetadataTransform");

Map<String, Object> newValue = Maps.newHashMap(value);
kafkaAppender.addToMap(record, newValue);

return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
null,
newValue,
record.timestamp(),
record.headers());
}

private Schema makeUpdatedSchema(Schema schema) {
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
for (Field field : schema.fields()) {
builder.field(field.name(), field.schema());
}
kafkaAppender.addToSchema(builder);
return builder.build();
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {
kafkaAppender = KafkaMetadataAppender.from(configs);
}
}
Loading
Loading