Skip to content

Commit

Permalink
Add fault injection for write operation
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 committed Apr 19, 2024
1 parent d796cfc commit 2f86af5
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 20 deletions.
12 changes: 11 additions & 1 deletion velox/common/file/tests/FaultyFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,24 @@ uint64_t FaultyReadFile::preadv(
}

FaultyWriteFile::FaultyWriteFile(
const std::string& path,
std::shared_ptr<WriteFile> delegatedFile,
FileFaultInjectionHook injectionHook)
: delegatedFile_(std::move(delegatedFile)),
: path_(path),
delegatedFile_(std::move(delegatedFile)),
injectionHook_(std::move(injectionHook)) {
VELOX_CHECK_NOT_NULL(delegatedFile_);
}

void FaultyWriteFile::append(std::string_view data) {
if (injectionHook_ != nullptr) {
FaultFileWriteOperation op(path_, data);
injectionHook_(&op);
if (op.delegate) {
delegatedFile_->append(op.data);
}
return;
}
delegatedFile_->append(data);
}

Expand Down
14 changes: 14 additions & 0 deletions velox/common/file/tests/FaultyFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct FaultFileOperation {
/// Injects faults for file read operations.
kRead,
kReadv,
kWrite,
/// TODO: add to support fault injections for the other file operation
/// types.
};
Expand Down Expand Up @@ -77,6 +78,17 @@ struct FaultFileReadvOperation : FaultFileOperation {
buffers(_buffers) {}
};

/// Fault injection parameters for file write API.
struct FaultFileWriteOperation : FaultFileOperation {
std::string_view data;

FaultFileWriteOperation(
const std::string& _path,
const std::string_view& _data)
: FaultFileOperation(FaultFileOperation::Type::kWrite, _path),
data(_data) {}
};

/// The fault injection hook on the file operation path.
using FileFaultInjectionHook = std::function<void(FaultFileOperation*)>;

Expand Down Expand Up @@ -125,6 +137,7 @@ class FaultyReadFile : public ReadFile {
class FaultyWriteFile : public WriteFile {
public:
FaultyWriteFile(
const std::string& path,
std::shared_ptr<WriteFile> delegatedFile,
FileFaultInjectionHook injectionHook);

Expand All @@ -143,6 +156,7 @@ class FaultyWriteFile : public WriteFile {
}

private:
const std::string path_;
const std::shared_ptr<WriteFile> delegatedFile_;
const FileFaultInjectionHook injectionHook_;
};
Expand Down
5 changes: 3 additions & 2 deletions velox/common/file/tests/FaultyFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ std::unique_ptr<WriteFile> FaultyFileSystem::openFileForWrite(
auto delegatedFile = getFileSystem(delegatedPath, config_)
->openFileForWrite(delegatedPath, options);
return std::make_unique<FaultyWriteFile>(
std::move(delegatedFile),
[&](FaultFileOperation* op) { maybeInjectFileFault(op); });
std::string(path), std::move(delegatedFile), [&](FaultFileOperation* op) {
maybeInjectFileFault(op);
});
}

void FaultyFileSystem::remove(std::string_view path) {
Expand Down
125 changes: 108 additions & 17 deletions velox/common/file/tests/FileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ class FaultyFsTest : public ::testing::Test {
filesystems::getFileSystem(dir_->getPath(), {}));
VELOX_CHECK_NOT_NULL(fs_);
filePath_ = fmt::format("{}/faultyTestFile", dir_->getPath());
writeFilePath_ = fmt::format("{}/faultyTestWriteFile", dir_->getPath());
const int bufSize = 1024;
buffer_.resize(bufSize);
for (int i = 0; i < bufSize; ++i) {
Expand Down Expand Up @@ -412,6 +413,7 @@ class FaultyFsTest : public ::testing::Test {

std::shared_ptr<exec::test::TempDirectoryPath> dir_;
std::string filePath_;
std::string writeFilePath_;
std::shared_ptr<tests::utils::FaultyFileSystem> fs_;
std::string buffer_;
std::exception_ptr fileError_;
Expand Down Expand Up @@ -440,7 +442,7 @@ TEST_F(FaultyFsTest, fileReadErrorInjection) {
}
{
auto readFile = fs_->openFileForRead(filePath_, {});
// We only inject error for pread API so preadv should be fine.
// We only inject error for preadv API so pread should be fine.
readData(readFile.get(), false);
}

Expand All @@ -450,13 +452,10 @@ TEST_F(FaultyFsTest, fileReadErrorInjection) {
VELOX_ASSERT_THROW(readData(readFile.get(), true), "InjectedFaultFileError");
VELOX_ASSERT_THROW(readData(readFile.get(), false), "InjectedFaultFileError");
fs_->remove(filePath_);
// Check there is no interference on write as we don't support it for now.
auto writeFile = fs_->openFileForWrite(filePath_, {});
writeData(writeFile.get());
}

TEST_F(FaultyFsTest, fileReadDelayInjection) {
// Set 3 seconds delay.
// Set 2 seconds delay.
const uint64_t injectDelay{2'000'000};
fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kRead});
{
Expand Down Expand Up @@ -523,16 +522,6 @@ TEST_F(FaultyFsTest, fileReadDelayInjection) {
}
ASSERT_GE(readDurationUs, injectDelay);
}

fs_->remove(filePath_);
// Check there is no interference on write as we don't support it for now.
auto writeFile = fs_->openFileForWrite(filePath_, {});
uint64_t writeDurationUs{0};
{
MicrosecondTimer writeTimer(&writeDurationUs);
writeData(writeFile.get());
}
ASSERT_LT(writeDurationUs, injectDelay);
}

TEST_F(FaultyFsTest, fileReadFaultHookInjection) {
Expand Down Expand Up @@ -571,8 +560,7 @@ TEST_F(FaultyFsTest, fileReadFaultHookInjection) {
auto readFile = fs_->openFileForRead(path2, {});
// Verify only throw for readv.
readData(readFile.get(), false);
VELOX_ASSERT_THROW(
readData(readFile.get(), true), "inject hook read failure");
VELOX_ASSERT_THROW(readData(readFile.get(), true), "inject hook read failure");
}

// Set to return fake data.
Expand Down Expand Up @@ -605,3 +593,106 @@ TEST_F(FaultyFsTest, fileReadFaultHookInjection) {
VELOX_ASSERT_THROW(readData(readFile.get(), false), "Data Mismatch");
}
}

TEST_F(FaultyFsTest, fileWriteErrorInjection) {
// Set write error.
fs_->setFileInjectionError(fileError_, {FaultFileOperation::Type::kWrite});
{
auto writeFile = fs_->openFileForWrite(writeFilePath_, {});
VELOX_ASSERT_THROW(writeFile->append("hello"), "InjectedFaultFileError");
fs_->remove(writeFilePath_);
}
// Set error for all kinds of operations.
fs_->setFileInjectionError(fileError_);
{
auto writeFile = fs_->openFileForWrite(writeFilePath_, {});
VELOX_ASSERT_THROW(writeFile->append("hello"), "InjectedFaultFileError");
fs_->remove(writeFilePath_);
}
}

TEST_F(FaultyFsTest, fileWriteDelayInjection) {
// Set 2 seconds delay.
const uint64_t injectDelay{2'000'000};
fs_->setFileInjectionDelay(injectDelay, {FaultFileOperation::Type::kWrite});
{
auto writeFile = fs_->openFileForWrite(writeFilePath_, {});
uint64_t readDurationUs{0};
{
MicrosecondTimer readTimer(&readDurationUs);
writeFile->append("hello");
}
ASSERT_GE(readDurationUs, injectDelay);
fs_->remove(writeFilePath_);
}
}

TEST_F(FaultyFsTest, fileWriteFaultHookInjection) {
const std::string path1 = fmt::format("{}/hookFile1", dir_->getPath());
const std::string path2 = fmt::format("{}/hookFile2", dir_->getPath());
// Set to write fake data.
fs_->setFileInjectionHook([&](FaultFileOperation* op) {
// Only inject for write.
if (op->type != FaultFileOperation::Type::kWrite) {
return;
}
// Only inject for path2.
if (op->path != path2) {
return;
}
auto* writeOp = static_cast<FaultFileWriteOperation*>(op);
writeOp->data = "Error data";
});
{
auto writeFile = fs_->openFileForWrite(path1, {});
writeFile->append("hello");
writeFile->close();
auto readFile = fs_->openFileForRead(path1, {});
char buffer[5];
ASSERT_EQ(readFile->size(), 5);
ASSERT_EQ(readFile->pread(0, 5, &buffer), "hello");
fs_->remove(path1);
}
{
auto writeFile = fs_->openFileForWrite(path2, {});
writeFile->append("hello");
writeFile->close();
auto readFile = fs_->openFileForRead(path2, {});
char buffer[10];
ASSERT_EQ(readFile->size(), 10);
ASSERT_EQ(readFile->pread(0, 10, &buffer), "Error data");
fs_->remove(path2);
}

// Set to not delegate.
fs_->setFileInjectionHook([&](FaultFileOperation* op) {
// Only inject for write.
if (op->type != FaultFileOperation::Type::kWrite) {
return;
}
// Only inject for path2.
if (op->path != path2) {
return;
}
auto* writeOp = static_cast<FaultFileWriteOperation*>(op);
writeOp->delegate = false;
});
{
auto writeFile = fs_->openFileForWrite(path1, {});
writeFile->append("hello");
writeFile->close();
auto readFile = fs_->openFileForRead(path1, {});
char buffer[5];
ASSERT_EQ(readFile->size(), 5);
ASSERT_EQ(readFile->pread(0, 5, &buffer), "hello");
fs_->remove(path1);
}
{
auto writeFile = fs_->openFileForWrite(path2, {});
writeFile->append("hello");
writeFile->close();
auto readFile = fs_->openFileForRead(path2, {});
ASSERT_EQ(readFile->size(), 0);
fs_->remove(path2);
}
}

0 comments on commit 2f86af5

Please sign in to comment.