22
22
#include " velox/common/base/Fs.h"
23
23
#include " velox/common/encode/Base64.h"
24
24
#include " velox/common/file/FileSystems.h"
25
+ #include " velox/common/file/tests/FaultyFileSystem.h"
25
26
#include " velox/connectors/hive/HiveConnector.h"
26
27
#include " velox/connectors/hive/HiveConnectorSplit.h"
27
28
#include " velox/connectors/hive/TableHandle.h"
36
37
#include " velox/vector/VectorSaver.h"
37
38
#include " velox/vector/fuzzer/VectorFuzzer.h"
38
39
40
+ DEFINE_bool (
41
+ file_system_error_injection,
42
+ true ,
43
+ " When enabled, inject file system write error with certain possibility" );
44
+
39
45
DEFINE_int32 (steps, 10 , " Number of plans to generate and test." );
40
46
41
47
DEFINE_int32 (
@@ -63,6 +69,8 @@ using namespace facebook::velox::test;
63
69
namespace facebook ::velox::exec::test {
64
70
65
71
namespace {
72
+ using tests::utils::FaultyFileSystem;
73
+ using facebook::velox::filesystems::FileSystem;
66
74
67
75
class WriterFuzzer {
68
76
public:
@@ -113,6 +121,11 @@ class WriterFuzzer {
113
121
std::vector<TypePtr> types,
114
122
size_t partitionOffset);
115
123
124
+ // Generates file system write error with certain possibility
125
+ bool injectWriterError (
126
+ const std::shared_ptr<TempDirectoryPath>& tempDirPath,
127
+ const std::vector<std::string>& partitionKeys);
128
+
116
129
void verifyWriter (
117
130
const std::vector<RowVectorPtr>& input,
118
131
const std::vector<std::string>& names,
@@ -123,7 +136,8 @@ class WriterFuzzer {
123
136
const std::vector<std::string>& bucketColumns,
124
137
int32_t sortColumnOffset,
125
138
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
126
- const std::string& outputDirectoryPath);
139
+ const std::string& outputDirectoryPath,
140
+ const bool writeErrorInjected);
127
141
128
142
// Generates table column handles based on table column properties
129
143
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
@@ -244,6 +258,11 @@ class WriterFuzzer {
244
258
std::shared_ptr<memory::MemoryPool> writerPool_{
245
259
rootPool_->addAggregateChild (" writerFuzzerWriter" )};
246
260
VectorFuzzer vectorFuzzer_;
261
+
262
+ const std::shared_ptr<FaultyFileSystem> faultyFs_ =
263
+ std::dynamic_pointer_cast<FaultyFileSystem>(
264
+ filesystems::getFileSystem (" /" , nullptr ));
265
+ const std::string injectedErrorMsg_ = " Injected Faulty File Error" ;
247
266
};
248
267
} // namespace
249
268
@@ -254,9 +273,10 @@ void writerFuzzer(
254
273
writerFuzzer.go ();
255
274
}
256
275
257
- std::vector<std::string> listFolders (std::string_view path) {
276
+ std::vector<std::string> listFolders (
277
+ const std::shared_ptr<FileSystem>& fileSystem,
278
+ std::string_view path) {
258
279
std::vector<std::string> folders;
259
- auto fileSystem = filesystems::getFileSystem (" /" , nullptr );
260
280
for (auto & p : std::filesystem::recursive_directory_iterator (
261
281
fileSystem->extractPath (path))) {
262
282
if (p.is_directory ())
@@ -340,7 +360,9 @@ void WriterFuzzer::go() {
340
360
}
341
361
auto input = generateInputData (names, types, partitionOffset);
342
362
343
- auto tempDirPath = exec::test::TempDirectoryPath::create ();
363
+ auto tempDirPath = exec::test::TempDirectoryPath::create (true );
364
+ bool writeErrorInjected = injectWriterError (tempDirPath, partitionKeys);
365
+
344
366
verifyWriter (
345
367
input,
346
368
names,
@@ -351,7 +373,10 @@ void WriterFuzzer::go() {
351
373
bucketColumns,
352
374
sortColumnOffset,
353
375
sortBy,
354
- tempDirPath->getPath ());
376
+ tempDirPath->getPath (),
377
+ writeErrorInjected);
378
+
379
+ faultyFs_->clearFileFaultInjections ();
355
380
356
381
LOG (INFO) << " ==============================> Done with iteration "
357
382
<< iteration++;
@@ -413,6 +438,24 @@ std::vector<RowVectorPtr> WriterFuzzer::generateInputData(
413
438
return input;
414
439
}
415
440
441
+ bool WriterFuzzer::injectWriterError (
442
+ const std::shared_ptr<TempDirectoryPath>& tempDirPath,
443
+ const std::vector<std::string>& partitionKeys) {
444
+ if (FLAGS_file_system_error_injection && partitionKeys.empty () &&
445
+ vectorFuzzer_.coinToss (0.01 )) {
446
+ std::exception_ptr fileError;
447
+ try {
448
+ VELOX_FAIL (injectedErrorMsg_);
449
+ } catch (VeloxRuntimeError&) {
450
+ fileError = std::current_exception ();
451
+ }
452
+ faultyFs_->setFileInjectionError (
453
+ fileError, {tests::utils::FaultFileOperation::Type::kWrite });
454
+ return true ;
455
+ }
456
+ return false ;
457
+ }
458
+
416
459
void WriterFuzzer::verifyWriter (
417
460
const std::vector<RowVectorPtr>& input,
418
461
const std::vector<std::string>& names,
@@ -423,7 +466,8 @@ void WriterFuzzer::verifyWriter(
423
466
const std::vector<std::string>& bucketColumns,
424
467
const int32_t sortColumnOffset,
425
468
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
426
- const std::string& outputDirectoryPath) {
469
+ const std::string& outputDirectoryPath,
470
+ const bool writeErrorInjected) {
427
471
const auto plan = PlanBuilder ()
428
472
.values (input)
429
473
.tableWrite (
@@ -436,7 +480,17 @@ void WriterFuzzer::verifyWriter(
436
480
437
481
const auto maxDrivers =
438
482
boost::random ::uniform_int_distribution<int32_t >(1 , 16 )(rng_);
439
- const auto result = veloxToPrestoResult (execute (plan, maxDrivers));
483
+ RowVectorPtr result;
484
+ try {
485
+ result = veloxToPrestoResult (execute (plan, maxDrivers));
486
+ } catch (VeloxRuntimeError& error) {
487
+ if (writeErrorInjected) {
488
+ VELOX_CHECK (
489
+ error.message () == injectedErrorMsg_,
490
+ " write plan failed with different error code" );
491
+ return ;
492
+ }
493
+ }
440
494
441
495
const auto dropSql = " DROP TABLE IF EXISTS tmp_write" ;
442
496
const auto sql = referenceQueryRunner_->toSql (plan).value ();
@@ -661,8 +715,7 @@ void WriterFuzzer::comparePartitionAndBucket(
661
715
// static
662
716
std::map<std::string, int32_t > WriterFuzzer::getPartitionNameAndFilecount (
663
717
const std::string& tableDirectoryPath) {
664
- auto fileSystem = filesystems::getFileSystem (" /" , nullptr );
665
- auto directories = listFolders (tableDirectoryPath);
718
+ auto directories = listFolders (faultyFs_, tableDirectoryPath);
666
719
std::map<std::string, int32_t > partitionNameAndFileCount;
667
720
668
721
for (std::string directory : directories) {
@@ -672,20 +725,20 @@ std::map<std::string, int32_t> WriterFuzzer::getPartitionNameAndFilecount(
672
725
}
673
726
674
727
// Count non-empty non-hidden files
675
- const auto files = fileSystem ->list (directory);
728
+ const auto files = faultyFs_ ->list (directory);
676
729
int32_t fileCount = 0 ;
677
730
for (const auto & file : files) {
678
731
// Presto query runner sometime creates empty files, ignore those.
679
732
if (file.find (" /." ) == std::string::npos &&
680
- fileSystem ->openFileForRead (file)->size () > 0 ) {
733
+ faultyFs_ ->openFileForRead (file)->size () > 0 ) {
681
734
fileCount++;
682
735
}
683
736
}
684
737
685
738
// Remove the path prefix to get the partition name
686
739
// For example: /test/tmp_write/p0=1/p1=2020
687
740
// partition name is /p0=1/p1=2020
688
- directory.erase (0 , fileSystem ->extractPath (tableDirectoryPath).length ());
741
+ directory.erase (0 , faultyFs_ ->extractPath (tableDirectoryPath).length ());
689
742
690
743
partitionNameAndFileCount.emplace (directory, fileCount);
691
744
}
0 commit comments