diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a1d2d3070f61..d17ad479bfab 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -440,6 +440,7 @@ This deep storage is used to interface with HDFS. You must load the `druid-hdfs- |Property|Description|Default| |--------|-----------|-------| |`druid.storage.storageDirectory`|HDFS directory to use as deep storage.|none| +|`druid.storage.compressionFormat`|The compression format applied to the segments uploaded to HDFS. Only `zip` and `lz4` are supported. |zip| #### Cassandra deep storage diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index aa1c25a693ad..fc0126db000d 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -616,3 +616,14 @@ These metrics are available on operating systems with the cgroup kernel feature. |`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to the process. Derived from `cpuset.effective_cpus`.||Varies| |`cgroup/cpuset/mems_count`|Total number of memory nodes available to the process. Derived from `cpuset.mems`.||Varies| |`cgroup/cpuset/effective_mems_count`|Total number of active memory nodes available to the process. Derived from `cpuset.effective_mems`.||Varies| + +## HDFS + +These metrics are available when the `druid-hdfs-storage` extension is used to push/pull segment files. + +| Metric | Description | Dimensions | Normal value | +|----------------------|-------------------------------------------------------------------------------|---------------------------------------------|--------------| +| `hdfs/pull/size` | Total bytes of decompressed segment files pulled from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies | +| `hdfs/pull/duration` | Time in milliseconds spent decompressing and pulling segment files from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies | +| `hdfs/push/size` | Total bytes compressed segment files pushed to HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies | +| `hdfs/push/duration` | Time in milliseconds spent compressing and pushing segment files to HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies | diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java index 2c6de8e28bbf..34a358bdd902 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.io.NativeIO; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.URIDataPuller; import org.apache.druid.utils.CompressionUtils; @@ -40,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import javax.annotation.Nullable; import javax.tools.FileObject; import java.io.File; import java.io.IOException; @@ -177,11 +180,21 @@ public boolean delete() private static final Logger log = new Logger(HdfsDataSegmentPuller.class); protected final Configuration config; + private final ServiceEmitter emitter; - @Inject public HdfsDataSegmentPuller(@Hdfs final Configuration config) + { + this(config, null); + } + + @Inject + public HdfsDataSegmentPuller( + @Hdfs final Configuration config, + @Nullable final ServiceEmitter emitter + ) { this.config = config; + this.emitter = emitter; } FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException @@ -235,27 +248,16 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) thr catch (Exception e) { throw new RuntimeException(e); } - } else if (CompressionUtils.isZip(path.getName())) { + } - // -------- zip --------- + // Try to detect format from file extension and decompress + final CompressionUtils.Format format = CompressionUtils.Format.fromFileName(path.getName()); + if ((format == CompressionUtils.Format.ZIP || format == CompressionUtils.Format.LZ4)) { + long startTime = System.currentTimeMillis(); - final FileUtils.FileCopyResult result = CompressionUtils.unzip( - new ByteSource() - { - @Override - public InputStream openStream() throws IOException - { - return getInputStream(path); - } - }, outDir, shouldRetryPredicate(), false - ); + final FileUtils.FileCopyResult result = format.decompressDirectory(getInputStream(path), outDir); - log.info( - "Unzipped %d bytes from [%s] to [%s]", - result.size(), - path.toString(), - outDir.getAbsolutePath() - ); + emitMetrics(format, result.size(), System.currentTimeMillis() - startTime); return result; } else if (CompressionUtils.isGz(path.getName())) { @@ -292,6 +294,17 @@ public InputStream openStream() throws IOException } } + private void emitMetrics(CompressionUtils.Format format, long size, long duration) + { + if (emitter == null) { + return; + } + ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); + metricBuilder.setDimension("format", format); + emitter.emit(metricBuilder.setMetric("hdfs/pull/size", size)); + emitter.emit(metricBuilder.setMetric("hdfs/pull/duration", duration)); + } + public InputStream getInputStream(Path path) throws IOException { return buildFileObject(path.toUri(), config).openInputStream(); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java index 6bcf96d6819b..bcbff961e521 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -19,7 +19,6 @@ package org.apache.druid.storage.hdfs; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -32,10 +31,11 @@ import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.joda.time.format.ISODateTimeFormat; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.net.URI; @@ -55,20 +56,33 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher { private static final Logger log = new Logger(HdfsDataSegmentPusher.class); + private final HdfsDataSegmentPusherConfig config; private final Configuration hadoopConfig; + @Nullable + private final ServiceEmitter emitter; // We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. // Please see https://github.com/apache/druid/pull/5684 private final Supplier fullyQualifiedStorageDirectory; + public HdfsDataSegmentPusher( + HdfsDataSegmentPusherConfig config, + @Hdfs Configuration hadoopConfig + ) + { + this(config, hadoopConfig, null); + } + @Inject public HdfsDataSegmentPusher( HdfsDataSegmentPusherConfig config, @Hdfs Configuration hadoopConfig, - ObjectMapper jsonMapper + ServiceEmitter emitter ) { + this.config = config; this.hadoopConfig = hadoopConfig; + this.emitter = emitter; Path storageDir = new Path(config.getStorageDirectory()); this.fullyQualifiedStorageDirectory = Suppliers.memoize( () -> { @@ -105,28 +119,34 @@ public DataSegment push(final File inDir, final DataSegment segment, final boole // '{partitionNum}_index.zip' without unique paths and '{partitionNum}_{UUID}_index.zip' with unique paths. final String storageDir = this.getStorageDir(segment, false); - final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : ""; - final String outIndexFilePathSuffix = StringUtils.format( - "%s/%s/%d_%sindex.zip", + final String outIndexFilePath = StringUtils.format( + "%s/%s/%d_%sindex%s", fullyQualifiedStorageDirectory.get(), storageDir, segment.getShardSpec().getPartitionNum(), - uniquePrefix + uniquePrefix, + config.getCompressionFormat().getSuffix() ); - return pushToFilePathWithRetry(inDir, segment, outIndexFilePathSuffix); + return pushToFilePathWithRetry(inDir, segment, outIndexFilePath); } @Override public DataSegment pushToPath(File inDir, DataSegment segment, String storageDirSuffix) throws IOException { - String outIndexFilePath = StringUtils.format( - "%s/%s/%d_index.zip", - fullyQualifiedStorageDirectory.get(), - storageDirSuffix.replace(':', '_'), - segment.getShardSpec().getPartitionNum() - ); + final String outIndexFilePath; + if (storageDirSuffix.endsWith("index.zip") || storageDirSuffix.endsWith("index.lz4")) { + outIndexFilePath = StringUtils.format("%s/%s", fullyQualifiedStorageDirectory.get(), storageDirSuffix); + } else { + outIndexFilePath = StringUtils.format( + "%s/%s/%d_index%s", + fullyQualifiedStorageDirectory.get(), + storageDirSuffix.replace(':', '_'), + segment.getShardSpec().getPartitionNum(), + config.getCompressionFormat().getSuffix() + ); + } return pushToFilePathWithRetry(inDir, segment, outIndexFilePath); } @@ -136,11 +156,17 @@ private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, Str { // Retry any HDFS errors that occur, up to 5 times. try { - return RetryUtils.retry( + final long startTime = System.currentTimeMillis(); + + final DataSegment result = RetryUtils.retry( () -> pushToFilePath(inDir, segment, outIndexFilePath), exception -> exception instanceof Exception, 5 ); + + emitMetrics(result.getSize(), System.currentTimeMillis() - startTime); + + return result; } catch (Exception e) { Throwables.throwIfInstanceOf(e, IOException.class); @@ -149,21 +175,32 @@ private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, Str } } + private void emitMetrics(long size, long duration) + { + if (emitter == null) { + return; + } + + final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); + metricBuilder.setDimension("format", config.getCompressionFormat()); + emitter.emit(metricBuilder.setMetric("hdfs/push/size", size)); + emitter.emit(metricBuilder.setMetric("hdfs/push/duration", duration)); + } + private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIndexFilePath) throws IOException { log.debug( - "Copying segment[%s] to HDFS at location[%s/%s]", + "Copying segment[%s] to HDFS at location[%s]", segment.getId(), - fullyQualifiedStorageDirectory.get(), outIndexFilePath ); - Path tmpIndexFile = new Path(StringUtils.format( - "%s/%s/%s/%s_index.zip", + "%s/%s/%s/%s_index%s", fullyQualifiedStorageDirectory.get(), segment.getDataSource(), UUIDUtils.generateUuid(), - segment.getShardSpec().getPartitionNum() + segment.getShardSpec().getPartitionNum(), + config.getCompressionFormat().getSuffix() )); FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig); @@ -174,7 +211,7 @@ private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIn final DataSegment dataSegment; try { try (FSDataOutputStream out = fs.create(tmpIndexFile)) { - size = CompressionUtils.zip(inDir, out); + size = config.getCompressionFormat().compressDirectory(inDir, out); } final Path outIndexFile = new Path(outIndexFilePath); dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri())) @@ -185,6 +222,10 @@ private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIn fs.mkdirs(outIndexFile.getParent()); copyFilesWithChecks(fs, tmpIndexFile, outIndexFile); } + catch (IOException e) { + log.error(e, "Failed to push segment[%s] to HDFS at location[%s]", segment.getId(), outIndexFilePath); + throw e; + } finally { try { if (fs.exists(tmpIndexFile.getParent()) && !fs.delete(tmpIndexFile.getParent(), true)) { diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java index 7692be131179..a2b656cc13d7 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.storage.hdfs; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.utils.CompressionUtils; /** */ @@ -28,6 +29,9 @@ public class HdfsDataSegmentPusherConfig @JsonProperty private String storageDirectory = ""; + @JsonProperty + private CompressionUtils.Format compressionFormat = CompressionUtils.Format.ZIP; + public void setStorageDirectory(String storageDirectory) { this.storageDirectory = storageDirectory; @@ -37,4 +41,14 @@ public String getStorageDirectory() { return storageDirectory; } + + public void setCompressionFormat(CompressionUtils.Format compressionFormat) + { + this.compressionFormat = compressionFormat; + } + + public CompressionUtils.Format getCompressionFormat() + { + return compressionFormat; + } } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index 7bdf8c06980f..e4b455e48122 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -36,7 +36,6 @@ import org.apache.druid.indexer.HadoopDruidIndexerConfig; import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.JobHelper; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.GranularityModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -46,6 +45,7 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -97,7 +97,7 @@ public void setUp() { HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConf = new HdfsDataSegmentPusherConfig(); hdfsDataSegmentPusherConf.setStorageDirectory("path/to/"); - hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true), objectMapper); + hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true)); } @Test @@ -126,7 +126,8 @@ public void testPushWithoutScheme() throws Exception @Test public void testPushWithMultipleSegments() throws Exception { - testUsingSchemeForMultipleSegments("file", 3); + testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.ZIP); + testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.LZ4); } @Test @@ -146,7 +147,7 @@ public void testUsingUniqueFilePath() throws Exception final File storageDirectory = tempFolder.newFolder(); config.setStorageDirectory(StringUtils.format("file://%s", storageDirectory.getAbsolutePath())); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); DataSegment segmentToPush = new DataSegment( "foo", @@ -187,7 +188,7 @@ public void testPushToPath() throws Exception final File storageDirectory = tempFolder.newFolder(); config.setStorageDirectory(StringUtils.format("file://%s", storageDirectory.getAbsolutePath())); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); DataSegment segmentToPush = new DataSegment( "foo", @@ -210,7 +211,7 @@ public void testPushToPath() throws Exception ); } - private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception + private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments, CompressionUtils.Format format) throws Exception { Configuration conf = new Configuration(true); DataSegment[] segments = new DataSegment[numberOfSegments]; @@ -226,12 +227,14 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig(); final File storageDirectory = tempFolder.newFolder(); + config.setCompressionFormat(format); config.setStorageDirectory( scheme != null ? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) : storageDirectory.getAbsolutePath() ); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); for (int i = 0; i < numberOfSegments; i++) { segments[i] = new DataSegment( @@ -251,10 +254,11 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], false); String indexUri = StringUtils.format( - "%s/%s/%d_index.zip", + "%s/%s/%d_index.%s", FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), pusher.getStorageDir(segments[i], false), - segments[i].getShardSpec().getPartitionNum() + segments[i].getShardSpec().getPartitionNum(), + format.getExtension() ); Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize()); @@ -269,10 +273,11 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n String segmentPath = pusher.getStorageDir(pushedSegment, false); File indexFile = new File(StringUtils.format( - "%s/%s/%d_index.zip", + "%s/%s/%d_index.%s", storageDirectory, segmentPath, - pushedSegment.getShardSpec().getPartitionNum() + pushedSegment.getShardSpec().getPartitionNum(), + format.getExtension() )); Assert.assertTrue(indexFile.exists()); @@ -280,10 +285,11 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n Assert.assertEquals(segments[i], pushedSegment); indexFile = new File(StringUtils.format( - "%s/%s/%d_index.zip", + "%s/%s/%d_index.%s", storageDirectory, segmentPath, - pushedSegment.getShardSpec().getPartitionNum() + pushedSegment.getShardSpec().getPartitionNum(), + format.getExtension() )); Assert.assertTrue(indexFile.exists()); @@ -320,7 +326,7 @@ private void testUsingScheme(final String scheme) throws Exception ? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) : storageDirectory.getAbsolutePath() ); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); DataSegment segmentToPush = new DataSegment( "foo", diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java index 482c47acee2b..a5111d2159c0 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java @@ -28,6 +28,8 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.junit.Assert; import org.junit.Test; @@ -85,6 +87,7 @@ private Injector makeInjectorWithProperties(final Properties props) binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); binder.bind(JsonConfigurator.class).in(LazySingleton.class); binder.bind(Properties.class).toInstance(props); + binder.bind(ServiceEmitter.class).toInstance(new ServiceEmitter("test", "localhost", new NoopEmitter())); }, new HdfsStorageDruidModule() ) diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 01b45c5089af..eabec58ef15e 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -19,6 +19,7 @@ package org.apache.druid.utils; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; @@ -27,6 +28,9 @@ import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import net.jpountz.lz4.LZ4BlockInputStream; +import net.jpountz.lz4.LZ4BlockOutputStream; +import net.jpountz.lz4.LZ4Factory; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; @@ -46,7 +50,10 @@ import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -57,6 +64,7 @@ import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Enumeration; +import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -74,9 +82,34 @@ public enum Format { BZ2(".bz2", "bz2"), GZ(".gz", "gz"), + LZ4(".lz4", "lz4") { + @Override + public long compressDirectory(File directory, OutputStream out) throws IOException + { + return lz4CompressDirectory(directory, out); + } + + @Override + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + { + return lz4DecompressDirectory(in, outDir); + } + }, SNAPPY(".sz", "sz"), XZ(".xz", "xz"), - ZIP(".zip", "zip"), + ZIP(".zip", "zip") { + @Override + public long compressDirectory(File directory, OutputStream out) throws IOException + { + return zip(directory, out); + } + + @Override + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + { + return unzip(in, outDir); + } + }, ZSTD(".zst", "zst"); private static final Map EXTENSION_TO_COMPRESSION_FORMAT; @@ -85,6 +118,7 @@ public enum Format ImmutableMap.Builder builder = ImmutableMap.builder(); builder.put(BZ2.getExtension(), BZ2); builder.put(GZ.getExtension(), GZ); + builder.put(LZ4.getExtension(), LZ4); builder.put(SNAPPY.getExtension(), SNAPPY); builder.put(XZ.getExtension(), XZ); builder.put(ZIP.getExtension(), ZIP); @@ -110,6 +144,16 @@ public String getExtension() return extension; } + @Nullable + @JsonCreator + public static Format fromString(@Nullable String name) + { + if (Strings.isNullOrEmpty(name)) { + return null; + } + return valueOf(name.toUpperCase(Locale.ROOT)); + } + @Nullable public static Format fromFileName(@Nullable String fileName) { @@ -119,6 +163,36 @@ public static Format fromFileName(@Nullable String fileName) } return EXTENSION_TO_COMPRESSION_FORMAT.get(extension); } + + /** + * Compresses a directory to the output stream. Default implementation throws UnsupportedOperationException. + * Override this method for formats that support directory compression. + * + * @param directory The directory to compress + * @param out The output stream to write compressed data to + * @return The number of bytes (uncompressed) read from the input directory + * @throws IOException if an I/O error occurs + * @throws UnsupportedOperationException if the format doesn't support directory compression + */ + public long compressDirectory(File directory, OutputStream out) throws IOException + { + throw new UnsupportedOperationException("Directory compression not supported for " + this.name()); + } + + /** + * Decompresses a directory from the input stream. Default implementation throws UnsupportedOperationException. + * Override this method for formats that support directory decompression. + * + * @param in The input stream containing compressed data + * @param outDir The output directory to extract files to + * @return A FileCopyResult containing information about extracted files + * @throws IOException if an I/O error occurs + * @throws UnsupportedOperationException if the format doesn't support directory decompression + */ + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + { + throw new UnsupportedOperationException("Directory decompression not supported for " + this.name()); + } } public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4; @@ -212,6 +286,106 @@ public static long zip(File directory, OutputStream out) throws IOException return totalSize; } + /** + * Compresses directory contents using LZ4 block compression with a simple archive format. + * Format: [file_count:4 bytes][file1_name_length:4][file1_name:bytes][file1_size:8][file1_data:bytes]... + * + * @param directory The directory whose contents should be compressed + * @param out The output stream to write compressed data to + * @return The number of bytes (uncompressed) read from the input directory + */ + public static long lz4CompressDirectory(File directory, OutputStream out) throws IOException + { + if (!directory.isDirectory()) { + throw new IOE("directory[%s] is not a directory", directory); + } + + // Use fast compressor for better performance (lower CPU, faster compression) + final LZ4BlockOutputStream lz4Out = new LZ4BlockOutputStream( + out, + 64 * 1024, // Block size + LZ4Factory.fastestInstance().fastCompressor() + ); + // Use DataOutputStream for structured writing + final DataOutputStream dataOut = new DataOutputStream(lz4Out); + final File[] files = directory.listFiles(); + + if (files == null) { + throw new IOE("Cannot list files in directory[%s]", directory); + } + + // Sort for consistency + final File[] sortedFiles = Arrays.stream(files).sorted().toArray(File[]::new); + + dataOut.writeInt(sortedFiles.length); + + long totalSize = 0; + + for (File file : sortedFiles) { + if (file.isDirectory()) { + continue; // Skip subdirectories like ZIP does + } + + log.debug("Compressing file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize); + + final String fileName = file.getName(); + final byte[] fileNameBytes = fileName.getBytes(StandardCharsets.UTF_8); + dataOut.writeInt(fileNameBytes.length); + dataOut.write(fileNameBytes); + + final long fileSize = file.length(); + if (fileSize > Integer.MAX_VALUE) { + throw new IOE("file[%s] too large [%,d]", file, fileSize); + } + + dataOut.writeLong(fileSize); + totalSize += fileSize; + + // Copy file content to dataOut + try (FileInputStream fileIn = new FileInputStream(file)) { + ByteStreams.copy(fileIn, dataOut); + } + } + + dataOut.flush(); + lz4Out.finish(); + return totalSize; + } + + /** + * Decompresses LZ4-compressed directory archive + */ + public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream in, File outDir) throws IOException + { + if (!(outDir.exists() && outDir.isDirectory())) { + throw new ISE("outDir[%s] must exist and be a directory", outDir); + } + + final LZ4BlockInputStream lz4In = new LZ4BlockInputStream(in); + final DataInputStream dataIn = new DataInputStream(lz4In); + + final int fileCount = dataIn.readInt(); + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); + + for (int i = 0; i < fileCount; i++) { + final int fileNameLength = dataIn.readInt(); + final byte[] fileNameBytes = new byte[fileNameLength]; + dataIn.readFully(fileNameBytes); + final String fileName = new String(fileNameBytes, StandardCharsets.UTF_8); + + final long fileSize = dataIn.readLong(); + + // Write to file + final File outFile = new File(outDir, fileName); + validateZipOutputFile("", outFile, outDir); + + NativeIO.chunkedCopy(ByteStreams.limit(dataIn, fileSize), outFile); + result.addFile(outFile); + } + + return result; + } + /** * Unzip the byteSource to the output directory. If cacheLocally is true, the byteSource is cached to local disk before unzipping. * This may cause more predictable behavior than trying to unzip a large file directly off a network stream, for example. @@ -622,6 +796,8 @@ public static InputStream decompress(final InputStream in, final String fileName { if (fileName.endsWith(Format.GZ.getSuffix())) { return gzipInputStream(in); + } else if (fileName.endsWith(Format.LZ4.getSuffix())) { + return new LZ4BlockInputStream(in); } else if (fileName.endsWith(Format.BZ2.getSuffix())) { return new BZip2CompressorInputStream(in, true); } else if (fileName.endsWith(Format.XZ.getSuffix())) { diff --git a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java index 27377a53a772..6d1c41e4ee58 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java @@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingInputStream; import com.google.common.io.Files; +import net.jpountz.lz4.LZ4BlockOutputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorOutputStream; import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; @@ -407,6 +408,46 @@ public void testDecompressZipWithManyFiles() throws IOException } } + @Test + public void testGoodLz4CompressUncompressDirectory() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testGoodLz4CompressUncompressDirectory"); + final File lz4File = new File(tmpDir, "compressionUtilTest.lz4"); + + try (final OutputStream out = new FileOutputStream(lz4File)) { + CompressionUtils.Format.LZ4.compressDirectory(testDir, out); + } + + final File newDir = new File(tmpDir, "newDir"); + Assert.assertTrue(newDir.mkdir()); + + final FileUtils.FileCopyResult result; + try (final InputStream in = new FileInputStream(lz4File)) { + result = CompressionUtils.Format.LZ4.decompressDirectory(in, newDir); + } + + verifyUnzip(newDir, result, ImmutableMap.of(testFile.getName(), StringUtils.toUtf8(CONTENT))); + } + + @Test + public void testDecompressLz4() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressLz4"); + final File lz4File = new File(tmpDir, testFile.getName() + ".lz4"); + Assert.assertFalse(lz4File.exists()); + + try ( + final OutputStream out = new LZ4BlockOutputStream(new FileOutputStream(lz4File)); + final InputStream in = new FileInputStream(testFile) + ) { + ByteStreams.copy(in, out); + } + + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(lz4File), lz4File.getName())) { + assertGoodDataStream(inputStream); + } + } + @Test public void testGoodGZStream() throws IOException {