Skip to content

Commit e56e14b

Browse files
authoredApr 17, 2025
HBASE-29193: Allow ZstdByteBuffDecompressor to take direct ByteBuffer as input and heap ByteBuffer as output, or vice versa (#6806)
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
1 parent 644cdbc commit e56e14b

File tree

3 files changed

+55
-46
lines changed

3 files changed

+55
-46
lines changed
 

‎hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java

+26-41
Original file line numberDiff line numberDiff line change
@@ -55,20 +55,8 @@ public class ZstdByteBuffDecompressor implements ByteBuffDecompressor, CanReinit
5555

5656
@Override
5757
public boolean canDecompress(ByteBuff output, ByteBuff input) {
58-
if (!allowByteBuffDecompression) {
59-
return false;
60-
}
61-
if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
62-
ByteBuffer nioOutput = output.nioByteBuffers()[0];
63-
ByteBuffer nioInput = input.nioByteBuffers()[0];
64-
if (nioOutput.isDirect() && nioInput.isDirect()) {
65-
return true;
66-
} else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
67-
return true;
68-
}
69-
}
70-
71-
return false;
58+
return allowByteBuffDecompression && output instanceof SingleByteBuff
59+
&& input instanceof SingleByteBuff;
7260
}
7361

7462
@Override
@@ -80,38 +68,35 @@ private int decompressRaw(ByteBuff output, ByteBuff input, int inputLen) throws
8068
if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
8169
ByteBuffer nioOutput = output.nioByteBuffers()[0];
8270
ByteBuffer nioInput = input.nioByteBuffers()[0];
71+
int origOutputPos = nioOutput.position();
72+
int n;
8373
if (nioOutput.isDirect() && nioInput.isDirect()) {
84-
return decompressDirectByteBuffers(nioOutput, nioInput, inputLen);
74+
n = ctx.decompressDirectByteBuffer(nioOutput, nioOutput.position(),
75+
nioOutput.limit() - nioOutput.position(), nioInput, nioInput.position(), inputLen);
8576
} else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
86-
return decompressHeapByteBuffers(nioOutput, nioInput, inputLen);
77+
n = ctx.decompressByteArray(nioOutput.array(),
78+
nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - nioOutput.position(),
79+
nioInput.array(), nioInput.arrayOffset() + nioInput.position(), inputLen);
80+
} else if (nioOutput.isDirect() && !nioInput.isDirect()) {
81+
n = ctx.decompressByteArrayToDirectByteBuffer(nioOutput, nioOutput.position(),
82+
nioOutput.limit() - nioOutput.position(), nioInput.array(),
83+
nioInput.arrayOffset() + nioInput.position(), inputLen);
84+
} else if (!nioOutput.isDirect() && nioInput.isDirect()) {
85+
n = ctx.decompressDirectByteBufferToByteArray(nioOutput.array(),
86+
nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - nioOutput.position(),
87+
nioInput, nioInput.position(), inputLen);
88+
} else {
89+
throw new IllegalStateException("Unreachable line");
8790
}
88-
}
89-
90-
throw new IllegalStateException("One buffer is direct and the other is not, "
91-
+ "or one or more not SingleByteBuffs. This is not supported");
92-
}
9391

94-
private int decompressDirectByteBuffers(ByteBuffer output, ByteBuffer input, int inputLen) {
95-
int origOutputPos = output.position();
92+
nioOutput.position(origOutputPos + n);
93+
nioInput.position(input.position() + inputLen);
9694

97-
int n = ctx.decompressDirectByteBuffer(output, output.position(),
98-
output.limit() - output.position(), input, input.position(), inputLen);
99-
100-
output.position(origOutputPos + n);
101-
input.position(input.position() + inputLen);
102-
return n;
103-
}
104-
105-
private int decompressHeapByteBuffers(ByteBuffer output, ByteBuffer input, int inputLen) {
106-
int origOutputPos = output.position();
107-
108-
int n = ctx.decompressByteArray(output.array(), output.arrayOffset() + output.position(),
109-
output.limit() - output.position(), input.array(), input.arrayOffset() + input.position(),
110-
inputLen);
111-
112-
output.position(origOutputPos + n);
113-
input.position(input.position() + inputLen);
114-
return n;
95+
return n;
96+
} else {
97+
throw new IllegalStateException(
98+
"At least one buffer is not a SingleByteBuff, this is not supported");
99+
}
115100
}
116101

117102
@Override

‎hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java

+28-4
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public void testCapabilities() {
6262
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
6363
assertTrue(decompressor.canDecompress(emptySingleHeapBuff, emptySingleHeapBuff));
6464
assertTrue(decompressor.canDecompress(emptySingleDirectBuff, emptySingleDirectBuff));
65-
assertFalse(decompressor.canDecompress(emptySingleHeapBuff, emptySingleDirectBuff));
66-
assertFalse(decompressor.canDecompress(emptySingleDirectBuff, emptySingleHeapBuff));
65+
assertTrue(decompressor.canDecompress(emptySingleHeapBuff, emptySingleDirectBuff));
66+
assertTrue(decompressor.canDecompress(emptySingleDirectBuff, emptySingleHeapBuff));
6767
assertFalse(decompressor.canDecompress(emptyMultiHeapBuff, emptyMultiHeapBuff));
6868
assertFalse(decompressor.canDecompress(emptyMultiDirectBuff, emptyMultiDirectBuff));
6969
assertFalse(decompressor.canDecompress(emptySingleHeapBuff, emptyMultiHeapBuff));
@@ -72,7 +72,7 @@ public void testCapabilities() {
7272
}
7373

7474
@Test
75-
public void testDecompressHeap() throws IOException {
75+
public void testDecompressHeapToHeap() throws IOException {
7676
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
7777
ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
7878
ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
@@ -83,7 +83,7 @@ public void testDecompressHeap() throws IOException {
8383
}
8484

8585
@Test
86-
public void testDecompressDirect() throws IOException {
86+
public void testDecompressDirectToDirect() throws IOException {
8787
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
8888
ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
8989
ByteBuff input = new SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
@@ -95,4 +95,28 @@ public void testDecompressDirect() throws IOException {
9595
}
9696
}
9797

98+
@Test
99+
public void testDecompressDirectToHeap() throws IOException {
100+
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
101+
ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
102+
ByteBuff input = new SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
103+
input.put(COMPRESSED_PAYLOAD);
104+
input.rewind();
105+
int decompressedSize = decompressor.decompress(output, input, COMPRESSED_PAYLOAD.length);
106+
assertEquals("HBase is fun to use and very fast",
107+
Bytes.toString(output.toBytes(0, decompressedSize)));
108+
}
109+
}
110+
111+
@Test
112+
public void testDecompressHeapToDirect() throws IOException {
113+
try (ZstdByteBuffDecompressor decompressor = new ZstdByteBuffDecompressor(null)) {
114+
ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
115+
ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
116+
int decompressedSize = decompressor.decompress(output, input, COMPRESSED_PAYLOAD.length);
117+
assertEquals("HBase is fun to use and very fast",
118+
Bytes.toString(output.toBytes(0, decompressedSize)));
119+
}
120+
}
121+
98122
}

‎pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -931,7 +931,7 @@
931931
<brotli4j.version>1.11.0</brotli4j.version>
932932
<lz4.version>1.8.0</lz4.version>
933933
<snappy.version>1.1.10.4</snappy.version>
934-
<zstd-jni.version>1.5.5-2</zstd-jni.version>
934+
<zstd-jni.version>1.5.7-2</zstd-jni.version>
935935
<!--
936936
Note that the version of protobuf shipped in hbase-thirdparty must match the version used
937937
in hbase-protocol-shaded and hbase-examples. The version of jackson-[annotations,core,

0 commit comments

Comments
 (0)