diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/serialize/RedisMetricsDataCodec.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/serialize/RedisMetricsDataCodec.java index bf79bf0e578..37c5ff3798c 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/serialize/RedisMetricsDataCodec.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/serialize/RedisMetricsDataCodec.java @@ -19,26 +19,34 @@ import io.lettuce.core.codec.RedisCodec; import io.netty.buffer.Unpooled; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.charset.StandardCharsets; import lombok.extern.slf4j.Slf4j; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.ipc.ArrowStreamWriter; -import org.apache.arrow.vector.table.ArrowTable; import org.apache.hertzbeat.common.entity.message.CollectRep; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + /** * redis metrics data codec */ @Slf4j public class RedisMetricsDataCodec implements RedisCodec { + private final BufferAllocator allocator; + + public RedisMetricsDataCodec() { + this.allocator = new RootAllocator(); + } + @Override public String decodeKey(ByteBuffer byteBuffer) { return Unpooled.wrappedBuffer(byteBuffer).toString(StandardCharsets.UTF_8); @@ -46,13 +54,32 @@ public String decodeKey(ByteBuffer byteBuffer) { @Override public CollectRep.MetricsData decodeValue(ByteBuffer byteBuffer) { - try (ByteArrayInputStream in = new ByteArrayInputStream(byteBuffer.array()); - ArrowStreamReader reader = new ArrowStreamReader(Channels.newChannel(in), new RootAllocator())) { - VectorSchemaRoot root = reader.getVectorSchemaRoot(); - reader.loadNextBatch(); - return new CollectRep.MetricsData(new ArrowTable(root)); - } catch (IOException e) { - throw new RuntimeException("Failed to deserialize Arrow table", e); + if (byteBuffer == null || !byteBuffer.hasRemaining()) { + return null; + } + try { + byte[] bytes; + if (byteBuffer.hasArray()) { + bytes = Arrays.copyOfRange(byteBuffer.array(), + byteBuffer.position(), byteBuffer.limit()); + } else { + bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + } + try (ByteArrayInputStream in = new ByteArrayInputStream(bytes); + ArrowStreamReader reader = new ArrowStreamReader( + Channels.newChannel(in), allocator)) { + reader.loadNextBatch(); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + if (root == null || root.getRowCount() == 0) { + log.warn("Empty data received"); + return null; + } + return new CollectRep.MetricsData(root); + } + } catch (Exception e) { + log.error("Failed to decode metrics data", e); + return null; } }