Skip to content

Commit

Permalink
feat: Add storage statistics in IoStatistics
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Jan 29, 2025
1 parent ce273fa commit fbaeab4
Show file tree
Hide file tree
Showing 18 changed files with 194 additions and 92 deletions.
6 changes: 3 additions & 3 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void SsdFile::read(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) {
process::TraceContext trace("SsdFile::read");
readFile_->preadv(offset, buffers);
readFile_->preadv(offset, buffers, nullptr);
}

std::optional<std::pair<uint64_t, int32_t>> SsdFile::getSpace(
Expand Down Expand Up @@ -467,7 +467,7 @@ void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
process::TraceContext trace("SsdFile::verifyWrite");
auto testData = std::make_unique<char[]>(entry.size());
const auto rc =
readFile_->pread(ssdRun.offset(), entry.size(), testData.get());
readFile_->pread(ssdRun.offset(), entry.size(), testData.get(), nullptr);
VELOX_CHECK_EQ(rc.size(), entry.size());
if (entry.tinyData() != nullptr) {
if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) {
Expand Down Expand Up @@ -1008,7 +1008,7 @@ void SsdFile::readCheckpoint() {
const auto logSize = evictLogReadFile->size();
std::vector<uint32_t> evicted(logSize / sizeof(uint32_t));
try {
evictLogReadFile->pread(0, logSize, evicted.data());
evictLogReadFile->pread(0, logSize, evicted.data(), nullptr);
} catch (const std::exception& e) {
++stats_.readCheckpointErrors;
VELOX_FAIL("Failed to read eviction log: {}", e.what());
Expand Down
49 changes: 33 additions & 16 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,21 @@ T getAttribute(
}
} // namespace

std::string ReadFile::pread(uint64_t offset, uint64_t length) const {
std::string ReadFile::pread(
uint64_t offset,
uint64_t length,
io::IoStatistics* stats) const {
std::string buf;
buf.resize(length);
auto res = pread(offset, length, buf.data());
auto res = pread(offset, length, buf.data(), stats);
buf.resize(res.size());
return buf;
}

uint64_t ReadFile::preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
auto fileSize = size();
uint64_t numRead = 0;
if (offset >= fileSize) {
Expand All @@ -77,7 +81,7 @@ uint64_t ReadFile::preadv(
auto copySize = std::min<size_t>(range.size(), fileSize - offset);
// NOTE: skip the gap in case of coalesce io.
if (range.data() != nullptr) {
pread(offset, copySize, range.data());
pread(offset, copySize, range.data(), stats);
}
offset += copySize;
numRead += copySize;
Expand All @@ -87,29 +91,36 @@ uint64_t ReadFile::preadv(

uint64_t ReadFile::preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const {
folly::Range<folly::IOBuf*> iobufs,
io::IoStatistics* stats) const {
VELOX_CHECK_EQ(regions.size(), iobufs.size());
uint64_t length = 0;
for (size_t i = 0; i < regions.size(); ++i) {
const auto& region = regions[i];
auto& output = iobufs[i];
output = folly::IOBuf(folly::IOBuf::CREATE, region.length);
pread(region.offset, region.length, output.writableData());
pread(region.offset, region.length, output.writableData(), stats);
output.append(region.length);
length += region.length;
}

return length;
}

std::string_view
InMemoryReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {
std::string_view InMemoryReadFile::pread(
uint64_t offset,
uint64_t length,
void* buf,
io::IoStatistics* stats) const {
bytesRead_ += length;
memcpy(buf, file_.data() + offset, length);
return {static_cast<char*>(buf), length};
}

std::string InMemoryReadFile::pread(uint64_t offset, uint64_t length) const {
std::string InMemoryReadFile::pread(
uint64_t offset,
uint64_t length,
io::IoStatistics* stats) const {
bytesRead_ += length;
return std::string(file_.data() + offset, length);
}
Expand Down Expand Up @@ -187,15 +198,19 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
folly::errnoStr(errno));
}

std::string_view
LocalReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {
std::string_view LocalReadFile::pread(
uint64_t offset,
uint64_t length,
void* buf,
io::IoStatistics* stats) const {
preadInternal(offset, length, static_cast<char*>(buf));
return {static_cast<char*>(buf), length};
}

uint64_t LocalReadFile::preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
// Dropped bytes sized so that a typical dropped range of 50K is not
// too many iovecs.
static thread_local std::vector<char> droppedBytes(16 * 1024);
Expand Down Expand Up @@ -251,16 +266,18 @@ uint64_t LocalReadFile::preadv(

folly::SemiFuture<uint64_t> LocalReadFile::preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
if (!executor_) {
return ReadFile::preadvAsync(offset, buffers);
return ReadFile::preadvAsync(offset, buffers, stats);
}
auto [promise, future] = folly::makePromiseContract<uint64_t>();
executor_->add([this,
_promise = std::move(promise),
_offset = offset,
_buffers = buffers]() mutable {
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
_buffers = buffers,
_stats = stats]() mutable {
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers, _stats);
_promise.setTry(std::move(delegateFuture).getTry());
});
return std::move(future);
Expand Down
45 changes: 31 additions & 14 deletions velox/common/file/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

#include "velox/common/base/Exceptions.h"
#include "velox/common/file/Region.h"
#include "velox/common/io/IoStatistics.h"

namespace facebook::velox {

Expand All @@ -52,13 +53,17 @@ class ReadFile {
// buffer 'buf'. The bytes are returned as a string_view pointing to 'buf'.
//
// This method should be thread safe.
virtual std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const = 0;
virtual std::string_view pread(
uint64_t offset,
uint64_t length,
void* buf,
io::IoStatistics* stats) const = 0;

// Same as above, but returns owned data directly.
//
// This method should be thread safe.
virtual std::string pread(uint64_t offset, uint64_t length) const;
virtual std::string
pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) const;

// Reads starting at 'offset' into the memory referenced by the
// Ranges in 'buffers'. The buffers are filled left to right. A
Expand All @@ -67,7 +72,8 @@ class ReadFile {
// This method should be thread safe.
virtual uint64_t preadv(
uint64_t /*offset*/,
const std::vector<folly::Range<char*>>& /*buffers*/) const;
const std::vector<folly::Range<char*>>& /*buffers*/,
io::IoStatistics* stats) const;

// Vectorized read API. Implementations can coalesce and parallelize.
// The offsets don't need to be sorted.
Expand All @@ -82,7 +88,8 @@ class ReadFile {
// This method should be thread safe.
virtual uint64_t preadv(
folly::Range<const common::Region*> regions,
folly::Range<folly::IOBuf*> iobufs) const;
folly::Range<folly::IOBuf*> iobufs,
io::IoStatistics* stats) const;

/// Like preadv but may execute asynchronously and returns the read size or
/// exception via SemiFuture. Use hasPreadvAsync() to check if the
Expand All @@ -91,9 +98,10 @@ class ReadFile {
/// This method should be thread safe.
virtual folly::SemiFuture<uint64_t> preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
try {
return folly::SemiFuture<uint64_t>(preadv(offset, buffers));
return folly::SemiFuture<uint64_t>(preadv(offset, buffers, stats));
} catch (const std::exception& e) {
return folly::makeSemiFuture<uint64_t>(e);
}
Expand Down Expand Up @@ -213,10 +221,14 @@ class InMemoryReadFile : public ReadFile {
explicit InMemoryReadFile(std::string file)
: ownedFile_(std::move(file)), file_(ownedFile_) {}

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const override;
std::string_view pread(
uint64_t offset,
uint64_t length,
void* buf,
io::IoStatistics* stats) const override;

std::string pread(uint64_t offset, uint64_t length) const override;
std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats)
const override;

uint64_t size() const final {
return file_.size();
Expand Down Expand Up @@ -278,18 +290,23 @@ class LocalReadFile final : public ReadFile {

~LocalReadFile();

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const final;
std::string_view pread(
uint64_t offset,
uint64_t length,
void* buf,
io::IoStatistics* stats) const final;

uint64_t size() const final;

uint64_t preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const final;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const final;

folly::SemiFuture<uint64_t> preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const override;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const override;

bool hasPreadvAsync() const override {
return executor_ != nullptr;
Expand Down
5 changes: 3 additions & 2 deletions velox/common/file/FileInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ void FileInputStream::readNextRange() {
VELOX_CHECK_LT(
0, readBytes, "Read past end of FileInputStream {}", fileSize_);
NanosecondTimer timer_2{&readTimeNs};
file_->pread(fileOffset_, readBytes, buffer()->asMutable<char>());
file_->pread(
fileOffset_, readBytes, buffer()->asMutable<char>(), nullptr);
}
}

Expand Down Expand Up @@ -218,7 +219,7 @@ void FileInputStream::maybeIssueReadahead() {
}
std::vector<folly::Range<char*>> ranges;
ranges.emplace_back(nextBuffer()->asMutable<char>(), size);
readAheadWait_ = file_->preadvAsync(fileOffset_, ranges);
readAheadWait_ = file_->preadvAsync(fileOffset_, ranges, nullptr);
VELOX_CHECK(readAheadWait_.valid());
}

Expand Down
25 changes: 16 additions & 9 deletions velox/common/file/tests/FaultyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,52 @@ FaultyReadFile::FaultyReadFile(
VELOX_CHECK_NOT_NULL(delegatedFile_);
}

std::string_view
FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const {
std::string_view FaultyReadFile::pread(
uint64_t offset,
uint64_t length,
void* buf,
io::IoStatistics* stats) const {
if (injectionHook_ != nullptr) {
FaultFileReadOperation op(path_, offset, length, buf);
injectionHook_(&op);
if (!op.delegate) {
return std::string_view(static_cast<char*>(op.buf), op.length);
}
}
return delegatedFile_->pread(offset, length, buf);
return delegatedFile_->pread(offset, length, buf, stats);
}

uint64_t FaultyReadFile::preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
if (injectionHook_ != nullptr) {
FaultFileReadvOperation op(path_, offset, buffers);
injectionHook_(&op);
if (!op.delegate) {
return op.readBytes;
}
}
return delegatedFile_->preadv(offset, buffers);
return delegatedFile_->preadv(offset, buffers, stats);
}

folly::SemiFuture<uint64_t> FaultyReadFile::preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const {
// TODO: add fault injection for async read later.
if (delegatedFile_->hasPreadvAsync() || executor_ == nullptr) {
return delegatedFile_->preadvAsync(offset, buffers);
return delegatedFile_->preadvAsync(offset, buffers, stats);
}
auto promise = std::make_unique<folly::Promise<uint64_t>>();
folly::SemiFuture<uint64_t> future = promise->getSemiFuture();
executor_->add([this,
_promise = std::move(promise),
_offset = offset,
_buffers = buffers]() {
auto delegateFuture = delegatedFile_->preadvAsync(_offset, _buffers);
_buffers = buffers,
_stats = stats]() {
auto delegateFuture =
delegatedFile_->preadvAsync(_offset, _buffers, _stats);
_promise->setValue(delegateFuture.wait().value());
});
return future;
Expand Down
13 changes: 9 additions & 4 deletions velox/common/file/tests/FaultyFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ class FaultyReadFile : public ReadFile {
return delegatedFile_->size();
}

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
const override;
std::string_view pread(
uint64_t offset,
uint64_t length,
void* buf,
io::IoStatistics* stats) const override;

uint64_t preadv(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const override;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const override;

uint64_t memoryUsage() const override {
return delegatedFile_->memoryUsage();
Expand All @@ -67,7 +71,8 @@ class FaultyReadFile : public ReadFile {

folly::SemiFuture<uint64_t> preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const override;
const std::vector<folly::Range<char*>>& buffers,
io::IoStatistics* stats) const override;

private:
const std::string path_;
Expand Down
Loading

0 comments on commit fbaeab4

Please sign in to comment.