Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AvroConverter order of fields declared in input schema affects output #3377

Open
baumac opened this issue Oct 23, 2024 · 0 comments
Open

AvroConverter order of fields declared in input schema affects output #3377

baumac opened this issue Oct 23, 2024 · 0 comments

Comments

@baumac
Copy link

baumac commented Oct 23, 2024

v7.7.2 of the AvroConverter returns different serialization output depending on the orders that fields are declared in the input schema. As a result, when serializing data using a schema from the schema registry different serialized output is produced for the same input value when using two syntactically equivalent schemas.

To better illustrate the issue, I've created PR #3375 with a test case that highlights this behavior.

Expected Behavior

When serializing data using a schema from the schema registry, the order that fields are declared in the input schema should not affect the output from AvroConverter.fromConnectData(...).

Current Behavior

When serializing data using a schema from the schema registry, the order that fields are declared in the input schema affects the output from AvroConverter.fromConnectData(...).

Steps to Reproduce

This PR #3375 contains the below test case which highlights the issue.

  @Test
  public void testSingleFieldSerialization() throws RestClientException, IOException {
    SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
    AvroConverter avroConverter = new AvroConverter(schemaRegistry);
    Map<String, ?> converterConfig = ImmutableMap.of(
            AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost",
            AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false,
            AbstractKafkaSchemaSerDeConfig.LATEST_COMPATIBILITY_STRICT, false,
            AbstractKafkaSchemaSerDeConfig.NORMALIZE_SCHEMAS, true,
            AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true,
            AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicNameStrategy.class.getName());
    avroConverter.configure(converterConfig, false);

    org.apache.avro.Schema registredSchema = org.apache.avro.SchemaBuilder
            .record("MySchema")
            .fields()
            .requiredString("id")
            .endRecord();

    schemaRegistry.register("topic-value", new AvroSchema(registredSchema));

    Schema inputSchema1 = SchemaBuilder.struct()
            .field("foo", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
            .field("id", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
            .build();

    Struct inputValue1 = new Struct(inputSchema1)
            .put("foo", "123")
            .put("id", "456");

    Schema inputSchema2 = SchemaBuilder.struct()
            .field("id", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
            .field("foo", org.apache.kafka.connect.data.Schema.STRING_SCHEMA)
            .build();

    Struct inputValue2 = new Struct(inputSchema2)
            .put("id", "456")
            .put("foo", "123");

    final byte[] bytes1 = avroConverter.fromConnectData("topic", inputSchema1, inputValue1);
    final SchemaAndValue schemaAndValue1 = avroConverter.toConnectData("topic", bytes1);

    final byte[] bytes2 = avroConverter.fromConnectData("topic", inputSchema2, inputValue2);
    final SchemaAndValue schemaAndValue2 = avroConverter.toConnectData("topic", bytes2);


    assertEquals(schemaAndValue1.value(), schemaAndValue2.value());
  }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant