Skip to content

Commit f6089d8

Browse files
committed
feat: Add storage stats into IoStatistics
1 parent 9002fc9 commit f6089d8

20 files changed

+83
-26
lines changed

velox/common/caching/CachedFactory.h

+37-10
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ template <
146146
typename Value,
147147
typename Generator,
148148
typename Properties = void,
149+
typename Stats = void,
149150
typename Sizer = DefaultSizer<Value>,
150151
typename Comparator = std::equal_to<Key>,
151152
typename Hash = std::hash<Key>>
@@ -178,7 +179,8 @@ class CachedFactory {
178179
/// will probably mess with your memory model, so really try to avoid it.
179180
CachedPtr<Key, Value, Comparator, Hash> generate(
180181
const Key& key,
181-
const Properties* properties = nullptr);
182+
const Properties* properties = nullptr,
183+
Stats* ioStats = nullptr);
182184

183185
/// Looks up the cache entry of the given key if it exists, otherwise returns
184186
/// null.
@@ -358,17 +360,25 @@ template <
358360
typename Value,
359361
typename Generator,
360362
typename Properties,
363+
typename Stats,
361364
typename Sizer,
362365
typename Comparator,
363366
typename Hash>
364-
CachedPtr<Key, Value, Comparator, Hash>
365-
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
366-
generate(const Key& key, const Properties* properties) {
367+
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
368+
Key,
369+
Value,
370+
Generator,
371+
Properties,
372+
Stats,
373+
Sizer,
374+
Comparator,
375+
Hash>::
376+
generate(const Key& key, const Properties* properties, Stats* ioStats) {
367377
process::TraceContext trace("CachedFactory::generate");
368378
if (cache_ == nullptr) {
369379
return CachedPtr<Key, Value, Comparator, Hash>{
370380
/*fromCache=*/false,
371-
(*generator_)(key, properties).release(),
381+
(*generator_)(key, properties, ioStats).release(),
372382
nullptr,
373383
std::make_unique<Key>(key)};
374384
}
@@ -408,7 +418,8 @@ CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
408418
pendingCv_.notify_all();
409419
};
410420

411-
std::unique_ptr<Value> generatedValue = (*generator_)(key, properties);
421+
std::unique_ptr<Value> generatedValue =
422+
(*generator_)(key, properties, ioStats);
412423
const uint64_t valueSize = Sizer()(*generatedValue);
413424
Value* rawValue = generatedValue.release();
414425
const bool inserted = addCache(key, rawValue, valueSize);
@@ -433,12 +444,19 @@ template <
433444
typename Value,
434445
typename Generator,
435446
typename Properties,
447+
typename Stats,
436448
typename Sizer,
437449
typename Comparator,
438450
typename Hash>
439-
CachedPtr<Key, Value, Comparator, Hash>
440-
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::get(
441-
const Key& key) {
451+
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
452+
Key,
453+
Value,
454+
Generator,
455+
Properties,
456+
Stats,
457+
Sizer,
458+
Comparator,
459+
Hash>::get(const Key& key) {
442460
if (cache_ == nullptr) {
443461
return {};
444462
}
@@ -460,10 +478,19 @@ template <
460478
typename Value,
461479
typename Generator,
462480
typename Properties,
481+
typename Stats,
463482
typename Sizer,
464483
typename Comparator,
465484
typename Hash>
466-
void CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
485+
void CachedFactory<
486+
Key,
487+
Value,
488+
Generator,
489+
Properties,
490+
Stats,
491+
Sizer,
492+
Comparator,
493+
Hash>::
467494
retrieveCached(
468495
const std::vector<Key>& keys,
469496
std::vector<std::pair<Key, CachedPtr<Key, Value, Comparator, Hash>>>&

velox/common/file/FileSystems.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ class LocalFileSystem : public FileSystem {
115115

116116
std::unique_ptr<ReadFile> openFileForRead(
117117
std::string_view path,
118-
const FileOptions& options) override {
118+
const FileOptions& options,
119+
io::IoStatistics* ioStats) override {
119120
return std::make_unique<LocalReadFile>(
120121
extractPath(path), executor_.get(), options.bufferIo);
121122
}

velox/common/file/FileSystems.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#pragma once
1717

1818
#include "velox/common/base/Exceptions.h"
19+
#include "velox/common/io/IoStatistics.h"
1920
#include "velox/common/memory/MemoryPool.h"
2021

2122
#include <functional>
@@ -103,7 +104,8 @@ class FileSystem {
103104
/// Returns a ReadFile handle for a given file path
104105
virtual std::unique_ptr<ReadFile> openFileForRead(
105106
std::string_view path,
106-
const FileOptions& options = {}) = 0;
107+
const FileOptions& options = {},
108+
io::IoStatistics* ioStats = {}) = 0;
107109

108110
/// Returns a WriteFile handle for a given file path
109111
virtual std::unique_ptr<WriteFile> openFileForWrite(

velox/common/file/tests/FaultyFileSystem.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class FaultyFileSystem : public FileSystem {
5454

5555
std::unique_ptr<ReadFile> openFileForRead(
5656
std::string_view path,
57-
const FileOptions& options = {}) override;
57+
const FileOptions& options = {},
58+
io::IoStatistics* ioStats = {}) override;
5859

5960
std::unique_ptr<WriteFile> openFileForWrite(
6061
std::string_view path,

velox/common/io/IoStatistics.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ IoStatistics::operationStats() const {
108108
return operationStats_;
109109
}
110110

111+
std::unordered_map<std::string, RuntimeCounter>* IoStatistics::storageStats() {
112+
std::lock_guard<std::mutex> lock{storageStatsMutex_};
113+
return &storageStats_;
114+
}
115+
111116
void IoStatistics::merge(const IoStatistics& other) {
112117
rawBytesRead_ += other.rawBytesRead_;
113118
rawBytesWritten_ += other.rawBytesWritten_;

velox/common/io/IoStatistics.h

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <unordered_map>
2424

2525
#include <folly/dynamic.h>
26+
#include "velox/common/base/RuntimeMetrics.h"
2627

2728
namespace facebook::velox::io {
2829

@@ -140,6 +141,7 @@ class IoStatistics {
140141
const uint64_t partialThrottleCount = 0);
141142

142143
std::unordered_map<std::string, OperationCounters> operationStats() const;
144+
std::unordered_map<std::string, RuntimeCounter>* storageStats();
143145

144146
void merge(const IoStatistics& other);
145147

@@ -172,7 +174,9 @@ class IoStatistics {
172174
IoCounter queryThreadIoLatency_;
173175

174176
std::unordered_map<std::string, OperationCounters> operationStats_;
177+
std::unordered_map<std::string, RuntimeCounter> storageStats_;
175178
mutable std::mutex operationStatsMutex_;
179+
mutable std::mutex storageStatsMutex_;
176180
};
177181

178182
} // namespace facebook::velox::io

velox/connectors/hive/FileHandle.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ std::string groupName(const std::string& filename) {
4141

4242
std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
4343
const std::string& filename,
44-
const FileProperties* properties) {
44+
const FileProperties* properties,
45+
io::IoStatistics* ioStats) {
4546
// We have seen cases where drivers are stuck when creating file handles.
4647
// Adding a trace here to spot this more easily in future.
4748
process::TraceContext trace("FileHandleGenerator::operator()");
@@ -55,7 +56,7 @@ std::unique_ptr<FileHandle> FileHandleGenerator::operator()(
5556
options.fileSize = properties->fileSize;
5657
}
5758
fileHandle->file = filesystems::getFileSystem(filename, properties_)
58-
->openFileForRead(filename, options);
59+
->openFileForRead(filename, options, ioStats);
5960
fileHandle->uuid = StringIdLease(fileIds(), filename);
6061
fileHandle->groupId = StringIdLease(fileIds(), groupName(filename));
6162
VLOG(1) << "Generating file handle for: " << filename

velox/connectors/hive/FileHandle.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "velox/common/caching/FileIds.h"
3030
#include "velox/common/config/Config.h"
3131
#include "velox/common/file/File.h"
32+
#include "velox/common/io/IoStatistics.h"
3233
#include "velox/connectors/hive/FileProperties.h"
3334

3435
namespace facebook::velox {
@@ -69,7 +70,8 @@ class FileHandleGenerator {
6970
: properties_(std::move(properties)) {}
7071
std::unique_ptr<FileHandle> operator()(
7172
const std::string& filename,
72-
const FileProperties* properties);
73+
const FileProperties* properties,
74+
io::IoStatistics* ioStats);
7375

7476
private:
7577
const std::shared_ptr<const config::ConfigBase> properties_;
@@ -80,6 +82,7 @@ using FileHandleFactory = CachedFactory<
8082
FileHandle,
8183
FileHandleGenerator,
8284
FileProperties,
85+
io::IoStatistics,
8386
FileHandleSizer>;
8487

8588
using FileHandleCachedPtr = CachedPtr<std::string, FileHandle>;

velox/connectors/hive/HiveDataSource.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,9 @@ std::unordered_map<std::string, RuntimeCounter> HiveDataSource::runtimeStats() {
513513
if (numBucketConversion_ > 0) {
514514
res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)});
515515
}
516+
for (const auto& storageStats : *ioStats_->storageStats()) {
517+
res.insert({storageStats.first, storageStats.second});
518+
}
516519
return res;
517520
}
518521

velox/connectors/hive/SplitReader.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ RowTypePtr SplitReader::createReader() {
229229
try {
230230
fileHandleCachePtr = fileHandleFactory_->generate(
231231
hiveSplit_->filePath,
232-
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties
233-
: nullptr);
232+
hiveSplit_->properties.has_value() ? &*hiveSplit_->properties : nullptr,
233+
ioStats_.get());
234234
VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get());
235235
} catch (const VeloxRuntimeError& e) {
236236
if (e.errorCode() == error_code::kFileNotFound &&

velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ std::string AbfsFileSystem::name() const {
208208

209209
std::unique_ptr<ReadFile> AbfsFileSystem::openFileForRead(
210210
std::string_view path,
211-
const FileOptions& options) {
211+
const FileOptions& options,
212+
io::IoStatistics* ioStats) {
212213
auto abfsfile = std::make_unique<AbfsReadFile>(path, *config_);
213214
abfsfile->initialize(options);
214215
return abfsfile;

velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ class AbfsFileSystem : public FileSystem {
4040

4141
std::unique_ptr<ReadFile> openFileForRead(
4242
std::string_view path,
43-
const FileOptions& options = {}) override;
43+
const FileOptions& options = {},
44+
io::IoStatistics* ioStats = {}) override;
4445

4546
std::unique_ptr<WriteFile> openFileForWrite(
4647
std::string_view path,

velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ void GcsFileSystem::initializeClient() {
340340

341341
std::unique_ptr<ReadFile> GcsFileSystem::openFileForRead(
342342
std::string_view path,
343-
const FileOptions& options) {
343+
const FileOptions& options,
344+
io::IoStatistics* ioStats) {
344345
const auto gcspath = gcsPath(path);
345346
auto gcsfile = std::make_unique<GcsReadFile>(gcspath, impl_->getClient());
346347
gcsfile->initialize(options);

velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ class GcsFileSystem : public FileSystem {
4141
/// [[https://cloud.google.com/storage/docs/samples/storage-stream-file-download]].
4242
std::unique_ptr<ReadFile> openFileForRead(
4343
std::string_view path,
44-
const FileOptions& options = {}) override;
44+
const FileOptions& options = {},
45+
io::IoStatistics* ioStats = {}) override;
4546

4647
/// Initialize a WriteFile
4748
/// First the method google::cloud::storage::Client::GetObjectMetadata

velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ std::string HdfsFileSystem::name() const {
9090

9191
std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
9292
std::string_view path,
93-
const FileOptions& /*unused*/) {
93+
const FileOptions& /*unused*/,
94+
io::IoStatistics* ioStats) {
9495
// Only remove the schema for hdfs path.
9596
if (path.find(kScheme) == 0) {
9697
path.remove_prefix(kScheme.length());

velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class HdfsFileSystem : public FileSystem {
5555

5656
std::unique_ptr<ReadFile> openFileForRead(
5757
std::string_view path,
58-
const FileOptions& options = {}) override;
58+
const FileOptions& options = {},
59+
io::IoStatistics* ioStats = {}) override;
5960

6061
std::unique_ptr<WriteFile> openFileForWrite(
6162
std::string_view path,

velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,8 @@ std::string S3FileSystem::getLogLevelName() const {
740740

741741
std::unique_ptr<ReadFile> S3FileSystem::openFileForRead(
742742
std::string_view s3Path,
743-
const FileOptions& options) {
743+
const FileOptions& options,
744+
io::IoStatistics* ioStats) {
744745
const auto path = getPath(s3Path);
745746
auto s3file = std::make_unique<S3ReadFile>(path, impl_->s3Client());
746747
s3file->initialize(options);

velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ class S3FileSystem : public FileSystem {
3737

3838
std::unique_ptr<ReadFile> openFileForRead(
3939
std::string_view s3Path,
40-
const FileOptions& options = {}) override;
40+
const FileOptions& options = {},
41+
io::IoStatistics* ioStats = {}) override;
4142

4243
std::unique_ptr<WriteFile> openFileForWrite(
4344
std::string_view s3Path,

velox/dwio/common/Throttler.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache(
270270
std::unique_ptr<Throttler::ThrottleSignal>
271271
Throttler::ThrottleSignalGenerator::operator()(
272272
const std::string& /*unused*/,
273+
const void* /*unused*/,
273274
const void* /*unused*/) {
274275
return std::unique_ptr<ThrottleSignal>(new ThrottleSignal{1});
275276
}

velox/dwio/common/Throttler.h

+1
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ class Throttler {
175175

176176
std::unique_ptr<ThrottleSignal> operator()(
177177
const std::string& /*unused*/,
178+
const void* /*unused*/,
178179
const void* /*unused*/);
179180
};
180181

0 commit comments

Comments
 (0)