Skip to content

Commit

Permalink
Added serde customization support in Fast-Avro
Browse files Browse the repository at this point in the history
We have the following requirements:
For serialization, we would like to validate whether all the map fields are using
the desired map type.
For deserialization, we would like to deserialize the map type into
a special map impelementation for later use.

These customized requirements are not supported in the past because of
the following reasons:
1. Fast classes generated are shared, so it is possible different
   users of the same schema may have different requirement.
2. For the same process, for different schema, the requirements
   can be different too.
3. No way to specify customized logic/data type when generating
   fast classes.

This PR adds a new functionality to specify customized logic and it
is expandable and backward compatible.
DatumReaderCustomization : customization for read
DatumWriterCustomization : customization for write

Currently, these classes only support the requirements mentioned
at the beginning.

How it works internally?
1. Each Fast DatumReader/DatumWriter constructor will take a new param for customization.
2. Each Fast DatumReader/DatumWriter will keep a local vanilla-Avro based implementation
   with customization support since the shared vanilla-Avro based implementation is still
   the default implementation.
3. Each generated Fast class will have a new param for customization in serialize/deserialize APIs.
4. Fast DatumReader/DatumWriter will call this new API with customization param of Fast classes.
5. The read/write API in Fast DatumReader/DatumWriter doesn't change, so it is backward compatible.
  • Loading branch information
gaojieliu committed Oct 19, 2023
1 parent ab40879 commit 856fc4c
Show file tree
Hide file tree
Showing 23 changed files with 862 additions and 262 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.generated.avro.TestEnum;
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -81,7 +87,7 @@ public void shouldNotCreateSpecificDatumReader() throws IOException, Interrupted
(FastDeserializer<TestRecord>) cache.getFastSpecificDeserializer(TestRecord.SCHEMA$, faultySchema);

Assert.assertNotNull(fastSpecificDeserializer);
Assert.assertEquals(fastSpecificDeserializer.getClass().getDeclaredMethods().length, 1);
Assert.assertEquals(fastSpecificDeserializer.getClass(), FastSerdeUtils.FastDeserializerWithAvroSpecificImplWithDifferentName.class);
}

@Test(groups = {"deserializationTest"})
Expand Down Expand Up @@ -134,4 +140,60 @@ public void shouldCreateGenericDatumReader() throws IOException {
Assert.assertTrue(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader should be using"
+ " Fast Deserializer when the fast deserializer generation is done.");
}

@Test(groups = {"deserializationTest"})
public void testDatumReaderWithCustomization() throws IOException, ExecutionException, InterruptedException {
Schema recordSchema = createRecord("TestSchema",
createField("testInt", Schema.create(Schema.Type.INT)),
createMapFieldSchema("testMap", Schema.create(Schema.Type.STRING)));
/**
* Test with special map type: {@link java.util.concurrent.ConcurrentHashMap}.
*/
DatumReaderCustomization customization = new DatumReaderCustomization.Builder()
.setNewMapOverrideFunc( (reuse, size) -> {
if (reuse instanceof ConcurrentHashMap) {
((ConcurrentHashMap)reuse).clear();
return reuse;
} else {
return new ConcurrentHashMap<>(size);
}
})
.build();
// Check cold datum Reader
GenericRecord record = new GenericData.Record(recordSchema);
record.put("testInt", new Integer(100));
Map<Utf8, Utf8> testMap = new HashMap<>();
testMap.put(new Utf8("key1"), new Utf8("value1"));
testMap.put(new Utf8("key2"), new Utf8("value2"));
record.put("testMap", testMap);
FastGenericDatumReader<GenericRecord> fastGenericDatumReader = new FastGenericDatumReader<>(recordSchema, recordSchema, cache, null, customization);
GenericRecord deserializedRecordByColdDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByColdDatumReader.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByColdDatumReader.get("testMap"), testMap);
Assert.assertTrue(deserializedRecordByColdDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Block the fast deserializer generation
fastGenericDatumReader.getFastDeserializer().get();
// Decode the record by fast datum reader
GenericRecord deserializedRecordByFastDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByFastDatumReader.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByFastDatumReader.get("testMap"), testMap);
Assert.assertTrue(deserializedRecordByFastDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Test with an empty map
GenericRecord recordWithEmptyMap = new GenericData.Record(recordSchema);
recordWithEmptyMap.put("testInt", new Integer(200));
recordWithEmptyMap.put("testMap", Collections.emptyMap());
GenericRecord deserializedRecordWithEmptyMapByFastDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(recordWithEmptyMap));
Assert.assertEquals(deserializedRecordWithEmptyMapByFastDatumReader.get("testInt"), new Integer(200));
Assert.assertEquals(deserializedRecordWithEmptyMapByFastDatumReader.get("testMap"), Collections.emptyMap());
Assert.assertTrue(deserializedRecordWithEmptyMapByFastDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Generate a new fast datum reader with the same schema, but without customization
FastGenericDatumReader<GenericRecord> fastGenericDatumReaderWithoutCustomization = new FastGenericDatumReader<>(recordSchema, cache);
GenericRecord deserializedRecordByFastDatumReaderWithoutCustomization = fastGenericDatumReaderWithoutCustomization.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByFastDatumReaderWithoutCustomization.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByFastDatumReaderWithoutCustomization.get("testMap"), testMap);
Assert.assertFalse(deserializedRecordByFastDatumReaderWithoutCustomization.get("testMap") instanceof ConcurrentHashMap);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.customized.DatumWriterCustomization;
import com.linkedin.avro.fastserde.generated.avro.TestEnum;
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -80,4 +85,40 @@ public void shouldCreateGenericDatumWriter() throws IOException {
Assert.assertTrue(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter should be using"
+ " Fast Serializer when the fast deserializer generation is done.");
}

@Test(groups = {"serializationTest"})
@SuppressWarnings("unchecked")
public void writeWithCustomizationCheck() throws IOException {
Schema recordSchema = createRecord("TestSchema",
createField("testInt", Schema.create(Schema.Type.INT)),
createMapFieldSchema("testMap", Schema.create(Schema.Type.STRING)));
/**
* Check whether the map type is a {@link java.util.LinkedHashMap} or not.
*/
DatumWriterCustomization customization = new DatumWriterCustomization.Builder()
.setCheckMapTypeFunction( o -> {
if (o == null) {
return;
}
if (! (o instanceof LinkedHashMap)) {
throw new IllegalArgumentException("The map type should be 'LinkedHashMap'");
}
}).build();
// Check cold datum Writer
GenericRecord record = new GenericData.Record(recordSchema);
record.put("testInt", new Integer(100));
Map<Utf8, Utf8> testMap = new HashMap<>();
testMap.put(new Utf8("key1"), new Utf8("value1"));
testMap.put(new Utf8("key2"), new Utf8("value2"));
record.put("testMap", testMap);
FastGenericDatumWriter<GenericRecord> fastGenericDatumWriterWithoutCustomization = new FastGenericDatumWriter<>(recordSchema, null, cache, null);
// No exception
fastGenericDatumWriterWithoutCustomization.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null));

FastGenericDatumWriter<GenericRecord> fastGenericDatumWriterWithCustomization = new FastGenericDatumWriter<>(recordSchema, null, cache, customization);
Assert.expectThrows(IllegalArgumentException.class,
() -> fastGenericDatumWriterWithCustomization.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null)));


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ record = new GenericData.Record(recordWithUnionMapOfUnionValuesSchema);

private static <T> T decodeRecordColdFast(Schema writerSchema, Schema readerSchema, Decoder decoder) {
FastDeserializer<T> deserializer =
new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, GenericData.get());
new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, GenericData.get());

return decodeRecordFast(deserializer, decoder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.avro.fastserde.FastSerdeTestsSupport.getCodeGenDirectory;

import com.linkedin.avro.fastserde.FastSerdeUtils;
import java.io.File;
import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -136,11 +137,11 @@ protected <T extends SpecificRecordBase> byte[] verifySerializers(T data,
schema, classesDir, classLoader, null, specificData)
.generateSerializer();

FastSerdeCache.FastSerializerWithAvroGenericImpl<T> fastSerializerWithAvroGeneric =
new FastSerdeCache.FastSerializerWithAvroGenericImpl<>(schema, genericData);
FastSerdeUtils.FastSerializerWithAvroGenericImpl<T> fastSerializerWithAvroGeneric =
new FastSerdeUtils.FastSerializerWithAvroGenericImpl<>(schema, genericData, null);

FastSerdeCache.FastSerializerWithAvroSpecificImpl<T> fastSerializerWithAvroSpecific =
new FastSerdeCache.FastSerializerWithAvroSpecificImpl<>(schema, specificData);
FastSerdeUtils.FastSerializerWithAvroSpecificImpl<T> fastSerializerWithAvroSpecific =
new FastSerdeUtils.FastSerializerWithAvroSpecificImpl<>(schema, specificData, null);

GenericDatumWriter<T> genericDatumWriter = new GenericDatumWriter<>(
schema, genericData);
Expand Down Expand Up @@ -188,11 +189,11 @@ protected <T extends SpecificRecordBase> T verifyDeserializers(byte[] bytesWithH
schema, schema, classesDir, classLoader, null, specificData)
.generateDeserializer();

FastSerdeCache.FastDeserializerWithAvroGenericImpl<GenericData.Record> fastDeserializerWithAvroGeneric =
new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(schema, schema, genericData);
FastSerdeUtils.FastDeserializerWithAvroGenericImpl<GenericData.Record> fastDeserializerWithAvroGeneric =
new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(schema, schema, genericData);

FastSerdeCache.FastDeserializerWithAvroSpecificImpl<T> fastDeserializerWithAvroSpecific =
new FastSerdeCache.FastDeserializerWithAvroSpecificImpl<>(schema, schema, specificData);
FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<T> fastDeserializerWithAvroSpecific =
new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(schema, schema, specificData);

GenericDatumReader<GenericData.Record> genericDatumReader = new GenericDatumReader<>(schema, schema, genericData);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import java.io.IOException;
import org.apache.avro.io.Decoder;

Expand All @@ -10,5 +11,9 @@ default T deserialize(Decoder d) throws IOException {
return deserialize(null, d);
}

T deserialize(T reuse, Decoder d) throws IOException;
default T deserialize(T reuse, Decoder d) throws IOException {
return deserialize(reuse, d, null);
}

T deserialize(T reuse, Decoder d, DatumReaderCustomization customization) throws IOException;
}
Loading

0 comments on commit 856fc4c

Please sign in to comment.