diff --git a/velox/common/file/tests/FaultyFileSystem.h b/velox/common/file/tests/FaultyFileSystem.h index b55266d41b9a..38451c692e2a 100644 --- a/velox/common/file/tests/FaultyFileSystem.h +++ b/velox/common/file/tests/FaultyFileSystem.h @@ -26,7 +26,6 @@ namespace facebook::velox::tests::utils { using namespace filesystems; - /// Implements faulty filesystem for io fault injection in unit test. It is a /// wrapper on top of a real file system, and by default it delegates the the /// file operation to the real file system underneath. @@ -55,11 +54,11 @@ class FaultyFileSystem : public FileSystem { std::unique_ptr openFileForRead( std::string_view path, - const FileOptions& options) override; + const FileOptions& options = {}) override; std::unique_ptr openFileForWrite( std::string_view path, - const FileOptions& options) override; + const FileOptions& options = {}) override; void remove(std::string_view path) override; diff --git a/velox/dwio/common/FileSink.cpp b/velox/dwio/common/FileSink.cpp index dcd9540035ec..45f5362d0e52 100644 --- a/velox/dwio/common/FileSink.cpp +++ b/velox/dwio/common/FileSink.cpp @@ -145,6 +145,17 @@ LocalFileSink::LocalFileSink(const std::string& name, const Options& options) writeFile_ = fs->openFileForWrite(name_); } +LocalFileSink::LocalFileSink( + const std::string& name, + const Options& options, + bool initializeWriter) + : FileSink{name, options}, writeFile_() { + const auto dir = fs::path(name_).parent_path(); + if (!fs::exists(dir)) { + VELOX_CHECK(velox::common::generateFileDirectory(dir.c_str())); + } +} + void LocalFileSink::doClose() { LOG(INFO) << "closing file: " << name() << ", total size: " << succinctBytes(size_); diff --git a/velox/dwio/common/FileSink.h b/velox/dwio/common/FileSink.h index bd6fab7f3ac0..b209ab2424b2 100644 --- a/velox/dwio/common/FileSink.h +++ b/velox/dwio/common/FileSink.h @@ -166,9 +166,14 @@ class LocalFileSink : public FileSink { static void registerFactory(); protected: + // 'initializeWriter' is false if it is used by FaultyFileSink which setups + // the write file through the fault filesystem. + LocalFileSink( + const std::string& name, + const Options& options, + bool initializeWriter); void doClose() override; - private: std::unique_ptr writeFile_; }; diff --git a/velox/dwio/common/tests/CMakeLists.txt b/velox/dwio/common/tests/CMakeLists.txt index f413dcdab12e..3db9799d18ce 100644 --- a/velox/dwio/common/tests/CMakeLists.txt +++ b/velox/dwio/common/tests/CMakeLists.txt @@ -13,6 +13,9 @@ # limitations under the License. add_subdirectory(utils) +velox_add_library(velox_dwio_faulty_file_sink FaultyFileSink.cpp) +velox_link_libraries(velox_dwio_faulty_file_sink velox_file_test_utils + velox_dwio_common) add_executable( velox_dwio_common_test diff --git a/velox/dwio/common/tests/FaultyFileSink.cpp b/velox/dwio/common/tests/FaultyFileSink.cpp new file mode 100644 index 000000000000..6d103df07586 --- /dev/null +++ b/velox/dwio/common/tests/FaultyFileSink.cpp @@ -0,0 +1,51 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/dwio/common/tests/FaultyFileSink.h" +#include "velox/common/base/Fs.h" +#include "velox/common/file/FileSystems.h" +#include "velox/common/file/tests/FaultyFileSystem.h" +#include "velox/dwio/common/FileSink.h" +#include "velox/dwio/common/exception/Exception.h" + +namespace facebook::velox::dwio::common { +namespace { +using tests::utils::FaultyFileSystem; + +std::unique_ptr createFaultyFileSink( + const std::string& filePath, + const FileSink::Options& options) { + if (filePath.find("faulty:") == 0) { + return std::make_unique(filePath, options); + } + return nullptr; +} +} // namespace + +FaultyFileSink::FaultyFileSink( + const std::string& faultyFilePath, + const Options& options) + : LocalFileSink{faultyFilePath.substr(7), options, false}, + faultyFilePath_(faultyFilePath) { + auto fs = filesystems::getFileSystem(faultyFilePath_, nullptr); + writeFile_ = fs->openFileForWrite(faultyFilePath_); +} + +void registerFaultyFileSinks() { + facebook::velox::dwio::common::FileSink::registerFactory( + (createFaultyFileSink)); +} +} // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/tests/FaultyFileSink.h b/velox/dwio/common/tests/FaultyFileSink.h new file mode 100644 index 000000000000..227a006498a9 --- /dev/null +++ b/velox/dwio/common/tests/FaultyFileSink.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/common/config/Config.h" +#include "velox/common/file/File.h" +#include "velox/common/file/tests/FaultyFile.h" +#include "velox/common/io/IoStatistics.h" +#include "velox/dwio/common/Closeable.h" +#include "velox/dwio/common/DataBuffer.h" +#include "velox/dwio/common/FileSink.h" +#include "velox/dwio/common/MetricsLog.h" + +namespace facebook::velox::dwio::common { +using namespace facebook::velox::io; + +class FaultyFileSink : public LocalFileSink { + public: + FaultyFileSink(const std::string& faultyFilePath, const Options& options); + + private: + const std::string faultyFilePath_; +}; + +void registerFaultyFileSinks(); + +} // namespace facebook::velox::dwio::common diff --git a/velox/exec/fuzzer/CMakeLists.txt b/velox/exec/fuzzer/CMakeLists.txt index 8a688f981b6d..856373b54fb4 100644 --- a/velox/exec/fuzzer/CMakeLists.txt +++ b/velox/exec/fuzzer/CMakeLists.txt @@ -29,7 +29,8 @@ target_link_libraries( velox_hive_connector velox_dwio_dwrf_reader velox_dwio_dwrf_writer - velox_dwio_catalog_fbhive) + velox_dwio_catalog_fbhive + velox_dwio_faulty_file_sink) add_library(velox_aggregation_fuzzer_base AggregationFuzzerBase.cpp) @@ -102,7 +103,9 @@ target_link_libraries( velox_exec_test_lib velox_expression_test_utility velox_temp_path - velox_vector_test_lib) + velox_vector_test_lib + velox_dwio_faulty_file_sink + velox_file_test_utils) add_library(velox_memory_arbitration_fuzzer MemoryArbitrationFuzzer.cpp) diff --git a/velox/exec/fuzzer/WriterFuzzer.cpp b/velox/exec/fuzzer/WriterFuzzer.cpp index f6b58411d18f..d3d60e49aaf2 100644 --- a/velox/exec/fuzzer/WriterFuzzer.cpp +++ b/velox/exec/fuzzer/WriterFuzzer.cpp @@ -22,6 +22,7 @@ #include "velox/common/base/Fs.h" #include "velox/common/encode/Base64.h" #include "velox/common/file/FileSystems.h" +#include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/TableHandle.h" @@ -36,6 +37,11 @@ #include "velox/vector/VectorSaver.h" #include "velox/vector/fuzzer/VectorFuzzer.h" +DEFINE_bool( + file_system_error_injection, + true, + "When enabled, inject file system write error with certain possibility"); + DEFINE_int32(steps, 10, "Number of plans to generate and test."); DEFINE_int32( @@ -63,6 +69,9 @@ using namespace facebook::velox::test; namespace facebook::velox::exec::test { namespace { +using facebook::velox::filesystems::FileSystem; +using tests::utils::FaultFileOperation; +using tests::utils::FaultyFileSystem; class WriterFuzzer { public: @@ -123,7 +132,7 @@ class WriterFuzzer { const std::vector& bucketColumns, int32_t sortColumnOffset, const std::vector>& sortBy, - const std::string& outputDirectoryPath); + const std::shared_ptr& outputDirectoryPath); // Generates table column handles based on table column properties std::unordered_map> @@ -235,6 +244,12 @@ class WriterFuzzer { BIGINT(), VARCHAR()}; + const std::shared_ptr faultyFs_ = + std::dynamic_pointer_cast( + filesystems::getFileSystem("faulty:/tmp", {})); + const std::string injectedErrorMsg_{"Injected Faulty File Error"}; + std::atomic injectedErrorCount_{0}; + FuzzerGenerator rng_; size_t currentSeed_{0}; std::unique_ptr referenceQueryRunner_; @@ -292,6 +307,16 @@ void WriterFuzzer::go() { auto startTime = std::chrono::system_clock::now(); size_t iteration = 0; + // Faulty fs will generate file system write error with certain possibility + if (FLAGS_file_system_error_injection) { + faultyFs_->setFileInjectionHook([&](FaultFileOperation* op) { + if (vectorFuzzer_.coinToss(0.01)) { + ++injectedErrorCount_; + VELOX_FAIL(injectedErrorMsg_); + } + }); + } + while (!isDone(iteration, startTime)) { LOG(INFO) << "==============================> Started iteration " << iteration << " (seed: " << currentSeed_ << ")"; @@ -340,7 +365,9 @@ void WriterFuzzer::go() { } auto input = generateInputData(names, types, partitionOffset); - auto tempDirPath = exec::test::TempDirectoryPath::create(); + const auto outputDirPath = exec::test::TempDirectoryPath::create( + FLAGS_file_system_error_injection); + verifyWriter( input, names, @@ -351,7 +378,7 @@ void WriterFuzzer::go() { bucketColumns, sortColumnOffset, sortBy, - tempDirPath->getPath()); + outputDirPath); LOG(INFO) << "==============================> Done with iteration " << iteration++; @@ -423,11 +450,11 @@ void WriterFuzzer::verifyWriter( const std::vector& bucketColumns, const int32_t sortColumnOffset, const std::vector>& sortBy, - const std::string& outputDirectoryPath) { + const std::shared_ptr& outputDirectoryPath) { const auto plan = PlanBuilder() .values(input) .tableWrite( - outputDirectoryPath, + outputDirectoryPath->getPath(), partitionKeys, bucketCount, bucketColumns, @@ -436,7 +463,23 @@ void WriterFuzzer::verifyWriter( const auto maxDrivers = boost::random::uniform_int_distribution(1, 16)(rng_); - const auto result = veloxToPrestoResult(execute(plan, maxDrivers)); + RowVectorPtr result; + const uint64_t prevInjectedErrorCount = injectedErrorCount_; + try { + result = veloxToPrestoResult(execute(plan, maxDrivers)); + } catch (VeloxRuntimeError& error) { + if (injectedErrorCount_ == prevInjectedErrorCount) { + throw error; + } + VELOX_CHECK_GT( + injectedErrorCount_, + prevInjectedErrorCount, + "Unexpected writer fuzzer failure: {}", + error.message()); + VELOX_CHECK_EQ( + error.message(), injectedErrorMsg_, "Unexpected writer fuzzer failure"); + return; + } const auto dropSql = "DROP TABLE IF EXISTS tmp_write"; const auto sql = referenceQueryRunner_->toSql(plan).value(); @@ -465,11 +508,13 @@ void WriterFuzzer::verifyWriter( const auto referencedOutputDirectoryPath = getReferenceOutputDirectoryPath(partitionKeys.size()); comparePartitionAndBucket( - outputDirectoryPath, referencedOutputDirectoryPath, bucketCount); + outputDirectoryPath->getDelegatePath(), + referencedOutputDirectoryPath, + bucketCount); } // 3. Verifies data itself. - auto splits = makeSplits(outputDirectoryPath); + auto splits = makeSplits(outputDirectoryPath->getDelegatePath()); auto columnHandles = getTableColumnHandles(names, types, partitionOffset, bucketCount); const auto rowType = generateOutputType(names, types, bucketCount); @@ -502,7 +547,8 @@ void WriterFuzzer::verifyWriter( types.begin() + sortColumnOffset, types.begin() + sortColumnOffset + sortBy.size()}; - // Read from each file and check if data is sorted as presto sorted result. + // Read from each file and check if data is sorted as presto sorted + // result. for (const auto& split : splits) { auto splitReadPlan = PlanBuilder() .tableScan(generateOutputType( diff --git a/velox/exec/fuzzer/WriterFuzzerRunner.h b/velox/exec/fuzzer/WriterFuzzerRunner.h index 80d92eaa760c..527b11bbef1f 100644 --- a/velox/exec/fuzzer/WriterFuzzerRunner.h +++ b/velox/exec/fuzzer/WriterFuzzerRunner.h @@ -23,8 +23,10 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/common/file/tests/FaultyFileSystem.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/dwio/common/FileSink.h" +#include "velox/dwio/common/tests/FaultyFileSink.h" #include "velox/dwio/dwrf/RegisterDwrfReader.h" #include "velox/dwio/dwrf/RegisterDwrfWriter.h" #include "velox/exec/fuzzer/FuzzerUtil.h" @@ -74,6 +76,7 @@ class WriterFuzzerRunner { size_t seed, std::unique_ptr referenceQueryRunner) { filesystems::registerLocalFileSystem(); + tests::utils::registerFaultyFileSystem(); connector::registerConnectorFactory( std::make_shared()); auto hiveConnector = @@ -87,6 +90,7 @@ class WriterFuzzerRunner { dwrf::registerDwrfReaderFactory(); dwrf::registerDwrfWriterFactory(); dwio::common::registerFileSinks(); + dwio::common::registerFaultyFileSinks(); facebook::velox::exec::test::writerFuzzer( seed, std::move(referenceQueryRunner)); // Calling gtest here so that it can be recognized as tests in CI systems. diff --git a/velox/exec/tests/utils/TempDirectoryPath.h b/velox/exec/tests/utils/TempDirectoryPath.h index 5c5124f53680..6050a841cfdc 100644 --- a/velox/exec/tests/utils/TempDirectoryPath.h +++ b/velox/exec/tests/utils/TempDirectoryPath.h @@ -45,6 +45,11 @@ class TempDirectoryPath { return path_; } + /// The actual file path if fault injection is enabled. + const std::string& getDelegatePath() const { + return tempPath_; + } + private: static std::string createTempDirectory();