Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ This deep storage is used to interface with HDFS. You must load the `druid-hdfs-
|Property|Description|Default|
|--------|-----------|-------|
|`druid.storage.storageDirectory`|HDFS directory to use as deep storage.|none|
|`druid.storage.compressionFormat`|The compression format applied to the segments uploaded to HDFS. Only `zip` and `lz4` are supported. |zip|

#### Cassandra deep storage

Expand Down
11 changes: 11 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -616,3 +616,14 @@ These metrics are available on operating systems with the cgroup kernel feature.
|`cgroup/cpuset/effective_cpu_count`|Total number of active CPUs available to the process. Derived from `cpuset.effective_cpus`.||Varies|
|`cgroup/cpuset/mems_count`|Total number of memory nodes available to the process. Derived from `cpuset.mems`.||Varies|
|`cgroup/cpuset/effective_mems_count`|Total number of active memory nodes available to the process. Derived from `cpuset.effective_mems`.||Varies|

## HDFS

These metrics are available when the `druid-hdfs-storage` extension is used to push/pull segment files.

| Metric | Description | Dimensions | Normal value |
|----------------------|-------------------------------------------------------------------------------|---------------------------------------------|--------------|
| `hdfs/pull/size` | Total bytes of decompressed segment files pulled from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies |
| `hdfs/pull/duration` | Time in milliseconds spent decompressing and pulling segment files from HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies |
| `hdfs/push/size` | Total bytes compressed segment files pushed to HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies |
| `hdfs/push/duration` | Time in milliseconds spent compressing and pushing segment files to HDFS. | `format`: compression format (`ZIP`, `LZ4`) | Varies |
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.io.NativeIO;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.URIDataPuller;
import org.apache.druid.utils.CompressionUtils;
Expand All @@ -40,6 +42,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

import javax.annotation.Nullable;
import javax.tools.FileObject;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -177,11 +180,21 @@ public boolean delete()

private static final Logger log = new Logger(HdfsDataSegmentPuller.class);
protected final Configuration config;
private final ServiceEmitter emitter;

@Inject
public HdfsDataSegmentPuller(@Hdfs final Configuration config)
{
this(config, null);
}

@Inject
public HdfsDataSegmentPuller(
@Hdfs final Configuration config,
@Nullable final ServiceEmitter emitter
)
{
this.config = config;
this.emitter = emitter;
}

FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) throws SegmentLoadingException
Expand Down Expand Up @@ -235,27 +248,16 @@ FileUtils.FileCopyResult getSegmentFiles(final Path path, final File outDir) thr
catch (Exception e) {
throw new RuntimeException(e);
}
} else if (CompressionUtils.isZip(path.getName())) {
}

// -------- zip ---------
Comment on lines -238 to -240
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope of this PR, but seems like it may be better to use a factory class instead to handle this logic...

Maybe something to do in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to do refactoring in a separated pr

// Try to detect format from file extension and decompress
final CompressionUtils.Format format = CompressionUtils.Format.fromFileName(path.getName());
if ((format == CompressionUtils.Format.ZIP || format == CompressionUtils.Format.LZ4)) {
long startTime = System.currentTimeMillis();

final FileUtils.FileCopyResult result = CompressionUtils.unzip(
new ByteSource()
{
@Override
public InputStream openStream() throws IOException
{
return getInputStream(path);
}
}, outDir, shouldRetryPredicate(), false
);
final FileUtils.FileCopyResult result = format.decompressDirectory(getInputStream(path), outDir);

log.info(
"Unzipped %d bytes from [%s] to [%s]",
result.size(),
path.toString(),
outDir.getAbsolutePath()
);
emitMetrics(format, result.size(), System.currentTimeMillis() - startTime);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance that emitMetrics can also apply for Gz / Directory? Since we are emitting for hdfs/pull/duration + hdfs/pull/size.

There is an expectation for metrics to be emitted for non-zip/lz4 too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently only zip is saved in hdfs, gz and dir should be old code path. no need to support it.


return result;
} else if (CompressionUtils.isGz(path.getName())) {
Expand Down Expand Up @@ -292,6 +294,17 @@ public InputStream openStream() throws IOException
}
}

private void emitMetrics(CompressionUtils.Format format, long size, long duration)
{
if (emitter == null) {
return;
}
ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
metricBuilder.setDimension("format", format);
emitter.emit(metricBuilder.setMetric("hdfs/pull/size", size));
emitter.emit(metricBuilder.setMetric("hdfs/pull/duration", duration));
}

public InputStream getInputStream(Path path) throws IOException
{
return buildFileObject(path.toUri(), config).openInputStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.storage.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
Expand All @@ -32,17 +31,19 @@
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HadoopFsWrapper;
import org.apache.hadoop.fs.Path;
import org.joda.time.format.ISODateTimeFormat;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
Expand All @@ -55,20 +56,33 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
{
private static final Logger log = new Logger(HdfsDataSegmentPusher.class);

private final HdfsDataSegmentPusherConfig config;
private final Configuration hadoopConfig;
@Nullable
private final ServiceEmitter emitter;

// We lazily initialize fullQualifiedStorageDirectory to avoid potential issues with Hadoop namenode HA.
// Please see https://github.com/apache/druid/pull/5684
private final Supplier<String> fullyQualifiedStorageDirectory;

public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
@Hdfs Configuration hadoopConfig
)
{
this(config, hadoopConfig, null);
}

@Inject
public HdfsDataSegmentPusher(
HdfsDataSegmentPusherConfig config,
@Hdfs Configuration hadoopConfig,
ObjectMapper jsonMapper
ServiceEmitter emitter
)
{
this.config = config;
this.hadoopConfig = hadoopConfig;
this.emitter = emitter;
Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = Suppliers.memoize(
() -> {
Expand Down Expand Up @@ -105,28 +119,34 @@ public DataSegment push(final File inDir, final DataSegment segment, final boole
// '{partitionNum}_index.zip' without unique paths and '{partitionNum}_{UUID}_index.zip' with unique paths.
final String storageDir = this.getStorageDir(segment, false);


final String uniquePrefix = useUniquePath ? DataSegmentPusher.generateUniquePath() + "_" : "";
final String outIndexFilePathSuffix = StringUtils.format(
"%s/%s/%d_%sindex.zip",
final String outIndexFilePath = StringUtils.format(
"%s/%s/%d_%sindex%s",
fullyQualifiedStorageDirectory.get(),
storageDir,
segment.getShardSpec().getPartitionNum(),
uniquePrefix
uniquePrefix,
config.getCompressionFormat().getSuffix()
);

return pushToFilePathWithRetry(inDir, segment, outIndexFilePathSuffix);
return pushToFilePathWithRetry(inDir, segment, outIndexFilePath);
}

@Override
public DataSegment pushToPath(File inDir, DataSegment segment, String storageDirSuffix) throws IOException
{
String outIndexFilePath = StringUtils.format(
"%s/%s/%d_index.zip",
fullyQualifiedStorageDirectory.get(),
storageDirSuffix.replace(':', '_'),
segment.getShardSpec().getPartitionNum()
);
final String outIndexFilePath;
if (storageDirSuffix.endsWith("index.zip") || storageDirSuffix.endsWith("index.lz4")) {
outIndexFilePath = StringUtils.format("%s/%s", fullyQualifiedStorageDirectory.get(), storageDirSuffix);
} else {
outIndexFilePath = StringUtils.format(
"%s/%s/%d_index%s",
fullyQualifiedStorageDirectory.get(),
storageDirSuffix.replace(':', '_'),
segment.getShardSpec().getPartitionNum(),
config.getCompressionFormat().getSuffix()
);
}

return pushToFilePathWithRetry(inDir, segment, outIndexFilePath);
}
Expand All @@ -136,11 +156,17 @@ private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, Str
{
// Retry any HDFS errors that occur, up to 5 times.
try {
return RetryUtils.retry(
final long startTime = System.currentTimeMillis();

final DataSegment result = RetryUtils.retry(
() -> pushToFilePath(inDir, segment, outIndexFilePath),
exception -> exception instanceof Exception,
5
);

emitMetrics(result.getSize(), System.currentTimeMillis() - startTime);

return result;
}
catch (Exception e) {
Throwables.throwIfInstanceOf(e, IOException.class);
Expand All @@ -149,21 +175,32 @@ private DataSegment pushToFilePathWithRetry(File inDir, DataSegment segment, Str
}
}

private void emitMetrics(long size, long duration)
{
if (emitter == null) {
return;
}

final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
metricBuilder.setDimension("format", config.getCompressionFormat());
emitter.emit(metricBuilder.setMetric("hdfs/push/size", size));
emitter.emit(metricBuilder.setMetric("hdfs/push/duration", duration));
}

private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIndexFilePath) throws IOException
{
log.debug(
"Copying segment[%s] to HDFS at location[%s/%s]",
"Copying segment[%s] to HDFS at location[%s]",
segment.getId(),
fullyQualifiedStorageDirectory.get(),
outIndexFilePath
);

Path tmpIndexFile = new Path(StringUtils.format(
"%s/%s/%s/%s_index.zip",
"%s/%s/%s/%s_index%s",
fullyQualifiedStorageDirectory.get(),
segment.getDataSource(),
UUIDUtils.generateUuid(),
segment.getShardSpec().getPartitionNum()
segment.getShardSpec().getPartitionNum(),
config.getCompressionFormat().getSuffix()
));
FileSystem fs = tmpIndexFile.getFileSystem(hadoopConfig);

Expand All @@ -174,7 +211,7 @@ private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIn
final DataSegment dataSegment;
try {
try (FSDataOutputStream out = fs.create(tmpIndexFile)) {
size = CompressionUtils.zip(inDir, out);
size = config.getCompressionFormat().compressDirectory(inDir, out);
}
final Path outIndexFile = new Path(outIndexFilePath);
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
Expand All @@ -185,6 +222,10 @@ private DataSegment pushToFilePath(File inDir, DataSegment segment, String outIn
fs.mkdirs(outIndexFile.getParent());
copyFilesWithChecks(fs, tmpIndexFile, outIndexFile);
}
catch (IOException e) {
log.error(e, "Failed to push segment[%s] to HDFS at location[%s]", segment.getId(), outIndexFilePath);
throw e;
}
finally {
try {
if (fs.exists(tmpIndexFile.getParent()) && !fs.delete(tmpIndexFile.getParent(), true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.storage.hdfs;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.utils.CompressionUtils;

/**
*/
Expand All @@ -28,6 +29,9 @@ public class HdfsDataSegmentPusherConfig
@JsonProperty
private String storageDirectory = "";

@JsonProperty
private CompressionUtils.Format compressionFormat = CompressionUtils.Format.ZIP;

public void setStorageDirectory(String storageDirectory)
{
this.storageDirectory = storageDirectory;
Expand All @@ -37,4 +41,14 @@ public String getStorageDirectory()
{
return storageDirectory;
}

public void setCompressionFormat(CompressionUtils.Format compressionFormat)
{
this.compressionFormat = compressionFormat;
}

public CompressionUtils.Format getCompressionFormat()
{
return compressionFormat;
}
}
Loading
Loading