Skip to content

Commit

Permalink
feat: Add IoStatistics in ReadFile to collect storage statistics (fac…
Browse files Browse the repository at this point in the history
…ebookincubator#12193)

Summary: Pull Request resolved: facebookincubator#12193

Differential Revision: D68764345

Pulled By: kewang1024
  • Loading branch information
kewang1024 authored and facebook-github-bot committed Jan 28, 2025
1 parent 9fd0b0f commit 9922a90
Show file tree
Hide file tree
Showing 24 changed files with 155 additions and 51 deletions.
49 changes: 39 additions & 10 deletions velox/common/caching/CachedFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ template <
typename Key,
typename Value,
typename Generator,
typename Stats,
typename Properties = void,
typename Sizer = DefaultSizer<Value>,
typename Comparator = std::equal_to<Key>,
Expand Down Expand Up @@ -178,7 +179,8 @@ class CachedFactory {
/// will probably mess with your memory model, so really try to avoid it.
CachedPtr<Key, Value, Comparator, Hash> generate(
const Key& key,
const Properties* properties = nullptr);
const Properties* properties = nullptr,
std::shared_ptr<Stats> stats = nullptr);

/// Looks up the cache entry of the given key if it exists, otherwise returns
/// null.
Expand Down Expand Up @@ -357,18 +359,29 @@ template <
typename Key,
typename Value,
typename Generator,
typename Stats,
typename Properties,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
generate(const Key& key, const Properties* properties) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Stats,
Properties,
Sizer,
Comparator,
Hash>::
generate(
const Key& key,
const Properties* properties,
std::shared_ptr<Stats> stats) {
process::TraceContext trace("CachedFactory::generate");
if (cache_ == nullptr) {
return CachedPtr<Key, Value, Comparator, Hash>{
/*fromCache=*/false,
(*generator_)(key, properties).release(),
(*generator_)(key, properties, stats).release(),
nullptr,
std::make_unique<Key>(key)};
}
Expand Down Expand Up @@ -408,7 +421,7 @@ CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
pendingCv_.notify_all();
};

std::unique_ptr<Value> generatedValue = (*generator_)(key, properties);
std::unique_ptr<Value> generatedValue = (*generator_)(key, properties, stats);
const uint64_t valueSize = Sizer()(*generatedValue);
Value* rawValue = generatedValue.release();
const bool inserted = addCache(key, rawValue, valueSize);
Expand All @@ -432,13 +445,20 @@ template <
typename Key,
typename Value,
typename Generator,
typename Stats,
typename Properties,
typename Sizer,
typename Comparator,
typename Hash>
CachedPtr<Key, Value, Comparator, Hash>
CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::get(
const Key& key) {
CachedPtr<Key, Value, Comparator, Hash> CachedFactory<
Key,
Value,
Generator,
Stats,
Properties,
Sizer,
Comparator,
Hash>::get(const Key& key) {
if (cache_ == nullptr) {
return {};
}
Expand All @@ -459,11 +479,20 @@ template <
typename Key,
typename Value,
typename Generator,
typename Stats,
typename Properties,
typename Sizer,
typename Comparator,
typename Hash>
void CachedFactory<Key, Value, Generator, Properties, Sizer, Comparator, Hash>::
void CachedFactory<
Key,
Value,
Generator,
Stats,
Properties,
Sizer,
Comparator,
Hash>::
retrieveCached(
const std::vector<Key>& keys,
std::vector<std::pair<Key, CachedPtr<Key, Value, Comparator, Hash>>>&
Expand Down
37 changes: 22 additions & 15 deletions velox/common/caching/tests/CachedFactoryTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "folly/synchronization/Latch.h"
#include "gtest/gtest.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/io/IoStatistics.h"

using namespace facebook::velox;

Expand All @@ -30,7 +31,8 @@ namespace {
struct DoublerGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
++generated;
return std::make_unique<int>(value * 2);
}
Expand All @@ -40,7 +42,8 @@ struct DoublerGenerator {
struct IdentityGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
return std::make_unique<int>(value);
}
};
Expand All @@ -49,7 +52,7 @@ struct IdentityGenerator {
TEST(CachedFactoryTest, basicGeneration) {
auto generator = std::make_unique<DoublerGenerator>();
auto* generated = &generator->generated;
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(1000), std::move(generator));
ASSERT_EQ(factory.maxSize(), 1000);
ASSERT_EQ(factory.currentSize(), 0);
Expand Down Expand Up @@ -113,7 +116,8 @@ TEST(CachedFactoryTest, basicGeneration) {
struct DoublerWithExceptionsGenerator {
std::unique_ptr<int> operator()(
const int& value,
const void* properties = nullptr) {
const void* properties = nullptr,
const void* stats = nullptr) {
if (value == 3) {
VELOX_FAIL("3 is bad");
}
Expand All @@ -125,7 +129,7 @@ struct DoublerWithExceptionsGenerator {

TEST(CachedFactoryTest, clearCache) {
auto generator = std::make_unique<DoublerGenerator>();
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(1000), std::move(generator));
ASSERT_EQ(factory.maxSize(), 1000);
{
Expand Down Expand Up @@ -159,8 +163,10 @@ TEST(CachedFactoryTest, clearCache) {
TEST(CachedFactoryTest, basicExceptionHandling) {
auto generator = std::make_unique<DoublerWithExceptionsGenerator>();
int* generated = &generator->generated;
CachedFactory<int, int, DoublerWithExceptionsGenerator> factory(
std::make_unique<SimpleLRUCache<int, int>>(1000), std::move(generator));
CachedFactory<int, int, DoublerWithExceptionsGenerator, io::IoStatistics>
factory(
std::make_unique<SimpleLRUCache<int, int>>(1000),
std::move(generator));
auto val1 = factory.generate(1);
ASSERT_EQ(*val1, 2);
ASSERT_EQ(*generated, 1);
Expand All @@ -177,7 +183,7 @@ TEST(CachedFactoryTest, basicExceptionHandling) {
TEST(CachedFactoryTest, multiThreadedGeneration) {
auto generator = std::make_unique<DoublerGenerator>();
auto* generated = &generator->generated;
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(1000), std::move(generator));
folly::EDFThreadPoolExecutor pool(
100, std::make_shared<folly::NamedThreadFactory>("test_pool"));
Expand All @@ -202,7 +208,7 @@ TEST(CachedFactoryTest, multiThreadedGeneration) {
TEST(CachedFactoryTest, multiThreadedGenerationAgain) {
auto generator = std::make_unique<DoublerGenerator>();
auto* generated = &generator->generated;
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(1000), std::move(generator));
folly::EDFThreadPoolExecutor pool(
100, std::make_shared<folly::NamedThreadFactory>("test_pool"));
Expand All @@ -229,7 +235,7 @@ TEST(CachedFactoryTest, multiThreadedGenerationAgain) {

TEST(CachedFactoryTest, lruCacheEviction) {
auto generator = std::make_unique<DoublerGenerator>();
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(3), std::move(generator));
ASSERT_EQ(factory.maxSize(), 3);
ASSERT_EQ(factory.currentSize(), 0);
Expand Down Expand Up @@ -298,7 +304,7 @@ TEST(CachedFactoryTest, lruCacheEviction) {

TEST(CachedFactoryTest, cacheExpiration) {
auto generator = std::make_unique<DoublerGenerator>();
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(3, 1'000),
std::move(generator));
ASSERT_EQ(factory.maxSize(), 3);
Expand Down Expand Up @@ -359,7 +365,7 @@ TEST(CachedFactoryTest, cacheExpiration) {
TEST(CachedFactoryTest, retrievedCached) {
auto generator = std::make_unique<DoublerGenerator>();
auto* generated = &generator->generated;
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(1000), std::move(generator));
for (int i = 0; i < 10; i += 2) {
factory.generate(i);
Expand Down Expand Up @@ -399,7 +405,7 @@ TEST(CachedFactoryTest, retrievedCached) {

TEST(CachedFactoryTest, clearCacheWithManyEntries) {
auto generator = std::make_unique<DoublerGenerator>();
CachedFactory<int, int, DoublerGenerator> factory(
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(1000), std::move(generator));
for (auto i = 0; i < 1000; ++i) {
factory.generate(i);
Expand Down Expand Up @@ -429,7 +435,8 @@ TEST(CachedFactoryTest, clearCacheWithManyEntries) {
TEST(CachedFactoryTest, disableCache) {
auto generator = std::make_unique<DoublerGenerator>();
auto* generated = &generator->generated;
CachedFactory<int, int, DoublerGenerator> factory(std::move(generator));
CachedFactory<int, int, DoublerGenerator, io::IoStatistics> factory(
std::move(generator));

auto val1 = factory.generate(1);
ASSERT_FALSE(val1.fromCache());
Expand Down Expand Up @@ -474,7 +481,7 @@ TEST(CachedFactoryTest, fuzzer) {
for (const bool expireCache : {false, true}) {
SCOPED_TRACE(fmt::format("expireCache: {}", expireCache));
auto generator = std::make_unique<IdentityGenerator>();
CachedFactory<int, int, IdentityGenerator> factory(
CachedFactory<int, int, IdentityGenerator, io::IoStatistics> factory(
std::make_unique<SimpleLRUCache<int, int>>(
128, expireCache ? expirationDurationMs : 0),
std::move(generator));
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class LocalFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options) override {
const FileOptions& options,
std::shared_ptr<io::IoStatistics> ioStats) override {
return std::make_unique<LocalReadFile>(
extractPath(path), executor_.get(), options.bufferIo);
}
Expand Down
4 changes: 3 additions & 1 deletion velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#pragma once

#include "velox/common/base/Exceptions.h"
#include "velox/common/io/IoStatistics.h"
#include "velox/common/memory/MemoryPool.h"

#include <functional>
Expand Down Expand Up @@ -103,7 +104,8 @@ class FileSystem {
/// Returns a ReadFile handle for a given file path
virtual std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) = 0;
const FileOptions& options = {},
std::shared_ptr<io::IoStatistics> ioStats = nullptr) = 0;

/// Returns a WriteFile handle for a given file path
virtual std::unique_ptr<WriteFile> openFileForWrite(
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ fileSystemGenerator() {

std::unique_ptr<ReadFile> FaultyFileSystem::openFileForRead(
std::string_view path,
const FileOptions& options) {
const FileOptions& options,
std::shared_ptr<io::IoStatistics> ioStats) {
const std::string delegatedPath = std::string(extractPath(path));
auto delegatedFile = getFileSystem(delegatedPath, config_)
->openFileForRead(delegatedPath, options);
Expand Down
3 changes: 2 additions & 1 deletion velox/common/file/tests/FaultyFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class FaultyFileSystem : public FileSystem {

std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& options = {}) override;
const FileOptions& options = {},
std::shared_ptr<io::IoStatistics> ioStats = nullptr) override;

std::unique_ptr<WriteFile> openFileForWrite(
std::string_view path,
Expand Down
38 changes: 35 additions & 3 deletions velox/common/io/IoStatistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ void IoStatistics::incOperationCounters(
const uint64_t fullThrottleCount,
const uint64_t partialThrottleCount) {
std::lock_guard<std::mutex> lock{operationStatsMutex_};
if (operationStats_.count(operation) == 0) {
operationStats_.emplace(operation, OperationCounters());
}
operationStats_[operation].localThrottleCount += localThrottleCount;
operationStats_[operation].resourceThrottleCount += resourceThrottleCount;
operationStats_[operation].networkThrottleCount += networkThrottleCount;
Expand All @@ -108,6 +111,24 @@ IoStatistics::operationStats() const {
return operationStats_;
}

std::unordered_map<std::string, RuntimeMetric> IoStatistics::storageStats()
const {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
return storageStats_;
}

void IoStatistics::addStorageStats(
const std::string& name,
const RuntimeCounter& counter) {
std::lock_guard<std::mutex> lock{storageStatsMutex_};
if (storageStats_.count(name) == 0) {
storageStats_.emplace(name, RuntimeMetric(counter.unit));
} else {
VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit);
}
storageStats_.at(name).addValue(counter.value);
}

void IoStatistics::merge(const IoStatistics& other) {
rawBytesRead_ += other.rawBytesRead_;
rawBytesWritten_ += other.rawBytesWritten_;
Expand All @@ -119,9 +140,20 @@ void IoStatistics::merge(const IoStatistics& other) {
ramHit_.merge(other.ramHit_);
ssdRead_.merge(other.ssdRead_);
queryThreadIoLatency_.merge(other.queryThreadIoLatency_);
std::lock_guard<std::mutex> l(operationStatsMutex_);
for (auto& item : other.operationStats_) {
operationStats_[item.first].merge(item.second);
{
const auto& otherOperationStats = other.operationStats();
std::lock_guard<std::mutex> l(operationStatsMutex_);
for (auto& item : otherOperationStats) {
operationStats_[item.first].merge(item.second);
}
}

{
const auto& otherStorageStats = other.storageStats();
std::lock_guard<std::mutex> storageStatsLock(storageStatsMutex_);
for (auto& item : otherStorageStats) {
storageStats_[item.first].merge(item.second);
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions velox/common/io/IoStatistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <unordered_map>

#include <folly/dynamic.h>
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/RuntimeMetrics.h"

namespace facebook::velox::io {

Expand Down Expand Up @@ -140,6 +142,9 @@ class IoStatistics {
const uint64_t partialThrottleCount = 0);

std::unordered_map<std::string, OperationCounters> operationStats() const;
std::unordered_map<std::string, RuntimeMetric> storageStats() const;

void addStorageStats(const std::string& name, const RuntimeCounter& counter);

void merge(const IoStatistics& other);

Expand Down Expand Up @@ -172,7 +177,9 @@ class IoStatistics {
IoCounter queryThreadIoLatency_;

std::unordered_map<std::string, OperationCounters> operationStats_;
std::unordered_map<std::string, RuntimeMetric> storageStats_;
mutable std::mutex operationStatsMutex_;
mutable std::mutex storageStatsMutex_;
};

} // namespace facebook::velox::io
Loading

0 comments on commit 9922a90

Please sign in to comment.