Skip to content

Commit

Permalink
feat: Add storage statistics in IoStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Jan 29, 2025
1 parent ce273fa commit 2f4a29c
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 27 deletions.
19 changes: 12 additions & 7 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ std::string ReadFile::pread(uint64_t offset, uint64_t length) const {

uint64_t ReadFile::preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
auto fileSize = size();
uint64_t numRead = 0;
if (offset >= fileSize) {
Expand All @@ -87,7 +88,8 @@ uint64_t ReadFile::preadv(

uint64_t ReadFile::preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const {
folly::Range<folly::IOBuf*> iobufs,
io::IoStatistics* stats) const {
VELOX_CHECK_EQ(regions.size(), iobufs.size());
uint64_t length = 0;
for (size_t i = 0; i < regions.size(); ++i) {
Expand Down Expand Up @@ -195,7 +197,8 @@ LocalReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {

uint64_t LocalReadFile::preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
// Dropped bytes sized so that a typical dropped range of 50K is not
// too many iovecs.
static thread_local std::vector<char> droppedBytes(16 * 1024);
Expand Down Expand Up @@ -251,16 +254,18 @@ uint64_t LocalReadFile::preadv(

folly::SemiFuture<uint64_t> LocalReadFile::preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
if (!executor_) {
return ReadFile::preadvAsync(offset, buffers);
return ReadFile::preadvAsync(offset, buffers, stats);
}
auto [promise, future] = folly::makePromiseContract<uint64_t>();
executor_->add([this,
_promise = std::move(promise),
_offset = offset,
_buffers = buffers]() mutable {
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
_buffers = buffers,
_stats = stats]() mutable {
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers, _stats);
_promise.setTry(std::move(delegateFuture).getTry());
});
return std::move(future);
Expand Down
18 changes: 12 additions & 6 deletions velox/common/file/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

#include "velox/common/base/Exceptions.h"
#include "velox/common/file/Region.h"
#include "velox/common/io/IoStatistics.h"

namespace facebook::velox {

Expand Down Expand Up @@ -67,7 +68,8 @@ class ReadFile {
// This method should be thread safe.
virtual uint64_t preadv(
uint64_t /*offset*/,
const std::vector<folly::Range<char*>>& /*buffers*/) const;
const std::vector<folly::Range<char*>>& /*buffers*/,
io::IoStatistics* stats) const;

// Vectorized read API. Implementations can coalesce and parallelize.
// The offsets don't need to be sorted.
Expand All @@ -82,7 +84,8 @@ class ReadFile {
// This method should be thread safe.
virtual uint64_t preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const;
folly::Range<folly::IOBuf*> iobufs,
io::IoStatistics* stats) const;

/// Like preadv but may execute asynchronously and returns the read size or
/// exception via SemiFuture. Use hasPreadvAsync() to check if the
Expand All @@ -91,9 +94,10 @@ class ReadFile {
/// This method should be thread safe.
virtual folly::SemiFuture<uint64_t> preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
try {
return folly::SemiFuture<uint64_t>(preadv(offset, buffers));
return folly::SemiFuture<uint64_t>(preadv(offset, buffers, stats));
} catch (const std::exception& e) {
return folly::makeSemiFuture<uint64_t>(e);
}
Expand Down Expand Up @@ -285,11 +289,13 @@ class LocalReadFile final : public ReadFile {

uint64_t preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const final;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const final;

folly::SemiFuture<uint64_t> preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const override;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const override;

bool hasPreadvAsync() const override {
return executor_ != nullptr;
Expand Down
16 changes: 10 additions & 6 deletions velox/common/file/tests/FaultyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,35 @@ FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {

uint64_t FaultyReadFile::preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
if (injectionHook_ != nullptr) {
FaultFileReadvOperation op(path_, offset, buffers);
injectionHook_(&op);
if (!op.delegate) {
return op.readBytes;
}
}
return delegatedFile_->preadv(offset, buffers);
return delegatedFile_->preadv(offset, buffers, stats);
}

folly::SemiFuture<uint64_t> FaultyReadFile::preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
// TODO: add fault injection for async read later.
if (delegatedFile_->hasPreadvAsync() || executor_ == nullptr) {
return delegatedFile_->preadvAsync(offset, buffers);
return delegatedFile_->preadvAsync(offset, buffers, stats);
}
auto promise = std::make_unique<folly::Promise<uint64_t>>();
folly::SemiFuture<uint64_t> future = promise->getSemiFuture();
executor_->add([this,
_promise = std::move(promise),
_offset = offset,
_buffers = buffers]() {
auto delegateFuture = delegatedFile_->preadvAsync(_offset, _buffers);
_buffers = buffers,
_stats = stats]() {
auto delegateFuture =
delegatedFile_->preadvAsync(_offset, _buffers, _stats);
_promise->setValue(delegateFuture.wait().value());
});
return future;
Expand Down
6 changes: 4 additions & 2 deletions velox/common/file/tests/FaultyFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class FaultyReadFile : public ReadFile {

uint64_t preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const override;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const override;

uint64_t memoryUsage() const override {
return delegatedFile_->memoryUsage();
Expand All @@ -67,7 +68,8 @@ class FaultyReadFile : public ReadFile {

folly::SemiFuture<uint64_t> preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const override;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const override;

private:
const std::string path_;
Expand Down
35 changes: 32 additions & 3 deletions velox/common/io/IoStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ IoStatistics::operationStats() const {
return operationStats_;
}

std::unordered_map<std::string, RuntimeMetric> IoStatistics::storageStats()
const {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
return storageStats_;
}

void IoStatistics::addStorageStats(
const std::string& name,
const RuntimeCounter& counter) {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
if (storageStats_.count(name) == 0) {
storageStats_.emplace(name, RuntimeMetric(counter.unit));
} else {
VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit);
}
storageStats_.at(name).addValue(counter.value);
}

void IoStatistics::merge(const IoStatistics& other) {
rawBytesRead_ += other.rawBytesRead_;
rawBytesWritten_ += other.rawBytesWritten_;
Expand All @@ -119,9 +137,20 @@ void IoStatistics::merge(const IoStatistics& other) {
ramHit_.merge(other.ramHit_);
ssdRead_.merge(other.ssdRead_);
queryThreadIoLatency_.merge(other.queryThreadIoLatency_);
std::lock_guard<std::mutex> l(operationStatsMutex_);
for (auto& item : other.operationStats_) {
operationStats_[item.first].merge(item.second);
{
const auto& otherOperationStats = other.operationStats();
std::lock_guard<std::mutex> l(operationStatsMutex_);
for (auto& item : otherOperationStats) {
operationStats_[item.first].merge(item.second);
}
}

{
const auto& otherStorageStats = other.storageStats();
std::lock_guard<std::mutex> storageStatsLock(storageStatsMutex_);
for (auto& item : otherStorageStats) {
storageStats_[item.first].merge(item.second);
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions velox/common/io/IoStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <unordered_map>

#include <folly/dynamic.h>
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/RuntimeMetrics.h"

namespace facebook::velox::io {

Expand Down Expand Up @@ -140,6 +142,9 @@ class IoStatistics {
const uint64_t partialThrottleCount = 0);

std::unordered_map<std::string, OperationCounters> operationStats() const;
std::unordered_map<std::string, RuntimeMetric> storageStats() const;

void addStorageStats(const std::string& name, const RuntimeCounter& counter);

void merge(const IoStatistics& other);

Expand Down Expand Up @@ -172,7 +177,9 @@ class IoStatistics {
IoCounter queryThreadIoLatency_;

std::unordered_map<std::string, OperationCounters> operationStats_;
std::unordered_map<std::string, RuntimeMetric> storageStats_;
mutable std::mutex operationStatsMutex_;
mutable std::mutex storageStatsMutex_;
};

} // namespace facebook::velox::io
5 changes: 5 additions & 0 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,11 @@ std::unordered_map<std::string, RuntimeCounter> HiveDataSource::runtimeStats() {
if (numBucketConversion_ > 0) {
res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)});
}
for (const auto& storageStats : ioStats_->storageStats()) {
res.emplace(
storageStats.first,
RuntimeCounter(storageStats.second.sum, storageStats.second.unit));
}
return res;
}

Expand Down
6 changes: 3 additions & 3 deletions velox/dwio/common/InputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ void ReadFileInputStream::read(
LogType logType) {
const int64_t bufferSize = totalBufferSize(buffers);
logRead(offset, bufferSize, logType);
const auto size = readFile_->preadv(offset, buffers);
const auto size = readFile_->preadv(offset, buffers, stats_);
VELOX_CHECK_EQ(
size,
bufferSize,
Expand All @@ -118,7 +118,7 @@ folly::SemiFuture<uint64_t> ReadFileInputStream::readAsync(
LogType logType) {
const int64_t bufferSize = totalBufferSize(buffers);
logRead(offset, bufferSize, logType);
return readFile_->preadvAsync(offset, buffers);
return readFile_->preadvAsync(offset, buffers, stats_);
}

bool ReadFileInputStream::hasReadAsync() const {
Expand All @@ -137,7 +137,7 @@ void ReadFileInputStream::vread(
[&](size_t acc, const auto& r) { return acc + r.length; });
logRead(regions[0].offset, length, purpose);
auto readStartMicros = getCurrentTimeMicro();
readFile_->preadv(regions, iobufs);
readFile_->preadv(regions, iobufs, stats_);
if (stats_) {
stats_->incRawBytesRead(length);
stats_->incTotalScanTime((getCurrentTimeMicro() - readStartMicros) * 1000);
Expand Down

0 comments on commit 2f4a29c

Please sign in to comment.