From 3550fdf0d418f83a87627cc9d7086047bc1437e4 Mon Sep 17 00:00:00 2001 From: yunmiao Date: Fri, 26 Jul 2019 14:26:14 +0800 Subject: [PATCH] modify: replace key&value deserializer to CompatAvroDeserializer --- .../serializers/CompatAvroDeserializer.java | 39 +++++++++++++++++++ .../kafkarest/v2/KafkaConsumerManager.java | 6 ++- 2 files changed, 43 insertions(+), 2 deletions(-) create mode 100644 kafka-rest/src/main/java/io/confluent/kafkarest/serializers/CompatAvroDeserializer.java diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/serializers/CompatAvroDeserializer.java b/kafka-rest/src/main/java/io/confluent/kafkarest/serializers/CompatAvroDeserializer.java new file mode 100644 index 0000000000..07d73bce0d --- /dev/null +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/serializers/CompatAvroDeserializer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.kafkarest.serializers; + +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import java.nio.ByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CompatAvroDeserializer extends KafkaAvroDeserializer { + private static final Logger log = LoggerFactory.getLogger(CompatAvroDeserializer.class); + + @Override + public Object deserialize(String s, byte[] bytes) { + try { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.get(); + return super.deserialize(s, bytes); + } catch (Exception e) { + log.error("Failed to deserialize {}" + s, e.toString()); + return e.toString(); + } + + } + +} diff --git a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java index 5289534b83..f707a7b4e4 100644 --- a/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java +++ b/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerManager.java @@ -221,8 +221,10 @@ public String createConsumer(String group, ConsumerInstanceConfig instanceConfig switch (instanceConfig.getFormat()) { case AVRO: - props.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); - props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); + props.put("key.deserializer", + "io.confluent.kafkarest.serializers.CompatAvroDeserializer"); + props.put("value.deserializer", + "io.confluent.kafkarest.serializers.CompatAvroDeserializer"); break; case JSON: case BINARY: