22
22
#include " velox/common/base/Fs.h"
23
23
#include " velox/common/encode/Base64.h"
24
24
#include " velox/common/file/FileSystems.h"
25
+ #include " velox/common/file/tests/FaultyFileSystem.h"
25
26
#include " velox/connectors/hive/HiveConnector.h"
26
27
#include " velox/connectors/hive/HiveConnectorSplit.h"
27
28
#include " velox/connectors/hive/TableHandle.h"
36
37
#include " velox/vector/VectorSaver.h"
37
38
#include " velox/vector/fuzzer/VectorFuzzer.h"
38
39
40
+ DEFINE_bool (
41
+ file_system_error_injection,
42
+ true ,
43
+ " When enabled, inject file system write error with certain possibility" );
44
+
39
45
DEFINE_int32 (steps, 10 , " Number of plans to generate and test." );
40
46
41
47
DEFINE_int32 (
@@ -63,6 +69,9 @@ using namespace facebook::velox::test;
63
69
namespace facebook ::velox::exec::test {
64
70
65
71
namespace {
72
+ using facebook::velox::filesystems::FileSystem;
73
+ using tests::utils::FaultyFileSystem;
74
+ using tests::utils::kFaultyFileScheme ;
66
75
67
76
class WriterFuzzer {
68
77
public:
@@ -113,6 +122,11 @@ class WriterFuzzer {
113
122
std::vector<TypePtr> types,
114
123
size_t partitionOffset);
115
124
125
+ // Generates file system write error with certain possibility
126
+ bool injectWriterError (
127
+ const std::shared_ptr<FaultyFileSystem>& faultyFs,
128
+ const std::vector<std::string>& partitionKeys);
129
+
116
130
void verifyWriter (
117
131
const std::vector<RowVectorPtr>& input,
118
132
const std::vector<std::string>& names,
@@ -123,7 +137,9 @@ class WriterFuzzer {
123
137
const std::vector<std::string>& bucketColumns,
124
138
int32_t sortColumnOffset,
125
139
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
126
- const std::string& outputDirectoryPath);
140
+ const std::string& outputFaultyDirectoryPath,
141
+ const std::string& outputDirectoryPath,
142
+ const bool writeErrorInjected);
127
143
128
144
// Generates table column handles based on table column properties
129
145
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
@@ -244,6 +260,8 @@ class WriterFuzzer {
244
260
std::shared_ptr<memory::MemoryPool> writerPool_{
245
261
rootPool_->addAggregateChild (" writerFuzzerWriter" )};
246
262
VectorFuzzer vectorFuzzer_;
263
+
264
+ const std::string injectedErrorMsg_ = " Injected Faulty File Error" ;
247
265
};
248
266
} // namespace
249
267
@@ -340,7 +358,11 @@ void WriterFuzzer::go() {
340
358
}
341
359
auto input = generateInputData (names, types, partitionOffset);
342
360
343
- auto tempDirPath = exec::test::TempDirectoryPath::create ();
361
+ auto tempFaultyDirPath = exec::test::TempDirectoryPath::create (true );
362
+ auto faultyFs = std::dynamic_pointer_cast<FaultyFileSystem>(
363
+ filesystems::getFileSystem (tempFaultyDirPath->getPath (), {}));
364
+ bool writeErrorInjected = injectWriterError (faultyFs, partitionKeys);
365
+
344
366
verifyWriter (
345
367
input,
346
368
names,
@@ -351,7 +373,11 @@ void WriterFuzzer::go() {
351
373
bucketColumns,
352
374
sortColumnOffset,
353
375
sortBy,
354
- tempDirPath->getPath ());
376
+ tempFaultyDirPath->getPath (),
377
+ std::string (faultyFs->extractPath (tempFaultyDirPath->getPath ())),
378
+ writeErrorInjected);
379
+
380
+ faultyFs->clearFileFaultInjections ();
355
381
356
382
LOG (INFO) << " ==============================> Done with iteration "
357
383
<< iteration++;
@@ -413,6 +439,24 @@ std::vector<RowVectorPtr> WriterFuzzer::generateInputData(
413
439
return input;
414
440
}
415
441
442
+ bool WriterFuzzer::injectWriterError (
443
+ const std::shared_ptr<FaultyFileSystem>& faultyFs,
444
+ const std::vector<std::string>& partitionKeys) {
445
+ if (FLAGS_file_system_error_injection && partitionKeys.empty () &&
446
+ vectorFuzzer_.coinToss (0.01 )) {
447
+ std::exception_ptr fileError;
448
+ try {
449
+ VELOX_FAIL (injectedErrorMsg_);
450
+ } catch (VeloxRuntimeError&) {
451
+ fileError = std::current_exception ();
452
+ }
453
+ faultyFs->setFileInjectionError (
454
+ fileError, {tests::utils::FaultFileOperation::Type::kWrite });
455
+ return true ;
456
+ }
457
+ return false ;
458
+ }
459
+
416
460
void WriterFuzzer::verifyWriter (
417
461
const std::vector<RowVectorPtr>& input,
418
462
const std::vector<std::string>& names,
@@ -423,11 +467,13 @@ void WriterFuzzer::verifyWriter(
423
467
const std::vector<std::string>& bucketColumns,
424
468
const int32_t sortColumnOffset,
425
469
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
426
- const std::string& outputDirectoryPath) {
470
+ const std::string& outputFaultyDirectoryPath,
471
+ const std::string& outputDirectoryPath,
472
+ const bool writeErrorInjected) {
427
473
const auto plan = PlanBuilder ()
428
474
.values (input)
429
475
.tableWrite (
430
- outputDirectoryPath ,
476
+ outputFaultyDirectoryPath ,
431
477
partitionKeys,
432
478
bucketCount,
433
479
bucketColumns,
@@ -436,7 +482,17 @@ void WriterFuzzer::verifyWriter(
436
482
437
483
const auto maxDrivers =
438
484
boost::random ::uniform_int_distribution<int32_t >(1 , 16 )(rng_);
439
- const auto result = veloxToPrestoResult (execute (plan, maxDrivers));
485
+ RowVectorPtr result;
486
+ try {
487
+ result = veloxToPrestoResult (execute (plan, maxDrivers));
488
+ } catch (VeloxRuntimeError& error) {
489
+ if (writeErrorInjected) {
490
+ VELOX_CHECK (
491
+ error.message () == injectedErrorMsg_,
492
+ " write plan failed with different error code" );
493
+ return ;
494
+ }
495
+ }
440
496
441
497
const auto dropSql = " DROP TABLE IF EXISTS tmp_write" ;
442
498
const auto sql = referenceQueryRunner_->toSql (plan).value ();
0 commit comments