Skip to content

Commit

Permalink
[bugfix] Fixed the 'java.lang.UnsupportedOperationException' exceptio…
Browse files Browse the repository at this point in the history
…n caused by getCurrentMetricsData (#2923)

Co-authored-by: bcxc <[email protected]>
Co-authored-by: tomsun28 <[email protected]>
  • Loading branch information
3 people authored Jan 1, 2025
1 parent 8c9b1f3 commit 6bf20ba
Showing 1 changed file with 41 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,67 @@

import io.lettuce.core.codec.RedisCodec;
import io.netty.buffer.Unpooled;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import lombok.extern.slf4j.Slf4j;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
import org.apache.arrow.vector.table.ArrowTable;
import org.apache.hertzbeat.common.entity.message.CollectRep;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

/**
* redis metrics data codec
*/
@Slf4j
public class RedisMetricsDataCodec implements RedisCodec<String, CollectRep.MetricsData> {

private final BufferAllocator allocator;

public RedisMetricsDataCodec() {
this.allocator = new RootAllocator();
}

@Override
public String decodeKey(ByteBuffer byteBuffer) {
return Unpooled.wrappedBuffer(byteBuffer).toString(StandardCharsets.UTF_8);
}

@Override
public CollectRep.MetricsData decodeValue(ByteBuffer byteBuffer) {
try (ByteArrayInputStream in = new ByteArrayInputStream(byteBuffer.array());
ArrowStreamReader reader = new ArrowStreamReader(Channels.newChannel(in), new RootAllocator())) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
reader.loadNextBatch();
return new CollectRep.MetricsData(new ArrowTable(root));
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize Arrow table", e);
if (byteBuffer == null || !byteBuffer.hasRemaining()) {
return null;
}
try {
byte[] bytes;
if (byteBuffer.hasArray()) {
bytes = Arrays.copyOfRange(byteBuffer.array(),
byteBuffer.position(), byteBuffer.limit());
} else {
bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
}
try (ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ArrowStreamReader reader = new ArrowStreamReader(
Channels.newChannel(in), allocator)) {
reader.loadNextBatch();
VectorSchemaRoot root = reader.getVectorSchemaRoot();
if (root == null || root.getRowCount() == 0) {
log.warn("Empty data received");
return null;
}
return new CollectRep.MetricsData(root);
}
} catch (Exception e) {
log.error("Failed to decode metrics data", e);
return null;
}
}

Expand Down

0 comments on commit 6bf20ba

Please sign in to comment.