diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index c9846b36..ce279ad8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,7 +6,7 @@ bson-ver = "4.11.0" hadoop-ver = "3.3.6" hive-ver = "2.3.9" http-client-ver = "5.2.1" -iceberg-ver = "1.4.2" +iceberg-ver = "1.5.2" jackson-ver = "2.14.2" junit-ver = "5.10.0" kafka-ver = "3.5.1" @@ -33,6 +33,7 @@ iceberg-gcp = { module = "org.apache.iceberg:iceberg-gcp", version.ref = "iceber iceberg-gcp-bundle = { module = "org.apache.iceberg:iceberg-gcp-bundle", version.ref = "iceberg-ver" } iceberg-guava = { module = "org.apache.iceberg:iceberg-bundled-guava", version.ref = "iceberg-ver" } iceberg-hive-metastore = { module = "org.apache.iceberg:iceberg-hive-metastore", version.ref = "iceberg-ver" } +iceberg-kafka-connect-events = {module = "org.apache.iceberg:iceberg-kafka-connect-events", version.ref = "iceberg-ver"} iceberg-nessie = { module = "org.apache.iceberg:iceberg-nessie", version.ref = "iceberg-ver" } iceberg-orc = { module = "org.apache.iceberg:iceberg-orc", version.ref = "iceberg-ver" } iceberg-parquet = { module = "org.apache.iceberg:iceberg-parquet", version.ref = "iceberg-ver" } @@ -60,7 +61,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0" [bundles] -iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet"] +iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet", "iceberg-kafka-connect-events"] iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"] jackson = ["jackson-core", "jackson-databind"] kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"] diff --git a/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/CommitResponsePayload.java b/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/CommitResponsePayload.java index bef7a75b..49bd7203 100644 --- a/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/CommitResponsePayload.java +++ b/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/CommitResponsePayload.java @@ -25,7 +25,7 @@ import org.apache.avro.SchemaBuilder; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.DeprecatedAvroSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types.StructType; @@ -58,12 +58,12 @@ public CommitResponsePayload( Map dataFileNames = Maps.newHashMap(); dataFileNames.put(dataFileStruct, "org.apache.iceberg.GenericDataFile"); dataFileNames.put(partitionType, "org.apache.iceberg.PartitionData"); - Schema dataFileSchema = AvroSchemaUtil.convert(dataFileStruct, dataFileNames); + Schema dataFileSchema = DeprecatedAvroSchemaUtil.convert(dataFileStruct, dataFileNames); Map deleteFileNames = Maps.newHashMap(); deleteFileNames.put(dataFileStruct, "org.apache.iceberg.GenericDeleteFile"); deleteFileNames.put(partitionType, "org.apache.iceberg.PartitionData"); - Schema deleteFileSchema = AvroSchemaUtil.convert(dataFileStruct, deleteFileNames); + Schema deleteFileSchema = DeprecatedAvroSchemaUtil.convert(dataFileStruct, deleteFileNames); this.avroSchema = SchemaBuilder.builder() diff --git a/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Element.java b/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Element.java index 79f72f02..a7cd9fb8 100644 --- a/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Element.java +++ b/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Element.java @@ -23,14 +23,14 @@ import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData.SchemaConstructable; -import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.DeprecatedAvroSchemaUtil; public interface Element extends IndexedRecord, SchemaConstructable { // this is required by Iceberg's Avro deserializer to check for special metadata // fields, but we aren't using any String DUMMY_FIELD_ID = "-1"; - String FIELD_ID_PROP = AvroSchemaUtil.FIELD_ID_PROP; + String FIELD_ID_PROP = DeprecatedAvroSchemaUtil.FIELD_ID_PROP; Schema UUID_SCHEMA = LogicalTypes.uuid().addToSchema(SchemaBuilder.builder().fixed("uuid").size(16)); diff --git a/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Event.java b/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Event.java index cfca858a..fdb1d17a 100644 --- a/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Event.java +++ b/kafka-connect-events/src/main/java/io/tabular/iceberg/connect/events/Event.java @@ -24,7 +24,7 @@ import java.util.UUID; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; -import org.apache.iceberg.avro.AvroEncoderUtil; +import org.apache.iceberg.avro.DeprecatedAvroEncoderUtil; import org.apache.iceberg.common.DynFields; import org.apache.iceberg.data.avro.DecoderResolver; @@ -41,7 +41,7 @@ public class Event implements Element { public static byte[] encode(Event event) { try { - return AvroEncoderUtil.encode(event, event.getSchema()); + return DeprecatedAvroEncoderUtil.encode(event, event.getSchema()); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -49,7 +49,7 @@ public static byte[] encode(Event event) { public static Event decode(byte[] bytes) { try { - Event event = AvroEncoderUtil.decode(bytes); + Event event = DeprecatedAvroEncoderUtil.decode(bytes); // workaround for memory leak, until this is addressed upstream DECODER_CACHES.get().clear(); return event; @@ -63,6 +63,13 @@ public Event(Schema avroSchema) { this.avroSchema = avroSchema; } + /** + * @deprecated + *

This class is required for a fallback decoder that can decode the legacy iceberg 1.4.x avro schemas in the case where + * the coordinator topic was not fully drained during the upgrade to 1.5.2. This entire module should be removed + * in later releases.

+ */ + @Deprecated public Event(String groupId, EventType type, Payload payload) { this.id = UUID.randomUUID(); this.type = type; diff --git a/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedAvroEncoderUtil.java b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedAvroEncoderUtil.java new file mode 100644 index 00000000..679da6dc --- /dev/null +++ b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedAvroEncoderUtil.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Iceberg 1.5.0 introduced a breaking change to Avro serialization that the connector uses when encoding + * messages for the control topic, requiring a way to fall back to decoding 1.4.x series messages that may + * be left behind on a control topic when upgrading. + * + * This class should be removed in later releases. + */ +public class DeprecatedAvroEncoderUtil { + + private DeprecatedAvroEncoderUtil() {} + + static { + LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get()); + } + + private static final byte[] MAGIC_BYTES = new byte[] {(byte) 0xC2, (byte) 0x01}; + + public static byte[] encode(T datum, Schema avroSchema) throws IOException { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + DataOutputStream dataOut = new DataOutputStream(out); + + // Write the magic bytes + dataOut.write(MAGIC_BYTES); + + // Write avro schema + dataOut.writeUTF(avroSchema.toString()); + + // Encode the datum with avro schema. + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + DatumWriter writer = new GenericAvroWriter<>(avroSchema); + writer.write(datum, encoder); + encoder.flush(); + + return out.toByteArray(); + } + } + + public static T decode(byte[] data) throws IOException { + try (ByteArrayInputStream in = new ByteArrayInputStream(data, 0, data.length)) { + DataInputStream dataInput = new DataInputStream(in); + + // Read the magic bytes + byte header0 = dataInput.readByte(); + byte header1 = dataInput.readByte(); + Preconditions.checkState( + header0 == MAGIC_BYTES[0] && header1 == MAGIC_BYTES[1], + "Unrecognized header bytes: 0x%02X 0x%02X", + header0, + header1); + + // Read avro schema + Schema avroSchema = new Schema.Parser().parse(dataInput.readUTF()); + + // Decode the datum with the parsed avro schema. + BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(in, null); + DatumReader reader = new DeprecatedGenericAvroReader<>(avroSchema); + reader.setSchema(avroSchema); + return reader.read(null, binaryDecoder); + } + } +} diff --git a/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedAvroSchemaUtil.java b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedAvroSchemaUtil.java new file mode 100644 index 00000000..acaf5939 --- /dev/null +++ b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedAvroSchemaUtil.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.iceberg.mapping.MappedField; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +/** + * Iceberg 1.5.0 introduced a breaking change to Avro serialization that the connector uses when encoding + * messages for the control topic, requiring a way to fall back to decoding 1.4.x series messages that may + * be left behind on a control topic when upgrading. + * + * This class should be removed in later releases. + */ +public class DeprecatedAvroSchemaUtil { + + private DeprecatedAvroSchemaUtil() {} + + // Original Iceberg field name corresponding to a sanitized Avro name + public static final String ICEBERG_FIELD_NAME_PROP = "iceberg-field-name"; + public static final String FIELD_ID_PROP = "field-id"; + public static final String KEY_ID_PROP = "key-id"; + public static final String VALUE_ID_PROP = "value-id"; + public static final String ELEMENT_ID_PROP = "element-id"; + public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc"; + + private static final Schema NULL = Schema.create(Schema.Type.NULL); + private static final Schema.Type MAP = Schema.Type.MAP; + private static final Schema.Type ARRAY = Schema.Type.ARRAY; + private static final Schema.Type UNION = Schema.Type.UNION; + private static final Schema.Type RECORD = Schema.Type.RECORD; + + public static Schema convert(org.apache.iceberg.Schema schema, String tableName) { + return convert(schema, ImmutableMap.of(schema.asStruct(), tableName)); + } + + public static Schema convert( + org.apache.iceberg.Schema schema, Map names) { + return TypeUtil.visit(schema, new DeprecatedTypeToSchema(names)); + } + + public static Schema convert(Type type) { + return convert(type, ImmutableMap.of()); + } + + public static Schema convert(Types.StructType type, String name) { + return convert(type, ImmutableMap.of(type, name)); + } + + public static Schema convert(Type type, Map names) { + return TypeUtil.visit(type, new DeprecatedTypeToSchema(names)); + } + + public static Type convert(Schema schema) { + return AvroSchemaVisitor.visit(schema, new SchemaToType(schema)); + } + + public static org.apache.iceberg.Schema toIceberg(Schema schema) { + final List fields = convert(schema).asNestedType().asStructType().fields(); + return new org.apache.iceberg.Schema(fields); + } + + static boolean hasIds(Schema schema) { + return AvroCustomOrderSchemaVisitor.visit(schema, new HasIds()); + } + + /** + * Check if any of the nodes in a given avro schema is missing an ID + * + *

To have an ID for a node: + * + *

    + *
  • a field node under struct (record) schema should have {@link #FIELD_ID_PROP} property + *
  • an element node under list (array) schema should have {@link #ELEMENT_ID_PROP} property + *
  • a pair of key and value node under map schema should have {@link #KEY_ID_PROP} and {@link + * #VALUE_ID_PROP} respectively + *
  • a primitive node is not assigned any ID properties + *
+ * + *

+ * + * @param schema an Avro Schema + * @return true if any of the nodes of the given Avro Schema is missing an ID property, false + * otherwise + */ + static boolean missingIds(Schema schema) { + return AvroCustomOrderSchemaVisitor.visit(schema, new MissingIds()); + } + + public static Map convertTypes(Types.StructType type, String name) { + DeprecatedTypeToSchema converter = new DeprecatedTypeToSchema(ImmutableMap.of(type, name)); + TypeUtil.visit(type, converter); + return ImmutableMap.copyOf(converter.getConversionMap()); + } + + public static Schema pruneColumns( + Schema schema, Set selectedIds, NameMapping nameMapping) { + return new PruneColumns(selectedIds, nameMapping).rootSchema(schema); + } + + public static Schema buildAvroProjection( + Schema schema, org.apache.iceberg.Schema expected, Map renames) { + return AvroCustomOrderSchemaVisitor.visit(schema, new BuildAvroProjection(expected, renames)); + } + + public static boolean isTimestamptz(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + if (logicalType instanceof LogicalTypes.TimestampMillis + || logicalType instanceof LogicalTypes.TimestampMicros) { + // timestamptz is adjusted to UTC + Object value = schema.getObjectProp(ADJUST_TO_UTC_PROP); + + if (value == null) { + // not all avro timestamp logical types will have the adjust_to_utc prop, default to + // timestamp without timezone + return false; + } else if (value instanceof Boolean) { + return (Boolean) value; + } else if (value instanceof String) { + return Boolean.parseBoolean((String) value); + } + } + + return false; + } + + public static boolean isOptionSchema(Schema schema) { + if (schema.getType() == UNION && schema.getTypes().size() == 2) { + if (schema.getTypes().get(0).getType() == Schema.Type.NULL) { + return true; + } else if (schema.getTypes().get(1).getType() == Schema.Type.NULL) { + return true; + } + } + return false; + } + + static Schema toOption(Schema schema) { + if (schema.getType() == UNION) { + Preconditions.checkArgument( + isOptionSchema(schema), "Union schemas are not supported: %s", schema); + return schema; + } else { + return Schema.createUnion(NULL, schema); + } + } + + static Schema fromOption(Schema schema) { + Preconditions.checkArgument( + schema.getType() == UNION, "Expected union schema but was passed: %s", schema); + Preconditions.checkArgument( + schema.getTypes().size() == 2, "Expected optional schema, but was passed: %s", schema); + if (schema.getTypes().get(0).getType() == Schema.Type.NULL) { + return schema.getTypes().get(1); + } else { + return schema.getTypes().get(0); + } + } + + static Schema fromOptions(List options) { + Preconditions.checkArgument( + options.size() == 2, "Expected two schemas, but was passed: %s options", options.size()); + if (options.get(0).getType() == Schema.Type.NULL) { + return options.get(1); + } else { + return options.get(0); + } + } + + public static boolean isKeyValueSchema(Schema schema) { + return schema.getType() == RECORD && schema.getFields().size() == 2; + } + + static Schema createMap(int keyId, Schema keySchema, int valueId, Schema valueSchema) { + String keyValueName = "k" + keyId + "_v" + valueId; + + Schema.Field keyField = new Schema.Field("key", keySchema, null, (Object) null); + keyField.addProp(FIELD_ID_PROP, keyId); + + Schema.Field valueField = + new Schema.Field( + "value", + valueSchema, + null, + isOptionSchema(valueSchema) ? JsonProperties.NULL_VALUE : null); + valueField.addProp(FIELD_ID_PROP, valueId); + + return LogicalMap.get() + .addToSchema( + Schema.createArray( + Schema.createRecord( + keyValueName, null, null, false, ImmutableList.of(keyField, valueField)))); + } + + static Schema createProjectionMap( + String recordName, + int keyId, + String keyName, + Schema keySchema, + int valueId, + String valueName, + Schema valueSchema) { + String keyValueName = "k" + keyId + "_v" + valueId; + + Schema.Field keyField = new Schema.Field("key", keySchema, null, (Object) null); + if (!"key".equals(keyName)) { + keyField.addAlias(keyName); + } + keyField.addProp(FIELD_ID_PROP, keyId); + + Schema.Field valueField = + new Schema.Field( + "value", + valueSchema, + null, + isOptionSchema(valueSchema) ? JsonProperties.NULL_VALUE : null); + valueField.addProp(FIELD_ID_PROP, valueId); + if (!"value".equals(valueName)) { + valueField.addAlias(valueName); + } + + Schema keyValueRecord = + Schema.createRecord( + keyValueName, null, null, false, ImmutableList.of(keyField, valueField)); + if (!keyValueName.equals(recordName)) { + keyValueRecord.addAlias(recordName); + } + + return LogicalMap.get().addToSchema(Schema.createArray(keyValueRecord)); + } + + private static Integer getId(Schema schema, String propertyName) { + Integer id = getId(schema, propertyName, null, null); + Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName); + return id; + } + + private static Integer getId( + Schema schema, String propertyName, NameMapping nameMapping, List names) { + if (schema.getType() == UNION) { + return getId(fromOption(schema), propertyName, nameMapping, names); + } + + Object id = schema.getObjectProp(propertyName); + if (id != null) { + return toInt(id); + } else if (nameMapping != null) { + MappedField mappedField = nameMapping.find(names); + if (mappedField != null) { + return mappedField.id(); + } + } + + return null; + } + + static boolean hasProperty(Schema schema, String propertyName) { + if (schema.getType() == UNION) { + return hasProperty(fromOption(schema), propertyName); + } + return schema.getObjectProp(propertyName) != null; + } + + public static int getKeyId(Schema schema) { + Preconditions.checkArgument( + schema.getType() == MAP, "Cannot get map key id for non-map schema: %s", schema); + return getId(schema, KEY_ID_PROP); + } + + static Integer getKeyId( + Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { + Preconditions.checkArgument( + schema.getType() == MAP, "Cannot get map key id for non-map schema: %s", schema); + List names = Lists.newArrayList(parentFieldNames); + names.add("key"); + return getId(schema, KEY_ID_PROP, nameMapping, names); + } + + public static int getValueId(Schema schema) { + Preconditions.checkArgument( + schema.getType() == MAP, "Cannot get map value id for non-map schema: %s", schema); + return getId(schema, VALUE_ID_PROP); + } + + static Integer getValueId( + Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { + Preconditions.checkArgument( + schema.getType() == MAP, "Cannot get map value id for non-map schema: %s", schema); + List names = Lists.newArrayList(parentFieldNames); + names.add("value"); + return getId(schema, VALUE_ID_PROP, nameMapping, names); + } + + public static int getElementId(Schema schema) { + Preconditions.checkArgument( + schema.getType() == ARRAY, "Cannot get array element id for non-array schema: %s", schema); + return getId(schema, ELEMENT_ID_PROP); + } + + static Integer getElementId( + Schema schema, NameMapping nameMapping, Iterable parentFieldNames) { + Preconditions.checkArgument( + schema.getType() == ARRAY, "Cannot get array element id for non-array schema: %s", schema); + List names = Lists.newArrayList(parentFieldNames); + names.add("element"); + return getId(schema, ELEMENT_ID_PROP, nameMapping, names); + } + + public static int getFieldId(Schema.Field field) { + Integer id = getFieldId(field, null, null); + Preconditions.checkNotNull(id, "Missing expected '%s' property", FIELD_ID_PROP); + return id; + } + + static Integer getFieldId( + Schema.Field field, NameMapping nameMapping, Iterable parentFieldNames) { + Object id = field.getObjectProp(FIELD_ID_PROP); + if (id != null) { + return toInt(id); + } else if (nameMapping != null) { + List names = Lists.newArrayList(parentFieldNames); + names.add(field.name()); + MappedField mappedField = nameMapping.find(names); + if (mappedField != null) { + return mappedField.id(); + } + } + + return null; + } + + public static boolean hasFieldId(Schema.Field field) { + return field.getObjectProp(FIELD_ID_PROP) != null; + } + + private static int toInt(Object value) { + if (value instanceof Number) { + return ((Number) value).intValue(); + } else if (value instanceof String) { + return Integer.parseInt((String) value); + } + + throw new UnsupportedOperationException("Cannot coerce value to int: " + value); + } + + static Schema copyRecord(Schema record, List newFields, String newName) { + Schema copy; + if (newName != null) { + copy = Schema.createRecord(newName, record.getDoc(), null, record.isError(), newFields); + // the namespace is defaulted to the record's namespace if it is null, which causes renames + // without the namespace to fail. using "" instead of null changes this behavior to match the + // original schema. + copy.addAlias(record.getName(), record.getNamespace() == null ? "" : record.getNamespace()); + } else { + copy = + Schema.createRecord( + record.getName(), + record.getDoc(), + record.getNamespace(), + record.isError(), + newFields); + } + + for (Map.Entry prop : record.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + + return copy; + } + + static Schema.Field copyField(Schema.Field field, Schema newSchema, String newName) { + Schema.Field copy = + new Schema.Field(newName, newSchema, field.doc(), field.defaultVal(), field.order()); + + for (Map.Entry prop : field.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + + if (!newName.equals(field.name())) { + copy.addAlias(field.name()); + } + + return copy; + } + + static Schema replaceElement(Schema array, Schema elementSchema) { + Preconditions.checkArgument( + array.getType() == ARRAY, "Cannot invoke replaceElement on non array schema: %s", array); + Schema copy = Schema.createArray(elementSchema); + for (Map.Entry prop : array.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + return copy; + } + + static Schema replaceValue(Schema map, Schema valueSchema) { + Preconditions.checkArgument( + map.getType() == MAP, "Cannot invoke replaceValue on non map schema: %s", map); + Schema copy = Schema.createMap(valueSchema); + for (Map.Entry prop : map.getObjectProps().entrySet()) { + copy.addProp(prop.getKey(), prop.getValue()); + } + return copy; + } + + public static String makeCompatibleName(String name) { + if (!validAvroName(name)) { + return sanitize(name); + } + return name; + } + + static boolean validAvroName(String name) { + int length = name.length(); + Preconditions.checkArgument(length > 0, "Empty name"); + char first = name.charAt(0); + if (!(Character.isLetter(first) || first == '_')) { + return false; + } + + for (int i = 1; i < length; i++) { + char character = name.charAt(i); + if (!(Character.isLetterOrDigit(character) || character == '_')) { + return false; + } + } + return true; + } + + static String sanitize(String name) { + int length = name.length(); + StringBuilder sb = new StringBuilder(name.length()); + char first = name.charAt(0); + if (!(Character.isLetter(first) || first == '_')) { + sb.append(sanitize(first)); + } else { + sb.append(first); + } + + for (int i = 1; i < length; i++) { + char character = name.charAt(i); + if (!(Character.isLetterOrDigit(character) || character == '_')) { + sb.append(sanitize(character)); + } else { + sb.append(character); + } + } + return sb.toString(); + } + + private static String sanitize(char character) { + if (Character.isDigit(character)) { + return "_" + character; + } + return "_x" + Integer.toHexString(character).toUpperCase(); + } +} diff --git a/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedGenericAvroReader.java b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedGenericAvroReader.java new file mode 100644 index 00000000..70302ef3 --- /dev/null +++ b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedGenericAvroReader.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.List; +import java.util.function.Supplier; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.iceberg.common.DynClasses; +import org.apache.iceberg.data.avro.DecoderResolver; + +/** + * Iceberg 1.5.0 introduced a breaking change to Avro serialization that the connector uses when encoding + * messages for the control topic, requiring a way to fall back to decoding 1.4.x series messages that may + * be left behind on a control topic when upgrading. + * + * This class should be removed in later releases. + */ +public class DeprecatedGenericAvroReader implements DatumReader, SupportsRowPosition { + + private final Schema readSchema; + private ClassLoader loader = Thread.currentThread().getContextClassLoader(); + private Schema fileSchema = null; + private ValueReader reader = null; + + public static GenericAvroReader create(Schema schema) { + return new GenericAvroReader<>(schema); + } + + DeprecatedGenericAvroReader(Schema readSchema) { + this.readSchema = readSchema; + } + + @SuppressWarnings("unchecked") + private void initReader() { + this.reader = (ValueReader) AvroSchemaVisitor.visit(readSchema, new ReadBuilder(loader)); + } + + @Override + public void setSchema(Schema schema) { + this.fileSchema = Schema.applyAliases(schema, readSchema); + initReader(); + } + + public void setClassLoader(ClassLoader newClassLoader) { + this.loader = newClassLoader; + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + + @Override + public T read(T reuse, Decoder decoder) throws IOException { + return DecoderResolver.resolveAndRead(decoder, readSchema, fileSchema, reader, reuse); + } + + private static class ReadBuilder extends AvroSchemaVisitor> { + private final ClassLoader loader; + + private ReadBuilder(ClassLoader loader) { + this.loader = loader; + } + + @Override + @SuppressWarnings("unchecked") + public ValueReader record(Schema record, List names, List> fields) { + try { + Class recordClass = + DynClasses.builder().loader(loader).impl(record.getFullName()).buildChecked(); + if (IndexedRecord.class.isAssignableFrom(recordClass)) { + return ValueReaders.record(fields, (Class) recordClass, record); + } + + return ValueReaders.record(fields, record); + + } catch (ClassNotFoundException e) { + return ValueReaders.record(fields, record); + } + } + + @Override + public ValueReader union(Schema union, List> options) { + return ValueReaders.union(options); + } + + @Override + public ValueReader array(Schema array, ValueReader elementReader) { + if (array.getLogicalType() instanceof LogicalMap) { + ValueReaders.StructReader keyValueReader = (ValueReaders.StructReader) elementReader; + ValueReader keyReader = keyValueReader.reader(0); + ValueReader valueReader = keyValueReader.reader(1); + + if (keyReader == ValueReaders.utf8s()) { + return ValueReaders.arrayMap(ValueReaders.strings(), valueReader); + } + + return ValueReaders.arrayMap(keyReader, valueReader); + } + + return ValueReaders.array(elementReader); + } + + @Override + public ValueReader map(Schema map, ValueReader valueReader) { + return ValueReaders.map(ValueReaders.strings(), valueReader); + } + + @Override + public ValueReader primitive(Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + // Spark uses the same representation + return ValueReaders.ints(); + + case "time-micros": + return ValueReaders.longs(); + + case "timestamp-millis": + // adjust to microseconds + ValueReader longs = ValueReaders.longs(); + return (ValueReader) (decoder, ignored) -> longs.read(decoder, null) * 1000L; + + case "timestamp-micros": + // Spark uses the same representation + return ValueReaders.longs(); + + case "decimal": + return ValueReaders.decimal( + ValueReaders.decimalBytesReader(primitive), + ((LogicalTypes.Decimal) logicalType).getScale()); + + case "uuid": + return ValueReaders.uuids(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalType); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueReaders.nulls(); + case BOOLEAN: + return ValueReaders.booleans(); + case INT: + return ValueReaders.ints(); + case LONG: + return ValueReaders.longs(); + case FLOAT: + return ValueReaders.floats(); + case DOUBLE: + return ValueReaders.doubles(); + case STRING: + return ValueReaders.utf8s(); + case FIXED: + return ValueReaders.fixed(primitive); + case BYTES: + return ValueReaders.byteBuffers(); + case ENUM: + return ValueReaders.enums(primitive.getEnumSymbols()); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } + } +} diff --git a/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedTypeToSchema.java b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedTypeToSchema.java new file mode 100644 index 00000000..626ecb7b --- /dev/null +++ b/kafka-connect-events/src/main/java/org/apache/iceberg/avro/DeprecatedTypeToSchema.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.Deque; +import java.util.List; +import java.util.Map; +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +/** + * Iceberg 1.5.0 introduced a breaking change to Avro serialization that the connector uses when encoding + * messages for the control topic, requiring a way to fall back to decoding 1.4.x series messages that may + * be left behind on a control topic when upgrading. + * + * This class should be removed in later releases. + */ +class DeprecatedTypeToSchema extends TypeUtil.SchemaVisitor { + private static final Schema BOOLEAN_SCHEMA = Schema.create(Schema.Type.BOOLEAN); + private static final Schema INTEGER_SCHEMA = Schema.create(Schema.Type.INT); + private static final Schema LONG_SCHEMA = Schema.create(Schema.Type.LONG); + private static final Schema FLOAT_SCHEMA = Schema.create(Schema.Type.FLOAT); + private static final Schema DOUBLE_SCHEMA = Schema.create(Schema.Type.DOUBLE); + private static final Schema DATE_SCHEMA = + LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + private static final Schema TIME_SCHEMA = + LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema TIMESTAMP_SCHEMA = + LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema TIMESTAMPTZ_SCHEMA = + LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); + private static final Schema UUID_SCHEMA = + LogicalTypes.uuid().addToSchema(Schema.createFixed("uuid_fixed", null, null, 16)); + private static final Schema BINARY_SCHEMA = Schema.create(Schema.Type.BYTES); + + static { + TIMESTAMP_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, false); + TIMESTAMPTZ_SCHEMA.addProp(AvroSchemaUtil.ADJUST_TO_UTC_PROP, true); + } + + private final Deque fieldIds = Lists.newLinkedList(); + private final Map results = Maps.newHashMap(); + private final Map names; + + DeprecatedTypeToSchema(Map names) { + this.names = names; + } + + Map getConversionMap() { + return results; + } + + @Override + public Schema schema(org.apache.iceberg.Schema schema, Schema structSchema) { + return structSchema; + } + + @Override + public void beforeField(Types.NestedField field) { + fieldIds.push(field.fieldId()); + } + + @Override + public void afterField(Types.NestedField field) { + fieldIds.pop(); + } + + @Override + public Schema struct(Types.StructType struct, List fieldSchemas) { + Schema recordSchema = results.get(struct); + if (recordSchema != null) { + return recordSchema; + } + + String recordName = names.get(struct); + if (recordName == null) { + recordName = "r" + fieldIds.peek(); + } + + List structFields = struct.fields(); + List fields = Lists.newArrayListWithExpectedSize(fieldSchemas.size()); + for (int i = 0; i < structFields.size(); i += 1) { + Types.NestedField structField = structFields.get(i); + String origFieldName = structField.name(); + boolean isValidFieldName = AvroSchemaUtil.validAvroName(origFieldName); + String fieldName = isValidFieldName ? origFieldName : AvroSchemaUtil.sanitize(origFieldName); + Schema.Field field = + new Schema.Field( + fieldName, + fieldSchemas.get(i), + structField.doc(), + structField.isOptional() ? JsonProperties.NULL_VALUE : null); + if (!isValidFieldName) { + field.addProp(AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP, origFieldName); + } + field.addProp(AvroSchemaUtil.FIELD_ID_PROP, structField.fieldId()); + fields.add(field); + } + + recordSchema = Schema.createRecord(recordName, null, null, false, fields); + + results.put(struct, recordSchema); + + return recordSchema; + } + + @Override + public Schema field(Types.NestedField field, Schema fieldSchema) { + if (field.isOptional()) { + return AvroSchemaUtil.toOption(fieldSchema); + } else { + return fieldSchema; + } + } + + @Override + public Schema list(Types.ListType list, Schema elementSchema) { + Schema listSchema = results.get(list); + if (listSchema != null) { + return listSchema; + } + + if (list.isElementOptional()) { + listSchema = Schema.createArray(AvroSchemaUtil.toOption(elementSchema)); + } else { + listSchema = Schema.createArray(elementSchema); + } + + listSchema.addProp(AvroSchemaUtil.ELEMENT_ID_PROP, list.elementId()); + + results.put(list, listSchema); + + return listSchema; + } + + @Override + public Schema map(Types.MapType map, Schema keySchema, Schema valueSchema) { + Schema mapSchema = results.get(map); + if (mapSchema != null) { + return mapSchema; + } + + if (keySchema.getType() == Schema.Type.STRING) { + // if the map has string keys, use Avro's map type + mapSchema = + Schema.createMap( + map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema); + mapSchema.addProp(AvroSchemaUtil.KEY_ID_PROP, map.keyId()); + mapSchema.addProp(AvroSchemaUtil.VALUE_ID_PROP, map.valueId()); + + } else { + mapSchema = + AvroSchemaUtil.createMap( + map.keyId(), + keySchema, + map.valueId(), + map.isValueOptional() ? AvroSchemaUtil.toOption(valueSchema) : valueSchema); + } + + results.put(map, mapSchema); + + return mapSchema; + } + + @Override + public Schema primitive(Type.PrimitiveType primitive) { + Schema primitiveSchema; + switch (primitive.typeId()) { + case BOOLEAN: + primitiveSchema = BOOLEAN_SCHEMA; + break; + case INTEGER: + primitiveSchema = INTEGER_SCHEMA; + break; + case LONG: + primitiveSchema = LONG_SCHEMA; + break; + case FLOAT: + primitiveSchema = FLOAT_SCHEMA; + break; + case DOUBLE: + primitiveSchema = DOUBLE_SCHEMA; + break; + case DATE: + primitiveSchema = DATE_SCHEMA; + break; + case TIME: + primitiveSchema = TIME_SCHEMA; + break; + case TIMESTAMP: + if (((Types.TimestampType) primitive).shouldAdjustToUTC()) { + primitiveSchema = TIMESTAMPTZ_SCHEMA; + } else { + primitiveSchema = TIMESTAMP_SCHEMA; + } + break; + case STRING: + primitiveSchema = STRING_SCHEMA; + break; + case UUID: + primitiveSchema = UUID_SCHEMA; + break; + case FIXED: + Types.FixedType fixed = (Types.FixedType) primitive; + primitiveSchema = Schema.createFixed("fixed_" + fixed.length(), null, null, fixed.length()); + break; + case BINARY: + primitiveSchema = BINARY_SCHEMA; + break; + case DECIMAL: + Types.DecimalType decimal = (Types.DecimalType) primitive; + primitiveSchema = + LogicalTypes.decimal(decimal.precision(), decimal.scale()) + .addToSchema( + Schema.createFixed( + "decimal_" + decimal.precision() + "_" + decimal.scale(), + null, + null, + TypeUtil.decimalRequiredBytes(decimal.precision()))); + break; + default: + throw new UnsupportedOperationException("Unsupported type ID: " + primitive.typeId()); + } + + results.put(primitive, primitiveSchema); + + return primitiveSchema; + } +} diff --git a/kafka-connect-events/src/test/java/io/tabular/iceberg/connect/events/EventSerializationTest.java b/kafka-connect-events/src/test/java/io/tabular/iceberg/connect/events/EventSerializationTest.java index 50029d00..e84e3c15 100644 --- a/kafka-connect-events/src/test/java/io/tabular/iceberg/connect/events/EventSerializationTest.java +++ b/kafka-connect-events/src/test/java/io/tabular/iceberg/connect/events/EventSerializationTest.java @@ -33,7 +33,7 @@ public class EventSerializationTest { public void testCommitRequestSerialization() { UUID commitId = UUID.randomUUID(); Event event = - new Event("cg-connector", EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId)); + new Event("cg-connector", EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId)); byte[] data = Event.encode(event); Event result = Event.decode(data); @@ -47,15 +47,15 @@ public void testCommitRequestSerialization() { public void testCommitResponseSerialization() { UUID commitId = UUID.randomUUID(); Event event = - new Event( - "cg-connector", - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( - StructType.of(), - commitId, - new TableName(Collections.singletonList("db"), "tbl"), - Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), - Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); + new Event( + "cg-connector", + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + StructType.of(), + commitId, + new TableName(Collections.singletonList("db"), "tbl"), + Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), + Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); byte[] data = Event.encode(event); Event result = Event.decode(data); @@ -74,14 +74,14 @@ public void testCommitResponseSerialization() { public void testCommitReadySerialization() { UUID commitId = UUID.randomUUID(); Event event = - new Event( - "cg-connector", - EventType.COMMIT_READY, - new CommitReadyPayload( - commitId, - Arrays.asList( - new TopicPartitionOffset("topic", 1, 1L, 1L), - new TopicPartitionOffset("topic", 2, null, null)))); + new Event( + "cg-connector", + EventType.COMMIT_READY, + new CommitReadyPayload( + commitId, + Arrays.asList( + new TopicPartitionOffset("topic", 1, 1L, 1L), + new TopicPartitionOffset("topic", 2, null, null)))); byte[] data = Event.encode(event); Event result = Event.decode(data); @@ -97,11 +97,11 @@ public void testCommitReadySerialization() { public void testCommitTableSerialization() { UUID commitId = UUID.randomUUID(); Event event = - new Event( - "cg-connector", - EventType.COMMIT_TABLE, - new CommitTablePayload( - commitId, new TableName(Collections.singletonList("db"), "tbl"), 1L, 2L)); + new Event( + "cg-connector", + EventType.COMMIT_TABLE, + new CommitTablePayload( + commitId, new TableName(Collections.singletonList("db"), "tbl"), 1L, 2L)); byte[] data = Event.encode(event); Event result = Event.decode(data); @@ -118,8 +118,8 @@ public void testCommitTableSerialization() { public void testCommitCompleteSerialization() { UUID commitId = UUID.randomUUID(); Event event = - new Event( - "cg-connector", EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, 2L)); + new Event( + "cg-connector", EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, 2L)); byte[] data = Event.encode(event); Event result = Event.decode(data); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java index 6a51676c..d6fc5e6e 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java @@ -22,12 +22,14 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.data.Offset; -import io.tabular.iceberg.connect.events.Event; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.function.Function; + +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.Event; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -55,6 +57,8 @@ public abstract class Channel { private final Map controlTopicOffsets = Maps.newHashMap(); private final String producerId; + private final EventDecoder eventDecoder; + public Channel( String name, String consumerGroupId, @@ -69,8 +73,8 @@ public Channel( this.consumer = clientFactory.createConsumer(consumerGroupId); consumer.subscribe(ImmutableList.of(controlTopic)); this.admin = clientFactory.createAdmin(); - this.producerId = pair.first().toString(); + this.eventDecoder = new EventDecoder(config.catalogName()); } protected void send(Event event) { @@ -89,7 +93,7 @@ protected void send( .map( event -> { LOG.debug("Sending event of type: {}", event.type().name()); - byte[] data = Event.encode(event); + byte[] data = AvroUtil.encode(event); // key by producer ID to keep event order return new ProducerRecord<>(controlTopic, producerId, data); }) @@ -124,12 +128,13 @@ record -> { // so increment the record offset by one controlTopicOffsets.put(record.partition(), record.offset() + 1); - Event event = Event.decode(record.value()); - - if (event.groupId().equals(groupId)) { - LOG.debug("Received event of type: {}", event.type().name()); - if (receiveFn.apply(new Envelope(event, record.partition(), record.offset()))) { - LOG.debug("Handled event of type: {}", event.type().name()); + Event event = eventDecoder.decode(record.value()); + if (event != null) { + if (event.groupId().equals(groupId)) { + LOG.debug("Received event of type: {}", event.type().name()); + if (receiveFn.apply(new Envelope(event, record.partition(), record.offset()))) { + LOG.debug("Handled event of type: {}", event.type().name()); + } } } }); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java index fb1191eb..502a01bd 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java @@ -21,14 +21,18 @@ import static java.util.stream.Collectors.groupingBy; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import java.time.OffsetDateTime; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; import java.util.UUID; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,11 +40,14 @@ public class CommitState { private static final Logger LOG = LoggerFactory.getLogger(CommitState.class); private final List commitBuffer = new LinkedList<>(); - private final List readyBuffer = new LinkedList<>(); + private final List readyBuffer = new LinkedList<>(); private long startTime; private UUID currentCommitId; private final IcebergSinkConfig config; + private final Comparator dateTimeComparator = + OffsetDateTime::compareTo; + public CommitState(IcebergSinkConfig config) { this.config = config; } @@ -49,17 +56,17 @@ public void addResponse(Envelope envelope) { commitBuffer.add(envelope); if (!isCommitInProgress()) { LOG.debug( - "Received commit response with commit-id={} when no commit in progress, this can happen during recovery", - ((CommitResponsePayload) envelope.event().payload()).commitId()); + "Received data written with commit-id={} when no commit in progress, this can happen during recovery", + ((DataWritten) envelope.event().payload()).commitId()); } } public void addReady(Envelope envelope) { - readyBuffer.add((CommitReadyPayload) envelope.event().payload()); + readyBuffer.add((DataComplete) envelope.event().payload()); if (!isCommitInProgress()) { LOG.debug( - "Received commit ready for commit-id={} when no commit in progress, this can happen during recovery", - ((CommitReadyPayload) envelope.event().payload()).commitId()); + "Received data complete for commit-id={} when no commit in progress, this can happen during recovery", + ((DataComplete) envelope.event().payload()).commitId()); } } @@ -139,26 +146,30 @@ public Map> tableCommitMap() { .collect( groupingBy( envelope -> - ((CommitResponsePayload) envelope.event().payload()) - .tableName() - .toIdentifier())); + ((DataWritten) envelope.event().payload()) + .tableReference() + .identifier())); } - public Long vtts(boolean partialCommit) { + public OffsetDateTime vtts(boolean partialCommit) { boolean validVtts = !partialCommit && readyBuffer.stream() .flatMap(event -> event.assignments().stream()) .allMatch(offset -> offset.timestamp() != null); - Long result; - if (validVtts) { - result = - readyBuffer.stream() - .flatMap(event -> event.assignments().stream()) - .mapToLong(TopicPartitionOffset::timestamp) - .min() - .getAsLong(); + OffsetDateTime result; + if (validVtts) { + Optional maybeResult = + readyBuffer.stream() + .flatMap(event -> event.assignments().stream()) + .map(TopicPartitionOffset::timestamp) + .min(dateTimeComparator); + if (maybeResult.isPresent()) { + result = maybeResult.get(); + } else { + throw new NoSuchElementException("no vtts found"); + } } else { result = null; } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java index d8c299a0..10730d99 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitterImpl.java @@ -23,13 +23,6 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.data.Offset; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventType; -import io.tabular.iceberg.connect.events.TableName; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; import java.io.IOException; import java.time.Duration; import java.util.List; @@ -38,6 +31,13 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -129,8 +129,8 @@ private void throwExceptionIfCoordinatorIsTerminated() { } private boolean receive(Envelope envelope, CommittableSupplier committableSupplier) { - if (envelope.event().type() == EventType.COMMIT_REQUEST) { - UUID commitId = ((CommitRequestPayload) envelope.event().payload()).commitId(); + if (envelope.event().type() == PayloadType.START_COMMIT) { + UUID commitId = ((StartCommit) envelope.event().payload()).commitId(); sendCommitResponse(commitId, committableSupplier); return true; } @@ -149,11 +149,10 @@ private void sendCommitResponse(UUID commitId, CommittableSupplier committableSu Event commitResponse = new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( writerResult.partitionStruct(), commitId, - TableName.of(writerResult.tableIdentifier()), + TableReference.of(config.catalogName(), writerResult.tableIdentifier()), writerResult.dataFiles(), writerResult.deleteFiles())); @@ -180,8 +179,7 @@ private void sendCommitResponse(UUID commitId, CommittableSupplier committableSu Event commitReady = new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload(commitId, assignments)); + new DataComplete(commitId, assignments)); events.add(commitReady); Map offsets = committable.offsetsByTopicPartition(); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 6d8b8fb5..0fcc0bb7 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -23,15 +23,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.events.CommitCompletePayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitTablePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventType; -import io.tabular.iceberg.connect.events.TableName; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Duration; +import java.time.OffsetDateTime; import java.util.Collection; import java.util.List; import java.util.Map; @@ -47,6 +42,11 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -98,10 +98,7 @@ public void process() { // send out begin commit commitState.startNewCommit(); Event event = - new Event( - config.controlGroupId(), - EventType.COMMIT_REQUEST, - new CommitRequestPayload(commitState.currentCommitId())); + new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId())); send(event); LOG.debug("Started new commit with commit-id={}", commitState.currentCommitId().toString()); } @@ -115,10 +112,10 @@ public void process() { private boolean receive(Envelope envelope) { switch (envelope.event().type()) { - case COMMIT_RESPONSE: + case DATA_WRITTEN: commitState.addResponse(envelope); return true; - case COMMIT_READY: + case DATA_COMPLETE: commitState.addReady(envelope); if (commitState.isCommitReady(totalPartitionCount)) { commit(false); @@ -142,7 +139,7 @@ private void doCommit(boolean partialCommit) { Map> commitMap = commitState.tableCommitMap(); String offsetsJson = offsetsJson(); - Long vtts = commitState.vtts(partialCommit); + OffsetDateTime vtts = commitState.vtts(partialCommit); Tasks.foreach(commitMap.entrySet()) .executeWith(exec) @@ -157,10 +154,7 @@ private void doCommit(boolean partialCommit) { commitState.clearResponses(); Event event = - new Event( - config.controlGroupId(), - EventType.COMMIT_COMPLETE, - new CommitCompletePayload(commitState.currentCommitId(), vtts)); + new Event(config.controlGroupId(), new CommitComplete(commitState.currentCommitId(), vtts)); send(event); LOG.info( @@ -179,7 +173,10 @@ private String offsetsJson() { } private void commitToTable( - TableIdentifier tableIdentifier, List envelopeList, String offsetsJson, Long vtts) { + TableIdentifier tableIdentifier, + List envelopeList, + String offsetsJson, + OffsetDateTime vtts) { Table table; try { table = catalog.loadTable(tableIdentifier); @@ -235,7 +232,7 @@ private void commitToTable( if (i == lastIdx) { appendOp.set(snapshotOffsetsProp, offsetsJson); if (vtts != null) { - appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts)); + appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts.toInstant().toEpochMilli())); } } @@ -249,7 +246,7 @@ private void commitToTable( deltaOp.set(snapshotOffsetsProp, offsetsJson); deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); if (vtts != null) { - deltaOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts)); + deltaOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts.toInstant().toEpochMilli())); } dataFiles.forEach(deltaOp::addRows); deleteFiles.forEach(deltaOp::addDeletes); @@ -260,9 +257,11 @@ private void commitToTable( Event event = new Event( config.controlGroupId(), - EventType.COMMIT_TABLE, - new CommitTablePayload( - commitState.currentCommitId(), TableName.of(tableIdentifier), snapshotId, vtts)); + new CommitToTable( + commitState.currentCommitId(), + TableReference.of(config.catalogName(), tableIdentifier), + snapshotId, + vtts)); send(event); LOG.info( diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java index 67f46f79..fc81030e 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java @@ -20,7 +20,7 @@ import static java.util.stream.Collectors.toList; -import io.tabular.iceberg.connect.events.CommitResponsePayload; +import java.time.OffsetDateTime; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -31,6 +31,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.DataWritten; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +83,7 @@ public static List dataFiles( tableIdentifier, envelopes, "data", - CommitResponsePayload::dataFiles, + DataWritten::dataFiles, dataFile -> dataFile.path().toString()); } @@ -97,7 +98,7 @@ public static List deleteFiles( tableIdentifier, envelopes, "delete", - CommitResponsePayload::deleteFiles, + DataWritten::deleteFiles, deleteFile -> deleteFile.path().toString()); } @@ -106,14 +107,13 @@ private static List deduplicatedFiles( TableIdentifier tableIdentifier, List envelopes, String fileType, - Function> extractFilesFromPayload, + Function> extractFilesFromPayload, Function extractPathFromFile) { List> filesAndEnvelopes = envelopes.stream() .flatMap( envelope -> { - CommitResponsePayload payload = - (CommitResponsePayload) envelope.event().payload(); + DataWritten payload = (DataWritten) envelope.event().payload(); List files = extractFilesFromPayload.apply(payload); if (files == null) { return Stream.empty(); @@ -207,7 +207,7 @@ private static class SimpleEnvelope { private final long offset; private final UUID eventId; private final String eventGroupId; - private final Long eventTimestamp; + private final OffsetDateTime eventTimestamp; private final UUID payloadCommitId; SimpleEnvelope(Envelope envelope) { @@ -216,7 +216,7 @@ private static class SimpleEnvelope { eventId = envelope.event().id(); eventGroupId = envelope.event().groupId(); eventTimestamp = envelope.event().timestamp(); - payloadCommitId = ((CommitResponsePayload) envelope.event().payload()).commitId(); + payloadCommitId = ((DataWritten) envelope.event().payload()).commitId(); } @Override @@ -255,7 +255,7 @@ public String toString() { + eventGroupId + '\'' + ", eventTimestamp=" - + eventTimestamp + + eventTimestamp.toInstant().toEpochMilli() + ", payloadCommitId=" + payloadCommitId + '}'; diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java index 3458a3b7..27939897 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java @@ -18,7 +18,7 @@ */ package io.tabular.iceberg.connect.channel; -import io.tabular.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.Event; public class Envelope { private final Event event; diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java new file mode 100644 index 00000000..7ba4f878 --- /dev/null +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/EventDecoder.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import io.tabular.iceberg.connect.events.CommitCompletePayload; +import io.tabular.iceberg.connect.events.CommitReadyPayload; +import io.tabular.iceberg.connect.events.CommitRequestPayload; +import io.tabular.iceberg.connect.events.CommitResponsePayload; +import io.tabular.iceberg.connect.events.CommitTablePayload; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.Payload; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * Iceberg 1.5.0 introduced a breaking change to Avro serialization that the connector uses when encoding + * messages for the control topic, requiring a way to fall back to decoding 1.4.x series messages that may + * be left behind on a control topic when upgrading. + * + * This class should be removed in later revisions. + */ +public class EventDecoder { + + private final String catalogName; + + public EventDecoder(String catalogName) { + this.catalogName = catalogName; + } + + /** + * @deprecated + *

This provides a fallback decoder that can decode the legacy iceberg 1.4.x avro schemas in the case where + * the coordinator topic was not fully drained during the upgrade to 1.5.2

+ */ + @Deprecated + public Event decode(byte[] value) { + try { + return AvroUtil.decode(value); + } catch (SchemaParseException exception) { + io.tabular.iceberg.connect.events.Event event = + io.tabular.iceberg.connect.events.Event.decode(value); + return convertLegacy(event); + } + } + + private Event convertLegacy(io.tabular.iceberg.connect.events.Event event) { + Payload payload = convertPayload(event.payload()); + if (payload == null) { + return null; + } + return new Event(event.groupId(), payload); + } + + private Payload convertPayload(io.tabular.iceberg.connect.events.Payload payload) { + if (payload instanceof CommitRequestPayload) { + CommitRequestPayload pay = (CommitRequestPayload) payload; + return new StartCommit(pay.commitId()); + } else if (payload instanceof CommitResponsePayload) { + CommitResponsePayload pay = (CommitResponsePayload) payload; + return convertCommitResponse(pay); + } else if (payload instanceof CommitReadyPayload) { + CommitReadyPayload pay = (CommitReadyPayload) payload; + List legacyTPO = pay.assignments(); + List converted = + legacyTPO.stream() + .map( + t -> + new TopicPartitionOffset( + t.topic(), + t.partition(), + t.offset(), + t.timestamp() == null + ? null + : OffsetDateTime.ofInstant( + Instant.ofEpochMilli(t.timestamp()), ZoneOffset.UTC))) + .collect(Collectors.toList()); + return new DataComplete(pay.commitId(), converted); + } else if (payload instanceof CommitTablePayload) { + CommitTablePayload pay = (CommitTablePayload) payload; + return new CommitToTable( + pay.commitId(), + TableReference.of(catalogName, pay.tableName().toIdentifier()), + pay.snapshotId(), + OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC)); + } else if (payload instanceof CommitCompletePayload) { + CommitCompletePayload pay = (CommitCompletePayload) payload; + return new CommitComplete( + pay.commitId(), + OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC)); + } else { + throw new IllegalStateException( + String.format("Unknown event payload: %s", payload.getSchema())); + } + } + + private Payload convertCommitResponse(CommitResponsePayload payload) { + List dataFiles = payload.dataFiles(); + List deleteFiles = payload.deleteFiles(); + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + return null; + } + + String target = (dataFiles.isEmpty()) ? "deleteFiles" : "dataFiles"; + List fields = + payload.getSchema().getField(target).schema().getTypes().stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst() + .get() + .getElementType() + .getField("partition") + .schema() + .getFields(); + + List convertedFields = Lists.newArrayListWithExpectedSize(fields.size()); + + for (Schema.Field f : fields) { + Schema fieldSchema = + f.schema().getTypes().stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst() + .get(); + Type fieldType = AvroSchemaUtil.convert(fieldSchema); + int fieldId = (int) f.getObjectProp("field-id"); + convertedFields.add(Types.NestedField.of(fieldId, f.schema().isNullable(), f.name(), fieldType)); + } + + Types.StructType convertedStructType = Types.StructType.of(convertedFields); + return new DataWritten( + convertedStructType, + payload.commitId(), + TableReference.of(catalogName, payload.tableName().toIdentifier()), + payload.dataFiles(), + payload.deleteFiles()); + } +} diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java index 6efa09be..7048fc73 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java @@ -18,6 +18,9 @@ */ package io.tabular.iceberg.connect.data; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -37,8 +40,11 @@ public Long offset() { return offset; } - public Long timestamp() { - return timestamp; + public OffsetDateTime timestamp() { + if (timestamp == null) { + return null; + } + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC); } @Override diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java index 25f4a68c..5b2bbc74 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/ChannelTestBase.java @@ -99,6 +99,7 @@ public void before() { when(config.commitThreads()).thenReturn(1); when(config.controlGroupId()).thenReturn(CONTROL_CONSUMER_GROUP_ID); when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class)); + when(config.catalogName()).thenReturn("catalog"); TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class); when(partitionInfo.partition()).thenReturn(0); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java index e06e7f1e..9ee165d9 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java @@ -23,15 +23,23 @@ import static org.mockito.Mockito.when; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.Payload; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.UUID; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.Payload; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; public class CommitStateTest { + + private OffsetDateTime offsetDateTime(Long ts) { + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + } + @Test public void testIsCommitReady() { TopicPartitionOffset tp = mock(TopicPartitionOffset.class); @@ -39,15 +47,15 @@ public void testIsCommitReady() { CommitState commitState = new CommitState(mock(IcebergSinkConfig.class)); commitState.startNewCommit(); - CommitReadyPayload payload1 = mock(CommitReadyPayload.class); + DataComplete payload1 = mock(DataComplete.class); when(payload1.commitId()).thenReturn(commitState.currentCommitId()); when(payload1.assignments()).thenReturn(ImmutableList.of(tp, tp)); - CommitReadyPayload payload2 = mock(CommitReadyPayload.class); + DataComplete payload2 = mock(DataComplete.class); when(payload2.commitId()).thenReturn(commitState.currentCommitId()); when(payload2.assignments()).thenReturn(ImmutableList.of(tp)); - CommitReadyPayload payload3 = mock(CommitReadyPayload.class); + DataComplete payload3 = mock(DataComplete.class); when(payload3.commitId()).thenReturn(UUID.randomUUID()); when(payload3.assignments()).thenReturn(ImmutableList.of(tp)); @@ -61,16 +69,16 @@ public void testIsCommitReady() { @Test public void testGetVtts() { - CommitReadyPayload payload1 = mock(CommitReadyPayload.class); + DataComplete payload1 = mock(DataComplete.class); TopicPartitionOffset tp1 = mock(TopicPartitionOffset.class); - when(tp1.timestamp()).thenReturn(3L); + when(tp1.timestamp()).thenReturn(offsetDateTime(3L)); TopicPartitionOffset tp2 = mock(TopicPartitionOffset.class); - when(tp2.timestamp()).thenReturn(2L); + when(tp2.timestamp()).thenReturn(offsetDateTime(2L)); when(payload1.assignments()).thenReturn(ImmutableList.of(tp1, tp2)); - CommitReadyPayload payload2 = mock(CommitReadyPayload.class); + DataComplete payload2 = mock(DataComplete.class); TopicPartitionOffset tp3 = mock(TopicPartitionOffset.class); - when(tp3.timestamp()).thenReturn(1L); + when(tp3.timestamp()).thenReturn(offsetDateTime(1L)); when(payload2.assignments()).thenReturn(ImmutableList.of(tp3)); CommitState commitState = new CommitState(mock(IcebergSinkConfig.class)); @@ -79,11 +87,11 @@ public void testGetVtts() { commitState.addReady(wrapInEnvelope(payload1)); commitState.addReady(wrapInEnvelope(payload2)); - assertThat(commitState.vtts(false)).isEqualTo(1L); + assertThat(commitState.vtts(false)).isEqualTo(offsetDateTime(1L)); assertThat(commitState.vtts(true)).isNull(); // null timestamp for one, so should not set a vtts - CommitReadyPayload payload3 = mock(CommitReadyPayload.class); + DataComplete payload3 = mock(DataComplete.class); TopicPartitionOffset tp4 = mock(TopicPartitionOffset.class); when(tp4.timestamp()).thenReturn(null); when(payload3.assignments()).thenReturn(ImmutableList.of(tp4)); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java index f87e5324..b3365be3 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitterImplTest.java @@ -18,6 +18,7 @@ */ package io.tabular.iceberg.connect.channel; +import static io.tabular.iceberg.connect.fixtures.EventTestUtil.createDataFile; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -30,14 +31,10 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.data.Offset; import io.tabular.iceberg.connect.data.WriterResult; -import io.tabular.iceberg.connect.events.CommitCompletePayload; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventTestUtil; -import io.tabular.iceberg.connect.events.EventType; import java.io.IOException; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; import java.util.Optional; @@ -50,6 +47,13 @@ import org.apache.iceberg.DeleteFile; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -78,6 +82,8 @@ import org.mockito.Mockito; class CommitterImplTest { + + private static final String CATALOG_NAME = "iceberg"; private static final String SOURCE_TOPIC = "source-topic-name"; private static final TopicPartition SOURCE_TP0 = new TopicPartition(SOURCE_TOPIC, 0); private static final TopicPartition SOURCE_TP1 = new TopicPartition(SOURCE_TOPIC, 1); @@ -229,7 +235,7 @@ private static > void assertSameContentFiles( expected.stream().map(CommitterImplTest::toPath).collect(Collectors.toList())); } - private void assertCommitResponse( + private void assertDataWritten( ProducerRecord producerRecord, UUID expectedProducerId, UUID expectedCommitId, @@ -238,27 +244,28 @@ private void assertCommitResponse( List expectedDeleteFiles) { assertThat(producerRecord.key()).isEqualTo(expectedProducerId.toString()); - Event event = Event.decode(producerRecord.value()); - assertThat(event.type()).isEqualTo(EventType.COMMIT_RESPONSE); - assertThat(event.payload()).isInstanceOf(CommitResponsePayload.class); - CommitResponsePayload commitResponsePayload = (CommitResponsePayload) event.payload(); - assertThat(commitResponsePayload.commitId()).isEqualTo(expectedCommitId); - assertThat(commitResponsePayload.tableName().toIdentifier()).isEqualTo(expectedTableIdentifier); - assertSameContentFiles(commitResponsePayload.dataFiles(), expectedDataFiles); - assertSameContentFiles(commitResponsePayload.deleteFiles(), expectedDeleteFiles); + Event event = AvroUtil.decode(producerRecord.value()); + assertThat(event.type()).isEqualTo(PayloadType.DATA_WRITTEN); + assertThat(event.payload()).isInstanceOf(DataWritten.class); + DataWritten payload = (DataWritten) event.payload(); + assertThat(payload.commitId()).isEqualTo(expectedCommitId); + assertThat(payload.tableReference().identifier()).isEqualTo(expectedTableIdentifier); + assertThat(payload.tableReference().catalog()).isEqualTo(CATALOG_NAME); + assertSameContentFiles(payload.dataFiles(), expectedDataFiles); + assertSameContentFiles(payload.deleteFiles(), expectedDeleteFiles); } - private void assertCommitReady( + private void assertDataComplete( ProducerRecord producerRecord, UUID expectedProducerId, UUID expectedCommitId, - Map> expectedAssignments) { + Map> expectedAssignments) { assertThat(producerRecord.key()).isEqualTo(expectedProducerId.toString()); - Event event = Event.decode(producerRecord.value()); - assertThat(event.type()).isEqualTo(EventType.COMMIT_READY); - assertThat(event.payload()).isInstanceOf(CommitReadyPayload.class); - CommitReadyPayload commitReadyPayload = (CommitReadyPayload) event.payload(); + Event event = AvroUtil.decode(producerRecord.value()); + assertThat(event.type()).isEqualTo(PayloadType.DATA_COMPLETE); + assertThat(event.payload()).isInstanceOf(DataComplete.class); + DataComplete commitReadyPayload = (DataComplete) event.payload(); assertThat(commitReadyPayload.commitId()).isEqualTo(expectedCommitId); assertThat( commitReadyPayload.assignments().stream() @@ -274,6 +281,10 @@ private void assertCommitReady( .collect(Collectors.toList())); } + private OffsetDateTime offsetDateTime(Long ms) { + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(ms), ZoneOffset.UTC); + } + @Test public void testShouldRewindOffsetsToStableControlGroupConsumerOffsetsForAssignedPartitionsOnConstruction() @@ -386,11 +397,10 @@ public void testCommitShouldDoNothingIfThereIsNoCommitRequestMessage() throws IO CONTROL_TOPIC_PARTITION.partition(), 0, UUID.randomUUID().toString(), - Event.encode( + AvroUtil.encode( new Event( CONFIG.controlGroupId(), - EventType.COMMIT_COMPLETE, - new CommitCompletePayload(UUID.randomUUID(), 100L))))); + new CommitComplete(UUID.randomUUID(), offsetDateTime(100L)))))); committer.commit(committableSupplier); @@ -410,7 +420,7 @@ public void testCommitShouldRespondToCommitRequest() throws IOException { ImmutableMap.of( CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); - List dataFiles = ImmutableList.of(EventTestUtil.createDataFile()); + List dataFiles = ImmutableList.of(createDataFile()); List deleteFiles = ImmutableList.of(); Types.StructType partitionStruct = Types.StructType.of(); Map sourceOffsets = ImmutableMap.of(SOURCE_TP0, new Offset(100L, 200L)); @@ -432,28 +442,27 @@ public void testCommitShouldRespondToCommitRequest() throws IOException { CONTROL_TOPIC_PARTITION.partition(), 0, UUID.randomUUID().toString(), - Event.encode( + AvroUtil.encode( new Event( CONFIG.controlGroupId(), - EventType.COMMIT_REQUEST, - new CommitRequestPayload(commitId))))); + new StartCommit(commitId))))); committer.commit(committableSupplier); assertThat(producer.transactionCommitted()).isTrue(); assertThat(producer.history()).hasSize(2); - assertCommitResponse( + assertDataWritten( producer.history().get(0), producerId, commitId, TABLE_1_IDENTIFIER, dataFiles, deleteFiles); - assertCommitReady( + assertDataComplete( producer.history().get(1), producerId, commitId, - ImmutableMap.of(SOURCE_TP0, Pair.of(100L, 200L))); + ImmutableMap.of(SOURCE_TP0, Pair.of(100L, offsetDateTime(200L)))); assertThat(producer.consumerGroupOffsetsHistory()).hasSize(2); Map expectedConsumerOffset = @@ -491,17 +500,17 @@ public void testCommitWhenCommittableIsEmpty() throws IOException { CONTROL_TOPIC_PARTITION.partition(), 0, UUID.randomUUID().toString(), - Event.encode( + AvroUtil.encode( new Event( CONFIG.controlGroupId(), - EventType.COMMIT_REQUEST, - new CommitRequestPayload(commitId))))); + new StartCommit(commitId))))); + committer.commit(committableSupplier); assertThat(producer.transactionCommitted()).isTrue(); assertThat(producer.history()).hasSize(1); - assertCommitReady( + assertDataComplete( producer.history().get(0), producerId, commitId, @@ -529,7 +538,7 @@ public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() thr ImmutableMap.of( CONFIG.controlGroupId(), ImmutableMap.of(SOURCE_TP0, 110L, SOURCE_TP1, 100L))); - List dataFiles = ImmutableList.of(EventTestUtil.createDataFile()); + List dataFiles = ImmutableList.of(createDataFile()); List deleteFiles = ImmutableList.of(); Types.StructType partitionStruct = Types.StructType.of(); CommittableSupplier committableSupplier = @@ -550,30 +559,29 @@ public void testCommitShouldCommitOffsetsOnlyForPartitionsWeMadeProgressOn() thr CONTROL_TOPIC_PARTITION.partition(), 0, UUID.randomUUID().toString(), - Event.encode( + AvroUtil.encode( new Event( CONFIG.controlGroupId(), - EventType.COMMIT_REQUEST, - new CommitRequestPayload(commitId))))); + new StartCommit(commitId))))); committer.commit(committableSupplier); assertThat(producer.transactionCommitted()).isTrue(); assertThat(producer.history()).hasSize(2); - assertCommitResponse( + assertDataWritten( producer.history().get(0), producerId, commitId, TABLE_1_IDENTIFIER, dataFiles, deleteFiles); - assertCommitReady( + assertDataComplete( producer.history().get(1), producerId, commitId, ImmutableMap.of( sourceTp0, Pair.of(null, null), - sourceTp1, Pair.of(100L, 200L))); + sourceTp1, Pair.of(100L, offsetDateTime(200L)))); assertThat(producer.consumerGroupOffsetsHistory()).hasSize(2); Map expectedConsumerOffset = diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java index d263206d..9cd34f1d 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java @@ -21,16 +21,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; -import io.tabular.iceberg.connect.events.CommitCompletePayload; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.CommitTablePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventTestUtil; -import io.tabular.iceberg.connect.events.EventType; -import io.tabular.iceberg.connect.events.TableName; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import io.tabular.iceberg.connect.fixtures.EventTestUtil; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,6 +40,16 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -65,7 +69,7 @@ public class CoordinatorTest extends ChannelTestBase { public void testCommitAppend() { Assertions.assertEquals(0, ImmutableList.copyOf(table.snapshots().iterator()).size()); - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); UUID commitId = coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), ts); table.refresh(); @@ -87,12 +91,13 @@ public void testCommitAppend() { Map summary = snapshot.summary(); Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); - Assertions.assertEquals(Long.toString(ts), summary.get(VTTS_SNAPSHOT_PROP)); + Assertions.assertEquals( + Long.toString(ts.toInstant().toEpochMilli()), summary.get(VTTS_SNAPSHOT_PROP)); } @Test public void testCommitDelta() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); UUID commitId = coordinatorTest( ImmutableList.of(EventTestUtil.createDataFile()), @@ -116,12 +121,13 @@ public void testCommitDelta() { Map summary = snapshot.summary(); Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); - Assertions.assertEquals(Long.toString(ts), summary.get(VTTS_SNAPSHOT_PROP)); + Assertions.assertEquals( + Long.toString(ts.toInstant().toEpochMilli()), summary.get(VTTS_SNAPSHOT_PROP)); } @Test public void testCommitNoFiles() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); UUID commitId = coordinatorTest(ImmutableList.of(), ImmutableList.of(), ts); assertThat(producer.history()).hasSize(2); @@ -146,7 +152,10 @@ public void testCommitError() { .withRecordCount(5) .build(); - coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), 0L); + coordinatorTest( + ImmutableList.of(badDataFile), + ImmutableList.of(), + OffsetDateTime.ofInstant(Instant.ofEpochMilli(0L), ZoneOffset.UTC)); // no commit messages sent assertThat(producer.history()).hasSize(1); @@ -159,7 +168,7 @@ public void testCommitError() { @Test public void testShouldDeduplicateDataFilesBeforeAppending() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); DataFile dataFile = EventTestUtil.createDataFile(); UUID commitId = @@ -168,11 +177,10 @@ public void testShouldDeduplicateDataFilesBeforeAppending() { Event commitResponse = new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( StructType.of(), currentCommitId, - new TableName(ImmutableList.of("db"), "tbl"), + new TableReference("catalog", ImmutableList.of("db"), "tbl"), ImmutableList.of(dataFile, dataFile), // duplicated data files ImmutableList.of())); @@ -181,8 +189,7 @@ public void testShouldDeduplicateDataFilesBeforeAppending() { commitResponse, // duplicate commit response new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( currentCommitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))))); }); @@ -201,7 +208,7 @@ public void testShouldDeduplicateDataFilesBeforeAppending() { @Test public void testShouldDeduplicateDeleteFilesBeforeAppending() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); DeleteFile deleteFile = EventTestUtil.createDeleteFile(); UUID commitId = @@ -210,11 +217,10 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() { Event duplicateCommitResponse = new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( StructType.of(), currentCommitId, - new TableName(ImmutableList.of("db"), "tbl"), + new TableReference("catalog", ImmutableList.of("db"), "tbl"), ImmutableList.of(), ImmutableList.of(deleteFile, deleteFile))); // duplicate delete files @@ -223,8 +229,7 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() { duplicateCommitResponse, // duplicate commit response new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( currentCommitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))))); }); @@ -302,9 +307,9 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { // retrieve commitId from commit request produced by coordinator final byte[] bytes = producer.history().get(0).value(); - final Event commitRequest = Event.decode(bytes); - assert commitRequest.type().equals(EventType.COMMIT_REQUEST); - final UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId(); + final Event commitRequest = AvroUtil.decode(bytes); + assert commitRequest.type().equals(PayloadType.START_COMMIT); + final UUID commitId = ((StartCommit) commitRequest.payload()).commitId(); // each worker sends its responses for the commit request Map workerIdToSpecMap = @@ -331,14 +336,13 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { 0, currentControlTopicOffset, "key", - Event.encode( + AvroUtil.encode( new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( spec.partitionType(), commitId, - TableName.of(TABLE_IDENTIFIER), + TableReference.of("catalog", TABLE_IDENTIFIER), ImmutableList.of(dataFile), ImmutableList.of()))))); currentControlTopicOffset += 1; @@ -349,14 +353,18 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { 0, currentControlTopicOffset, "key", - Event.encode( + AvroUtil.encode( new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( commitId, ImmutableList.of( - new TopicPartitionOffset(SRC_TOPIC_NAME, 0, 100L, 100L))))))); + new TopicPartitionOffset( + SRC_TOPIC_NAME, + 0, + 100L, + OffsetDateTime.ofInstant( + Instant.ofEpochMilli(100L), ZoneOffset.UTC)))))))); currentControlTopicOffset += 1; } @@ -399,45 +407,44 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { "Only the most recent snapshot should include vtts in it's summary"); } - private void assertCommitTable(int idx, UUID commitId, long ts) { + private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) { byte[] bytes = producer.history().get(idx).value(); - Event commitTable = Event.decode(bytes); - assertThat(commitTable.type()).isEqualTo(EventType.COMMIT_TABLE); - CommitTablePayload commitTablePayload = (CommitTablePayload) commitTable.payload(); + Event commitTable = AvroUtil.decode(bytes); + assertThat(commitTable.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE); + CommitToTable commitTablePayload = (CommitToTable) commitTable.payload(); assertThat(commitTablePayload.commitId()).isEqualTo(commitId); - assertThat(commitTablePayload.tableName().toIdentifier().toString()) + assertThat(commitTablePayload.tableReference().identifier().toString()) .isEqualTo(TABLE_IDENTIFIER.toString()); - assertThat(commitTablePayload.vtts()).isEqualTo(ts); + assertThat(commitTablePayload.validThroughTs()).isEqualTo(ts); } - private void assertCommitComplete(int idx, UUID commitId, long ts) { + private void assertCommitComplete(int idx, UUID commitId, OffsetDateTime ts) { byte[] bytes = producer.history().get(idx).value(); - Event commitComplete = Event.decode(bytes); - assertThat(commitComplete.type()).isEqualTo(EventType.COMMIT_COMPLETE); - CommitCompletePayload commitCompletePayload = (CommitCompletePayload) commitComplete.payload(); + Event commitComplete = AvroUtil.decode(bytes); + assertThat(commitComplete.type()).isEqualTo(PayloadType.COMMIT_COMPLETE); + CommitComplete commitCompletePayload = (CommitComplete) commitComplete.payload(); assertThat(commitCompletePayload.commitId()).isEqualTo(commitId); - assertThat(commitCompletePayload.vtts()).isEqualTo(ts); + assertThat(commitCompletePayload.validThroughTs()).isEqualTo(ts); } - private UUID coordinatorTest(List dataFiles, List deleteFiles, long ts) { + private UUID coordinatorTest( + List dataFiles, List deleteFiles, OffsetDateTime ts) { return coordinatorTest( currentCommitId -> { Event commitResponse = new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( StructType.of(), currentCommitId, - new TableName(ImmutableList.of("db"), "tbl"), + new TableReference("catalog", ImmutableList.of("db"), "tbl"), dataFiles, deleteFiles)); Event commitReady = new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( currentCommitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))); @@ -460,14 +467,14 @@ private UUID coordinatorTest(Function> eventsFn) { assertThat(producer.history()).hasSize(1); byte[] bytes = producer.history().get(0).value(); - Event commitRequest = Event.decode(bytes); - assertThat(commitRequest.type()).isEqualTo(EventType.COMMIT_REQUEST); + Event commitRequest = AvroUtil.decode(bytes); + assertThat(commitRequest.type()).isEqualTo(PayloadType.START_COMMIT); - UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId(); + UUID commitId = ((StartCommit) commitRequest.payload()).commitId(); int currentOffset = 1; for (Event event : eventsFn.apply(commitId)) { - bytes = Event.encode(event); + bytes = AvroUtil.encode(event); consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, currentOffset, "key", bytes)); currentOffset += 1; } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java index 24685b04..3b066ce8 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java @@ -18,12 +18,8 @@ */ package io.tabular.iceberg.connect.channel; -import static io.tabular.iceberg.connect.events.EventType.COMMIT_RESPONSE; import static org.assertj.core.api.Assertions.assertThat; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.TableName; import java.util.List; import java.util.Set; import java.util.UUID; @@ -35,6 +31,9 @@ import org.apache.iceberg.FileMetadata; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -54,7 +53,7 @@ class DeduplicatedTest { private static final UUID PAYLOAD_COMMIT_ID = UUID.fromString("4142add7-7c92-4bbe-b864-21ce8ac4bf53"); private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("db", "tbl"); - private static final TableName TABLE_NAME = TableName.of(TABLE_IDENTIFIER); + private static final TableReference TABLE_NAME = TableReference.of("catalog", TABLE_IDENTIFIER); private static final String GROUP_ID = "some-group"; private static final DataFile DATA_FILE_1 = createDataFile("1"); private static final DataFile DATA_FILE_2 = createDataFile("2"); @@ -124,8 +123,7 @@ private void assertWarnOrHigherLogsContainsEntryMatching(String expectedMessages private Event commitResponseEvent(List dataFiles, List deleteFiles) { return new Event( GROUP_ID, - COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( Types.StructType.of(), PAYLOAD_COMMIT_ID, TABLE_NAME, dataFiles, deleteFiles)); } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java new file mode 100644 index 00000000..3d1ecab9 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/EventDecoderTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.channel; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.tabular.iceberg.connect.data.SchemaUtils; +import io.tabular.iceberg.connect.events.CommitCompletePayload; +import io.tabular.iceberg.connect.events.CommitReadyPayload; +import io.tabular.iceberg.connect.events.CommitRequestPayload; +import io.tabular.iceberg.connect.events.CommitResponsePayload; +import io.tabular.iceberg.connect.events.CommitTablePayload; +import io.tabular.iceberg.connect.events.EventTestUtil; +import io.tabular.iceberg.connect.events.EventType; +import io.tabular.iceberg.connect.events.TableName; +import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import org.apache.avro.Schema; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +public class EventDecoderTest { + private final UUID commitId = UUID.fromString("e0bf874a-0f83-4242-97c9-f0a7b5cb3f45"); + private final String catalogName = "catalog"; + + private final EventDecoder eventDecoder = new EventDecoder(catalogName); + + @Test + public void testCommitRequestBecomesStartCommit() { + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", + io.tabular.iceberg.connect.events.EventType.COMMIT_REQUEST, + new CommitRequestPayload(commitId)); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + + assertThat(result.groupId()).isEqualTo("cg-connector"); + assertThat(result.type()).isEqualTo(PayloadType.START_COMMIT); + assertThat(result.payload()).isInstanceOf(StartCommit.class); + StartCommit payload = (StartCommit) result.payload(); + assertThat(payload.commitId()).isEqualTo(commitId); + } + + @Test + public void testCommitResponseBecomesDataWrittenUnpartitioned() { + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + Types.StructType.of(), + commitId, + new TableName(Collections.singletonList("db"), "tbl"), + Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), + Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + + assertThat(result.groupId()).isEqualTo("cg-connector"); + assertThat(result.type()).isEqualTo(PayloadType.DATA_WRITTEN); + assertThat(result.payload()).isInstanceOf(DataWritten.class); + DataWritten payload = (DataWritten) result.payload(); + assertThat(payload.commitId()).isEqualTo(commitId); + assertThat(payload.dataFiles().size()).isEqualTo(2); + assertThat(payload.deleteFiles().size()).isEqualTo(2); + assertThat(payload.tableReference().catalog()).isEqualTo("catalog"); + assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl")); + // should have an empty partition spec on the schema + Schema.Field field = + payload.getSchema().getFields().get(2).schema().getTypes().stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst() + .get() + .getElementType() + .getField("partition"); + assertThat(field.schema().getFields()).isEmpty(); + } + + @Test + public void testCommitResponseBecomesDataWrittenPartitioned() { + + org.apache.iceberg.Schema schemaSpec = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "i", Types.IntegerType.get()), + Types.NestedField.required(2, "s", Types.StringType.get()), + Types.NestedField.required(3, "ts1", Types.TimestampType.withZone()), + Types.NestedField.required(4, "ts2", Types.TimestampType.withZone()), + Types.NestedField.required(5, "ts3", Types.TimestampType.withZone()), + Types.NestedField.required(6, "ts4", Types.TimestampType.withZone())); + + List partitionFields = + ImmutableList.of( + "year(ts1)", + "month(ts2)", + "day(ts3)", + "hour(ts4)", + "bucket(i, 4)", + "truncate(s, 10)", + "s"); + PartitionSpec spec = SchemaUtils.createPartitionSpec(schemaSpec, partitionFields); + + Types.StructType structType = spec.partitionType(); + + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", + EventType.COMMIT_RESPONSE, + new CommitResponsePayload( + structType, + commitId, + new TableName(Collections.singletonList("db"), "tbl"), + Arrays.asList(EventTestUtil.createDataFile(), EventTestUtil.createDataFile()), + Arrays.asList(EventTestUtil.createDeleteFile(), EventTestUtil.createDeleteFile()))); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + + assertThat(event.groupId()).isEqualTo("cg-connector"); + assertThat(result.type()).isEqualTo(PayloadType.DATA_WRITTEN); + assertThat(result.payload()).isInstanceOf(DataWritten.class); + DataWritten payload = (DataWritten) result.payload(); + assertThat(payload.commitId()).isEqualTo(commitId); + assertThat(payload.dataFiles().size()).isEqualTo(2); + assertThat(payload.deleteFiles().size()).isEqualTo(2); + assertThat(payload.tableReference().catalog()).isEqualTo("catalog"); + assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl")); + + assertThat(payload.writeSchema()).isEqualTo( + Types.StructType.of( + Types.NestedField.required(10_300, "commit_id", Types.UUIDType.get()), + Types.NestedField.required( + 10_301, "table_reference", TableReference.ICEBERG_SCHEMA), + Types.NestedField.optional( + 10_302, + "data_files", + Types.ListType.ofRequired(10_303, DataFile.getType(spec.partitionType()))), + Types.NestedField.optional( + 10_304, + "delete_files", + Types.ListType.ofRequired(10_304, DataFile.getType(spec.partitionType()))))); + } + + @Test + public void testCommitReadyBecomesDataComplete() { + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", + EventType.COMMIT_READY, + new CommitReadyPayload( + commitId, + Arrays.asList( + new TopicPartitionOffset("topic", 1, 1L, 1L), + new TopicPartitionOffset("topic", 2, null, null)))); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + assertThat(event.groupId()).isEqualTo("cg-connector"); + + assertThat(result.type()).isEqualTo(PayloadType.DATA_COMPLETE); + assertThat(result.payload()).isInstanceOf(DataComplete.class); + DataComplete payload = (DataComplete) result.payload(); + + assertThat(payload.commitId()).isEqualTo(commitId); + assertThat(payload.assignments().get(0).topic()).isEqualTo("topic"); + assertThat(payload.assignments().get(0).partition()).isEqualTo(1); + assertThat(payload.assignments().get(0).offset()).isEqualTo(1L); + assertThat(payload.assignments().get(0).timestamp()) + .isEqualTo(OffsetDateTime.ofInstant(Instant.ofEpochMilli(1), ZoneOffset.UTC)); + + assertThat(payload.assignments().get(1).topic()).isEqualTo("topic"); + assertThat(payload.assignments().get(1).partition()).isEqualTo(2); + assertThat(payload.assignments().get(1).offset()).isNull(); + assertThat(payload.assignments().get(1).timestamp()).isNull(); + } + + @Test + public void testCommitTableBecomesCommitToTable() { + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", + EventType.COMMIT_TABLE, + new CommitTablePayload( + commitId, new TableName(Collections.singletonList("db"), "tbl"), 1L, 2L)); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + assertThat(event.groupId()).isEqualTo("cg-connector"); + assertThat(result.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE); + assertThat(result.payload()).isInstanceOf(CommitToTable.class); + CommitToTable payload = (CommitToTable) result.payload(); + + assertThat(payload.commitId()).isEqualTo(commitId); + assertThat(payload.snapshotId()).isEqualTo(1L); + assertThat(payload.validThroughTs()) + .isEqualTo(OffsetDateTime.ofInstant(Instant.ofEpochMilli(2L), ZoneOffset.UTC)); + assertThat(payload.tableReference().catalog()).isEqualTo(catalogName); + assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl")); + } + + @Test + public void testCommitCompleteBecomesCommitCompleteSerialization() { + io.tabular.iceberg.connect.events.Event event = + new io.tabular.iceberg.connect.events.Event( + "cg-connector", EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, 2L)); + + byte[] data = io.tabular.iceberg.connect.events.Event.encode(event); + + Event result = eventDecoder.decode(data); + assertThat(result.type()).isEqualTo(PayloadType.COMMIT_COMPLETE); + assertThat(result.payload()).isInstanceOf(CommitComplete.class); + CommitComplete payload = (CommitComplete) result.payload(); + assertThat(payload.commitId()).isEqualTo(commitId); + assertThat(payload.validThroughTs()) + .isEqualTo(OffsetDateTime.ofInstant(Instant.ofEpochMilli(2L), ZoneOffset.UTC)); + } +} diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java index 78a627fd..8ce6c0c8 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java @@ -46,6 +46,7 @@ public class WorkerTest { public void testStaticRoute() { IcebergSinkConfig config = mock(IcebergSinkConfig.class); when(config.tables()).thenReturn(ImmutableList.of(TABLE_NAME)); + when(config.catalogName()).thenReturn("catalog"); Map value = ImmutableMap.of(FIELD_NAME, "val"); workerTest(config, value); } @@ -55,6 +56,8 @@ public void testDynamicRoute() { IcebergSinkConfig config = mock(IcebergSinkConfig.class); when(config.dynamicTablesEnabled()).thenReturn(true); when(config.tablesRouteField()).thenReturn(FIELD_NAME); + when(config.catalogName()).thenReturn("catalog"); + Map value = ImmutableMap.of(FIELD_NAME, TABLE_NAME); workerTest(config, value); } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/fixtures/EventTestUtil.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/fixtures/EventTestUtil.java new file mode 100644 index 00000000..77a4a129 --- /dev/null +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/fixtures/EventTestUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.fixtures; + +import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionSpec; + +public class EventTestUtil { + public static DataFile createDataFile() { + return DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(UUID.randomUUID() + ".parquet") + .withFormat(FileFormat.PARQUET) + .withFileSizeInBytes(100L) + .withRecordCount(5) + .build(); + } + + public static DeleteFile createDeleteFile() { + return FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned()) + .ofEqualityDeletes(1) + .withPath(UUID.randomUUID() + ".parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + } + + private EventTestUtil() {} +}