Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to inject kafka metadata #205

Merged
merged 19 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@ It will promote the `data` element fields to top level and add the following met

## Configuration

The SMT currently has no configuration.
| Property | Description |
|:---------------------------|:---------------------------------------------------------------------------------------------|
| cdc.kafka.include_metadata | Boolean (true) to append Kafka topic/partition/offset/timestamp metadta to each record |
| cdc.kafka.metadata_field | Key to append metadata on. Defaults to `_kafka_metadta` |
| cdc.kafka.external_field | Optional `key,value` string to append a static field with Kafka Metadata. E.g. cluster name |

Kafka metadata injection only works on `SinkRecords` not `SourceRecords`

# DebeziumTransform
_(Experimental)_
Expand All @@ -44,6 +50,11 @@ It will promote the `before` or `after` element fields to top level and add the

## Configuration

| Property | Description |
|---------------------|-----------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` |
| Property | Description |
|:---------------------------|:---------------------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` |
| cdc.kafka.include_metadata | Boolean (true) to append Kafka topic/partition/offset/timestamp metadta to each record |
| cdc.kafka.metadata_field | Key to append metadata on. Defaults to `_kafka_metadta` |
| cdc.kafka.external_field | Optional `key,value` string to append a static field with Kafka Metadata. E.g. cluster name |

Kafka metadata injection only works on `SinkRecords` not `SourceRecords`
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.transforms.util.KafkaMetadataAppender;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -43,6 +44,7 @@ public class DebeziumTransform<R extends ConnectRecord<R>> implements Transforma
private static final String CDC_TARGET_PATTERN = "cdc.target.pattern";
private static final String DB_PLACEHOLDER = "{db}";
private static final String TABLE_PLACEHOLDER = "{table}";
private KafkaMetadataAppender kafkaAppender = null;

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
Expand All @@ -51,14 +53,35 @@ public class DebeziumTransform<R extends ConnectRecord<R>> implements Transforma
ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Pattern to use for setting the CDC target field value.");
"Pattern to use for setting the CDC target field value.")
.define(
KafkaMetadataAppender.INCLUDE_KAFKA_METADATA,
ConfigDef.Type.BOOLEAN,
false,
Importance.LOW,
"Include appending of Kafka metadata to SinkRecord")
.define(
KafkaMetadataAppender.KEY_METADATA_FIELD_NAME,
ConfigDef.Type.STRING,
KafkaMetadataAppender.DEFAULT_METADATA_FIELD_NAME,
Importance.LOW,
"field to append Kafka metadata under")
.define(
KafkaMetadataAppender.EXTERNAL_KAFKA_METADATA,
ConfigDef.Type.STRING,
"none",
Importance.LOW,
"key,value representing a String to be injected on Kafka metadata (e.g. Cluster)");

private String cdcTargetPattern;

@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
cdcTargetPattern = config.getString(CDC_TARGET_PATTERN);
if (config.getBoolean(KafkaMetadataAppender.INCLUDE_KAFKA_METADATA)) {
kafkaAppender = KafkaMetadataAppender.from(config);
}
}

@Override
Expand Down Expand Up @@ -110,6 +133,12 @@ private R applyWithSchema(R record) {
}
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

if (kafkaAppender != null) {
if (record instanceof SinkRecord) {
tabmatfournier marked this conversation as resolved.
Show resolved Hide resolved
kafkaAppender.appendToStruct((SinkRecord) record, newValue);
}
}

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Expand Down Expand Up @@ -138,21 +167,25 @@ private R applySchemaless(R record) {
return null;
}

// create the new value
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) payload);

// create the CDC metadata
Map<String, Object> cdcMetadata = Maps.newHashMap();
cdcMetadata.put(CdcConstants.COL_OP, op);
cdcMetadata.put(CdcConstants.COL_TS, value.get("ts_ms"));
if (record instanceof SinkRecord) {
cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset());
if (kafkaAppender != null) {
kafkaAppender.appendToMap((SinkRecord) record, newValue);
}
}
setTableAndTargetFromSourceMap(value.get("source"), cdcMetadata);

if (record.key() instanceof Map) {
cdcMetadata.put(CdcConstants.COL_KEY, record.key());
}

// create the new value
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) payload);
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

return record.newRecord(
Expand Down Expand Up @@ -238,6 +271,9 @@ private Schema makeUpdatedSchema(Schema schema, Schema cdcSchema) {

builder.field(CdcConstants.COL_CDC, cdcSchema);

if (kafkaAppender != null) {
kafkaAppender.appendSchema(builder);
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,43 @@
*/
package io.tabular.iceberg.connect.transforms;

import io.tabular.iceberg.connect.transforms.util.KafkaMetadataAppender;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DmsTransform<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger LOG = LoggerFactory.getLogger(DmsTransform.class.getName());
private static final ConfigDef EMPTY_CONFIG = new ConfigDef();
public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
KafkaMetadataAppender.INCLUDE_KAFKA_METADATA,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Include appending of Kafka metadata to SinkRecord")
.define(
KafkaMetadataAppender.KEY_METADATA_FIELD_NAME,
ConfigDef.Type.STRING,
KafkaMetadataAppender.DEFAULT_METADATA_FIELD_NAME,
ConfigDef.Importance.LOW,
"field to append Kafka metadata under")
.define(
KafkaMetadataAppender.EXTERNAL_KAFKA_METADATA,
ConfigDef.Type.STRING,
"none",
ConfigDef.Importance.LOW,
"key,value representing a String to be injected on Kafka metadata (e.g. Cluster)");

private KafkaMetadataAppender kafkaAppender = null;

@Override
public R apply(R record) {
Expand Down Expand Up @@ -82,6 +106,12 @@ private R applySchemaless(R record) {
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) dataObj);
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

if (kafkaAppender != null) {
if (record instanceof SinkRecord) {
kafkaAppender.appendToMap((SinkRecord) record, newValue);
}
}

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Expand All @@ -94,12 +124,17 @@ private R applySchemaless(R record) {

@Override
public ConfigDef config() {
return EMPTY_CONFIG;
return CONFIG_DEF;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
public void configure(Map<String, ?> configs) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
if (config.getBoolean(KafkaMetadataAppender.INCLUDE_KAFKA_METADATA)) {
kafkaAppender = KafkaMetadataAppender.from(config);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms.util;

import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

public class KafkaMetadataAppender {

public static final String INCLUDE_KAFKA_METADATA = "cdc.kafka.include_metadata";
public static final String EXTERNAL_KAFKA_METADATA = "cdc.kafka.external_field";
public static final String KEY_METADATA_FIELD_NAME = "cdc.kafka.metadata_field";
public static final String DEFAULT_METADATA_FIELD_NAME = "_kafka_metadata";

private final ExternalKafkaData externalData;

private final Schema schema;

private final String metadataFieldName;

public KafkaMetadataAppender(
ExternalKafkaData externalData, Schema schema, String metadataFieldName) {
this.externalData = externalData;
this.schema = schema;
this.metadataFieldName = metadataFieldName;
}

public interface ExternalKafkaData {
SchemaBuilder addToSchema(SchemaBuilder builder);

Struct addToStruct(Struct struct);

Map<String, Object> addToMap(Map<String, Object> map);
}

public static class ExternalStringKafkaData implements ExternalKafkaData {
private final String name;
private final String value;

public ExternalStringKafkaData(String name, String value) {
this.name = name;
this.value = value;
}

public static ExternalKafkaData parse(String field) {
if (field.equals("none")) {
return new EmptyExternalData();
}
String[] parts = field.split(",");
if (parts.length != 2) {
throw new ConfigException(
String.format(
"Could not parse %s for %s", field, KafkaMetadataAppender.EXTERNAL_KAFKA_METADATA));
}
return new ExternalStringKafkaData(parts[0], parts[1]);
}

@Override
public SchemaBuilder addToSchema(SchemaBuilder builder) {
return builder.field(this.name, Schema.STRING_SCHEMA);
}

@Override
public Struct addToStruct(Struct struct) {
return struct.put(this.name, this.value);
}

@Override
public Map<String, Object> addToMap(Map<String, Object> map) {
map.put(this.name, this.value);
return map;
}
}

public static class EmptyExternalData implements ExternalKafkaData {
@Override
public SchemaBuilder addToSchema(SchemaBuilder builder) {
return builder;
}

@Override
public Struct addToStruct(Struct struct) {
return struct;
}

@Override
public Map<String, Object> addToMap(Map<String, Object> map) {
return map;
}
}

public static KafkaMetadataAppender from(SimpleConfig config) {
ExternalKafkaData externalAppender =
ExternalStringKafkaData.parse(config.getString(EXTERNAL_KAFKA_METADATA));
String metadataFieldName = config.getString(KEY_METADATA_FIELD_NAME);

SchemaBuilder schema = SchemaBuilder.struct();

externalAppender
.addToSchema(schema)
.field("topic", Schema.STRING_SCHEMA)
.field("partition", Schema.INT32_SCHEMA)
.field("offset", Schema.INT64_SCHEMA)
.field("timestamp", Schema.OPTIONAL_INT64_SCHEMA);
// TODO headers
return new KafkaMetadataAppender(externalAppender, schema.build(), metadataFieldName);
}

public SchemaBuilder appendSchema(SchemaBuilder builder) {
return builder.field(this.metadataFieldName, this.schema);
}

public Struct appendToStruct(SinkRecord record, Struct struct) {
Struct metadata = new Struct(this.schema);
externalData.addToStruct(metadata);
metadata.put("topic", record.topic());
metadata.put("partition", record.kafkaPartition());
metadata.put("offset", record.kafkaOffset());
if (record.timestamp() != null) {
metadata.put("timestamp", record.timestamp());
}
struct.put(this.metadataFieldName, metadata);
return struct;
}

public Map<String, Object> appendToMap(SinkRecord record, Map<String, Object> map) {
Map<String, Object> metadata = Maps.newHashMap();
externalData.addToMap(metadata);
metadata.put("topic", record.topic());
metadata.put("partition", record.kafkaPartition());
metadata.put("offset", record.kafkaOffset());
if (record.timestamp() != null) {
metadata.put("timestamp", record.timestamp());
}
map.put(this.metadataFieldName, metadata);
return map;
}
}
Loading
Loading