Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for JSON/camelcase conversion #2705

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public class ProtobufData {
private boolean useWrapperForRawPrimitives;
private boolean generateStructForNulls;
private boolean generateIndexForUnions;
private boolean useJsonFieldNames;

public ProtobufData() {
this(new ProtobufDataConfig.Builder().with(
Expand All @@ -332,6 +333,7 @@ public ProtobufData(ProtobufDataConfig protobufDataConfig) {
this.useWrapperForRawPrimitives = protobufDataConfig.useWrapperForRawPrimitives();
this.generateStructForNulls = protobufDataConfig.generateStructForNulls();
this.generateIndexForUnions = protobufDataConfig.generateIndexForUnions();
this.useJsonFieldNames = protobufDataConfig.useJsonFieldNames();
}

/**
Expand Down Expand Up @@ -519,10 +521,14 @@ private Object fromConnectData(
throw new DataException("Invalid message name: " + scopedStructName);
}
for (Field field : schema.fields()) {
String fieldName = scrubName(field.name());
Object fieldCtx = getFieldType(ctx, fieldName);
String fieldName = field.name();
if (useJsonFieldNames) {
fieldName = fromJsonCase(fieldName);
}
String scrubbedFieldName = scrubName(fieldName);
Object fieldCtx = getFieldType(ctx, scrubbedFieldName);
Object connectFieldVal = ignoreDefaultForNullables
? struct.getWithoutDefault(field.name()) : struct.get(field);
? struct.getWithoutDefault(fieldName) : struct.get(field);
Object fieldValue = fromConnectData(
fieldCtx,
field.schema(),
Expand All @@ -539,10 +545,10 @@ private Object fromConnectData(
fieldValue = union.getValue();
} else {
fieldDescriptor = messageBuilder.getDescriptorForType()
.findFieldByName(fieldName);
.findFieldByName(scrubbedFieldName);
}
if (fieldDescriptor == null) {
throw new DataException("Cannot find field with name " + fieldName);
throw new DataException("Cannot find field with name " + scrubbedFieldName);
}
if (fieldValue != null) {
messageBuilder.setField(fieldDescriptor, fieldValue);
Expand Down Expand Up @@ -727,12 +733,16 @@ private MessageDefinition messageDefinitionFromConnectSchema(
String fieldTag = fieldSchema.parameters() != null ? fieldSchema.parameters()
.get(PROTOBUF_TYPE_TAG) : null;
int tag = fieldTag != null ? Integer.parseInt(fieldTag) : index++;
String fieldName = field.name();
if (useJsonFieldNames) {
fieldName = fromJsonCase(fieldName);
}
FieldDefinition fieldDef = fieldDefinitionFromConnectSchema(
ctx,
schema,
message,
fieldSchema,
scrubName(field.name()),
scrubName(fieldName),
tag
);
if (fieldDef != null) {
Expand Down Expand Up @@ -763,6 +773,22 @@ private MessageDefinition messageDefinitionFromConnectSchema(
return message.build();
}

private static String fromJsonCase(final String str) {
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < str.length(); i++) {
char c = str.charAt(i);
if (Character.isUpperCase(c)) {
if (i != 0) {
sb.append("_");
}
sb.append(Character.toLowerCase(c));
} else {
sb.append(c);
}
}
return sb.toString();
}

private void oneofDefinitionFromConnectSchema(
FromConnectContext ctx,
DynamicSchema.Builder schema,
Expand All @@ -776,12 +802,17 @@ private void oneofDefinitionFromConnectSchema(
String fieldTag = fieldSchema.parameters() != null ? fieldSchema.parameters()
.get(PROTOBUF_TYPE_TAG) : null;
int tag = fieldTag != null ? Integer.parseInt(fieldTag) : 0;
String fieldName = field.name();
if (useJsonFieldNames) {
fieldName = fromJsonCase(fieldName);
}
String scrubbedFieldName = scrubName(fieldName);
FieldDefinition fieldDef = fieldDefinitionFromConnectSchema(
ctx,
schema,
message,
field.schema(),
scrubName(field.name()),
scrubbedFieldName,
tag
);
if (fieldDef != null) {
Expand Down Expand Up @@ -1363,7 +1394,8 @@ private void setStructField(
Struct result,
FieldDescriptor fieldDescriptor
) {
final String fieldName = fieldDescriptor.getName();
final String fieldName = useJsonFieldNames
? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
final Field field = schema.field(fieldName);
if ((isPrimitiveOrRepeated(fieldDescriptor) && !isOptional(fieldDescriptor))
|| (generateStructForNulls || message.hasField(fieldDescriptor))) {
Expand Down Expand Up @@ -1425,7 +1457,9 @@ private SchemaBuilder toConnectSchema(
// Already added field as oneof
continue;
}
builder.field(fieldDescriptor.getName(), toConnectSchema(ctx, fieldDescriptor));
final String fieldName = useJsonFieldNames
? fieldDescriptor.getJsonName() : fieldDescriptor.getName();
builder.field(fieldName, toConnectSchema(ctx, fieldDescriptor));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class ProtobufDataConfig extends AbstractDataConfig {
public static final String GENERATE_INDEX_FOR_UNIONS_DOC = "Whether to suffix union"
+ "names with an underscore followed by an index";

public static final String JSON_FIELD_NAMES_CONFIG = "json.field.names";
public static final boolean JSON_FIELD_NAMES_DEFAULT = false;
public static final String JSON_FIELD_NAMES_DOC = "Whether to convert protobuf field names "
+ "to camelcase for internal data representation and vice-versa.";

public static ConfigDef baseConfigDef() {
return AbstractDataConfig.baseConfigDef()
.define(ENHANCED_PROTOBUF_SCHEMA_SUPPORT_CONFIG,
Expand Down Expand Up @@ -112,7 +117,12 @@ public static ConfigDef baseConfigDef() {
ConfigDef.Type.BOOLEAN,
GENERATE_INDEX_FOR_UNIONS_DEFAULT,
ConfigDef.Importance.LOW,
GENERATE_INDEX_FOR_UNIONS_DOC
GENERATE_INDEX_FOR_UNIONS_DOC)
.define(JSON_FIELD_NAMES_CONFIG,
ConfigDef.Type.BOOLEAN,
JSON_FIELD_NAMES_DEFAULT,
ConfigDef.Importance.LOW,
JSON_FIELD_NAMES_DOC
);
}

Expand Down Expand Up @@ -156,6 +166,10 @@ public boolean generateIndexForUnions() {
return this.getBoolean(GENERATE_INDEX_FOR_UNIONS_CONFIG);
}

public boolean useJsonFieldNames() {
return this.getBoolean(JSON_FIELD_NAMES_CONFIG);
}

public static class Builder {

private final Map<String, Object> props = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
import io.confluent.kafka.serializers.protobuf.test.KeyTimestampValueOuterClass.KeyTimestampValue;
import io.confluent.kafka.serializers.protobuf.test.TestMessageProtos.TestMessage2;
import io.confluent.kafka.serializers.protobuf.test.TimestampValueOuterClass.TimestampValue;
import java.util.List;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand All @@ -39,9 +40,12 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import io.confluent.connect.protobuf.test.Key;
Expand Down Expand Up @@ -281,6 +285,23 @@ public void testFromConnectDataForValue() {
assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
}

@Test
public void testFromConnectDataForValueUseJsonFieldNames() {
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();

Map<String, Object> configs = new HashMap<>(SR_CONFIG);
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
converter.configure(configs, false);

SchemaAndValue schemaAndValue = getExpectedTestMessageWithJsonFieldNames();

byte[] result = converter.fromConnectData("my-topic",
schemaAndValue.schema(), schemaAndValue.value()
);

assertArrayEquals(expected, Arrays.copyOfRange(result, PROTOBUF_BYTES_START, result.length));
}

@Test
public void testFromConnectDataForValueWithNamespace() {
final byte[] expected = HELLO_WORLD_MESSAGE.toByteArray();
Expand Down Expand Up @@ -531,6 +552,44 @@ public void testToConnectDataForValue() throws Exception {
assertEquals(expected, result);
}

@Test
public void testToConnectDataForValueUseJsonFieldNames() throws Exception {
Map<String, Object> configs = new HashMap<>(SR_CONFIG);
configs.put(ProtobufDataConfig.JSON_FIELD_NAMES_CONFIG, true);
converter.configure(configs, false);
// extra byte for message index
final byte[] input = concat(new byte[]{0, 0, 0, 0, 1, 0}, HELLO_WORLD_MESSAGE.toByteArray());
schemaRegistry.register("my-topic-value", getSchema(TestMessage.getDescriptor()));
SchemaAndValue result = converter.toConnectData("my-topic", input);

SchemaAndValue expected = getExpectedTestMessageWithJsonFieldNames();

assertEquals(expected.schema(), result.schema());
assertEquals(expected, result);
}

private SchemaAndValue getExpectedTestMessageWithJsonFieldNames() {
Struct testMessageStruct = getTestMessageStruct(TEST_MSG_STRING, 123);
Schema testMessageSchema = getTestMessageSchema();

final SchemaBuilder builder = SchemaBuilder.struct();
builder.name("TestMessage").version(1);
List values = new ArrayList<>();
for (Field field : testMessageSchema.fields()) {
String jsonFieldName = TestMessage.getDescriptor()
.findFieldByName(field.name()).getJsonName();
builder.field(jsonFieldName, field.schema());
values.add(testMessageStruct.get(field));
}
final Schema jsonSchema = builder.build();
final Struct jsonStruct = new Struct(jsonSchema);
final Iterator<Object> valuesIt = values.iterator();
for (Field field : jsonSchema.fields()) {
jsonStruct.put(field, valuesIt.next());
}
return new SchemaAndValue(jsonSchema, jsonStruct);
}

@Test
public void testToConnectDataForValueWithSecondMessage() throws Exception {
converter.configure(SR_CONFIG, false);
Expand Down