Skip to content

Commit 7f10dc6

Browse files
committed
feat: Add storage statistics in IoStatistics
1 parent ce273fa commit 7f10dc6

File tree

10 files changed

+87
-29
lines changed

10 files changed

+87
-29
lines changed

velox/common/caching/SsdFile.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ void SsdFile::read(
244244
uint64_t offset,
245245
const std::vector<folly::Range<char*>>& buffers) {
246246
process::TraceContext trace("SsdFile::read");
247-
readFile_->preadv(offset, buffers);
247+
readFile_->preadv(offset, buffers, nullptr);
248248
}
249249

250250
std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(

velox/common/file/File.cpp

+12-7
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ std::string ReadFile::pread(uint64_t offset, uint64_t length) const {
6767

6868
uint64_t ReadFile::preadv(
6969
uint64_t offset,
70-
const std::vector<folly::Range<char*>>& buffers) const {
70+
const std::vector<folly::Range<char*>>& buffers,
71+
io::IoStatistics* stats) const {
7172
auto fileSize = size();
7273
uint64_t numRead = 0;
7374
if (offset >= fileSize) {
@@ -87,7 +88,8 @@ uint64_t ReadFile::preadv(
8788

8889
uint64_t ReadFile::preadv(
8990
folly::Range<const common::Region*> regions,
90-
folly::Range<folly::IOBuf*> iobufs) const {
91+
folly::Range<folly::IOBuf*> iobufs,
92+
io::IoStatistics* stats) const {
9193
VELOX_CHECK_EQ(regions.size(), iobufs.size());
9294
uint64_t length = 0;
9395
for (size_t i = 0; i < regions.size(); ++i) {
@@ -195,7 +197,8 @@ LocalReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {
195197

196198
uint64_t LocalReadFile::preadv(
197199
uint64_t offset,
198-
const std::vector<folly::Range<char*>>& buffers) const {
200+
const std::vector<folly::Range<char*>>& buffers,
201+
io::IoStatistics* stats) const {
199202
// Dropped bytes sized so that a typical dropped range of 50K is not
200203
// too many iovecs.
201204
static thread_local std::vector<char> droppedBytes(16 * 1024);
@@ -251,16 +254,18 @@ uint64_t LocalReadFile::preadv(
251254

252255
folly::SemiFuture<uint64_t> LocalReadFile::preadvAsync(
253256
uint64_t offset,
254-
const std::vector<folly::Range<char*>>& buffers) const {
257+
const std::vector<folly::Range<char*>>& buffers,
258+
io::IoStatistics* stats) const {
255259
if (!executor_) {
256-
return ReadFile::preadvAsync(offset, buffers);
260+
return ReadFile::preadvAsync(offset, buffers, stats);
257261
}
258262
auto [promise, future] = folly::makePromiseContract<uint64_t>();
259263
executor_->add([this,
260264
_promise = std::move(promise),
261265
_offset = offset,
262-
_buffers = buffers]() mutable {
263-
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
266+
_buffers = buffers,
267+
_stats = stats]() mutable {
268+
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers, _stats);
264269
_promise.setTry(std::move(delegateFuture).getTry());
265270
});
266271
return std::move(future);

velox/common/file/File.h

+12-6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
#include "velox/common/base/Exceptions.h"
4242
#include "velox/common/file/Region.h"
43+
#include "velox/common/io/IoStatistics.h"
4344

4445
namespace facebook::velox {
4546

@@ -67,7 +68,8 @@ class ReadFile {
6768
// This method should be thread safe.
6869
virtual uint64_t preadv(
6970
uint64_t /*offset*/,
70-
const std::vector<folly::Range<char*>>& /*buffers*/) const;
71+
const std::vector<folly::Range<char*>>& /*buffers*/,
72+
io::IoStatistics* stats) const;
7173

7274
// Vectorized read API. Implementations can coalesce and parallelize.
7375
// The offsets don't need to be sorted.
@@ -82,7 +84,8 @@ class ReadFile {
8284
// This method should be thread safe.
8385
virtual uint64_t preadv(
8486
folly::Range<const common::Region*> regions,
85-
folly::Range<folly::IOBuf*> iobufs) const;
87+
folly::Range<folly::IOBuf*> iobufs,
88+
io::IoStatistics* stats) const;
8689

8790
/// Like preadv but may execute asynchronously and returns the read size or
8891
/// exception via SemiFuture. Use hasPreadvAsync() to check if the
@@ -91,9 +94,10 @@ class ReadFile {
9194
/// This method should be thread safe.
9295
virtual folly::SemiFuture<uint64_t> preadvAsync(
9396
uint64_t offset,
94-
const std::vector<folly::Range<char*>>& buffers) const {
97+
const std::vector<folly::Range<char*>>& buffers,
98+
io::IoStatistics* stats) const {
9599
try {
96-
return folly::SemiFuture<uint64_t>(preadv(offset, buffers));
100+
return folly::SemiFuture<uint64_t>(preadv(offset, buffers, stats));
97101
} catch (const std::exception& e) {
98102
return folly::makeSemiFuture<uint64_t>(e);
99103
}
@@ -285,11 +289,13 @@ class LocalReadFile final : public ReadFile {
285289

286290
uint64_t preadv(
287291
uint64_t offset,
288-
const std::vector<folly::Range<char*>>& buffers) const final;
292+
const std::vector<folly::Range<char*>>& buffers,
293+
io::IoStatistics* stats) const final;
289294

290295
folly::SemiFuture<uint64_t> preadvAsync(
291296
uint64_t offset,
292-
const std::vector<folly::Range<char*>>& buffers) const override;
297+
const std::vector<folly::Range<char*>>& buffers,
298+
io::IoStatistics* stats) const override;
293299

294300
bool hasPreadvAsync() const override {
295301
return executor_ != nullptr;

velox/common/file/FileInputStream.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ void FileInputStream::maybeIssueReadahead() {
218218
}
219219
std::vector<folly::Range<char*>> ranges;
220220
ranges.emplace_back(nextBuffer()->asMutable<char>(), size);
221-
readAheadWait_ = file_->preadvAsync(fileOffset_, ranges);
221+
readAheadWait_ = file_->preadvAsync(fileOffset_, ranges, nullptr);
222222
VELOX_CHECK(readAheadWait_.valid());
223223
}
224224

velox/common/file/tests/FaultyFile.cpp

+10-6
Original file line numberDiff line numberDiff line change
@@ -44,31 +44,35 @@ FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {
4444

4545
uint64_t FaultyReadFile::preadv(
4646
uint64_t offset,
47-
const std::vector<folly::Range<char*>>& buffers) const {
47+
const std::vector<folly::Range<char*>>& buffers,
48+
io::IoStatistics* stats) const {
4849
if (injectionHook_ != nullptr) {
4950
FaultFileReadvOperation op(path_, offset, buffers);
5051
injectionHook_(&op);
5152
if (!op.delegate) {
5253
return op.readBytes;
5354
}
5455
}
55-
return delegatedFile_->preadv(offset, buffers);
56+
return delegatedFile_->preadv(offset, buffers, stats);
5657
}
5758

5859
folly::SemiFuture<uint64_t> FaultyReadFile::preadvAsync(
5960
uint64_t offset,
60-
const std::vector<folly::Range<char*>>& buffers) const {
61+
const std::vector<folly::Range<char*>>& buffers,
62+
io::IoStatistics* stats) const {
6163
// TODO: add fault injection for async read later.
6264
if (delegatedFile_->hasPreadvAsync() || executor_ == nullptr) {
63-
return delegatedFile_->preadvAsync(offset, buffers);
65+
return delegatedFile_->preadvAsync(offset, buffers, stats);
6466
}
6567
auto promise = std::make_unique<folly::Promise<uint64_t>>();
6668
folly::SemiFuture<uint64_t> future = promise->getSemiFuture();
6769
executor_->add([this,
6870
_promise = std::move(promise),
6971
_offset = offset,
70-
_buffers = buffers]() {
71-
auto delegateFuture = delegatedFile_->preadvAsync(_offset, _buffers);
72+
_buffers = buffers,
73+
_stats = stats]() {
74+
auto delegateFuture =
75+
delegatedFile_->preadvAsync(_offset, _buffers, _stats);
7276
_promise->setValue(delegateFuture.wait().value());
7377
});
7478
return future;

velox/common/file/tests/FaultyFile.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ class FaultyReadFile : public ReadFile {
4040

4141
uint64_t preadv(
4242
uint64_t offset,
43-
const std::vector<folly::Range<char*>>& buffers) const override;
43+
const std::vector<folly::Range<char*>>& buffers,
44+
io::IoStatistics* stats) const override;
4445

4546
uint64_t memoryUsage() const override {
4647
return delegatedFile_->memoryUsage();
@@ -67,7 +68,8 @@ class FaultyReadFile : public ReadFile {
6768

6869
folly::SemiFuture<uint64_t> preadvAsync(
6970
uint64_t offset,
70-
const std::vector<folly::Range<char*>>& buffers) const override;
71+
const std::vector<folly::Range<char*>>& buffers,
72+
io::IoStatistics* stats) const override;
7173

7274
private:
7375
const std::string path_;

velox/common/io/IoStatistics.cpp

+32-3
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,24 @@ IoStatistics::operationStats() const {
108108
return operationStats_;
109109
}
110110

111+
std::unordered_map<std::string, RuntimeMetric> IoStatistics::storageStats()
112+
const {
113+
std::lock_guard<std::mutex> lock{storageStatsMutex_};
114+
return storageStats_;
115+
}
116+
117+
void IoStatistics::addStorageStats(
118+
const std::string& name,
119+
const RuntimeCounter& counter) {
120+
std::lock_guard<std::mutex> lock{storageStatsMutex_};
121+
if (storageStats_.count(name) == 0) {
122+
storageStats_.emplace(name, RuntimeMetric(counter.unit));
123+
} else {
124+
VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit);
125+
}
126+
storageStats_.at(name).addValue(counter.value);
127+
}
128+
111129
void IoStatistics::merge(const IoStatistics& other) {
112130
rawBytesRead_ += other.rawBytesRead_;
113131
rawBytesWritten_ += other.rawBytesWritten_;
@@ -119,9 +137,20 @@ void IoStatistics::merge(const IoStatistics& other) {
119137
ramHit_.merge(other.ramHit_);
120138
ssdRead_.merge(other.ssdRead_);
121139
queryThreadIoLatency_.merge(other.queryThreadIoLatency_);
122-
std::lock_guard<std::mutex> l(operationStatsMutex_);
123-
for (auto& item : other.operationStats_) {
124-
operationStats_[item.first].merge(item.second);
140+
{
141+
const auto& otherOperationStats = other.operationStats();
142+
std::lock_guard<std::mutex> l(operationStatsMutex_);
143+
for (auto& item : otherOperationStats) {
144+
operationStats_[item.first].merge(item.second);
145+
}
146+
}
147+
148+
{
149+
const auto& otherStorageStats = other.storageStats();
150+
std::lock_guard<std::mutex> storageStatsLock(storageStatsMutex_);
151+
for (auto& item : otherStorageStats) {
152+
storageStats_[item.first].merge(item.second);
153+
}
125154
}
126155
}
127156

velox/common/io/IoStatistics.h

+7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <unordered_map>
2424

2525
#include <folly/dynamic.h>
26+
#include "velox/common/base/Exceptions.h"
27+
#include "velox/common/base/RuntimeMetrics.h"
2628

2729
namespace facebook::velox::io {
2830

@@ -140,6 +142,9 @@ class IoStatistics {
140142
const uint64_t partialThrottleCount = 0);
141143

142144
std::unordered_map<std::string, OperationCounters> operationStats() const;
145+
std::unordered_map<std::string, RuntimeMetric> storageStats() const;
146+
147+
void addStorageStats(const std::string& name, const RuntimeCounter& counter);
143148

144149
void merge(const IoStatistics& other);
145150

@@ -172,7 +177,9 @@ class IoStatistics {
172177
IoCounter queryThreadIoLatency_;
173178

174179
std::unordered_map<std::string, OperationCounters> operationStats_;
180+
std::unordered_map<std::string, RuntimeMetric> storageStats_;
175181
mutable std::mutex operationStatsMutex_;
182+
mutable std::mutex storageStatsMutex_;
176183
};
177184

178185
} // namespace facebook::velox::io

velox/connectors/hive/HiveDataSource.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,11 @@ std::unordered_map<std::string, RuntimeCounter> HiveDataSource::runtimeStats() {
513513
if (numBucketConversion_ > 0) {
514514
res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)});
515515
}
516+
for (const auto& storageStats : ioStats_->storageStats()) {
517+
res.emplace(
518+
storageStats.first,
519+
RuntimeCounter(storageStats.second.sum, storageStats.second.unit));
520+
}
516521
return res;
517522
}
518523

velox/dwio/common/InputStream.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ void ReadFileInputStream::read(
101101
LogType logType) {
102102
const int64_t bufferSize = totalBufferSize(buffers);
103103
logRead(offset, bufferSize, logType);
104-
const auto size = readFile_->preadv(offset, buffers);
104+
const auto size = readFile_->preadv(offset, buffers, stats_);
105105
VELOX_CHECK_EQ(
106106
size,
107107
bufferSize,
@@ -118,7 +118,7 @@ folly::SemiFuture<uint64_t> ReadFileInputStream::readAsync(
118118
LogType logType) {
119119
const int64_t bufferSize = totalBufferSize(buffers);
120120
logRead(offset, bufferSize, logType);
121-
return readFile_->preadvAsync(offset, buffers);
121+
return readFile_->preadvAsync(offset, buffers, stats_);
122122
}
123123

124124
bool ReadFileInputStream::hasReadAsync() const {
@@ -137,7 +137,7 @@ void ReadFileInputStream::vread(
137137
[&](size_t acc, const auto& r) { return acc + r.length; });
138138
logRead(regions[0].offset, length, purpose);
139139
auto readStartMicros = getCurrentTimeMicro();
140-
readFile_->preadv(regions, iobufs);
140+
readFile_->preadv(regions, iobufs, stats_);
141141
if (stats_) {
142142
stats_->incRawBytesRead(length);
143143
stats_->incTotalScanTime((getCurrentTimeMicro() - readStartMicros) * 1000);

0 commit comments

Comments
 (0)