From 9922a90555434b9598475f963d058db9df1c1488 Mon Sep 17 00:00:00 2001 From: Ke Date: Tue, 28 Jan 2025 02:37:38 -0800 Subject: [PATCH] feat: Add IoStatistics in ReadFile to collect storage statistics (#12193) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/12193 Differential Revision: D68764345 Pulled By: kewang1024 --- velox/common/caching/CachedFactory.h | 49 +++++++++++++++---- .../caching/tests/CachedFactoryTest.cpp | 37 ++++++++------ velox/common/file/FileSystems.cpp | 3 +- velox/common/file/FileSystems.h | 4 +- velox/common/file/tests/FaultyFileSystem.cpp | 3 +- velox/common/file/tests/FaultyFileSystem.h | 3 +- velox/common/io/IoStatistics.cpp | 38 ++++++++++++-- velox/common/io/IoStatistics.h | 7 +++ velox/connectors/hive/FileHandle.cpp | 5 +- velox/connectors/hive/FileHandle.h | 5 +- velox/connectors/hive/HiveDataSource.cpp | 5 ++ velox/connectors/hive/SplitReader.cpp | 4 +- velox/connectors/hive/SplitReader.h | 2 +- .../storage_adapters/abfs/AbfsFileSystem.cpp | 3 +- .../storage_adapters/abfs/AbfsFileSystem.h | 3 +- .../storage_adapters/gcs/GcsFileSystem.cpp | 3 +- .../hive/storage_adapters/gcs/GcsFileSystem.h | 3 +- .../storage_adapters/hdfs/HdfsFileSystem.cpp | 3 +- .../storage_adapters/hdfs/HdfsFileSystem.h | 3 +- .../storage_adapters/s3fs/S3FileSystem.cpp | 3 +- .../hive/storage_adapters/s3fs/S3FileSystem.h | 3 +- velox/dwio/common/Throttler.cpp | 3 +- velox/dwio/common/Throttler.h | 10 ++-- .../experimental/wave/common/KernelCache.cpp | 4 +- 24 files changed, 155 insertions(+), 51 deletions(-) diff --git a/velox/common/caching/CachedFactory.h b/velox/common/caching/CachedFactory.h index 22d266ec6235e..73074b079a0bb 100644 --- a/velox/common/caching/CachedFactory.h +++ b/velox/common/caching/CachedFactory.h @@ -145,6 +145,7 @@ template < typename Key, typename Value, typename Generator, + typename Stats, typename Properties = void, typename Sizer = DefaultSizer, typename Comparator = std::equal_to, @@ -178,7 +179,8 @@ class CachedFactory { /// will probably mess with your memory model, so really try to avoid it. CachedPtr generate( const Key& key, - const Properties* properties = nullptr); + const Properties* properties = nullptr, + std::shared_ptr stats = nullptr); /// Looks up the cache entry of the given key if it exists, otherwise returns /// null. @@ -357,18 +359,29 @@ template < typename Key, typename Value, typename Generator, + typename Stats, typename Properties, typename Sizer, typename Comparator, typename Hash> -CachedPtr -CachedFactory:: - generate(const Key& key, const Properties* properties) { +CachedPtr CachedFactory< + Key, + Value, + Generator, + Stats, + Properties, + Sizer, + Comparator, + Hash>:: + generate( + const Key& key, + const Properties* properties, + std::shared_ptr stats) { process::TraceContext trace("CachedFactory::generate"); if (cache_ == nullptr) { return CachedPtr{ /*fromCache=*/false, - (*generator_)(key, properties).release(), + (*generator_)(key, properties, stats).release(), nullptr, std::make_unique(key)}; } @@ -408,7 +421,7 @@ CachedFactory:: pendingCv_.notify_all(); }; - std::unique_ptr generatedValue = (*generator_)(key, properties); + std::unique_ptr generatedValue = (*generator_)(key, properties, stats); const uint64_t valueSize = Sizer()(*generatedValue); Value* rawValue = generatedValue.release(); const bool inserted = addCache(key, rawValue, valueSize); @@ -432,13 +445,20 @@ template < typename Key, typename Value, typename Generator, + typename Stats, typename Properties, typename Sizer, typename Comparator, typename Hash> -CachedPtr -CachedFactory::get( - const Key& key) { +CachedPtr CachedFactory< + Key, + Value, + Generator, + Stats, + Properties, + Sizer, + Comparator, + Hash>::get(const Key& key) { if (cache_ == nullptr) { return {}; } @@ -459,11 +479,20 @@ template < typename Key, typename Value, typename Generator, + typename Stats, typename Properties, typename Sizer, typename Comparator, typename Hash> -void CachedFactory:: +void CachedFactory< + Key, + Value, + Generator, + Stats, + Properties, + Sizer, + Comparator, + Hash>:: retrieveCached( const std::vector& keys, std::vector>>& diff --git a/velox/common/caching/tests/CachedFactoryTest.cpp b/velox/common/caching/tests/CachedFactoryTest.cpp index e8161a9792569..6c534b52cca8b 100644 --- a/velox/common/caching/tests/CachedFactoryTest.cpp +++ b/velox/common/caching/tests/CachedFactoryTest.cpp @@ -22,6 +22,7 @@ #include "folly/synchronization/Latch.h" #include "gtest/gtest.h" #include "velox/common/base/tests/GTestUtils.h" +#include "velox/common/io/IoStatistics.h" using namespace facebook::velox; @@ -30,7 +31,8 @@ namespace { struct DoublerGenerator { std::unique_ptr operator()( const int& value, - const void* properties = nullptr) { + const void* properties = nullptr, + const void* stats = nullptr) { ++generated; return std::make_unique(value * 2); } @@ -40,7 +42,8 @@ struct DoublerGenerator { struct IdentityGenerator { std::unique_ptr operator()( const int& value, - const void* properties = nullptr) { + const void* properties = nullptr, + const void* stats = nullptr) { return std::make_unique(value); } }; @@ -49,7 +52,7 @@ struct IdentityGenerator { TEST(CachedFactoryTest, basicGeneration) { auto generator = std::make_unique(); auto* generated = &generator->generated; - CachedFactory factory( + CachedFactory factory( std::make_unique>(1000), std::move(generator)); ASSERT_EQ(factory.maxSize(), 1000); ASSERT_EQ(factory.currentSize(), 0); @@ -113,7 +116,8 @@ TEST(CachedFactoryTest, basicGeneration) { struct DoublerWithExceptionsGenerator { std::unique_ptr operator()( const int& value, - const void* properties = nullptr) { + const void* properties = nullptr, + const void* stats = nullptr) { if (value == 3) { VELOX_FAIL("3 is bad"); } @@ -125,7 +129,7 @@ struct DoublerWithExceptionsGenerator { TEST(CachedFactoryTest, clearCache) { auto generator = std::make_unique(); - CachedFactory factory( + CachedFactory factory( std::make_unique>(1000), std::move(generator)); ASSERT_EQ(factory.maxSize(), 1000); { @@ -159,8 +163,10 @@ TEST(CachedFactoryTest, clearCache) { TEST(CachedFactoryTest, basicExceptionHandling) { auto generator = std::make_unique(); int* generated = &generator->generated; - CachedFactory factory( - std::make_unique>(1000), std::move(generator)); + CachedFactory + factory( + std::make_unique>(1000), + std::move(generator)); auto val1 = factory.generate(1); ASSERT_EQ(*val1, 2); ASSERT_EQ(*generated, 1); @@ -177,7 +183,7 @@ TEST(CachedFactoryTest, basicExceptionHandling) { TEST(CachedFactoryTest, multiThreadedGeneration) { auto generator = std::make_unique(); auto* generated = &generator->generated; - CachedFactory factory( + CachedFactory factory( std::make_unique>(1000), std::move(generator)); folly::EDFThreadPoolExecutor pool( 100, std::make_shared("test_pool")); @@ -202,7 +208,7 @@ TEST(CachedFactoryTest, multiThreadedGeneration) { TEST(CachedFactoryTest, multiThreadedGenerationAgain) { auto generator = std::make_unique(); auto* generated = &generator->generated; - CachedFactory factory( + CachedFactory factory( std::make_unique>(1000), std::move(generator)); folly::EDFThreadPoolExecutor pool( 100, std::make_shared("test_pool")); @@ -229,7 +235,7 @@ TEST(CachedFactoryTest, multiThreadedGenerationAgain) { TEST(CachedFactoryTest, lruCacheEviction) { auto generator = std::make_unique(); - CachedFactory factory( + CachedFactory factory( std::make_unique>(3), std::move(generator)); ASSERT_EQ(factory.maxSize(), 3); ASSERT_EQ(factory.currentSize(), 0); @@ -298,7 +304,7 @@ TEST(CachedFactoryTest, lruCacheEviction) { TEST(CachedFactoryTest, cacheExpiration) { auto generator = std::make_unique(); - CachedFactory factory( + CachedFactory factory( std::make_unique>(3, 1'000), std::move(generator)); ASSERT_EQ(factory.maxSize(), 3); @@ -359,7 +365,7 @@ TEST(CachedFactoryTest, cacheExpiration) { TEST(CachedFactoryTest, retrievedCached) { auto generator = std::make_unique(); auto* generated = &generator->generated; - CachedFactory factory( + CachedFactory factory( std::make_unique>(1000), std::move(generator)); for (int i = 0; i < 10; i += 2) { factory.generate(i); @@ -399,7 +405,7 @@ TEST(CachedFactoryTest, retrievedCached) { TEST(CachedFactoryTest, clearCacheWithManyEntries) { auto generator = std::make_unique(); - CachedFactory factory( + CachedFactory factory( std::make_unique>(1000), std::move(generator)); for (auto i = 0; i < 1000; ++i) { factory.generate(i); @@ -429,7 +435,8 @@ TEST(CachedFactoryTest, clearCacheWithManyEntries) { TEST(CachedFactoryTest, disableCache) { auto generator = std::make_unique(); auto* generated = &generator->generated; - CachedFactory factory(std::move(generator)); + CachedFactory factory( + std::move(generator)); auto val1 = factory.generate(1); ASSERT_FALSE(val1.fromCache()); @@ -474,7 +481,7 @@ TEST(CachedFactoryTest, fuzzer) { for (const bool expireCache : {false, true}) { SCOPED_TRACE(fmt::format("expireCache: {}", expireCache)); auto generator = std::make_unique(); - CachedFactory factory( + CachedFactory factory( std::make_unique>( 128, expireCache ? expirationDurationMs : 0), std::move(generator)); diff --git a/velox/common/file/FileSystems.cpp b/velox/common/file/FileSystems.cpp index aa738b6a43ba4..c3b791af5f159 100644 --- a/velox/common/file/FileSystems.cpp +++ b/velox/common/file/FileSystems.cpp @@ -115,7 +115,8 @@ class LocalFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options) override { + const FileOptions& options, + std::shared_ptr ioStats) override { return std::make_unique( extractPath(path), executor_.get(), options.bufferIo); } diff --git a/velox/common/file/FileSystems.h b/velox/common/file/FileSystems.h index 1829215b625bc..c346274e15c63 100644 --- a/velox/common/file/FileSystems.h +++ b/velox/common/file/FileSystems.h @@ -16,6 +16,7 @@ #pragma once #include "velox/common/base/Exceptions.h" +#include "velox/common/io/IoStatistics.h" #include "velox/common/memory/MemoryPool.h" #include @@ -103,7 +104,8 @@ class FileSystem { /// Returns a ReadFile handle for a given file path virtual std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) = 0; + const FileOptions& options = {}, + std::shared_ptr ioStats = nullptr) = 0; /// Returns a WriteFile handle for a given file path virtual std::unique_ptr openFileForWrite( diff --git a/velox/common/file/tests/FaultyFileSystem.cpp b/velox/common/file/tests/FaultyFileSystem.cpp index 339e267c09716..0cbaab088322b 100644 --- a/velox/common/file/tests/FaultyFileSystem.cpp +++ b/velox/common/file/tests/FaultyFileSystem.cpp @@ -55,7 +55,8 @@ fileSystemGenerator() { std::unique_ptr FaultyFileSystem::openFileForRead( std::string_view path, - const FileOptions& options) { + const FileOptions& options, + std::shared_ptr ioStats) { const std::string delegatedPath = std::string(extractPath(path)); auto delegatedFile = getFileSystem(delegatedPath, config_) ->openFileForRead(delegatedPath, options); diff --git a/velox/common/file/tests/FaultyFileSystem.h b/velox/common/file/tests/FaultyFileSystem.h index f85314c75e8f3..c6b2f7bd5aa06 100644 --- a/velox/common/file/tests/FaultyFileSystem.h +++ b/velox/common/file/tests/FaultyFileSystem.h @@ -54,7 +54,8 @@ class FaultyFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + std::shared_ptr ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view path, diff --git a/velox/common/io/IoStatistics.cpp b/velox/common/io/IoStatistics.cpp index 7dfddc6dc4831..e32deaa8fa422 100644 --- a/velox/common/io/IoStatistics.cpp +++ b/velox/common/io/IoStatistics.cpp @@ -90,6 +90,9 @@ void IoStatistics::incOperationCounters( const uint64_t fullThrottleCount, const uint64_t partialThrottleCount) { std::lock_guard lock{operationStatsMutex_}; + if (operationStats_.count(operation) == 0) { + operationStats_.emplace(operation, OperationCounters()); + } operationStats_[operation].localThrottleCount += localThrottleCount; operationStats_[operation].resourceThrottleCount += resourceThrottleCount; operationStats_[operation].networkThrottleCount += networkThrottleCount; @@ -108,6 +111,24 @@ IoStatistics::operationStats() const { return operationStats_; } +std::unordered_map IoStatistics::storageStats() + const { + std::lock_guard lock{storageStatsMutex_}; + return storageStats_; +} + +void IoStatistics::addStorageStats( + const std::string& name, + const RuntimeCounter& counter) { + std::lock_guard lock{storageStatsMutex_}; + if (storageStats_.count(name) == 0) { + storageStats_.emplace(name, RuntimeMetric(counter.unit)); + } else { + VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit); + } + storageStats_.at(name).addValue(counter.value); +} + void IoStatistics::merge(const IoStatistics& other) { rawBytesRead_ += other.rawBytesRead_; rawBytesWritten_ += other.rawBytesWritten_; @@ -119,9 +140,20 @@ void IoStatistics::merge(const IoStatistics& other) { ramHit_.merge(other.ramHit_); ssdRead_.merge(other.ssdRead_); queryThreadIoLatency_.merge(other.queryThreadIoLatency_); - std::lock_guard l(operationStatsMutex_); - for (auto& item : other.operationStats_) { - operationStats_[item.first].merge(item.second); + { + const auto& otherOperationStats = other.operationStats(); + std::lock_guard l(operationStatsMutex_); + for (auto& item : otherOperationStats) { + operationStats_[item.first].merge(item.second); + } + } + + { + const auto& otherStorageStats = other.storageStats(); + std::lock_guard storageStatsLock(storageStatsMutex_); + for (auto& item : otherStorageStats) { + storageStats_[item.first].merge(item.second); + } } } diff --git a/velox/common/io/IoStatistics.h b/velox/common/io/IoStatistics.h index 2111a8877b475..6d31e0dfef144 100644 --- a/velox/common/io/IoStatistics.h +++ b/velox/common/io/IoStatistics.h @@ -23,6 +23,8 @@ #include #include +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/RuntimeMetrics.h" namespace facebook::velox::io { @@ -140,6 +142,9 @@ class IoStatistics { const uint64_t partialThrottleCount = 0); std::unordered_map operationStats() const; + std::unordered_map storageStats() const; + + void addStorageStats(const std::string& name, const RuntimeCounter& counter); void merge(const IoStatistics& other); @@ -172,7 +177,9 @@ class IoStatistics { IoCounter queryThreadIoLatency_; std::unordered_map operationStats_; + std::unordered_map storageStats_; mutable std::mutex operationStatsMutex_; + mutable std::mutex storageStatsMutex_; }; } // namespace facebook::velox::io diff --git a/velox/connectors/hive/FileHandle.cpp b/velox/connectors/hive/FileHandle.cpp index 7678fb7a6c35c..a7ab1701511e9 100644 --- a/velox/connectors/hive/FileHandle.cpp +++ b/velox/connectors/hive/FileHandle.cpp @@ -41,7 +41,8 @@ std::string groupName(const std::string& filename) { std::unique_ptr FileHandleGenerator::operator()( const std::string& filename, - const FileProperties* properties) { + const FileProperties* properties, + std::shared_ptr ioStats) { // We have seen cases where drivers are stuck when creating file handles. // Adding a trace here to spot this more easily in future. process::TraceContext trace("FileHandleGenerator::operator()"); @@ -55,7 +56,7 @@ std::unique_ptr FileHandleGenerator::operator()( options.fileSize = properties->fileSize; } fileHandle->file = filesystems::getFileSystem(filename, properties_) - ->openFileForRead(filename, options); + ->openFileForRead(filename, options, ioStats); fileHandle->uuid = StringIdLease(fileIds(), filename); fileHandle->groupId = StringIdLease(fileIds(), groupName(filename)); VLOG(1) << "Generating file handle for: " << filename diff --git a/velox/connectors/hive/FileHandle.h b/velox/connectors/hive/FileHandle.h index 5db30b1d7f4c3..e121ba769f7a7 100644 --- a/velox/connectors/hive/FileHandle.h +++ b/velox/connectors/hive/FileHandle.h @@ -29,6 +29,7 @@ #include "velox/common/caching/FileIds.h" #include "velox/common/config/Config.h" #include "velox/common/file/File.h" +#include "velox/common/io/IoStatistics.h" #include "velox/connectors/hive/FileProperties.h" namespace facebook::velox { @@ -69,7 +70,8 @@ class FileHandleGenerator { : properties_(std::move(properties)) {} std::unique_ptr operator()( const std::string& filename, - const FileProperties* properties); + const FileProperties* properties, + std::shared_ptr ioStats); private: const std::shared_ptr properties_; @@ -79,6 +81,7 @@ using FileHandleFactory = CachedFactory< std::string, FileHandle, FileHandleGenerator, + io::IoStatistics, FileProperties, FileHandleSizer>; diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 0edfb3a0ea7cb..dbac54e81889d 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -513,6 +513,11 @@ std::unordered_map HiveDataSource::runtimeStats() { if (numBucketConversion_ > 0) { res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)}); } + for (const auto& storageStats : ioStats_->storageStats()) { + res.emplace( + storageStats.first, + RuntimeCounter(storageStats.second.sum, storageStats.second.unit)); + } return res; } diff --git a/velox/connectors/hive/SplitReader.cpp b/velox/connectors/hive/SplitReader.cpp index ebbb543e0e545..3fcb7640f88e8 100644 --- a/velox/connectors/hive/SplitReader.cpp +++ b/velox/connectors/hive/SplitReader.cpp @@ -229,8 +229,8 @@ RowTypePtr SplitReader::createReader() { try { fileHandleCachePtr = fileHandleFactory_->generate( hiveSplit_->filePath, - hiveSplit_->properties.has_value() ? &*hiveSplit_->properties - : nullptr); + hiveSplit_->properties.has_value() ? &*hiveSplit_->properties : nullptr, + ioStats_); VELOX_CHECK_NOT_NULL(fileHandleCachePtr.get()); } catch (const VeloxRuntimeError& e) { if (e.errorCode() == error_code::kFileNotFound && diff --git a/velox/connectors/hive/SplitReader.h b/velox/connectors/hive/SplitReader.h index 5466107ca6206..d30b265a8eef6 100644 --- a/velox/connectors/hive/SplitReader.h +++ b/velox/connectors/hive/SplitReader.h @@ -156,7 +156,7 @@ class SplitReader { const std::shared_ptr hiveConfig_; const RowTypePtr readerOutputType_; - const std::shared_ptr ioStats_; + std::shared_ptr ioStats_; FileHandleFactory* const fileHandleFactory_; folly::Executor* const executor_; memory::MemoryPool* const pool_; diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index 7e63c2df1438f..6cde61d0c702e 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -208,7 +208,8 @@ std::string AbfsFileSystem::name() const { std::unique_ptr AbfsFileSystem::openFileForRead( std::string_view path, - const FileOptions& options) { + const FileOptions& options, + std::shared_ptr ioStats) { auto abfsfile = std::make_unique(path, *config_); abfsfile->initialize(options); return abfsfile; diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h index c0d3d60ccdee5..864cefc6ee833 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h @@ -40,7 +40,8 @@ class AbfsFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + std::shared_ptr ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view path, diff --git a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp index f5d0830d3de35..424bbfcd29a54 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp @@ -340,7 +340,8 @@ void GcsFileSystem::initializeClient() { std::unique_ptr GcsFileSystem::openFileForRead( std::string_view path, - const FileOptions& options) { + const FileOptions& options, + std::shared_ptr ioStats) { const auto gcspath = gcsPath(path); auto gcsfile = std::make_unique(gcspath, impl_->getClient()); gcsfile->initialize(options); diff --git a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h index 34daff8d6c64f..5ceffde1b9c3c 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.h @@ -41,7 +41,8 @@ class GcsFileSystem : public FileSystem { /// [[https://cloud.google.com/storage/docs/samples/storage-stream-file-download]]. std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + std::shared_ptr ioStats = nullptr) override; /// Initialize a WriteFile /// First the method google::cloud::storage::Client::GetObjectMetadata diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp index 856f2b2526dea..2a99b3e6b81fe 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp @@ -90,7 +90,8 @@ std::string HdfsFileSystem::name() const { std::unique_ptr HdfsFileSystem::openFileForRead( std::string_view path, - const FileOptions& /*unused*/) { + const FileOptions& /*unused*/, + std::shared_ptr ioStats /*unused*/) { // Only remove the schema for hdfs path. if (path.find(kScheme) == 0) { path.remove_prefix(kScheme.length()); diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h index b541ec629baf1..947a4e19f1308 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h @@ -55,7 +55,8 @@ class HdfsFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + std::shared_ptr ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view path, diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index cd57ce79e8467..379311998cef2 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -740,7 +740,8 @@ std::string S3FileSystem::getLogLevelName() const { std::unique_ptr S3FileSystem::openFileForRead( std::string_view s3Path, - const FileOptions& options) { + const FileOptions& options, + std::shared_ptr ioStats) { const auto path = getPath(s3Path); auto s3file = std::make_unique(path, impl_->s3Client()); s3file->initialize(options); diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h index c1e73198d48f1..7f6b1e9a32812 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.h @@ -37,7 +37,8 @@ class S3FileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view s3Path, - const FileOptions& options = {}) override; + const FileOptions& options = {}, + std::shared_ptr ioStats = nullptr) override; std::unique_ptr openFileForWrite( std::string_view s3Path, diff --git a/velox/dwio/common/Throttler.cpp b/velox/dwio/common/Throttler.cpp index 2e1431bdc5c89..3e3272806fa66 100644 --- a/velox/dwio/common/Throttler.cpp +++ b/velox/dwio/common/Throttler.cpp @@ -270,7 +270,8 @@ uint64_t Throttler::calculateBackoffDurationAndUpdateThrottleCache( std::unique_ptr Throttler::ThrottleSignalGenerator::operator()( const std::string& /*unused*/, - const void* /*unused*/) { + const void* /*unused*/, + std::shared_ptr /*unused*/) { return std::unique_ptr(new ThrottleSignal{1}); } diff --git a/velox/dwio/common/Throttler.h b/velox/dwio/common/Throttler.h index 0ebf1e0882057..e9b9664e0d208 100644 --- a/velox/dwio/common/Throttler.h +++ b/velox/dwio/common/Throttler.h @@ -175,13 +175,17 @@ class Throttler { std::unique_ptr operator()( const std::string& /*unused*/, - const void* /*unused*/); + const void* /*unused*/, + std::shared_ptr /*unused*/); }; using CachedThrottleSignalPtr = CachedPtr; - using ThrottleSignalFactory = facebook::velox:: - CachedFactory; + using ThrottleSignalFactory = facebook::velox::CachedFactory< + std::string, + ThrottleSignal, + ThrottleSignalGenerator, + io::IoStatistics>; struct ThrottleSignalCache { std::unique_ptr throttleCache; diff --git a/velox/experimental/wave/common/KernelCache.cpp b/velox/experimental/wave/common/KernelCache.cpp index 0604512b69df5..7987c25426357 100644 --- a/velox/experimental/wave/common/KernelCache.cpp +++ b/velox/experimental/wave/common/KernelCache.cpp @@ -19,6 +19,7 @@ #include #include +#include "velox/common/io/IoStatistics.h" namespace facebook::velox::wave { @@ -96,7 +97,8 @@ class KernelGenerator { public: std::unique_ptr operator()( const std::string, - const KernelGenFunc* gen) { + const KernelGenFunc* gen, + std::shared_ptr stats) { using ModulePromise = folly::Promise; struct PromiseHolder { ModulePromise promise;