From 75e5be6f539db1717a24c15bf3f17a9582955ea5 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Mon, 29 Dec 2025 16:32:57 +0800 Subject: [PATCH 01/11] Support lz4 compression for segment --- .../storage/hdfs/HdfsDataSegmentPuller.java | 24 +-- .../storage/hdfs/HdfsDataSegmentPusher.java | 42 ++-- .../hdfs/HdfsDataSegmentPusherConfig.java | 14 ++ .../apache/druid/utils/CompressionUtils.java | 194 +++++++++++++++++- 4 files changed, 241 insertions(+), 33 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java index 2c6de8e28bbf..b39e1b6e75be 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -235,26 +235,22 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) thr catch (Exception e) { throw new RuntimeException(e); } - } else if (CompressionUtils.isZip(path.getName())) { - - // -------- zip --------- + } - final FileUtils.FileCopyResult result = CompressionUtils.unzip( - new ByteSource() - { - @Override - public InputStream openStream() throws IOException - { - return getInputStream(path); - } - }, outDir, shouldRetryPredicate(), false + // Try to detect format from file extension and decompress + final CompressionUtils.Format format = CompressionUtils.Format.fromFileName(path.getName()); + if (format != null && (format == CompressionUtils.Format.ZIP || format == CompressionUtils.Format.LZ4)) { + final FileUtils.FileCopyResult result = format.decompressDirectory( + getInputStream(path), + outDir ); log.info( - "Unzipped %d bytes from [%s] to [%s]", + "Decompressed %d bytes from [%s] to [%s] using %s", result.size(), path.toString(), - outDir.getAbsolutePath() + outDir.getAbsolutePath(), + format.name() ); return result; diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java index 6bcf96d6819b..fa33351f8f52 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -35,7 +35,6 @@ import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -55,6 +54,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher { private static final Logger log = new Logger(HdfsDataSegmentPusher.class); + private final HdfsDataSegmentPusherConfig config; private final Configuration hadoopConfig; // We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. @@ -68,6 +68,7 @@ public HdfsDataSegmentPusher( ObjectMapper jsonMapper ) { + this.config = config; this.hadoopConfig = hadoopConfig; Path storageDir = new Path(config.getStorageDirectory()); this.fullyQualifiedStorageDirectory = Suppliers.memoize( @@ -105,28 +106,34 @@ public DataSegment push(final File inDir, final DataSegment segment, final boole // '{partitionNum}_index.zip' without unique paths and '{partitionNum}_{UUID}_index.zip' with unique paths. final String storageDir = this.getStorageDir(segment, false); - final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : ""; - final String outIndexFilePathSuffix = StringUtils.format( - "%s/%s/%d_%sindex.zip", + final String outIndexFilePath = StringUtils.format( + "%s/%s/%d_%sindex%s", fullyQualifiedStorageDirectory.get(), storageDir, segment.getShardSpec().getPartitionNum(), - uniquePrefix + uniquePrefix, + config.getCompressionFormat().getSuffix() ); - return pushToFilePathWithRetry(inDir, segment, outIndexFilePathSuffix); + return pushToFilePathWithRetry(inDir, segment, outIndexFilePath); } @Override public DataSegment pushToPath(File inDir, DataSegment segment, String storageDirSuffix) throws IOException { - String outIndexFilePath = StringUtils.format( - "%s/%s/%d_index.zip", - fullyQualifiedStorageDirectory.get(), - storageDirSuffix.replace(':', '_'), - segment.getShardSpec().getPartitionNum() - ); + final String outIndexFilePath; + if (storageDirSuffix.endsWith("index.zip") || storageDirSuffix.endsWith("index.lz4")) { + outIndexFilePath = StringUtils.format("%s/%s", fullyQualifiedStorageDirectory.get(), storageDirSuffix); + } else { + outIndexFilePath = StringUtils.format( + "%s/%s/%d_index%s", + fullyQualifiedStorageDirectory.get(), + storageDirSuffix.replace(':', '_'), + segment.getShardSpec().getPartitionNum(), + config.getCompressionFormat().getSuffix() + ); + } return pushToFilePathWithRetry(inDir, segment, outIndexFilePath); } @@ -152,18 +159,17 @@ private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, Str private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIndexFilePath) throws IOException { log.debug( - "Copying segment[%s] to HDFS at location[%s/%s]", + "Copying segment[%s] to HDFS at location[%s]", segment.getId(), - fullyQualifiedStorageDirectory.get(), outIndexFilePath ); - Path tmpIndexFile = new Path(StringUtils.format( - "%s/%s/%s/%s_index.zip", + "%s/%s/%s/%s_index%s", fullyQualifiedStorageDirectory.get(), segment.getDataSource(), UUIDUtils.generateUuid(), - segment.getShardSpec().getPartitionNum() + segment.getShardSpec().getPartitionNum(), + config.getCompressionFormat().getSuffix() )); FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig); @@ -174,7 +180,7 @@ private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIn final DataSegment dataSegment; try { try (FSDataOutputStream out = fs.create(tmpIndexFile)) { - size = CompressionUtils.zip(inDir, out); + size = config.getCompressionFormat().compressDirectory(inDir, out); } final Path outIndexFile = new Path(outIndexFilePath); dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri())) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java index 7692be131179..a2b656cc13d7 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.storage.hdfs; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.utils.CompressionUtils; /** */ @@ -28,6 +29,9 @@ public class HdfsDataSegmentPusherConfig @JsonProperty private String storageDirectory = ""; + @JsonProperty + private CompressionUtils.Format compressionFormat = CompressionUtils.Format.ZIP; + public void setStorageDirectory(String storageDirectory) { this.storageDirectory = storageDirectory; @@ -37,4 +41,14 @@ public String getStorageDirectory() { return storageDirectory; } + + public void setCompressionFormat(CompressionUtils.Format compressionFormat) + { + this.compressionFormat = compressionFormat; + } + + public CompressionUtils.Format getCompressionFormat() + { + return compressionFormat; + } } diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 01b45c5089af..2bb79bc65aac 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -19,6 +19,7 @@ package org.apache.druid.utils; +import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; @@ -27,6 +28,9 @@ import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import net.jpountz.lz4.LZ4BlockInputStream; +import net.jpountz.lz4.LZ4BlockOutputStream; +import net.jpountz.lz4.LZ4Factory; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorInputStream; import org.apache.commons.compress.compressors.xz.XZCompressorInputStream; @@ -46,7 +50,10 @@ import javax.annotation.Nullable; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -74,9 +81,34 @@ public enum Format { BZ2(".bz2", "bz2"), GZ(".gz", "gz"), + LZ4(".lz4", "lz4") { + @Override + public long compressDirectory(File directory, OutputStream out) throws IOException + { + return lz4CompressDirectory(directory, out); + } + + @Override + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + { + return lz4DecompressDirectory(in, outDir); + } + }, SNAPPY(".sz", "sz"), XZ(".xz", "xz"), - ZIP(".zip", "zip"), + ZIP(".zip", "zip") { + @Override + public long compressDirectory(File directory, OutputStream out) throws IOException + { + return zip(directory, out); + } + + @Override + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + { + return unzip(in, outDir); + } + }, ZSTD(".zst", "zst"); private static final Map EXTENSION_TO_COMPRESSION_FORMAT; @@ -85,6 +117,7 @@ public enum Format ImmutableMap.Builder builder = ImmutableMap.builder(); builder.put(BZ2.getExtension(), BZ2); builder.put(GZ.getExtension(), GZ); + builder.put(LZ4.getExtension(), LZ4); builder.put(SNAPPY.getExtension(), SNAPPY); builder.put(XZ.getExtension(), XZ); builder.put(ZIP.getExtension(), ZIP); @@ -110,6 +143,16 @@ public String getExtension() return extension; } + @Nullable + @JsonCreator + public static Format fromString(@Nullable String name) + { + if (Strings.isNullOrEmpty(name)) { + return null; + } + return valueOf(name.toUpperCase()); + } + @Nullable public static Format fromFileName(@Nullable String fileName) { @@ -119,6 +162,36 @@ public static Format fromFileName(@Nullable String fileName) } return EXTENSION_TO_COMPRESSION_FORMAT.get(extension); } + + /** + * Compresses a directory to the output stream. Default implementation throws UnsupportedOperationException. + * Override this method for formats that support directory compression. + * + * @param directory The directory to compress + * @param out The output stream to write compressed data to + * @return The number of bytes (uncompressed) read from the input directory + * @throws IOException if an I/O error occurs + * @throws UnsupportedOperationException if the format doesn't support directory compression + */ + public long compressDirectory(File directory, OutputStream out) throws IOException + { + throw new UnsupportedOperationException("Directory compression not supported for " + this.name()); + } + + /** + * Decompresses a directory from the input stream. Default implementation throws UnsupportedOperationException. + * Override this method for formats that support directory decompression. + * + * @param in The input stream containing compressed data + * @param outDir The output directory to extract files to + * @return A FileCopyResult containing information about extracted files + * @throws IOException if an I/O error occurs + * @throws UnsupportedOperationException if the format doesn't support directory decompression + */ + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + { + throw new UnsupportedOperationException("Directory decompression not supported for " + this.name()); + } } public static final int COMPRESSED_TEXT_WEIGHT_FACTOR = 4; @@ -212,6 +285,108 @@ public static long zip(File directory, OutputStream out) throws IOException return totalSize; } + /** + * Compresses directory contents using LZ4 block compression with a simple archive format. + * Format: [file_count:4 bytes][file1_name_length:4][file1_name:bytes][file1_size:8][file1_data:bytes]... + * + * @param directory The directory whose contents should be compressed + * @param out The output stream to write compressed data to + * @return The number of bytes (uncompressed) read from the input directory + */ + public static long lz4CompressDirectory(File directory, OutputStream out) throws IOException + { + if (!directory.isDirectory()) { + throw new IOE("directory[%s] is not a directory", directory); + } + + // Use fast compressor for better performance (lower CPU, faster compression) + final LZ4BlockOutputStream lz4Out = new LZ4BlockOutputStream( + out, + 64 * 1024, // Block size + LZ4Factory.fastestInstance().fastCompressor() + ); + // Use DataOutputStream for structured writing + final DataOutputStream dataOut = new DataOutputStream(lz4Out); + final File[] files = directory.listFiles(); + + if (files == null) { + throw new IOE("Cannot list files in directory[%s]", directory); + } + + // Sort for consistency + final File[] sortedFiles = Arrays.stream(files).sorted().toArray(File[]::new); + + dataOut.writeInt(sortedFiles.length); + + long totalSize = 0; + + for (File file : sortedFiles) { + if (file.isDirectory()) { + continue; // Skip subdirectories like ZIP does + } + + log.debug("Compressing file[%s] with size[%,d]. Total size so far[%,d]", file, file.length(), totalSize); + + final String fileName = file.getName(); + final byte[] fileNameBytes = fileName.getBytes(StandardCharsets.UTF_8); + dataOut.writeInt(fileNameBytes.length); + dataOut.write(fileNameBytes); + + final long fileSize = file.length(); + if (fileSize > Integer.MAX_VALUE) { + throw new IOE("file[%s] too large [%,d]", file, fileSize); + } + + dataOut.writeLong(fileSize); + totalSize += fileSize; + + // Copy file content to dataOut + try (FileInputStream fileIn = new FileInputStream(file)) { + ByteStreams.copy(fileIn, dataOut); + } + } + + dataOut.flush(); + lz4Out.finish(); + return totalSize; + } + + /** + * Decompresses LZ4-compressed directory archive + */ + public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream in, File outDir) throws IOException + { + if (!(outDir.exists() && outDir.isDirectory())) { + throw new ISE("outDir[%s] must exist and be a directory", outDir); + } + + final LZ4BlockInputStream lz4In = new LZ4BlockInputStream(in); + final DataInputStream dataIn = new DataInputStream(lz4In); + + final int fileCount = dataIn.readInt(); + final FileUtils.FileCopyResult result = new FileUtils.FileCopyResult(); + + for (int i = 0; i < fileCount; i++) { + final int fileNameLength = dataIn.readInt(); + final byte[] fileNameBytes = new byte[fileNameLength]; + dataIn.readFully(fileNameBytes); + final String fileName = new String(fileNameBytes, StandardCharsets.UTF_8); + + final long fileSize = dataIn.readLong(); + + // Write to file + final File outFile = new File(outDir, fileName); + validateZipOutputFile("", outFile, outDir); + + try (OutputStream fileOut = java.nio.file.Files.newOutputStream(outFile.toPath())) { + ByteStreams.copy(ByteStreams.limit(dataIn, fileSize), fileOut); + } + result.addFile(outFile); + } + + return result; + } + /** * Unzip the byteSource to the output directory. If cacheLocally is true, the byteSource is cached to local disk before unzipping. * This may cause more predictable behavior than trying to unzip a large file directly off a network stream, for example. @@ -597,6 +772,21 @@ public static boolean isGz(String fName) return fName.endsWith(Format.GZ.getSuffix()) && fName.length() > Format.GZ.getSuffix().length(); } + /** + * Checks to see if fName is a valid name for a "*.lz4" file + * + * @param fName The name of the file in question + * + * @return True if fName is a properly named .lz4 file, false otherwise + */ + public static boolean isLz4(String fName) + { + if (Strings.isNullOrEmpty(fName)) { + return false; + } + return fName.endsWith(Format.LZ4.getSuffix()); + } + /** * Get the file name without the .gz extension * @@ -622,6 +812,8 @@ public static InputStream decompress(final InputStream in, final String fileName { if (fileName.endsWith(Format.GZ.getSuffix())) { return gzipInputStream(in); + } else if (fileName.endsWith(Format.LZ4.getSuffix())) { + return new LZ4BlockInputStream(in); } else if (fileName.endsWith(Format.BZ2.getSuffix())) { return new BZip2CompressorInputStream(in, true); } else if (fileName.endsWith(Format.XZ.getSuffix())) { From 1fa9666719b60d39e25e65331563b62af1730d5d Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 10:13:28 +0800 Subject: [PATCH 02/11] Add pull metrics --- .../storage/hdfs/HdfsDataSegmentPuller.java | 38 +++++++++++++++++-- .../apache/druid/utils/CompressionUtils.java | 4 +- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java index b39e1b6e75be..e7d06a8c820f 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -30,6 +30,8 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.io.NativeIO; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.URIDataPuller; import org.apache.druid.utils.CompressionUtils; @@ -40,6 +42,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import javax.annotation.Nullable; import javax.tools.FileObject; import java.io.File; import java.io.IOException; @@ -177,11 +180,21 @@ public boolean delete() private static final Logger log = new Logger(HdfsDataSegmentPuller.class); protected final Configuration config; + private final ServiceEmitter emitter; - @Inject public HdfsDataSegmentPuller(@Hdfs final Configuration config) + { + this(config, null); + } + + @Inject + public HdfsDataSegmentPuller( + @Hdfs final Configuration config, + @Nullable final ServiceEmitter emitter + ) { this.config = config; + this.emitter = emitter; } FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException @@ -239,18 +252,24 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) thr // Try to detect format from file extension and decompress final CompressionUtils.Format format = CompressionUtils.Format.fromFileName(path.getName()); - if (format != null && (format == CompressionUtils.Format.ZIP || format == CompressionUtils.Format.LZ4)) { + if ((format == CompressionUtils.Format.ZIP || format == CompressionUtils.Format.LZ4)) { + long startTime = System.currentTimeMillis(); + final FileUtils.FileCopyResult result = format.decompressDirectory( getInputStream(path), outDir ); + long duration = System.currentTimeMillis() - startTime; + emitMetrics(result.size(), duration); + log.info( - "Decompressed %d bytes from [%s] to [%s] using %s", + "Decompressed %d bytes from [%s] to [%s] using %s in [%d] millis", result.size(), path.toString(), outDir.getAbsolutePath(), - format.name() + format.name(), + duration ); return result; @@ -288,6 +307,17 @@ public InputStream openStream() throws IOException } } + private void emitMetrics(long size, long duration) + { + if (emitter == null) { + return; + } + ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); + + emitter.emit(metricBuilder.build("hdfs/pull/size", size)); + emitter.emit(metricBuilder.build("hdfs/pull/duration", duration)); + } + public InputStream getInputStream(Path path) throws IOException { return buildFileObject(path.toUri(), config).openInputStream(); diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 2bb79bc65aac..b10ced0478b6 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -378,9 +378,7 @@ public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream in, Fi final File outFile = new File(outDir, fileName); validateZipOutputFile("", outFile, outDir); - try (OutputStream fileOut = java.nio.file.Files.newOutputStream(outFile.toPath())) { - ByteStreams.copy(ByteStreams.limit(dataIn, fileSize), fileOut); - } + NativeIO.chunkedCopy(ByteStreams.limit(dataIn, fileSize), outFile); result.addFile(outFile); } From cb2b03b1df51e92d677eee7bae0acbb9ba49e932 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 10:48:12 +0800 Subject: [PATCH 03/11] Add tests --- .../storage/hdfs/HdfsDataSegmentPuller.java | 2 +- .../apache/druid/utils/CompressionUtils.java | 46 ++++--------------- .../util/common/CompressionUtilsTest.java | 41 +++++++++++++++++ 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java index e7d06a8c820f..53f2c9e4def1 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -255,7 +255,7 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) thr if ((format == CompressionUtils.Format.ZIP || format == CompressionUtils.Format.LZ4)) { long startTime = System.currentTimeMillis(); - final FileUtils.FileCopyResult result = format.decompressDirectory( + final FileUtils.FileCopyResult result = format.decompressToDirectory( getInputStream(path), outDir ); diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index b10ced0478b6..947bb173cae6 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -64,6 +64,7 @@ import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Enumeration; +import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -89,9 +90,9 @@ public long compressDirectory(File directory, OutputStream out) throws IOExcepti } @Override - public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDir) throws IOException { - return lz4DecompressDirectory(in, outDir); + return lz4DecompressToDirectory(in, outDir); } }, SNAPPY(".sz", "sz"), @@ -104,7 +105,7 @@ public long compressDirectory(File directory, OutputStream out) throws IOExcepti } @Override - public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDir) throws IOException { return unzip(in, outDir); } @@ -150,7 +151,7 @@ public static Format fromString(@Nullable String name) if (Strings.isNullOrEmpty(name)) { return null; } - return valueOf(name.toUpperCase()); + return valueOf(name.toUpperCase(Locale.ROOT)); } @Nullable @@ -188,7 +189,7 @@ public long compressDirectory(File directory, OutputStream out) throws IOExcepti * @throws IOException if an I/O error occurs * @throws UnsupportedOperationException if the format doesn't support directory decompression */ - public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException + public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDir) throws IOException { throw new UnsupportedOperationException("Directory decompression not supported for " + this.name()); } @@ -200,7 +201,7 @@ public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512 /** - * Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped + * Zip the contents of directory into the file indicated by outputZipFile. Subdirectories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. * @param outputZipFile The output file to write the zipped data to @@ -208,7 +209,6 @@ public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) * * @return The number of bytes (uncompressed) read from the input directory. * - * @throws IOException */ public static long zip(File directory, File outputZipFile, boolean fsync) throws IOException { @@ -233,14 +233,13 @@ public static long zip(File directory, File outputZipFile, boolean fsync) throws } /** - * Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped + * Zip the contents of directory into the file indicated by outputZipFile. Subdirectories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. * @param outputZipFile The output file to write the zipped data to * * @return The number of bytes (uncompressed) read from the input directory. * - * @throws IOException */ public static long zip(File directory, File outputZipFile) throws IOException { @@ -248,14 +247,13 @@ public static long zip(File directory, File outputZipFile) throws IOException } /** - * Zips the contents of the input directory to the output stream. Sub directories are skipped + * Zips the contents of the input directory to the output stream. Subdirectories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. * @param out The output stream to write the zip data to. Caller is responsible for closing this stream. * * @return The number of bytes (uncompressed) read from the input directory. * - * @throws IOException */ public static long zip(File directory, OutputStream out) throws IOException { @@ -354,7 +352,7 @@ public static long lz4CompressDirectory(File directory, OutputStream out) throws /** * Decompresses LZ4-compressed directory archive */ - public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream in, File outDir) throws IOException + public static FileUtils.FileCopyResult lz4DecompressToDirectory(InputStream in, File outDir) throws IOException { if (!(outDir.exists() && outDir.isDirectory())) { throw new ISE("outDir[%s] must exist and be a directory", outDir); @@ -399,7 +397,6 @@ public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream in, Fi * * @return A FileCopyResult containing the result of writing the zip entries to disk * - * @throws IOException */ public static FileUtils.FileCopyResult unzip( final ByteSource byteSource, @@ -449,7 +446,6 @@ public static FileUtils.FileCopyResult unzip( * * @return a FileCopyResult of the files which were written to disk * - * @throws IOException */ public static FileUtils.FileCopyResult unzip(final File pulledFile, final File outDir) throws IOException { @@ -515,7 +511,6 @@ public static void validateZipOutputFile( * * @return The FileUtils.FileCopyResult containing information on all the files which were written * - * @throws IOException */ public static FileUtils.FileCopyResult unzip(InputStream in, File outDir) throws IOException { @@ -551,7 +546,6 @@ public static FileUtils.FileCopyResult unzip(InputStream in, File outDir) throws * * @return The result of the file copy * - * @throws IOException */ public static FileUtils.FileCopyResult gunzip(final File pulledFile, File outFile) { @@ -564,7 +558,6 @@ public static FileUtils.FileCopyResult gunzip(final File pulledFile, File outFil * @param in The input stream to run through the gunzip filter. This stream is closed * @param outFile The file to output to * - * @throws IOException */ public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) throws IOException { @@ -609,7 +602,6 @@ public int available() throws IOException * * @return The number of bytes written to the output stream. * - * @throws IOException */ public static long gunzip(InputStream in, OutputStream out) throws IOException { @@ -676,7 +668,6 @@ public static FileUtils.FileCopyResult gunzip(final ByteSource in, File outFile) * * @return The size of the data copied * - * @throws IOException */ public static long gzip(InputStream inputStream, OutputStream out) throws IOException { @@ -699,7 +690,6 @@ public static long gzip(InputStream inputStream, OutputStream out) throws IOExce * * @return The result of the file copy * - * @throws IOException */ public static FileUtils.FileCopyResult gzip(final File inFile, final File outFile, Predicate shouldRetry) { @@ -733,7 +723,6 @@ public OutputStream openStream() throws IOException * * @return A FileCopyResult of the resulting file at outFile * - * @throws IOException */ public static FileUtils.FileCopyResult gzip(final File inFile, final File outFile) { @@ -770,21 +759,6 @@ public static boolean isGz(String fName) return fName.endsWith(Format.GZ.getSuffix()) && fName.length() > Format.GZ.getSuffix().length(); } - /** - * Checks to see if fName is a valid name for a "*.lz4" file - * - * @param fName The name of the file in question - * - * @return True if fName is a properly named .lz4 file, false otherwise - */ - public static boolean isLz4(String fName) - { - if (Strings.isNullOrEmpty(fName)) { - return false; - } - return fName.endsWith(Format.LZ4.getSuffix()); - } - /** * Get the file name without the .gz extension * diff --git a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java index 27377a53a772..33e1925d255d 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java @@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams; import com.google.common.io.CountingInputStream; import com.google.common.io.Files; +import net.jpountz.lz4.LZ4BlockOutputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.snappy.FramedSnappyCompressorOutputStream; import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream; @@ -407,6 +408,46 @@ public void testDecompressZipWithManyFiles() throws IOException } } + @Test + public void testGoodLz4CompressUncompressDirectory() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testGoodLz4CompressUncompressDirectory"); + final File lz4File = new File(tmpDir, "compressionUtilTest.lz4"); + + try (final OutputStream out = new FileOutputStream(lz4File)) { + CompressionUtils.Format.LZ4.compressDirectory(testDir, out); + } + + final File newDir = new File(tmpDir, "newDir"); + Assert.assertTrue(newDir.mkdir()); + + final FileUtils.FileCopyResult result; + try (final InputStream in = new FileInputStream(lz4File)) { + result = CompressionUtils.Format.LZ4.decompressToDirectory(in, newDir); + } + + verifyUnzip(newDir, result, ImmutableMap.of(testFile.getName(), StringUtils.toUtf8(CONTENT))); + } + + @Test + public void testDecompressLz4() throws IOException + { + final File tmpDir = temporaryFolder.newFolder("testDecompressLz4"); + final File lz4File = new File(tmpDir, testFile.getName() + ".lz4"); + Assert.assertFalse(lz4File.exists()); + + try ( + final OutputStream out = new LZ4BlockOutputStream(new FileOutputStream(lz4File)); + final InputStream in = new FileInputStream(testFile) + ) { + ByteStreams.copy(in, out); + } + + try (final InputStream inputStream = CompressionUtils.decompress(new FileInputStream(lz4File), lz4File.getName())) { + assertGoodDataStream(inputStream); + } + } + @Test public void testGoodGZStream() throws IOException { From 10299cba19411ad71f6258b4f90bfefa4bcad565 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 10:54:36 +0800 Subject: [PATCH 04/11] Add test Signed-off-by: Frank Chen --- .../apache/druid/utils/CompressionUtils.java | 43 +++++++++++++++---- .../util/common/CompressionUtilsTest.java | 2 +- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index 947bb173cae6..cd83755968e3 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -90,9 +90,9 @@ public long compressDirectory(File directory, OutputStream out) throws IOExcepti } @Override - public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDir) throws IOException + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException { - return lz4DecompressToDirectory(in, outDir); + return lz4DecompressDirectory(in, outDir); } }, SNAPPY(".sz", "sz"), @@ -105,7 +105,7 @@ public long compressDirectory(File directory, OutputStream out) throws IOExcepti } @Override - public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDir) throws IOException + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException { return unzip(in, outDir); } @@ -189,7 +189,7 @@ public long compressDirectory(File directory, OutputStream out) throws IOExcepti * @throws IOException if an I/O error occurs * @throws UnsupportedOperationException if the format doesn't support directory decompression */ - public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDir) throws IOException + public FileUtils.FileCopyResult decompressDirectory(InputStream in, File outDir) throws IOException { throw new UnsupportedOperationException("Directory decompression not supported for " + this.name()); } @@ -201,7 +201,7 @@ public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDi private static final int GZIP_BUFFER_SIZE = 8192; // Default is 512 /** - * Zip the contents of directory into the file indicated by outputZipFile. Subdirectories are skipped + * Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. * @param outputZipFile The output file to write the zipped data to @@ -209,6 +209,7 @@ public FileUtils.FileCopyResult decompressToDirectory(InputStream in, File outDi * * @return The number of bytes (uncompressed) read from the input directory. * + * @throws IOException */ public static long zip(File directory, File outputZipFile, boolean fsync) throws IOException { @@ -233,13 +234,14 @@ public static long zip(File directory, File outputZipFile, boolean fsync) throws } /** - * Zip the contents of directory into the file indicated by outputZipFile. Subdirectories are skipped + * Zip the contents of directory into the file indicated by outputZipFile. Sub directories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. * @param outputZipFile The output file to write the zipped data to * * @return The number of bytes (uncompressed) read from the input directory. * + * @throws IOException */ public static long zip(File directory, File outputZipFile) throws IOException { @@ -247,13 +249,14 @@ public static long zip(File directory, File outputZipFile) throws IOException } /** - * Zips the contents of the input directory to the output stream. Subdirectories are skipped + * Zips the contents of the input directory to the output stream. Sub directories are skipped * * @param directory The directory whose contents should be added to the zip in the output stream. * @param out The output stream to write the zip data to. Caller is responsible for closing this stream. * * @return The number of bytes (uncompressed) read from the input directory. * + * @throws IOException */ public static long zip(File directory, OutputStream out) throws IOException { @@ -352,7 +355,7 @@ public static long lz4CompressDirectory(File directory, OutputStream out) throws /** * Decompresses LZ4-compressed directory archive */ - public static FileUtils.FileCopyResult lz4DecompressToDirectory(InputStream in, File outDir) throws IOException + public static FileUtils.FileCopyResult lz4DecompressDirectory(InputStream in, File outDir) throws IOException { if (!(outDir.exists() && outDir.isDirectory())) { throw new ISE("outDir[%s] must exist and be a directory", outDir); @@ -397,6 +400,7 @@ public static FileUtils.FileCopyResult lz4DecompressToDirectory(InputStream in, * * @return A FileCopyResult containing the result of writing the zip entries to disk * + * @throws IOException */ public static FileUtils.FileCopyResult unzip( final ByteSource byteSource, @@ -446,6 +450,7 @@ public static FileUtils.FileCopyResult unzip( * * @return a FileCopyResult of the files which were written to disk * + * @throws IOException */ public static FileUtils.FileCopyResult unzip(final File pulledFile, final File outDir) throws IOException { @@ -511,6 +516,7 @@ public static void validateZipOutputFile( * * @return The FileUtils.FileCopyResult containing information on all the files which were written * + * @throws IOException */ public static FileUtils.FileCopyResult unzip(InputStream in, File outDir) throws IOException { @@ -546,6 +552,7 @@ public static FileUtils.FileCopyResult unzip(InputStream in, File outDir) throws * * @return The result of the file copy * + * @throws IOException */ public static FileUtils.FileCopyResult gunzip(final File pulledFile, File outFile) { @@ -558,6 +565,7 @@ public static FileUtils.FileCopyResult gunzip(final File pulledFile, File outFil * @param in The input stream to run through the gunzip filter. This stream is closed * @param outFile The file to output to * + * @throws IOException */ public static FileUtils.FileCopyResult gunzip(InputStream in, File outFile) throws IOException { @@ -602,6 +610,7 @@ public int available() throws IOException * * @return The number of bytes written to the output stream. * + * @throws IOException */ public static long gunzip(InputStream in, OutputStream out) throws IOException { @@ -668,6 +677,7 @@ public static FileUtils.FileCopyResult gunzip(final ByteSource in, File outFile) * * @return The size of the data copied * + * @throws IOException */ public static long gzip(InputStream inputStream, OutputStream out) throws IOException { @@ -690,6 +700,7 @@ public static long gzip(InputStream inputStream, OutputStream out) throws IOExce * * @return The result of the file copy * + * @throws IOException */ public static FileUtils.FileCopyResult gzip(final File inFile, final File outFile, Predicate shouldRetry) { @@ -723,6 +734,7 @@ public OutputStream openStream() throws IOException * * @return A FileCopyResult of the resulting file at outFile * + * @throws IOException */ public static FileUtils.FileCopyResult gzip(final File inFile, final File outFile) { @@ -759,6 +771,21 @@ public static boolean isGz(String fName) return fName.endsWith(Format.GZ.getSuffix()) && fName.length() > Format.GZ.getSuffix().length(); } + /** + * Checks to see if fName is a valid name for a "*.lz4" file + * + * @param fName The name of the file in question + * + * @return True if fName is a properly named .lz4 file, false otherwise + */ + public static boolean isLz4(String fName) + { + if (Strings.isNullOrEmpty(fName)) { + return false; + } + return fName.endsWith(Format.LZ4.getSuffix()); + } + /** * Get the file name without the .gz extension * diff --git a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java index 33e1925d255d..6d1c41e4ee58 100644 --- a/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/java/util/common/CompressionUtilsTest.java @@ -423,7 +423,7 @@ public void testGoodLz4CompressUncompressDirectory() throws IOException final FileUtils.FileCopyResult result; try (final InputStream in = new FileInputStream(lz4File)) { - result = CompressionUtils.Format.LZ4.decompressToDirectory(in, newDir); + result = CompressionUtils.Format.LZ4.decompressDirectory(in, newDir); } verifyUnzip(newDir, result, ImmutableMap.of(testFile.getName(), StringUtils.toUtf8(CONTENT))); From 66b594959ffd68e715d7b9e6968f73c7feccd613 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 11:17:20 +0800 Subject: [PATCH 05/11] Clean --- .../storage/hdfs/HdfsDataSegmentPuller.java | 25 +++-------- .../storage/hdfs/HdfsDataSegmentPusher.java | 41 +++++++++++++++++-- .../hdfs/HdfsDataSegmentPusherTest.java | 10 ++--- 3 files changed, 49 insertions(+), 27 deletions(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java index 53f2c9e4def1..34a358bdd902 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPuller.java @@ -255,22 +255,9 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) thr if ((format == CompressionUtils.Format.ZIP || format == CompressionUtils.Format.LZ4)) { long startTime = System.currentTimeMillis(); - final FileUtils.FileCopyResult result = format.decompressToDirectory( - getInputStream(path), - outDir - ); + final FileUtils.FileCopyResult result = format.decompressDirectory(getInputStream(path), outDir); - long duration = System.currentTimeMillis() - startTime; - emitMetrics(result.size(), duration); - - log.info( - "Decompressed %d bytes from [%s] to [%s] using %s in [%d] millis", - result.size(), - path.toString(), - outDir.getAbsolutePath(), - format.name(), - duration - ); + emitMetrics(format, result.size(), System.currentTimeMillis() - startTime); return result; } else if (CompressionUtils.isGz(path.getName())) { @@ -307,15 +294,15 @@ public InputStream openStream() throws IOException } } - private void emitMetrics(long size, long duration) + private void emitMetrics(CompressionUtils.Format format, long size, long duration) { if (emitter == null) { return; } ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); - - emitter.emit(metricBuilder.build("hdfs/pull/size", size)); - emitter.emit(metricBuilder.build("hdfs/pull/duration", duration)); + metricBuilder.setDimension("format", format); + emitter.emit(metricBuilder.setMetric("hdfs/pull/size", size)); + emitter.emit(metricBuilder.setMetric("hdfs/pull/duration", duration)); } public InputStream getInputStream(Path path) throws IOException diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java index fa33351f8f52..2afd068a768d 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -19,7 +19,6 @@ package org.apache.druid.storage.hdfs; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -32,6 +31,8 @@ import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.timeline.DataSegment; @@ -42,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.joda.time.format.ISODateTimeFormat; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.net.URI; @@ -56,20 +58,31 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher private final HdfsDataSegmentPusherConfig config; private final Configuration hadoopConfig; + @Nullable + private final ServiceEmitter emitter; // We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA. // Please see https://github.com/apache/druid/pull/5684 private final Supplier fullyQualifiedStorageDirectory; + public HdfsDataSegmentPusher( + HdfsDataSegmentPusherConfig config, + @Hdfs Configuration hadoopConfig + ) + { + this(config, hadoopConfig, null); + } + @Inject public HdfsDataSegmentPusher( HdfsDataSegmentPusherConfig config, @Hdfs Configuration hadoopConfig, - ObjectMapper jsonMapper + @Nullable ServiceEmitter emitter ) { this.config = config; this.hadoopConfig = hadoopConfig; + this.emitter = emitter; Path storageDir = new Path(config.getStorageDirectory()); this.fullyQualifiedStorageDirectory = Suppliers.memoize( () -> { @@ -143,11 +156,17 @@ private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, Str { // Retry any HDFS errors that occur, up to 5 times. try { - return RetryUtils.retry( + final long startTime = System.currentTimeMillis(); + + final DataSegment result = RetryUtils.retry( () -> pushToFilePath(inDir, segment, outIndexFilePath), exception -> exception instanceof Exception, 5 ); + + emitMetrics(result.getSize(), System.currentTimeMillis() - startTime); + + return result; } catch (Exception e) { Throwables.throwIfInstanceOf(e, IOException.class); @@ -156,6 +175,18 @@ private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, Str } } + private void emitMetrics(long size, long duration) + { + if (emitter == null) { + return; + } + + final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); + metricBuilder.setDimension("format", config.getCompressionFormat()); + emitter.emit(metricBuilder.setMetric("hdfs/push/size", size)); + emitter.emit(metricBuilder.setMetric("hdfs/push/duration", duration)); + } + private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIndexFilePath) throws IOException { log.debug( @@ -191,6 +222,10 @@ private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIn fs.mkdirs(outIndexFile.getParent()); copyFilesWithChecks(fs, tmpIndexFile, outIndexFile); } + catch (IOException e) { + log.error(e, "Failed to push segment[%s] to HDFS at location[%s]", segment.getId(), outIndexFilePath); + throw e; + } finally { try { if (fs.exists(tmpIndexFile.getParent()) && !fs.delete(tmpIndexFile.getParent(), true)) { diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index 7bdf8c06980f..028b0ab80865 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -97,7 +97,7 @@ public void setUp() { HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConf = new HdfsDataSegmentPusherConfig(); hdfsDataSegmentPusherConf.setStorageDirectory("path/to/"); - hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true), objectMapper); + hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true)); } @Test @@ -146,7 +146,7 @@ public void testUsingUniqueFilePath() throws Exception final File storageDirectory = tempFolder.newFolder(); config.setStorageDirectory(StringUtils.format("file://%s", storageDirectory.getAbsolutePath())); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); DataSegment segmentToPush = new DataSegment( "foo", @@ -187,7 +187,7 @@ public void testPushToPath() throws Exception final File storageDirectory = tempFolder.newFolder(); config.setStorageDirectory(StringUtils.format("file://%s", storageDirectory.getAbsolutePath())); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); DataSegment segmentToPush = new DataSegment( "foo", @@ -231,7 +231,7 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n ? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) : storageDirectory.getAbsolutePath() ); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); for (int i = 0; i < numberOfSegments; i++) { segments[i] = new DataSegment( @@ -320,7 +320,7 @@ private void testUsingScheme(final String scheme) throws Exception ? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) : storageDirectory.getAbsolutePath() ); - HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf, new DefaultObjectMapper()); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); DataSegment segmentToPush = new DataSegment( "foo", From 338eba12750c5e18b796262be94b11f1c4907e42 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 11:35:21 +0800 Subject: [PATCH 06/11] Clean --- .../org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index 028b0ab80865..62fb7c82ccf3 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -36,7 +36,6 @@ import org.apache.druid.indexer.HadoopDruidIndexerConfig; import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.JobHelper; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.GranularityModule; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; From c72fbba5c2250e464f5df45fe045891524fe55af Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 11:52:55 +0800 Subject: [PATCH 07/11] Add test --- .../hdfs/HdfsDataSegmentPusherTest.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java index 62fb7c82ccf3..e4b455e48122 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusherTest.java @@ -45,6 +45,7 @@ import org.apache.druid.timeline.DataSegment.PruneSpecsHolder; import org.apache.druid.timeline.partition.NoneShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; @@ -125,7 +126,8 @@ public void testPushWithoutScheme() throws Exception @Test public void testPushWithMultipleSegments() throws Exception { - testUsingSchemeForMultipleSegments("file", 3); + testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.ZIP); + testUsingSchemeForMultipleSegments("file", 3, CompressionUtils.Format.LZ4); } @Test @@ -209,7 +211,7 @@ public void testPushToPath() throws Exception ); } - private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments) throws Exception + private void testUsingSchemeForMultipleSegments(final String scheme, final int numberOfSegments, CompressionUtils.Format format) throws Exception { Configuration conf = new Configuration(true); DataSegment[] segments = new DataSegment[numberOfSegments]; @@ -225,11 +227,13 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n HdfsDataSegmentPusherConfig config = new HdfsDataSegmentPusherConfig(); final File storageDirectory = tempFolder.newFolder(); + config.setCompressionFormat(format); config.setStorageDirectory( scheme != null ? StringUtils.format("%s://%s", scheme, storageDirectory.getAbsolutePath()) : storageDirectory.getAbsolutePath() ); + HdfsDataSegmentPusher pusher = new HdfsDataSegmentPusher(config, conf); for (int i = 0; i < numberOfSegments; i++) { @@ -250,10 +254,11 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n final DataSegment pushedSegment = pusher.push(segmentDir, segments[i], false); String indexUri = StringUtils.format( - "%s/%s/%d_index.zip", + "%s/%s/%d_index.%s", FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(), pusher.getStorageDir(segments[i], false), - segments[i].getShardSpec().getPartitionNum() + segments[i].getShardSpec().getPartitionNum(), + format.getExtension() ); Assert.assertEquals(segments[i].getSize(), pushedSegment.getSize()); @@ -268,10 +273,11 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n String segmentPath = pusher.getStorageDir(pushedSegment, false); File indexFile = new File(StringUtils.format( - "%s/%s/%d_index.zip", + "%s/%s/%d_index.%s", storageDirectory, segmentPath, - pushedSegment.getShardSpec().getPartitionNum() + pushedSegment.getShardSpec().getPartitionNum(), + format.getExtension() )); Assert.assertTrue(indexFile.exists()); @@ -279,10 +285,11 @@ private void testUsingSchemeForMultipleSegments(final String scheme, final int n Assert.assertEquals(segments[i], pushedSegment); indexFile = new File(StringUtils.format( - "%s/%s/%d_index.zip", + "%s/%s/%d_index.%s", storageDirectory, segmentPath, - pushedSegment.getShardSpec().getPartitionNum() + pushedSegment.getShardSpec().getPartitionNum(), + format.getExtension() )); Assert.assertTrue(indexFile.exists()); From e7b4c270a52ade912c484496805f2079327aee31 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 11:56:27 +0800 Subject: [PATCH 08/11] Update doc --- docs/configuration/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a1d2d3070f61..d17ad479bfab 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -440,6 +440,7 @@ This deep storage is used to interface with HDFS. You must load the `druid-hdfs- |Property|Description|Default| |--------|-----------|-------| |`druid.storage.storageDirectory`|HDFS directory to use as deep storage.|none| +|`druid.storage.compressionFormat`|The compression format applied to the segments uploaded to HDFS. Only `zip` and `lz4` are supported. |zip| #### Cassandra deep storage From edf9a28a0b8c8a6ab4e088ccebf6d60b38f289f7 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 14:28:07 +0800 Subject: [PATCH 09/11] Fix test --- .../org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java | 2 +- .../apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java index 2afd068a768d..bcbff961e521 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsDataSegmentPusher.java @@ -77,7 +77,7 @@ public HdfsDataSegmentPusher( public HdfsDataSegmentPusher( HdfsDataSegmentPusherConfig config, @Hdfs Configuration hadoopConfig, - @Nullable ServiceEmitter emitter + ServiceEmitter emitter ) { this.config = config; diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java index 482c47acee2b..a5111d2159c0 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModuleTest.java @@ -28,6 +28,8 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.inputsource.hdfs.HdfsInputSourceConfig; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.junit.Assert; import org.junit.Test; @@ -85,6 +87,7 @@ private Injector makeInjectorWithProperties(final Properties props) binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator()); binder.bind(JsonConfigurator.class).in(LazySingleton.class); binder.bind(Properties.class).toInstance(props); + binder.bind(ServiceEmitter.class).toInstance(new ServiceEmitter("test", "localhost", new NoopEmitter())); }, new HdfsStorageDruidModule() ) From b556296c7dd60cf32b9fdad9ca20b8db40e8d8f2 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Wed, 4 Feb 2026 14:35:52 +0800 Subject: [PATCH 10/11] Remove unused --- .../org/apache/druid/utils/CompressionUtils.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java index cd83755968e3..eabec58ef15e 100644 --- a/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java +++ b/processing/src/main/java/org/apache/druid/utils/CompressionUtils.java @@ -771,21 +771,6 @@ public static boolean isGz(String fName) return fName.endsWith(Format.GZ.getSuffix()) && fName.length() > Format.GZ.getSuffix().length(); } - /** - * Checks to see if fName is a valid name for a "*.lz4" file - * - * @param fName The name of the file in question - * - * @return True if fName is a properly named .lz4 file, false otherwise - */ - public static boolean isLz4(String fName) - { - if (Strings.isNullOrEmpty(fName)) { - return false; - } - return fName.endsWith(Format.LZ4.getSuffix()); - } - /** * Get the file name without the .gz extension * From 06b4775ecea9c8658dd965306af5b3054e11cea4 Mon Sep 17 00:00:00 2001 From: Frank Chen Date: Thu, 12 Feb 2026 15:45:48 +0800 Subject: [PATCH 11/11] Add doc for metrics --- docs/operations/metrics.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index aa1c25a693ad..fc0126db000d 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -616,3 +616,14 @@ These metrics are available on operating systems with the cgroup kernel feature. |`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to the process. Derived from `cpuset.effective_cpus`.||Varies| |`cgroup/cpuset/mems_count`|Total number of memory nodes available to the process. Derived from `cpuset.mems`.||Varies| |`cgroup/cpuset/effective_mems_count`|Total number of active memory nodes available to the process. Derived from `cpuset.effective_mems`.||Varies| + +## HDFS + +These metrics are available when the `druid-hdfs-storage` extension is used to push/pull segment files. + +| Metric | Description | Dimensions | Normal value | +|----------------------|-------------------------------------------------------------------------------|---------------------------------------------|--------------| +| `hdfs/pull/size` | Total bytes of decompressed segment files pulled from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies | +| `hdfs/pull/duration` | Time in milliseconds spent decompressing and pulling segment files from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies | +| `hdfs/push/size` | Total bytes compressed segment files pushed to HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies | +| `hdfs/push/duration` | Time in milliseconds spent compressing and pushing segment files to HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies |