From bb1f2a21942c5718020c6f96b1e964bfbcc1cdde Mon Sep 17 00:00:00 2001 From: Ke Date: Tue, 28 Jan 2025 23:26:35 -0800 Subject: [PATCH] feat: Add storage statistics in IoStatistics --- velox/common/caching/SsdFile.cpp | 6 +-- velox/common/file/File.cpp | 49 +++++++++++++------ velox/common/file/File.h | 45 +++++++++++------ velox/common/file/FileInputStream.cpp | 5 +- velox/common/file/tests/FaultyFile.cpp | 25 ++++++---- velox/common/file/tests/FaultyFile.h | 13 +++-- velox/common/file/tests/FileTest.cpp | 37 +++++++------- velox/common/io/IoStatistics.cpp | 35 +++++++++++-- velox/common/io/IoStatistics.h | 7 +++ velox/connectors/hive/HiveDataSource.cpp | 5 ++ .../storage_adapters/abfs/AbfsFileSystem.cpp | 33 +++++++++---- .../abfs/tests/AbfsFileSystemTest.cpp | 19 ++++--- .../storage_adapters/gcs/GcsFileSystem.cpp | 13 +++-- .../gcs/tests/GcsFileSystemTest.cpp | 13 ++--- .../storage_adapters/hdfs/HdfsReadFile.cpp | 12 +++-- .../hive/storage_adapters/hdfs/HdfsReadFile.h | 10 ++-- .../storage_adapters/s3fs/S3FileSystem.cpp | 13 +++-- .../connectors/hive/tests/FileHandleTest.cpp | 4 +- velox/dwio/common/InputStream.cpp | 8 +-- velox/dwio/common/tests/TestBufferedInput.cpp | 20 +++++--- velox/dwio/dwrf/test/TestReadFile.h | 12 +++-- velox/dwio/dwrf/test/TestStripeStream.cpp | 7 ++- velox/exec/OperatorTraceReader.cpp | 2 +- velox/exec/TraceUtil.cpp | 4 +- velox/tool/trace/TraceFileToolRunner.cpp | 2 +- 25 files changed, 270 insertions(+), 129 deletions(-) diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index d65b4e139cb1..4e5afff1b311 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -244,7 +244,7 @@ void SsdFile::read( uint64_t offset, const std::vector>& buffers) { process::TraceContext trace("SsdFile::read"); - readFile_->preadv(offset, buffers); + readFile_->preadv(offset, buffers, nullptr); } std::optional> SsdFile::getSpace( @@ -467,7 +467,7 @@ void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) { process::TraceContext trace("SsdFile::verifyWrite"); auto testData = std::make_unique(entry.size()); const auto rc = - readFile_->pread(ssdRun.offset(), entry.size(), testData.get()); + readFile_->pread(ssdRun.offset(), entry.size(), testData.get(), nullptr); VELOX_CHECK_EQ(rc.size(), entry.size()); if (entry.tinyData() != nullptr) { if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) { @@ -1008,7 +1008,7 @@ void SsdFile::readCheckpoint() { const auto logSize = evictLogReadFile->size(); std::vector evicted(logSize / sizeof(uint32_t)); try { - evictLogReadFile->pread(0, logSize, evicted.data()); + evictLogReadFile->pread(0, logSize, evicted.data(), nullptr); } catch (const std::exception& e) { ++stats_.readCheckpointErrors; VELOX_FAIL("Failed to read eviction log: {}", e.what()); diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 1739f8e8cb52..ab7d2cf3c9d6 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -57,17 +57,21 @@ T getAttribute( } } // namespace -std::string ReadFile::pread(uint64_t offset, uint64_t length) const { +std::string ReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { std::string buf; buf.resize(length); - auto res = pread(offset, length, buf.data()); + auto res = pread(offset, length, buf.data(), stats); buf.resize(res.size()); return buf; } uint64_t ReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { auto fileSize = size(); uint64_t numRead = 0; if (offset >= fileSize) { @@ -77,7 +81,7 @@ uint64_t ReadFile::preadv( auto copySize = std::min(range.size(), fileSize - offset); // NOTE: skip the gap in case of coalesce io. if (range.data() != nullptr) { - pread(offset, copySize, range.data()); + pread(offset, copySize, range.data(), stats); } offset += copySize; numRead += copySize; @@ -87,14 +91,15 @@ uint64_t ReadFile::preadv( uint64_t ReadFile::preadv( folly::Range regions, - folly::Range iobufs) const { + folly::Range iobufs, + io::IoStatistics* stats) const { VELOX_CHECK_EQ(regions.size(), iobufs.size()); uint64_t length = 0; for (size_t i = 0; i < regions.size(); ++i) { const auto& region = regions[i]; auto& output = iobufs[i]; output = folly::IOBuf(folly::IOBuf::CREATE, region.length); - pread(region.offset, region.length, output.writableData()); + pread(region.offset, region.length, output.writableData(), stats); output.append(region.length); length += region.length; } @@ -102,14 +107,20 @@ uint64_t ReadFile::preadv( return length; } -std::string_view -InMemoryReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view InMemoryReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { bytesRead_ += length; memcpy(buf, file_.data() + offset, length); return {static_cast(buf), length}; } -std::string InMemoryReadFile::pread(uint64_t offset, uint64_t length) const { +std::string InMemoryReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { bytesRead_ += length; return std::string(file_.data() + offset, length); } @@ -187,15 +198,19 @@ void LocalReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) folly::errnoStr(errno)); } -std::string_view -LocalReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view LocalReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { preadInternal(offset, length, static_cast(buf)); return {static_cast(buf), length}; } uint64_t LocalReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { // Dropped bytes sized so that a typical dropped range of 50K is not // too many iovecs. static thread_local std::vector droppedBytes(16 * 1024); @@ -251,16 +266,18 @@ uint64_t LocalReadFile::preadv( folly::SemiFuture LocalReadFile::preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { if (!executor_) { - return ReadFile::preadvAsync(offset, buffers); + return ReadFile::preadvAsync(offset, buffers, stats); } auto [promise, future] = folly::makePromiseContract(); executor_->add([this, _promise = std::move(promise), _offset = offset, - _buffers = buffers]() mutable { - auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers); + _buffers = buffers, + _stats = stats]() mutable { + auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers, _stats); _promise.setTry(std::move(delegateFuture).getTry()); }); return std::move(future); diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 832d098e303a..9996ada7f33e 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -40,6 +40,7 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/file/Region.h" +#include "velox/common/io/IoStatistics.h" namespace facebook::velox { @@ -52,13 +53,17 @@ class ReadFile { // buffer 'buf'. The bytes are returned as a string_view pointing to 'buf'. // // This method should be thread safe. - virtual std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const = 0; + virtual std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const = 0; // Same as above, but returns owned data directly. // // This method should be thread safe. - virtual std::string pread(uint64_t offset, uint64_t length) const; + virtual std::string + pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) const; // Reads starting at 'offset' into the memory referenced by the // Ranges in 'buffers'. The buffers are filled left to right. A @@ -67,7 +72,8 @@ class ReadFile { // This method should be thread safe. virtual uint64_t preadv( uint64_t /*offset*/, - const std::vector>& /*buffers*/) const; + const std::vector>& /*buffers*/, + io::IoStatistics* stats) const; // Vectorized read API. Implementations can coalesce and parallelize. // The offsets don't need to be sorted. @@ -82,7 +88,8 @@ class ReadFile { // This method should be thread safe. virtual uint64_t preadv( folly::Range regions, - folly::Range iobufs) const; + folly::Range iobufs, + io::IoStatistics* stats) const; /// Like preadv but may execute asynchronously and returns the read size or /// exception via SemiFuture. Use hasPreadvAsync() to check if the @@ -91,9 +98,10 @@ class ReadFile { /// This method should be thread safe. virtual folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { try { - return folly::SemiFuture(preadv(offset, buffers)); + return folly::SemiFuture(preadv(offset, buffers, stats)); } catch (const std::exception& e) { return folly::makeSemiFuture(e); } @@ -213,10 +221,14 @@ class InMemoryReadFile : public ReadFile { explicit InMemoryReadFile(std::string file) : ownedFile_(std::move(file)), file_(ownedFile_) {} - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const override; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const override; - std::string pread(uint64_t offset, uint64_t length) const override; + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const override; uint64_t size() const final { return file_.size(); @@ -278,18 +290,23 @@ class LocalReadFile final : public ReadFile { ~LocalReadFile(); - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const final; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const final; uint64_t size() const final; uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const final; + const std::vector>& buffers, + io::IoStatistics* stats) const final; folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats) const override; bool hasPreadvAsync() const override { return executor_ != nullptr; diff --git a/velox/common/file/FileInputStream.cpp b/velox/common/file/FileInputStream.cpp index 56efdc4e1c3a..df01951e9658 100644 --- a/velox/common/file/FileInputStream.cpp +++ b/velox/common/file/FileInputStream.cpp @@ -73,7 +73,8 @@ void FileInputStream::readNextRange() { VELOX_CHECK_LT( 0, readBytes, "Read past end of FileInputStream {}", fileSize_); NanosecondTimer timer_2{&readTimeNs}; - file_->pread(fileOffset_, readBytes, buffer()->asMutable()); + file_->pread( + fileOffset_, readBytes, buffer()->asMutable(), nullptr); } } @@ -218,7 +219,7 @@ void FileInputStream::maybeIssueReadahead() { } std::vector> ranges; ranges.emplace_back(nextBuffer()->asMutable(), size); - readAheadWait_ = file_->preadvAsync(fileOffset_, ranges); + readAheadWait_ = file_->preadvAsync(fileOffset_, ranges, nullptr); VELOX_CHECK(readAheadWait_.valid()); } diff --git a/velox/common/file/tests/FaultyFile.cpp b/velox/common/file/tests/FaultyFile.cpp index a5984113f901..dbdb47f493a0 100644 --- a/velox/common/file/tests/FaultyFile.cpp +++ b/velox/common/file/tests/FaultyFile.cpp @@ -30,8 +30,11 @@ FaultyReadFile::FaultyReadFile( VELOX_CHECK_NOT_NULL(delegatedFile_); } -std::string_view -FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view FaultyReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { if (injectionHook_ != nullptr) { FaultFileReadOperation op(path_, offset, length, buf); injectionHook_(&op); @@ -39,12 +42,13 @@ FaultyReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { return std::string_view(static_cast(op.buf), op.length); } } - return delegatedFile_->pread(offset, length, buf); + return delegatedFile_->pread(offset, length, buf, stats); } uint64_t FaultyReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { if (injectionHook_ != nullptr) { FaultFileReadvOperation op(path_, offset, buffers); injectionHook_(&op); @@ -52,23 +56,26 @@ uint64_t FaultyReadFile::preadv( return op.readBytes; } } - return delegatedFile_->preadv(offset, buffers); + return delegatedFile_->preadv(offset, buffers, stats); } folly::SemiFuture FaultyReadFile::preadvAsync( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { // TODO: add fault injection for async read later. if (delegatedFile_->hasPreadvAsync() || executor_ == nullptr) { - return delegatedFile_->preadvAsync(offset, buffers); + return delegatedFile_->preadvAsync(offset, buffers, stats); } auto promise = std::make_unique>(); folly::SemiFuture future = promise->getSemiFuture(); executor_->add([this, _promise = std::move(promise), _offset = offset, - _buffers = buffers]() { - auto delegateFuture = delegatedFile_->preadvAsync(_offset, _buffers); + _buffers = buffers, + _stats = stats]() { + auto delegateFuture = + delegatedFile_->preadvAsync(_offset, _buffers, _stats); _promise->setValue(delegateFuture.wait().value()); }); return future; diff --git a/velox/common/file/tests/FaultyFile.h b/velox/common/file/tests/FaultyFile.h index 6d73eaebe625..27c28d3e7bde 100644 --- a/velox/common/file/tests/FaultyFile.h +++ b/velox/common/file/tests/FaultyFile.h @@ -35,12 +35,16 @@ class FaultyReadFile : public ReadFile { return delegatedFile_->size(); } - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const override; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const override; uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats) const override; uint64_t memoryUsage() const override { return delegatedFile_->memoryUsage(); @@ -67,7 +71,8 @@ class FaultyReadFile : public ReadFile { folly::SemiFuture preadvAsync( uint64_t offset, - const std::vector>& buffers) const override; + const std::vector>& buffers, + io::IoStatistics* stats) const override; private: const std::string path_; diff --git a/velox/common/file/tests/FileTest.cpp b/velox/common/file/tests/FileTest.cpp index 9ea54ceb9595..b6f2f4601a22 100644 --- a/velox/common/file/tests/FileTest.cpp +++ b/velox/common/file/tests/FileTest.cpp @@ -75,19 +75,20 @@ void readData( ASSERT_EQ(readFile->size(), 15 + kOneMB); } char buffer1[5]; - ASSERT_EQ(readFile->pread(10 + kOneMB, 5, &buffer1), "ddddd"); + ASSERT_EQ(readFile->pread(10 + kOneMB, 5, &buffer1, nullptr), "ddddd"); char buffer2[10]; - ASSERT_EQ(readFile->pread(0, 10, &buffer2), "aaaaabbbbb"); + ASSERT_EQ(readFile->pread(0, 10, &buffer2, nullptr), "aaaaabbbbb"); char buffer3[kOneMB]; - ASSERT_EQ(readFile->pread(10, kOneMB, &buffer3), std::string(kOneMB, 'c')); + ASSERT_EQ( + readFile->pread(10, kOneMB, &buffer3, nullptr), std::string(kOneMB, 'c')); if (checkFileSize) { ASSERT_EQ(readFile->size(), 15 + kOneMB); } char buffer4[10]; - const std::string_view arf = readFile->pread(5, 10, &buffer4); - const std::string zarf = readFile->pread(kOneMB, 15); + const std::string_view arf = readFile->pread(5, 10, &buffer4, nullptr); + const std::string zarf = readFile->pread(kOneMB, 15, nullptr); auto buf = std::make_unique(8); - const std::string_view warf = readFile->pread(4, 8, buf.get()); + const std::string_view warf = readFile->pread(4, 8, buf.get(), nullptr); const std::string_view warfFromBuf(buf.get(), 8); ASSERT_EQ(arf, "bbbbbccccc"); ASSERT_EQ(zarf, "ccccccccccddddd"); @@ -105,7 +106,7 @@ void readData( (char*)(uint64_t)(15 + kOneMB - 500000 - sizeof(head) - sizeof(middle) - sizeof(tail))), folly::Range(tail, sizeof(tail))}; - ASSERT_EQ(15 + kOneMB, readFile->preadv(0, buffers)); + ASSERT_EQ(15 + kOneMB, readFile->preadv(0, buffers, nullptr)); ASSERT_EQ(std::string_view(head, sizeof(head)), "aaaaabbbbbcc"); ASSERT_EQ(std::string_view(middle, sizeof(middle)), "cccc"); ASSERT_EQ(std::string_view(tail, sizeof(tail)), "ccddddd"); @@ -113,7 +114,7 @@ void readData( std::vector> buffers1 = { folly::Range(head, sizeof(head)), folly::Range(nullptr, (char*)(uint64_t)500000)}; - auto future1 = readFile->preadvAsync(0, buffers1); + auto future1 = readFile->preadvAsync(0, buffers1, nullptr); const auto offset1 = sizeof(head) + 500000; std::vector> buffers2 = { folly::Range(middle, sizeof(middle)), @@ -121,11 +122,11 @@ void readData( nullptr, (char*)(uint64_t)(15 + kOneMB - offset1 - sizeof(middle) - sizeof(tail)))}; - auto future2 = readFile->preadvAsync(offset1, buffers2); + auto future2 = readFile->preadvAsync(offset1, buffers2, nullptr); std::vector> buffers3 = { folly::Range(tail, sizeof(tail))}; const auto offset2 = 15 + kOneMB - sizeof(tail); - auto future3 = readFile->preadvAsync(offset2, buffers3); + auto future3 = readFile->preadvAsync(offset2, buffers3, nullptr); ASSERT_EQ(offset1, future1.wait().value()); ASSERT_EQ(offset2 - offset1, future2.wait().value()); ASSERT_EQ(sizeof(tail), future3.wait().value()); @@ -164,7 +165,7 @@ TEST(InMemoryFile, preadv) { {5 + 5 + kOneMB + 2, 3UL, {}}}; std::vector iobufs(readRegions.size()); - readFile.preadv(readRegions, {iobufs.data(), iobufs.size()}); + readFile.preadv(readRegions, {iobufs.data(), iobufs.size()}, nullptr); std::vector values; values.reserve(iobufs.size()); for (auto& iobuf : iobufs) { @@ -245,7 +246,7 @@ TEST_P(LocalFileTest, viaRegistry) { auto readFile = fs->openFileForRead(filename); ASSERT_EQ(readFile->size(), 5); char buffer1[5]; - ASSERT_EQ(readFile->pread(0, 5, &buffer1), "snarf"); + ASSERT_EQ(readFile->pread(0, 5, &buffer1, nullptr), "snarf"); fs->remove(filename); } @@ -273,7 +274,7 @@ TEST_P(LocalFileTest, rename) { localFs->rename(b, newA, true); auto readFile = localFs->openFileForRead(newA); char buffer[5]; - ASSERT_EQ(readFile->pread(0, 5, &buffer), data); + ASSERT_EQ(readFile->pread(0, 5, &buffer, nullptr), data); } TEST_P(LocalFileTest, exists) { @@ -522,11 +523,11 @@ class FaultyFsTest : public ::testing::Test { void readData(ReadFile* file, bool useReadv = false) { char readBuf[buffer_.size()]; if (!useReadv) { - file->pread(0, buffer_.size(), readBuf); + file->pread(0, buffer_.size(), readBuf, nullptr); } else { std::vector> buffers; buffers.push_back(folly::Range(readBuf, buffer_.size())); - file->preadv(0, buffers); + file->preadv(0, buffers, nullptr); } for (int i = 0; i < buffer_.size(); ++i) { if (buffer_[i] != readBuf[i]) { @@ -782,7 +783,7 @@ TEST_F(FaultyFsTest, fileWriteFaultHookInjection) { auto readFile = fs_->openFileForRead(path1, {}); char buffer[5]; ASSERT_EQ(readFile->size(), 5); - ASSERT_EQ(readFile->pread(0, 5, &buffer), "hello"); + ASSERT_EQ(readFile->pread(0, 5, &buffer, nullptr), "hello"); fs_->remove(path1); } { @@ -792,7 +793,7 @@ TEST_F(FaultyFsTest, fileWriteFaultHookInjection) { auto readFile = fs_->openFileForRead(path2, {}); char buffer[10]; ASSERT_EQ(readFile->size(), 10); - ASSERT_EQ(readFile->pread(0, 10, &buffer), "Error data"); + ASSERT_EQ(readFile->pread(0, 10, &buffer, nullptr), "Error data"); fs_->remove(path2); } @@ -816,7 +817,7 @@ TEST_F(FaultyFsTest, fileWriteFaultHookInjection) { auto readFile = fs_->openFileForRead(path1, {}); char buffer[5]; ASSERT_EQ(readFile->size(), 5); - ASSERT_EQ(readFile->pread(0, 5, &buffer), "hello"); + ASSERT_EQ(readFile->pread(0, 5, &buffer, nullptr), "hello"); fs_->remove(path1); } { diff --git a/velox/common/io/IoStatistics.cpp b/velox/common/io/IoStatistics.cpp index 7dfddc6dc483..69bf03125490 100644 --- a/velox/common/io/IoStatistics.cpp +++ b/velox/common/io/IoStatistics.cpp @@ -108,6 +108,24 @@ IoStatistics::operationStats() const { return operationStats_; } +std::unordered_map IoStatistics::storageStats() + const { + std::lock_guard lock{storageStatsMutex_}; + return storageStats_; +} + +void IoStatistics::addStorageStats( + const std::string& name, + const RuntimeCounter& counter) { + std::lock_guard lock{storageStatsMutex_}; + if (storageStats_.count(name) == 0) { + storageStats_.emplace(name, RuntimeMetric(counter.unit)); + } else { + VELOX_CHECK_EQ(storageStats_.at(name).unit, counter.unit); + } + storageStats_.at(name).addValue(counter.value); +} + void IoStatistics::merge(const IoStatistics& other) { rawBytesRead_ += other.rawBytesRead_; rawBytesWritten_ += other.rawBytesWritten_; @@ -119,9 +137,20 @@ void IoStatistics::merge(const IoStatistics& other) { ramHit_.merge(other.ramHit_); ssdRead_.merge(other.ssdRead_); queryThreadIoLatency_.merge(other.queryThreadIoLatency_); - std::lock_guard l(operationStatsMutex_); - for (auto& item : other.operationStats_) { - operationStats_[item.first].merge(item.second); + { + const auto& otherOperationStats = other.operationStats(); + std::lock_guard l(operationStatsMutex_); + for (auto& item : otherOperationStats) { + operationStats_[item.first].merge(item.second); + } + } + + { + const auto& otherStorageStats = other.storageStats(); + std::lock_guard storageStatsLock(storageStatsMutex_); + for (auto& item : otherStorageStats) { + storageStats_[item.first].merge(item.second); + } } } diff --git a/velox/common/io/IoStatistics.h b/velox/common/io/IoStatistics.h index 2111a8877b47..6d31e0dfef14 100644 --- a/velox/common/io/IoStatistics.h +++ b/velox/common/io/IoStatistics.h @@ -23,6 +23,8 @@ #include #include +#include "velox/common/base/Exceptions.h" +#include "velox/common/base/RuntimeMetrics.h" namespace facebook::velox::io { @@ -140,6 +142,9 @@ class IoStatistics { const uint64_t partialThrottleCount = 0); std::unordered_map operationStats() const; + std::unordered_map storageStats() const; + + void addStorageStats(const std::string& name, const RuntimeCounter& counter); void merge(const IoStatistics& other); @@ -172,7 +177,9 @@ class IoStatistics { IoCounter queryThreadIoLatency_; std::unordered_map operationStats_; + std::unordered_map storageStats_; mutable std::mutex operationStatsMutex_; + mutable std::mutex storageStatsMutex_; }; } // namespace facebook::velox::io diff --git a/velox/connectors/hive/HiveDataSource.cpp b/velox/connectors/hive/HiveDataSource.cpp index 0edfb3a0ea7c..dbac54e81889 100644 --- a/velox/connectors/hive/HiveDataSource.cpp +++ b/velox/connectors/hive/HiveDataSource.cpp @@ -513,6 +513,11 @@ std::unordered_map HiveDataSource::runtimeStats() { if (numBucketConversion_ > 0) { res.insert({"numBucketConversion", RuntimeCounter(numBucketConversion_)}); } + for (const auto& storageStats : ioStats_->storageStats()) { + res.emplace( + storageStats.first, + RuntimeCounter(storageStats.second.sum, storageStats.second.unit)); + } return res; } diff --git a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp index 7e63c2df1438..75b53e5adc9f 100644 --- a/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp @@ -58,12 +58,17 @@ class AbfsReadFile::Impl { VELOX_CHECK_GE(length_, 0); } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) const { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const { preadInternal(offset, length, static_cast(buffer)); return {static_cast(buffer), length}; } - std::string pread(uint64_t offset, uint64_t length) const { + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const { std::string result(length, 0); preadInternal(offset, length, result.data()); return result; @@ -71,7 +76,8 @@ class AbfsReadFile::Impl { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { size_t length = 0; auto size = buffers.size(); for (auto& range : buffers) { @@ -92,7 +98,8 @@ class AbfsReadFile::Impl { uint64_t preadv( folly::Range regions, - folly::Range iobufs) const { + folly::Range iobufs, + io::IoStatistics* stats) const { size_t length = 0; VELOX_CHECK_EQ(regions.size(), iobufs.size()); for (size_t i = 0; i < regions.size(); ++i) { @@ -156,24 +163,32 @@ void AbfsReadFile::initialize(const FileOptions& options) { return impl_->initialize(options); } -std::string_view -AbfsReadFile::pread(uint64_t offset, uint64_t length, void* buffer) const { +std::string_view AbfsReadFile::pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const { return impl_->pread(offset, length, buffer); } -std::string AbfsReadFile::pread(uint64_t offset, uint64_t length) const { +std::string AbfsReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { return impl_->pread(offset, length); } uint64_t AbfsReadFile::preadv( uint64_t offset, - const std::vector>& buffers) const { + const std::vector>& buffers, + io::IoStatistics* stats) const { return impl_->preadv(offset, buffers); } uint64_t AbfsReadFile::preadv( folly::Range regions, - folly::Range iobufs) const { + folly::Range iobufs, + io::IoStatistics* stats) const { return impl_->preadv(regions, iobufs); } diff --git a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp index aef53c2dd68d..9d6772d7982e 100644 --- a/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/abfs/tests/AbfsFileSystemTest.cpp @@ -96,17 +96,18 @@ class AbfsFileSystemTest : public testing::Test { void readData(ReadFile* readFile) { ASSERT_EQ(readFile->size(), 15 + kOneMB); char buffer1[5]; - ASSERT_EQ(readFile->pread(10 + kOneMB, 5, &buffer1), "ddddd"); + ASSERT_EQ(readFile->pread(10 + kOneMB, 5, &buffer1, nullptr), "ddddd"); char buffer2[10]; - ASSERT_EQ(readFile->pread(0, 10, &buffer2), "aaaaabbbbb"); + ASSERT_EQ(readFile->pread(0, 10, &buffer2, nullptr), "aaaaabbbbb"); char buffer3[kOneMB]; - ASSERT_EQ(readFile->pread(10, kOneMB, buffer3), std::string(kOneMB, 'c')); + ASSERT_EQ( + readFile->pread(10, kOneMB, buffer3, nullptr), std::string(kOneMB, 'c')); ASSERT_EQ(readFile->size(), 15 + kOneMB); char buffer4[10]; - const std::string_view arf = readFile->pread(5, 10, &buffer4); - const std::string zarf = readFile->pread(kOneMB, 15); + const std::string_view arf = readFile->pread(5, 10, &buffer4, nullptr); + const std::string zarf = readFile->pread(kOneMB, 15, nullptr); auto buf = std::make_unique(8); - const std::string_view warf = readFile->pread(4, 8, buf.get()); + const std::string_view warf = readFile->pread(4, 8, buf.get(), nullptr); const std::string_view warfFromBuf(buf.get(), 8); ASSERT_EQ(arf, "bbbbbccccc"); ASSERT_EQ(zarf, "ccccccccccddddd"); @@ -119,7 +120,7 @@ void readData(ReadFile* readFile) { folly::Range(buff1, 10), folly::Range(nullptr, kOneMB - 5), folly::Range(buff2, 10)}; - ASSERT_EQ(10 + kOneMB - 5 + 10, readFile->preadv(0, buffers)); + ASSERT_EQ(10 + kOneMB - 5 + 10, readFile->preadv(0, buffers, nullptr)); ASSERT_EQ(std::string_view(buff1, sizeof(buff1)), "aaaaabbbbb"); ASSERT_EQ(std::string_view(buff2, sizeof(buff2)), "cccccddddd"); @@ -128,7 +129,9 @@ void readData(ReadFile* readFile) { ASSERT_EQ( 10 + 5, readFile->preadv( - {regions.data(), regions.size()}, {iobufs.data(), iobufs.size()})); + {regions.data(), regions.size()}, + {iobufs.data(), iobufs.size()}, + nullptr)); ASSERT_EQ( std::string_view( reinterpret_cast(iobufs[0].writableData()), diff --git a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp index f5d0830d3de3..147882305147 100644 --- a/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/GcsFileSystem.cpp @@ -97,13 +97,17 @@ class GcsReadFile final : public ReadFile { VELOX_CHECK_GE(length_, 0); } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const override { preadInternal(offset, length, static_cast(buffer)); return {static_cast(buffer), length}; } - std::string pread(uint64_t offset, uint64_t length) const override { + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const override { std::string result(length, 0); char* position = result.data(); preadInternal(offset, length, position); @@ -112,7 +116,8 @@ class GcsReadFile final : public ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override { + const std::vector>& buffers, + io::IoStatistics* stats) const override { // 'buffers' contains Ranges(data, size) with some gaps (data = nullptr) in // between. This call must populate the ranges (except gap ranges) // sequentially starting from 'offset'. If a range pointer is nullptr, the diff --git a/velox/connectors/hive/storage_adapters/gcs/tests/GcsFileSystemTest.cpp b/velox/connectors/hive/storage_adapters/gcs/tests/GcsFileSystemTest.cpp index a80842208f89..400f7d9d3551 100644 --- a/velox/connectors/hive/storage_adapters/gcs/tests/GcsFileSystemTest.cpp +++ b/velox/connectors/hive/storage_adapters/gcs/tests/GcsFileSystemTest.cpp @@ -46,17 +46,18 @@ TEST_F(GcsFileSystemTest, readFile) { std::int64_t size = readFile->size(); std::int64_t ref_size = kLoremIpsum.length(); EXPECT_EQ(size, ref_size); - EXPECT_EQ(readFile->pread(0, size), kLoremIpsum); + EXPECT_EQ(readFile->pread(0, size, nullptr), kLoremIpsum); char buffer1[size]; - ASSERT_EQ(readFile->pread(0, size, &buffer1), kLoremIpsum); + ASSERT_EQ(readFile->pread(0, size, &buffer1, nullptr), kLoremIpsum); ASSERT_EQ(readFile->size(), ref_size); char buffer2[50]; - ASSERT_EQ(readFile->pread(10, 50, &buffer2), kLoremIpsum.substr(10, 50)); + ASSERT_EQ( + readFile->pread(10, 50, &buffer2, nullptr), kLoremIpsum.substr(10, 50)); ASSERT_EQ(readFile->size(), ref_size); - EXPECT_EQ(readFile->pread(10, size - 10), kLoremIpsum.substr(10)); + EXPECT_EQ(readFile->pread(10, size - 10, nullptr), kLoremIpsum.substr(10)); char buff1[10]; char buff2[20]; @@ -67,7 +68,7 @@ TEST_F(GcsFileSystemTest, readFile) { folly::Range(buff2, 20), folly::Range(nullptr, 30), folly::Range(buff3, 30)}; - ASSERT_EQ(10 + 20 + 20 + 30 + 30, readFile->preadv(0, buffers)); + ASSERT_EQ(10 + 20 + 20 + 30 + 30, readFile->preadv(0, buffers, nullptr)); ASSERT_EQ(std::string_view(buff1, sizeof(buff1)), kLoremIpsum.substr(0, 10)); ASSERT_EQ(std::string_view(buff2, sizeof(buff2)), kLoremIpsum.substr(30, 20)); ASSERT_EQ(std::string_view(buff3, sizeof(buff3)), kLoremIpsum.substr(80, 30)); @@ -100,7 +101,7 @@ TEST_F(GcsFileSystemTest, writeAndReadFile) { auto readFile = gcfs.openFileForRead(gcsFile); std::int64_t size = readFile->size(); EXPECT_EQ(readFile->size(), contentSize); - EXPECT_EQ(readFile->pread(0, size), kDataContent); + EXPECT_EQ(readFile->pread(0, size, nullptr), kDataContent); // Opening an existing file for write must be an error. filesystems::GcsFileSystem newGcfs(emulator_->hiveConfig()); diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index 7210f8a5d2a8..518d795fa455 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -159,12 +159,18 @@ HdfsReadFile::HdfsReadFile( HdfsReadFile::~HdfsReadFile() = default; -std::string_view -HdfsReadFile::pread(uint64_t offset, uint64_t length, void* buf) const { +std::string_view HdfsReadFile::pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const { return pImpl->pread(offset, length, buf); } -std::string HdfsReadFile::pread(uint64_t offset, uint64_t length) const { +std::string HdfsReadFile::pread( + uint64_t offset, + uint64_t length, + io::IoStatistics* stats) const { return pImpl->pread(offset, length); } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index 121fef47212d..eb98c09190ab 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -34,10 +34,14 @@ class HdfsReadFile final : public ReadFile { std::string_view path); ~HdfsReadFile() override; - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const final; + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + io::IoStatistics* stats) const final; - std::string pread(uint64_t offset, uint64_t length) const final; + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const final; uint64_t size() const final; diff --git a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp index cd57ce79e846..3af3aa9e0cec 100644 --- a/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp +++ b/velox/connectors/hive/storage_adapters/s3fs/S3FileSystem.cpp @@ -103,13 +103,17 @@ class S3ReadFile final : public ReadFile { VELOX_CHECK_GE(length_, 0); } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const override { preadInternal(offset, length, static_cast(buffer)); return {static_cast(buffer), length}; } - std::string pread(uint64_t offset, uint64_t length) const override { + std::string pread(uint64_t offset, uint64_t length, io::IoStatistics* stats) + const override { std::string result(length, 0); char* position = result.data(); preadInternal(offset, length, position); @@ -118,7 +122,8 @@ class S3ReadFile final : public ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override { + const std::vector>& buffers, + io::IoStatistics* stats) const override { // 'buffers' contains Ranges(data, size) with some gaps (data = nullptr) in // between. This call must populate the ranges (except gap ranges) // sequentially starting from 'offset'. AWS S3 GetObject does not support diff --git a/velox/connectors/hive/tests/FileHandleTest.cpp b/velox/connectors/hive/tests/FileHandleTest.cpp index e641cb391627..ef81cc895d4c 100644 --- a/velox/connectors/hive/tests/FileHandleTest.cpp +++ b/velox/connectors/hive/tests/FileHandleTest.cpp @@ -42,7 +42,7 @@ TEST(FileHandleTest, localFile) { auto fileHandle = factory.generate(filename); ASSERT_EQ(fileHandle->file->size(), 3); char buffer[3]; - ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo"); + ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer, nullptr), "foo"); // Clean up remove(filename.c_str()); @@ -68,7 +68,7 @@ TEST(FileHandleTest, localFileWithProperties) { auto fileHandle = factory.generate(filename, &properties); ASSERT_EQ(fileHandle->file->size(), 3); char buffer[3]; - ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer), "foo"); + ASSERT_EQ(fileHandle->file->pread(0, 3, &buffer, nullptr), "foo"); // Clean up remove(filename.c_str()); diff --git a/velox/dwio/common/InputStream.cpp b/velox/dwio/common/InputStream.cpp index e2dd496c16d8..e58fe41f12ae 100644 --- a/velox/dwio/common/InputStream.cpp +++ b/velox/dwio/common/InputStream.cpp @@ -78,7 +78,7 @@ void ReadFileInputStream::read( std::string_view readData; { MicrosecondTimer timer(&readTimeUs); - readData = readFile_->pread(offset, length, buf); + readData = readFile_->pread(offset, length, buf, stats_); } if (stats_) { stats_->incRawBytesRead(length); @@ -101,7 +101,7 @@ void ReadFileInputStream::read( LogType logType) { const int64_t bufferSize = totalBufferSize(buffers); logRead(offset, bufferSize, logType); - const auto size = readFile_->preadv(offset, buffers); + const auto size = readFile_->preadv(offset, buffers, stats_); VELOX_CHECK_EQ( size, bufferSize, @@ -118,7 +118,7 @@ folly::SemiFuture ReadFileInputStream::readAsync( LogType logType) { const int64_t bufferSize = totalBufferSize(buffers); logRead(offset, bufferSize, logType); - return readFile_->preadvAsync(offset, buffers); + return readFile_->preadvAsync(offset, buffers, stats_); } bool ReadFileInputStream::hasReadAsync() const { @@ -137,7 +137,7 @@ void ReadFileInputStream::vread( [&](size_t acc, const auto& r) { return acc + r.length; }); logRead(regions[0].offset, length, purpose); auto readStartMicros = getCurrentTimeMicro(); - readFile_->preadv(regions, iobufs); + readFile_->preadv(regions, iobufs, stats_); if (stats_) { stats_->incRawBytesRead(length); stats_->incTotalScanTime((getCurrentTimeMicro() - readStartMicros) * 1000); diff --git a/velox/dwio/common/tests/TestBufferedInput.cpp b/velox/dwio/common/tests/TestBufferedInput.cpp index c0795e3c0760..4032e375e47a 100644 --- a/velox/dwio/common/tests/TestBufferedInput.cpp +++ b/velox/dwio/common/tests/TestBufferedInput.cpp @@ -32,7 +32,7 @@ class ReadFileMock : public ::facebook::velox::ReadFile { MOCK_METHOD( std::string_view, pread, - (uint64_t offset, uint64_t length, void* buf), + (uint64_t offset, uint64_t length, void* buf, IoStatistics* stats), (const, override)); MOCK_METHOD(bool, shouldCoalesce, (), (const, override)); @@ -43,7 +43,9 @@ class ReadFileMock : public ::facebook::velox::ReadFile { MOCK_METHOD( uint64_t, preadv, - (folly::Range regions, folly::Range iobufs), + (folly::Range regions, + folly::Range iobufs, + IoStatistics* stats), (const, override)); }; @@ -55,11 +57,14 @@ void expectPreads( EXPECT_CALL(file, size()).WillRepeatedly(Return(content.size())); for (auto& read : reads) { ASSERT_GE(content.size(), read.offset + read.length); - EXPECT_CALL(file, pread(read.offset, read.length, _)) + EXPECT_CALL(file, pread(read.offset, read.length, _, nullptr)) .Times(1) .WillOnce( - [content](uint64_t offset, uint64_t length, void* buf) - -> std::string_view { + [content]( + uint64_t offset, + uint64_t length, + void* buf, + IoStatistics* stats) -> std::string_view { memcpy(buf, content.data() + offset, length); return {content.data() + offset, length}; }); @@ -72,12 +77,13 @@ void expectPreadvs( std::vector reads) { EXPECT_CALL(file, getName()).WillRepeatedly(Return("mock_name")); EXPECT_CALL(file, size()).WillRepeatedly(Return(content.size())); - EXPECT_CALL(file, preadv(_, _)) + EXPECT_CALL(file, preadv(_, _, nullptr)) .Times(1) .WillOnce( [content, reads]( folly::Range regions, - folly::Range iobufs) -> uint64_t { + folly::Range iobufs, + IoStatistics* stats) -> uint64_t { EXPECT_EQ(regions.size(), reads.size()); uint64_t length = 0; for (size_t i = 0; i < reads.size(); ++i) { diff --git a/velox/dwio/dwrf/test/TestReadFile.h b/velox/dwio/dwrf/test/TestReadFile.h index 30b35de2b277..2a52b9462933 100644 --- a/velox/dwio/dwrf/test/TestReadFile.h +++ b/velox/dwio/dwrf/test/TestReadFile.h @@ -38,8 +38,11 @@ class TestReadFile : public velox::ReadFile { return length_; } - std::string_view pread(uint64_t offset, uint64_t length, void* buffer) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buffer, + io::IoStatistics* stats) const override { const uint64_t content = offset + seed_; const uint64_t available = std::min(length_ - offset, length); int fill; @@ -51,8 +54,9 @@ class TestReadFile : public velox::ReadFile { uint64_t preadv( uint64_t offset, - const std::vector>& buffers) const override { - auto res = ReadFile::preadv(offset, buffers); + const std::vector>& buffers, + io::IoStatistics* stats) const override { + auto res = ReadFile::preadv(offset, buffers, stats); ++numIos_; return res; } diff --git a/velox/dwio/dwrf/test/TestStripeStream.cpp b/velox/dwio/dwrf/test/TestStripeStream.cpp index b456b7c30af8..6c4245ce56a4 100644 --- a/velox/dwio/dwrf/test/TestStripeStream.cpp +++ b/velox/dwio/dwrf/test/TestStripeStream.cpp @@ -38,8 +38,11 @@ class RecordingInputStream : public facebook::velox::InMemoryReadFile { public: RecordingInputStream() : InMemoryReadFile(std::string()) {} - std::string_view pread(uint64_t offset, uint64_t length, void* buf) - const override { + std::string_view pread( + uint64_t offset, + uint64_t length, + void* buf, + IoStatistics* stats) const override { reads_.push_back({offset, length}); return {static_cast(buf), length}; } diff --git a/velox/exec/OperatorTraceReader.cpp b/velox/exec/OperatorTraceReader.cpp index 0e64111d6886..d508723376ef 100644 --- a/velox/exec/OperatorTraceReader.cpp +++ b/velox/exec/OperatorTraceReader.cpp @@ -74,7 +74,7 @@ OperatorTraceSummaryReader::OperatorTraceSummaryReader( OperatorTraceSummary OperatorTraceSummaryReader::read() const { VELOX_CHECK_NOT_NULL(summaryFile_); - const auto summaryStr = summaryFile_->pread(0, summaryFile_->size()); + const auto summaryStr = summaryFile_->pread(0, summaryFile_->size(), nullptr); VELOX_CHECK(!summaryStr.empty()); folly::dynamic summaryObj = folly::parseJson(summaryStr); diff --git a/velox/exec/TraceUtil.cpp b/velox/exec/TraceUtil.cpp index 28cac1b77f03..69793b4293ba 100644 --- a/velox/exec/TraceUtil.cpp +++ b/velox/exec/TraceUtil.cpp @@ -150,7 +150,7 @@ folly::dynamic getTaskMetadata( try { const auto file = fs->openFileForRead(taskMetaFilePath); VELOX_CHECK_NOT_NULL(file); - const auto taskMeta = file->pread(0, file->size()); + const auto taskMeta = file->pread(0, file->size(), nullptr); VELOX_USER_CHECK(!taskMeta.empty()); return folly::parseJson(taskMeta); } catch (const std::exception& e) { @@ -169,7 +169,7 @@ std::string getNodeName( try { const auto file = fs->openFileForRead(taskMetaFilePath); VELOX_CHECK_NOT_NULL(file); - const auto taskMeta = file->pread(0, file->size()); + const auto taskMeta = file->pread(0, file->size(), nullptr); VELOX_USER_CHECK(!taskMeta.empty()); folly::dynamic metaObj = folly::parseJson(taskMeta); const auto planFragment = ISerializable::deserialize( diff --git a/velox/tool/trace/TraceFileToolRunner.cpp b/velox/tool/trace/TraceFileToolRunner.cpp index fcea11603341..d94d9c71a8ae 100644 --- a/velox/tool/trace/TraceFileToolRunner.cpp +++ b/velox/tool/trace/TraceFileToolRunner.cpp @@ -94,7 +94,7 @@ void TraceFileToolRunner::copyFiles() const { while (offset < fileSize) { const auto curLen = std::min(fileSize - offset, batchSize); const auto dataView = - readFile->pread(offset, curLen, ioBuf->writableData()); + readFile->pread(offset, curLen, ioBuf->writableData(), nullptr); writeFile->append(dataView); ioBuf->append(curLen); offset += curLen;