From c4a808f8dbc57ef6ac99ac51022af1a842ce277c Mon Sep 17 00:00:00 2001 From: Ke Date: Thu, 30 Jan 2025 18:44:24 -0800 Subject: [PATCH] feat: Collect storage stat for Tables scan (#12210) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12210 Test Plan: Deployed to a test cluster Java: 20250125_082712_05968_cwjy8 S1-wsInRegionReadBytes 985.81 MB 1232 322.87 KB 1.00 MB CPP: 20250125_095038_00000_sgud6 S1-TableScan.0.wsInRegionReadBytes 908.81 MB 154 5.80 MB 5.97 MB Reviewed By: yuandagits Differential Revision: D68872215 Pulled By: kewang1024 --- velox/common/file/File.cpp | 49 ++++++++++----- velox/common/file/File.h | 59 ++++++++++++++----- velox/common/file/tests/FaultyFile.cpp | 25 +++++--- velox/common/file/tests/FaultyFile.h | 13 ++-- velox/common/io/IoStatistics.cpp | 35 ++++++++++- velox/common/io/IoStatistics.h | 7 +++ velox/connectors/hive/HiveDataSource.cpp | 5 ++ .../storage_adapters/abfs/AbfsFileSystem.cpp | 43 +++++++++----- .../hive/storage_adapters/abfs/AbfsReadFile.h | 18 ++++-- .../storage_adapters/gcs/GcsFileSystem.cpp | 15 +++-- .../storage_adapters/hdfs/HdfsReadFile.cpp | 12 +++- .../hive/storage_adapters/hdfs/HdfsReadFile.h | 12 +++- .../storage_adapters/s3fs/S3FileSystem.cpp | 13 ++-- velox/dwio/common/CachedBufferedInput.h | 3 +- velox/dwio/common/DirectBufferedInput.h | 3 +- velox/dwio/common/InputStream.cpp | 8 +-- velox/dwio/common/tests/TestBufferedInput.cpp | 20 ++++--- velox/dwio/dwrf/test/TestReadFile.h | 12 ++-- velox/dwio/dwrf/test/TestStripeStream.cpp | 7 ++- 19 files changed, 261 insertions(+), 98 deletions(-) diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 1739f8e8cb52..ab7d2cf3c9d6 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -57,17 +57,21 @@ T getAttribute( } } // namespace -std::string ReadFile::pread(uint64_t offset, uint64_t length) const { +std::string ReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { std::string buf; buf.resize(length); - auto res = pread(offset, length, buf.data()); + auto res = pread(offset, length, buf.data(), stats); buf.resize(res.size()); return buf; } uint64_t ReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { auto fileSize = size(); uint64_t numRead = 0; if (offset >= fileSize) { @@ -77,7 +81,7 @@ uint64_t ReadFile::preadv( auto copySize = std::min(range.size(), fileSize - offset); // NOTE: skip the gap in case of coalesce io. if (range.data() != nullptr) { - pread(offset, copySize, range.data()); + pread(offset, copySize, range.data(), stats); } offset += copySize; numRead += copySize; @@ -87,14 +91,15 @@ uint64_t ReadFile::preadv( uint64_t ReadFile::preadv( folly::Range regions, - folly::Range iobufs) const { + folly::Range iobufs, + io::IoStatistics* stats) const { VELOX_CHECK_EQ(regions.size(), iobufs.size()); uint64_t length = 0; for (size_t i = 0; i < regions.size(); ++i) { const auto& region = regions[i]; auto& output = iobufs[i]; output = folly::IOBuf(folly::IOBuf::CREATE, region.length); - pread(region.offset, region.length, output.writableData()); + pread(region.offset, region.length, output.writableData(), stats); output.append(region.length); length += region.length; } @@ -102,14 +107,20 @@ uint64_t ReadFile::preadv( return length; } -std::string_view -InMemoryReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view InMemoryReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { bytesRead_ += length; memcpy(buf, file_.data() + offset, length); return {static_cast(buf), length}; } -std::string InMemoryReadFile::pread(uint64_t offset, uint64_t length) const { +std::string InMemoryReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { bytesRead_ += length; return std::string(file_.data() + offset, length); } @@ -187,15 +198,19 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) folly::errnoStr(errno)); } -std::string_view -LocalReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view LocalReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { preadInternal(offset, length, static_cast(buf)); return {static_cast(buf), length}; } uint64_t LocalReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { // Dropped bytes sized so that a typical dropped range of 50K is not // too many iovecs. static thread_local std::vector droppedBytes(16 * 1024); @@ -251,16 +266,18 @@ uint64_t LocalReadFile::preadv( folly::SemiFuture LocalReadFile::preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { if (!executor_) { - return ReadFile::preadvAsync(offset, buffers); + return ReadFile::preadvAsync(offset, buffers, stats); } auto [promise, future] = folly::makePromiseContract(); executor_->add([this, _promise = std::move(promise), _offset = offset, - _buffers = buffers]() mutable { - auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers); + _buffers = buffers, + _stats = stats]() mutable { + auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers, _stats); _promise.setTry(std::move(delegateFuture).getTry()); }); return std::move(future); diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 832d098e303a..aeacbe93986d 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -40,6 +40,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/file/Region.h" +#include "velox/common/io/IoStatistics.h" namespace facebook::velox { @@ -51,23 +52,36 @@ class ReadFile { // Reads the data at [offset, offset + length) into the provided pre-allocated // buffer 'buf'. The bytes are returned as a string_view pointing to 'buf'. // + // 'stats' is an IoStatistics pointer passed in by the caller to collect stats + // for this read operation. + // // This method should be thread safe. - virtual std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const = 0; + virtual std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats = nullptr) const = 0; // Same as above, but returns owned data directly. // // This method should be thread safe. - virtual std::string pread(uint64_t offset, uint64_t length) const; + virtual std::string pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats = nullptr) const; // Reads starting at 'offset' into the memory referenced by the // Ranges in 'buffers'. The buffers are filled left to right. A // buffer with nullptr data will cause its size worth of bytes to be skipped. // + // 'stats' is an IoStatistics pointer passed in by the caller to collect stats + // for this read operation. + // // This method should be thread safe. virtual uint64_t preadv( uint64_t /*offset*/, - const std::vector>& /*buffers*/) const; + const std::vector>& /*buffers*/, + io::IoStatistics* stats = nullptr) const; // Vectorized read API. Implementations can coalesce and parallelize. // The offsets don't need to be sorted. @@ -79,21 +93,29 @@ class ReadFile { // Returns the total number of bytes read, which might be different than the // sum of all buffer sizes (for example, if coalescing was used). // + // 'stats' is an IoStatistics pointer passed in by the caller to collect stats + // for this read operation. + // // This method should be thread safe. virtual uint64_t preadv( folly::Range regions, - folly::Range iobufs) const; + folly::Range iobufs, + io::IoStatistics* stats = nullptr) const; /// Like preadv but may execute asynchronously and returns the read size or /// exception via SemiFuture. Use hasPreadvAsync() to check if the /// implementation is in fact asynchronous. /// + /// 'stats' is an IoStatistics pointer passed in by the caller to collect + /// stats for this read operation. + /// /// This method should be thread safe. virtual folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const { try { - return folly::SemiFuture(preadv(offset, buffers)); + return folly::SemiFuture(preadv(offset, buffers, stats)); } catch (const std::exception& e) { return folly::makeSemiFuture(e); } @@ -213,10 +235,14 @@ class InMemoryReadFile : public ReadFile { explicit InMemoryReadFile(std::string file) : ownedFile_(std::move(file)), file_(ownedFile_) {} - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const override; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats = nullptr) const override; - std::string pread(uint64_t offset, uint64_t length) const override; + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const override; uint64_t size() const final { return file_.size(); @@ -278,18 +304,23 @@ class LocalReadFile final : public ReadFile { ~LocalReadFile(); - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const final; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats = nullptr) const final; uint64_t size() const final; uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const final; + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const final; folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const override; bool hasPreadvAsync() const override { return executor_ != nullptr; diff --git a/velox/common/file/tests/FaultyFile.cpp b/velox/common/file/tests/FaultyFile.cpp index a5984113f901..dbdb47f493a0 100644 --- a/velox/common/file/tests/FaultyFile.cpp +++ b/velox/common/file/tests/FaultyFile.cpp @@ -30,8 +30,11 @@ FaultyReadFile::FaultyReadFile( VELOX_CHECK_NOT_NULL(delegatedFile_); } -std::string_view -FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view FaultyReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { if (injectionHook_ != nullptr) { FaultFileReadOperation op(path_, offset, length, buf); injectionHook_(&op); @@ -39,12 +42,13 @@ FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { return std::string_view(static_cast(op.buf), op.length); } } - return delegatedFile_->pread(offset, length, buf); + return delegatedFile_->pread(offset, length, buf, stats); } uint64_t FaultyReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { if (injectionHook_ != nullptr) { FaultFileReadvOperation op(path_, offset, buffers); injectionHook_(&op); @@ -52,23 +56,26 @@ uint64_t FaultyReadFile::preadv( return op.readBytes; } } - return delegatedFile_->preadv(offset, buffers); + return delegatedFile_->preadv(offset, buffers, stats); } folly::SemiFuture FaultyReadFile::preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { // TODO: add fault injection for async read later. if (delegatedFile_->hasPreadvAsync() || executor_ == nullptr) { - return delegatedFile_->preadvAsync(offset, buffers); + return delegatedFile_->preadvAsync(offset, buffers, stats); } auto promise = std::make_unique>(); folly::SemiFuture future = promise->getSemiFuture(); executor_->add([this, _promise = std::move(promise), _offset = offset, - _buffers = buffers]() { - auto delegateFuture = delegatedFile_->preadvAsync(_offset, _buffers); + _buffers = buffers, + _stats = stats]() { + auto delegateFuture = + delegatedFile_->preadvAsync(_offset, _buffers, _stats); _promise->setValue(delegateFuture.wait().value()); }); return future; diff --git a/velox/common/file/tests/FaultyFile.h b/velox/common/file/tests/FaultyFile.h index 6d73eaebe625..e4b2bc5336e0 100644 --- a/velox/common/file/tests/FaultyFile.h +++ b/velox/common/file/tests/FaultyFile.h @@ -35,12 +35,16 @@ class FaultyReadFile : public ReadFile { return delegatedFile_->size(); } - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const override; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats = nullptr) const override; uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const override; uint64_t memoryUsage() const override { return delegatedFile_->memoryUsage(); @@ -67,7 +71,8 @@ class FaultyReadFile : public ReadFile { folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const override; private: const std::string path_; diff --git a/velox/common/io/IoStatistics.cpp b/velox/common/io/IoStatistics.cpp index 7dfddc6dc483..69bf03125490 100644 --- a/velox/common/io/IoStatistics.cpp +++ b/velox/common/io/IoStatistics.cpp @@ -108,6 +108,24 @@ IoStatistics::operationStats() const { return operationStats_; } +std::unordered_map IoStatistics::storageStats() + const { + std::lock_guard lock{storageStatsMutex_}; + return storageStats_; +} + +void IoStatistics::addStorageStats( + const std::string& name, + const RuntimeCounter& counter) { + std::lock_guard lock{storageStatsMutex_}; + if (storageStats_.count(name) == 0) { + storageStats_.emplace(name, RuntimeMetric(counter.unit)); + } else { + VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit); + } + storageStats_.at(name).addValue(counter.value); +} + void IoStatistics::merge(const IoStatistics& other) { rawBytesRead_ += other.rawBytesRead_; rawBytesWritten_ += other.rawBytesWritten_; @@ -119,9 +137,20 @@ void IoStatistics::merge(const IoStatistics& other) { ramHit_.merge(other.ramHit_); ssdRead_.merge(other.ssdRead_); queryThreadIoLatency_.merge(other.queryThreadIoLatency_); - std::lock_guard l(operationStatsMutex_); - for (auto& item : other.operationStats_) { - operationStats_[item.first].merge(item.second); + { + const auto& otherOperationStats = other.operationStats(); + std::lock_guard l(operationStatsMutex_); + for (auto& item : otherOperationStats) { + operationStats_[item.first].merge(item.second); + } + } + + { + const auto& otherStorageStats = other.storageStats(); + std::lock_guard storageStatsLock(storageStatsMutex_); + for (auto& item : otherStorageStats) { + storageStats_[item.first].merge(item.second); + } } } diff --git a/velox/common/io/IoStatistics.h b/velox/common/io/IoStatistics.h index 2111a8877b47..6d31e0dfef14 100644 --- a/velox/common/io/IoStatistics.h +++ b/velox/common/io/IoStatistics.h @@ -23,6 +23,8 @@ #include #include +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/RuntimeMetrics.h" namespace facebook::velox::io { @@ -140,6 +142,9 @@ class IoStatistics { const uint64_t partialThrottleCount = 0); std::unordered_map operationStats() const; + std::unordered_map storageStats() const; + + void addStorageStats(const std::string& name, const RuntimeCounter& counter); void merge(const IoStatistics& other); @@ -172,7 +177,9 @@ class IoStatistics { IoCounter queryThreadIoLatency_; std::unordered_map operationStats_; + std::unordered_map storageStats_; mutable std::mutex operationStatsMutex_; + mutable std::mutex storageStatsMutex_; }; } // namespace facebook::velox::io diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 0edfb3a0ea7c..dbac54e81889 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -513,6 +513,11 @@ std::unordered_map HiveDataSource::runtimeStats() { if (numBucketConversion_ > 0) { res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)}); } + for (const auto& storageStats : ioStats_->storageStats()) { + res.emplace( + storageStats.first, + RuntimeCounter(storageStats.second.sum, storageStats.second.unit)); + } return res; } diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index 7e63c2df1438..ee54b20a4766 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -58,12 +58,17 @@ class AbfsReadFile::Impl { VELOX_CHECK_GE(length_, 0); } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) const { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const { preadInternal(offset, length, static_cast(buffer)); return {static_cast(buffer), length}; } - std::string pread(uint64_t offset, uint64_t length) const { + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const { std::string result(length, 0); preadInternal(offset, length, result.data()); return result; @@ -71,7 +76,8 @@ class AbfsReadFile::Impl { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { size_t length = 0; auto size = buffers.size(); for (auto& range : buffers) { @@ -92,14 +98,15 @@ class AbfsReadFile::Impl { uint64_t preadv( folly::Range regions, - folly::Range iobufs) const { + folly::Range iobufs, + io::IoStatistics* stats) const { size_t length = 0; VELOX_CHECK_EQ(regions.size(), iobufs.size()); for (size_t i = 0; i < regions.size(); ++i) { const auto& region = regions[i]; auto& output = iobufs[i]; output = folly::IOBuf(folly::IOBuf::CREATE, region.length); - pread(region.offset, region.length, output.writableData()); + pread(region.offset, region.length, output.writableData(), stats); output.append(region.length); length += region.length; } @@ -156,25 +163,33 @@ void AbfsReadFile::initialize(const FileOptions& options) { return impl_->initialize(options); } -std::string_view -AbfsReadFile::pread(uint64_t offset, uint64_t length, void* buffer) const { - return impl_->pread(offset, length, buffer); +std::string_view AbfsReadFile::pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const { + return impl_->pread(offset, length, buffer, stats); } -std::string AbfsReadFile::pread(uint64_t offset, uint64_t length) const { - return impl_->pread(offset, length); +std::string AbfsReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { + return impl_->pread(offset, length, stats); } uint64_t AbfsReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { - return impl_->preadv(offset, buffers); + const std::vector>& buffers, + io::IoStatistics* stats) const { + return impl_->preadv(offset, buffers, stats); } uint64_t AbfsReadFile::preadv( folly::Range regions, - folly::Range iobufs) const { - return impl_->preadv(regions, iobufs); + folly::Range iobufs, + io::IoStatistics* stats) const { + return impl_->preadv(regions, iobufs, stats); } uint64_t AbfsReadFile::size() const { diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h b/velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h index 99f8b8c68595..6e084da0b9ed 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsReadFile.h @@ -31,18 +31,26 @@ class AbfsReadFile final : public ReadFile { void initialize(const FileOptions& options); - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const final; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats = nullptr) const final; - std::string pread(uint64_t offset, uint64_t length) const final; + std::string pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats = nullptr) const final; uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const final; + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const final; uint64_t preadv( folly::Range regions, - folly::Range iobufs) const final; + folly::Range iobufs, + io::IoStatistics* stats = nullptr) const final; uint64_t size() const final; diff --git a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp index f5d0830d3de3..4db3f3045d6e 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp @@ -97,13 +97,19 @@ class GcsReadFile final : public ReadFile { VELOX_CHECK_GE(length_, 0); } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats = nullptr) const override { preadInternal(offset, length, static_cast(buffer)); return {static_cast(buffer), length}; } - std::string pread(uint64_t offset, uint64_t length) const override { + std::string pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats = nullptr) const override { std::string result(length, 0); char* position = result.data(); preadInternal(offset, length, position); @@ -112,7 +118,8 @@ class GcsReadFile final : public ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override { + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const override { // 'buffers' contains Ranges(data, size) with some gaps (data = nullptr) in // between. This call must populate the ranges (except gap ranges) // sequentially starting from 'offset'. If a range pointer is nullptr, the diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index 7210f8a5d2a8..518d795fa455 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -159,12 +159,18 @@ HdfsReadFile::HdfsReadFile( HdfsReadFile::~HdfsReadFile() = default; -std::string_view -HdfsReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view HdfsReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { return pImpl->pread(offset, length, buf); } -std::string HdfsReadFile::pread(uint64_t offset, uint64_t length) const { +std::string HdfsReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { return pImpl->pread(offset, length); } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index 121fef47212d..8ae5cc5ac6a8 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -34,10 +34,16 @@ class HdfsReadFile final : public ReadFile { std::string_view path); ~HdfsReadFile() override; - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const final; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats = nullptr) const final; - std::string pread(uint64_t offset, uint64_t length) const final; + std::string pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats = nullptr) const final; uint64_t size() const final; diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index cd57ce79e846..3af3aa9e0cec 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -103,13 +103,17 @@ class S3ReadFile final : public ReadFile { VELOX_CHECK_GE(length_, 0); } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const override { preadInternal(offset, length, static_cast(buffer)); return {static_cast(buffer), length}; } - std::string pread(uint64_t offset, uint64_t length) const override { + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const override { std::string result(length, 0); char* position = result.data(); preadInternal(offset, length, position); @@ -118,7 +122,8 @@ class S3ReadFile final : public ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override { + const std::vector>& buffers, + io::IoStatistics* stats) const override { // 'buffers' contains Ranges(data, size) with some gaps (data = nullptr) in // between. This call must populate the ranges (except gap ranges) // sequentially starting from 'offset'. AWS S3 GetObject does not support diff --git a/velox/dwio/common/CachedBufferedInput.h b/velox/dwio/common/CachedBufferedInput.h index 240ae8354bd8..8f4586dbb038 100644 --- a/velox/dwio/common/CachedBufferedInput.h +++ b/velox/dwio/common/CachedBufferedInput.h @@ -67,7 +67,8 @@ class CachedBufferedInput : public BufferedInput { : BufferedInput( std::move(readFile), readerOptions.memoryPool(), - metricsLog), + metricsLog, + ioStats.get()), cache_(cache), fileNum_(fileNum), tracker_(std::move(tracker)), diff --git a/velox/dwio/common/DirectBufferedInput.h b/velox/dwio/common/DirectBufferedInput.h index 734b3bfeb026..4eaf10396431 100644 --- a/velox/dwio/common/DirectBufferedInput.h +++ b/velox/dwio/common/DirectBufferedInput.h @@ -118,7 +118,8 @@ class DirectBufferedInput : public BufferedInput { : BufferedInput( std::move(readFile), readerOptions.memoryPool(), - metricsLog), + metricsLog, + ioStats.get()), fileNum_(fileNum), tracker_(std::move(tracker)), groupId_(groupId), diff --git a/velox/dwio/common/InputStream.cpp b/velox/dwio/common/InputStream.cpp index e2dd496c16d8..e58fe41f12ae 100644 --- a/velox/dwio/common/InputStream.cpp +++ b/velox/dwio/common/InputStream.cpp @@ -78,7 +78,7 @@ void ReadFileInputStream::read( std::string_view readData; { MicrosecondTimer timer(&readTimeUs); - readData = readFile_->pread(offset, length, buf); + readData = readFile_->pread(offset, length, buf, stats_); } if (stats_) { stats_->incRawBytesRead(length); @@ -101,7 +101,7 @@ void ReadFileInputStream::read( LogType logType) { const int64_t bufferSize = totalBufferSize(buffers); logRead(offset, bufferSize, logType); - const auto size = readFile_->preadv(offset, buffers); + const auto size = readFile_->preadv(offset, buffers, stats_); VELOX_CHECK_EQ( size, bufferSize, @@ -118,7 +118,7 @@ folly::SemiFuture ReadFileInputStream::readAsync( LogType logType) { const int64_t bufferSize = totalBufferSize(buffers); logRead(offset, bufferSize, logType); - return readFile_->preadvAsync(offset, buffers); + return readFile_->preadvAsync(offset, buffers, stats_); } bool ReadFileInputStream::hasReadAsync() const { @@ -137,7 +137,7 @@ void ReadFileInputStream::vread( [&](size_t acc, const auto& r) { return acc + r.length; }); logRead(regions[0].offset, length, purpose); auto readStartMicros = getCurrentTimeMicro(); - readFile_->preadv(regions, iobufs); + readFile_->preadv(regions, iobufs, stats_); if (stats_) { stats_->incRawBytesRead(length); stats_->incTotalScanTime((getCurrentTimeMicro() - readStartMicros) * 1000); diff --git a/velox/dwio/common/tests/TestBufferedInput.cpp b/velox/dwio/common/tests/TestBufferedInput.cpp index c0795e3c0760..4032e375e47a 100644 --- a/velox/dwio/common/tests/TestBufferedInput.cpp +++ b/velox/dwio/common/tests/TestBufferedInput.cpp @@ -32,7 +32,7 @@ class ReadFileMock : public ::facebook::velox::ReadFile { MOCK_METHOD( std::string_view, pread, - (uint64_t offset, uint64_t length, void* buf), + (uint64_t offset, uint64_t length, void* buf, IoStatistics* stats), (const, override)); MOCK_METHOD(bool, shouldCoalesce, (), (const, override)); @@ -43,7 +43,9 @@ class ReadFileMock : public ::facebook::velox::ReadFile { MOCK_METHOD( uint64_t, preadv, - (folly::Range regions, folly::Range iobufs), + (folly::Range regions, + folly::Range iobufs, + IoStatistics* stats), (const, override)); }; @@ -55,11 +57,14 @@ void expectPreads( EXPECT_CALL(file, size()).WillRepeatedly(Return(content.size())); for (auto& read : reads) { ASSERT_GE(content.size(), read.offset + read.length); - EXPECT_CALL(file, pread(read.offset, read.length, _)) + EXPECT_CALL(file, pread(read.offset, read.length, _, nullptr)) .Times(1) .WillOnce( - [content](uint64_t offset, uint64_t length, void* buf) - -> std::string_view { + [content]( + uint64_t offset, + uint64_t length, + void* buf, + IoStatistics* stats) -> std::string_view { memcpy(buf, content.data() + offset, length); return {content.data() + offset, length}; }); @@ -72,12 +77,13 @@ void expectPreadvs( std::vector reads) { EXPECT_CALL(file, getName()).WillRepeatedly(Return("mock_name")); EXPECT_CALL(file, size()).WillRepeatedly(Return(content.size())); - EXPECT_CALL(file, preadv(_, _)) + EXPECT_CALL(file, preadv(_, _, nullptr)) .Times(1) .WillOnce( [content, reads]( folly::Range regions, - folly::Range iobufs) -> uint64_t { + folly::Range iobufs, + IoStatistics* stats) -> uint64_t { EXPECT_EQ(regions.size(), reads.size()); uint64_t length = 0; for (size_t i = 0; i < reads.size(); ++i) { diff --git a/velox/dwio/dwrf/test/TestReadFile.h b/velox/dwio/dwrf/test/TestReadFile.h index 30b35de2b277..7f7ffa49bf7b 100644 --- a/velox/dwio/dwrf/test/TestReadFile.h +++ b/velox/dwio/dwrf/test/TestReadFile.h @@ -38,8 +38,11 @@ class TestReadFile : public velox::ReadFile { return length_; } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats = nullptr) const override { const uint64_t content = offset + seed_; const uint64_t available = std::min(length_ - offset, length); int fill; @@ -51,8 +54,9 @@ class TestReadFile : public velox::ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override { - auto res = ReadFile::preadv(offset, buffers); + const std::vector>& buffers, + io::IoStatistics* stats = nullptr) const override { + auto res = ReadFile::preadv(offset, buffers, stats); ++numIos_; return res; } diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index b456b7c30af8..214113f92fd5 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -38,8 +38,11 @@ class RecordingInputStream : public facebook::velox::InMemoryReadFile { public: RecordingInputStream() : InMemoryReadFile(std::string()) {} - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + IoStatistics* stats = nullptr) const override { reads_.push_back({offset, length}); return {static_cast(buf), length}; }