diff --git a/java/core/pom.xml b/java/core/pom.xml index 99364d09ab..da27896d83 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -52,6 +52,10 @@ io.airlift aircompressor + + at.yawk.lz4 + lz4-java + com.github.luben zstd-jni diff --git a/java/core/src/java/org/apache/orc/impl/Lz4Codec.java b/java/core/src/java/org/apache/orc/impl/Lz4Codec.java new file mode 100644 index 0000000000..cc9f89cf07 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/Lz4Codec.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4SafeDecompressor; +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class Lz4Codec implements CompressionCodec, DirectDecompressionCodec { + private static final LZ4Factory lz4Factory = LZ4Factory.fastestInstance(); + private static final ThreadLocal threadBuffer = ThreadLocal.withInitial(() -> null); + + public Lz4Codec() {} + + protected static byte[] getBuffer(int size) { + byte[] result = threadBuffer.get(); + if (result == null || result.length < size || result.length > size * 2) { + result = new byte[size]; + threadBuffer.set(result); + } + return result; + } + + @Override + public Options getDefaultOptions() { + return CompressionCodec.NullOptions.INSTANCE; + } + + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, + ByteBuffer overflow, + Options options) throws IOException { + int inBytes = in.remaining(); + // Skip with minimum size check similar to ZstdCodec + if (inBytes < 10) return false; + + LZ4Compressor compressor = lz4Factory.fastCompressor(); + int maxOutputLength = compressor.maxCompressedLength(inBytes); + byte[] compressed = getBuffer(maxOutputLength); + + int outBytes = compressor.compress(in.array(), in.arrayOffset() + in.position(), inBytes, + compressed, 0, maxOutputLength); + + if (outBytes < inBytes) { + int remaining = out.remaining(); + if (remaining >= outBytes) { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), outBytes); + out.position(out.position() + outBytes); + } else { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), remaining); + out.position(out.limit()); + System.arraycopy(compressed, remaining, overflow.array(), + overflow.arrayOffset(), outBytes - remaining); + overflow.position(outBytes - remaining); + } + return true; + } else { + return false; + } + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + if (in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } + + int srcOffset = in.arrayOffset() + in.position(); + int srcSize = in.remaining(); + int dstOffset = out.arrayOffset() + out.position(); + int dstSize = out.remaining(); + + LZ4SafeDecompressor decompressor = lz4Factory.safeDecompressor(); + int decompressedBytes = decompressor.decompress(in.array(), srcOffset, srcSize, out.array(), + dstOffset, dstSize); + + in.position(in.limit()); + out.position(dstOffset + decompressedBytes); + out.flip(); + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException { + LZ4SafeDecompressor decompressor = lz4Factory.safeDecompressor(); + decompressor.decompress(in, out); + out.flip(); + } + + @Override + public void reset() { + } + + @Override + public void destroy() { + } + + @Override + public CompressionKind getKind() { + return CompressionKind.LZ4; + } + + @Override + public void close() { + OrcCodecPool.returnCodec(CompressionKind.LZ4, this); + } +} diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index f6b08cde90..aa9d1ffa45 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -300,8 +300,12 @@ public static CompressionCodec createCodec(CompressionKind kind) { return new AircompressorCodec(kind, new LzoCompressor(), new LzoDecompressor()); case LZ4: - return new AircompressorCodec(kind, new Lz4Compressor(), - new Lz4Decompressor()); + if ("aircompressor".equalsIgnoreCase(System.getProperty("orc.compress.lz4.impl"))) { + return new AircompressorCodec(kind, new Lz4Compressor(), + new Lz4Decompressor()); + } else { + return new Lz4Codec(); + } case ZSTD: if ("java".equalsIgnoreCase(System.getProperty("orc.compression.zstd.impl"))) { return new AircompressorCodec(kind, new ZstdCompressor(), diff --git a/java/core/src/test/org/apache/orc/impl/TestLz4.java b/java/core/src/test/org/apache/orc/impl/TestLz4.java new file mode 100644 index 0000000000..dae862eb17 --- /dev/null +++ b/java/core/src/test/org/apache/orc/impl/TestLz4.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import net.jpountz.lz4.LZ4Exception; +import org.apache.orc.CompressionCodec; +import org.junit.jupiter.api.Test; + +import java.nio.ByteBuffer; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +public class TestLz4 { + + @Test + public void testNoOverflow() throws Exception { + ByteBuffer in = ByteBuffer.allocate(10); + ByteBuffer out = ByteBuffer.allocate(10); + in.put(new byte[]{1, 2, 3, 4, 5, 6, 7, 10}); + in.flip(); + CompressionCodec codec = new Lz4Codec(); + assertFalse(codec.compress(in, out, null, + codec.getDefaultOptions())); + } + + @Test + public void testCorrupt() throws Exception { + ByteBuffer buf = ByteBuffer.allocate(1000); + buf.put(new byte[] {127, 125, 1, 99, 98, 1}); + buf.flip(); + CompressionCodec codec = new Lz4Codec(); + ByteBuffer out = ByteBuffer.allocate(1000); + try { + codec.decompress(buf, out); + fail(); + } catch (LZ4Exception ioe) { + // EXPECTED + } + } + + @Test + public void testLz4CompressDecompress() throws Exception { + int inputSize = 10000; + CompressionCodec codec = new Lz4Codec(); + + ByteBuffer in = ByteBuffer.allocate(inputSize); + ByteBuffer out = ByteBuffer.allocate(inputSize); + ByteBuffer compressed = ByteBuffer.allocate(inputSize * 2); // Ample space for compressed data + ByteBuffer decompressed = ByteBuffer.allocate(inputSize); + + for (int i = 0; i < inputSize; i++) { + in.put((byte) i); + } + in.flip(); + + // Compress + assertTrue(codec.compress(in, compressed, null, codec.getDefaultOptions())); + compressed.flip(); + + // Decompress + codec.decompress(compressed, decompressed); + + assertArrayEquals(in.array(), decompressed.array()); + } + + @Test + public void testLz4DirectDecompress() { + ByteBuffer in = ByteBuffer.allocate(10000); + ByteBuffer out = ByteBuffer.allocate(10000); // Heap buffer for initial compression + ByteBuffer directOut = ByteBuffer.allocateDirect(10000); + ByteBuffer directResult = ByteBuffer.allocateDirect(10000); + for (int i = 0; i < 10000; i++) { + in.put((byte) i); + } + in.flip(); + try (Lz4Codec codec = new Lz4Codec()) { + assertTrue(codec.compress(in, out, null, codec.getDefaultOptions())); + out.flip(); + directOut.put(out); + directOut.flip(); + + codec.decompress(directOut, directResult); + + // copy result from direct buffer to heap. + byte[] heapBytes = new byte[in.array().length]; + directResult.get(heapBytes, 0, directResult.limit()); + + assertArrayEquals(in.array(), heapBytes); + } catch (Exception e) { + fail(e); + } + } +} diff --git a/java/pom.xml b/java/pom.xml index 220725cb8f..d266c496df 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -166,6 +166,11 @@ aircompressor 2.0.2 + + at.yawk.lz4 + lz4-java + 1.10.3 + com.github.luben zstd-jni