From 55d3dd3e98e57c81967773ead6aedfd018bce986 Mon Sep 17 00:00:00 2001 From: Ke Date: Tue, 28 Jan 2025 23:26:35 -0800 Subject: [PATCH] feat: Add storage statistics in IoStatistics --- velox/common/caching/SsdFile.cpp | 2 +- velox/common/file/File.cpp | 19 ++++++++----- velox/common/file/File.h | 18 ++++++++---- velox/common/file/FileInputStream.cpp | 2 +- velox/common/file/tests/FaultyFile.cpp | 16 +++++++---- velox/common/file/tests/FaultyFile.h | 6 ++-- velox/common/file/tests/FileTest.cpp | 12 ++++---- velox/common/io/IoStatistics.cpp | 35 ++++++++++++++++++++++-- velox/common/io/IoStatistics.h | 7 +++++ velox/connectors/hive/HiveDataSource.cpp | 5 ++++ velox/dwio/common/InputStream.cpp | 6 ++-- 11 files changed, 93 insertions(+), 35 deletions(-) diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index d65b4e139cb1d..36a852b40cedc 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -244,7 +244,7 @@ void SsdFile::read( uint64_t offset, const std::vector>& buffers) { process::TraceContext trace("SsdFile::read"); - readFile_->preadv(offset, buffers); + readFile_->preadv(offset, buffers, nullptr); } std::optional> SsdFile::getSpace( diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 1739f8e8cb52d..69bbcd7172ff6 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -67,7 +67,8 @@ std::string ReadFile::pread(uint64_t offset, uint64_t length) const { uint64_t ReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { auto fileSize = size(); uint64_t numRead = 0; if (offset >= fileSize) { @@ -87,7 +88,8 @@ uint64_t ReadFile::preadv( uint64_t ReadFile::preadv( folly::Range regions, - folly::Range iobufs) const { + folly::Range iobufs, + io::IoStatistics* stats) const { VELOX_CHECK_EQ(regions.size(), iobufs.size()); uint64_t length = 0; for (size_t i = 0; i < regions.size(); ++i) { @@ -195,7 +197,8 @@ LocalReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { uint64_t LocalReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { // Dropped bytes sized so that a typical dropped range of 50K is not // too many iovecs. static thread_local std::vector droppedBytes(16 * 1024); @@ -251,16 +254,18 @@ uint64_t LocalReadFile::preadv( folly::SemiFuture LocalReadFile::preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { if (!executor_) { - return ReadFile::preadvAsync(offset, buffers); + return ReadFile::preadvAsync(offset, buffers, stats); } auto [promise, future] = folly::makePromiseContract(); executor_->add([this, _promise = std::move(promise), _offset = offset, - _buffers = buffers]() mutable { - auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers); + _buffers = buffers, + _stats = stats]() mutable { + auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers, _stats); _promise.setTry(std::move(delegateFuture).getTry()); }); return std::move(future); diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 832d098e303a2..7f5bdbaee60bb 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -40,6 +40,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/file/Region.h" +#include "velox/common/io/IoStatistics.h" namespace facebook::velox { @@ -67,7 +68,8 @@ class ReadFile { // This method should be thread safe. virtual uint64_t preadv( uint64_t /*offset*/, - const std::vector>& /*buffers*/) const; + const std::vector>& /*buffers*/, + io::IoStatistics* stats) const; // Vectorized read API. Implementations can coalesce and parallelize. // The offsets don't need to be sorted. @@ -82,7 +84,8 @@ class ReadFile { // This method should be thread safe. virtual uint64_t preadv( folly::Range regions, - folly::Range iobufs) const; + folly::Range iobufs, + io::IoStatistics* stats) const; /// Like preadv but may execute asynchronously and returns the read size or /// exception via SemiFuture. Use hasPreadvAsync() to check if the @@ -91,9 +94,10 @@ class ReadFile { /// This method should be thread safe. virtual folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { try { - return folly::SemiFuture(preadv(offset, buffers)); + return folly::SemiFuture(preadv(offset, buffers, stats)); } catch (const std::exception& e) { return folly::makeSemiFuture(e); } @@ -285,11 +289,13 @@ class LocalReadFile final : public ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const final; + const std::vector>& buffers, + io::IoStatistics* stats) const final; folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats) const override; bool hasPreadvAsync() const override { return executor_ != nullptr; diff --git a/velox/common/file/FileInputStream.cpp b/velox/common/file/FileInputStream.cpp index 56efdc4e1c3a0..65930dcc25208 100644 --- a/velox/common/file/FileInputStream.cpp +++ b/velox/common/file/FileInputStream.cpp @@ -218,7 +218,7 @@ void FileInputStream::maybeIssueReadahead() { } std::vector> ranges; ranges.emplace_back(nextBuffer()->asMutable(), size); - readAheadWait_ = file_->preadvAsync(fileOffset_, ranges); + readAheadWait_ = file_->preadvAsync(fileOffset_, ranges, nullptr); VELOX_CHECK(readAheadWait_.valid()); } diff --git a/velox/common/file/tests/FaultyFile.cpp b/velox/common/file/tests/FaultyFile.cpp index a5984113f901f..774b9d13e1d63 100644 --- a/velox/common/file/tests/FaultyFile.cpp +++ b/velox/common/file/tests/FaultyFile.cpp @@ -44,7 +44,8 @@ FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { uint64_t FaultyReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { if (injectionHook_ != nullptr) { FaultFileReadvOperation op(path_, offset, buffers); injectionHook_(&op); @@ -52,23 +53,26 @@ uint64_t FaultyReadFile::preadv( return op.readBytes; } } - return delegatedFile_->preadv(offset, buffers); + return delegatedFile_->preadv(offset, buffers, stats); } folly::SemiFuture FaultyReadFile::preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { // TODO: add fault injection for async read later. if (delegatedFile_->hasPreadvAsync() || executor_ == nullptr) { - return delegatedFile_->preadvAsync(offset, buffers); + return delegatedFile_->preadvAsync(offset, buffers, stats); } auto promise = std::make_unique>(); folly::SemiFuture future = promise->getSemiFuture(); executor_->add([this, _promise = std::move(promise), _offset = offset, - _buffers = buffers]() { - auto delegateFuture = delegatedFile_->preadvAsync(_offset, _buffers); + _buffers = buffers, + _stats = stats]() { + auto delegateFuture = + delegatedFile_->preadvAsync(_offset, _buffers, _stats); _promise->setValue(delegateFuture.wait().value()); }); return future; diff --git a/velox/common/file/tests/FaultyFile.h b/velox/common/file/tests/FaultyFile.h index 6d73eaebe6256..6d560cfc98076 100644 --- a/velox/common/file/tests/FaultyFile.h +++ b/velox/common/file/tests/FaultyFile.h @@ -40,7 +40,8 @@ class FaultyReadFile : public ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats) const override; uint64_t memoryUsage() const override { return delegatedFile_->memoryUsage(); @@ -67,7 +68,8 @@ class FaultyReadFile : public ReadFile { folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats) const override; private: const std::string path_; diff --git a/velox/common/file/tests/FileTest.cpp b/velox/common/file/tests/FileTest.cpp index 9ea54ceb95959..7b3289b5532cc 100644 --- a/velox/common/file/tests/FileTest.cpp +++ b/velox/common/file/tests/FileTest.cpp @@ -105,7 +105,7 @@ void readData( (char*)(uint64_t)(15 + kOneMB - 500000 - sizeof(head) - sizeof(middle) - sizeof(tail))), folly::Range(tail, sizeof(tail))}; - ASSERT_EQ(15 + kOneMB, readFile->preadv(0, buffers)); + ASSERT_EQ(15 + kOneMB, readFile->preadv(0, buffers, nullptr)); ASSERT_EQ(std::string_view(head, sizeof(head)), "aaaaabbbbbcc"); ASSERT_EQ(std::string_view(middle, sizeof(middle)), "cccc"); ASSERT_EQ(std::string_view(tail, sizeof(tail)), "ccddddd"); @@ -113,7 +113,7 @@ void readData( std::vector> buffers1 = { folly::Range(head, sizeof(head)), folly::Range(nullptr, (char*)(uint64_t)500000)}; - auto future1 = readFile->preadvAsync(0, buffers1); + auto future1 = readFile->preadvAsync(0, buffers1, nullptr); const auto offset1 = sizeof(head) + 500000; std::vector> buffers2 = { folly::Range(middle, sizeof(middle)), @@ -121,11 +121,11 @@ void readData( nullptr, (char*)(uint64_t)(15 + kOneMB - offset1 - sizeof(middle) - sizeof(tail)))}; - auto future2 = readFile->preadvAsync(offset1, buffers2); + auto future2 = readFile->preadvAsync(offset1, buffers2, nullptr); std::vector> buffers3 = { folly::Range(tail, sizeof(tail))}; const auto offset2 = 15 + kOneMB - sizeof(tail); - auto future3 = readFile->preadvAsync(offset2, buffers3); + auto future3 = readFile->preadvAsync(offset2, buffers3, nullptr); ASSERT_EQ(offset1, future1.wait().value()); ASSERT_EQ(offset2 - offset1, future2.wait().value()); ASSERT_EQ(sizeof(tail), future3.wait().value()); @@ -164,7 +164,7 @@ TEST(InMemoryFile, preadv) { {5 + 5 + kOneMB + 2, 3UL, {}}}; std::vector iobufs(readRegions.size()); - readFile.preadv(readRegions, {iobufs.data(), iobufs.size()}); + readFile.preadv(readRegions, {iobufs.data(), iobufs.size()}, nullptr); std::vector values; values.reserve(iobufs.size()); for (auto& iobuf : iobufs) { @@ -526,7 +526,7 @@ class FaultyFsTest : public ::testing::Test { } else { std::vector> buffers; buffers.push_back(folly::Range(readBuf, buffer_.size())); - file->preadv(0, buffers); + file->preadv(0, buffers, nullptr); } for (int i = 0; i < buffer_.size(); ++i) { if (buffer_[i] != readBuf[i]) { diff --git a/velox/common/io/IoStatistics.cpp b/velox/common/io/IoStatistics.cpp index 7dfddc6dc4831..69bf031254903 100644 --- a/velox/common/io/IoStatistics.cpp +++ b/velox/common/io/IoStatistics.cpp @@ -108,6 +108,24 @@ IoStatistics::operationStats() const { return operationStats_; } +std::unordered_map IoStatistics::storageStats() + const { + std::lock_guard lock{storageStatsMutex_}; + return storageStats_; +} + +void IoStatistics::addStorageStats( + const std::string& name, + const RuntimeCounter& counter) { + std::lock_guard lock{storageStatsMutex_}; + if (storageStats_.count(name) == 0) { + storageStats_.emplace(name, RuntimeMetric(counter.unit)); + } else { + VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit); + } + storageStats_.at(name).addValue(counter.value); +} + void IoStatistics::merge(const IoStatistics& other) { rawBytesRead_ += other.rawBytesRead_; rawBytesWritten_ += other.rawBytesWritten_; @@ -119,9 +137,20 @@ void IoStatistics::merge(const IoStatistics& other) { ramHit_.merge(other.ramHit_); ssdRead_.merge(other.ssdRead_); queryThreadIoLatency_.merge(other.queryThreadIoLatency_); - std::lock_guard l(operationStatsMutex_); - for (auto& item : other.operationStats_) { - operationStats_[item.first].merge(item.second); + { + const auto& otherOperationStats = other.operationStats(); + std::lock_guard l(operationStatsMutex_); + for (auto& item : otherOperationStats) { + operationStats_[item.first].merge(item.second); + } + } + + { + const auto& otherStorageStats = other.storageStats(); + std::lock_guard storageStatsLock(storageStatsMutex_); + for (auto& item : otherStorageStats) { + storageStats_[item.first].merge(item.second); + } } } diff --git a/velox/common/io/IoStatistics.h b/velox/common/io/IoStatistics.h index 2111a8877b475..6d31e0dfef144 100644 --- a/velox/common/io/IoStatistics.h +++ b/velox/common/io/IoStatistics.h @@ -23,6 +23,8 @@ #include #include +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/RuntimeMetrics.h" namespace facebook::velox::io { @@ -140,6 +142,9 @@ class IoStatistics { const uint64_t partialThrottleCount = 0); std::unordered_map operationStats() const; + std::unordered_map storageStats() const; + + void addStorageStats(const std::string& name, const RuntimeCounter& counter); void merge(const IoStatistics& other); @@ -172,7 +177,9 @@ class IoStatistics { IoCounter queryThreadIoLatency_; std::unordered_map operationStats_; + std::unordered_map storageStats_; mutable std::mutex operationStatsMutex_; + mutable std::mutex storageStatsMutex_; }; } // namespace facebook::velox::io diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 0edfb3a0ea7cb..dbac54e81889d 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -513,6 +513,11 @@ std::unordered_map HiveDataSource::runtimeStats() { if (numBucketConversion_ > 0) { res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)}); } + for (const auto& storageStats : ioStats_->storageStats()) { + res.emplace( + storageStats.first, + RuntimeCounter(storageStats.second.sum, storageStats.second.unit)); + } return res; } diff --git a/velox/dwio/common/InputStream.cpp b/velox/dwio/common/InputStream.cpp index e2dd496c16d88..f019fd841c0c5 100644 --- a/velox/dwio/common/InputStream.cpp +++ b/velox/dwio/common/InputStream.cpp @@ -101,7 +101,7 @@ void ReadFileInputStream::read( LogType logType) { const int64_t bufferSize = totalBufferSize(buffers); logRead(offset, bufferSize, logType); - const auto size = readFile_->preadv(offset, buffers); + const auto size = readFile_->preadv(offset, buffers, stats_); VELOX_CHECK_EQ( size, bufferSize, @@ -118,7 +118,7 @@ folly::SemiFuture ReadFileInputStream::readAsync( LogType logType) { const int64_t bufferSize = totalBufferSize(buffers); logRead(offset, bufferSize, logType); - return readFile_->preadvAsync(offset, buffers); + return readFile_->preadvAsync(offset, buffers, stats_); } bool ReadFileInputStream::hasReadAsync() const { @@ -137,7 +137,7 @@ void ReadFileInputStream::vread( [&](size_t acc, const auto& r) { return acc + r.length; }); logRead(regions[0].offset, length, purpose); auto readStartMicros = getCurrentTimeMicro(); - readFile_->preadv(regions, iobufs); + readFile_->preadv(regions, iobufs, stats_); if (stats_) { stats_->incRawBytesRead(length); stats_->incTotalScanTime((getCurrentTimeMicro() - readStartMicros) * 1000);