22
22
#include " velox/common/base/Fs.h"
23
23
#include " velox/common/encode/Base64.h"
24
24
#include " velox/common/file/FileSystems.h"
25
+ #include " velox/common/file/tests/FaultyFileSystem.h"
25
26
#include " velox/connectors/hive/HiveConnector.h"
26
27
#include " velox/connectors/hive/HiveConnectorSplit.h"
27
28
#include " velox/connectors/hive/TableHandle.h"
36
37
#include " velox/vector/VectorSaver.h"
37
38
#include " velox/vector/fuzzer/VectorFuzzer.h"
38
39
40
+ DEFINE_bool (
41
+ file_system_error_injection,
42
+ true ,
43
+ " When enabled, inject file system write error with certain possibility" );
44
+
39
45
DEFINE_int32 (steps, 10 , " Number of plans to generate and test." );
40
46
41
47
DEFINE_int32 (
@@ -63,6 +69,9 @@ using namespace facebook::velox::test;
63
69
namespace facebook ::velox::exec::test {
64
70
65
71
namespace {
72
+ using facebook::velox::filesystems::FileSystem;
73
+ using tests::utils::FaultFileOperation;
74
+ using tests::utils::FaultyFileSystem;
66
75
67
76
class WriterFuzzer {
68
77
public:
@@ -123,7 +132,7 @@ class WriterFuzzer {
123
132
const std::vector<std::string>& bucketColumns,
124
133
int32_t sortColumnOffset,
125
134
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
126
- const std::string & outputDirectoryPath);
135
+ const std::shared_ptr<TempDirectoryPath> & outputDirectoryPath);
127
136
128
137
// Generates table column handles based on table column properties
129
138
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
@@ -235,6 +244,12 @@ class WriterFuzzer {
235
244
BIGINT (),
236
245
VARCHAR ()};
237
246
247
+ const std::shared_ptr<FaultyFileSystem> faultyFs_ =
248
+ std::dynamic_pointer_cast<FaultyFileSystem>(
249
+ filesystems::getFileSystem (" faulty:/tmp" , {}));
250
+ const std::string injectedErrorMsg_{" Injected Faulty File Error" };
251
+ std::atomic<uint64_t > injectedErrorCount_{0 };
252
+
238
253
FuzzerGenerator rng_;
239
254
size_t currentSeed_{0 };
240
255
std::unique_ptr<ReferenceQueryRunner> referenceQueryRunner_;
@@ -292,6 +307,16 @@ void WriterFuzzer::go() {
292
307
auto startTime = std::chrono::system_clock::now ();
293
308
size_t iteration = 0 ;
294
309
310
+ // Faulty fs will generate file system write error with certain possibility
311
+ if (FLAGS_file_system_error_injection) {
312
+ faultyFs_->setFileInjectionHook ([&](FaultFileOperation* op) {
313
+ if (vectorFuzzer_.coinToss (0.01 )) {
314
+ ++injectedErrorCount_;
315
+ VELOX_FAIL (injectedErrorMsg_);
316
+ }
317
+ });
318
+ }
319
+
295
320
while (!isDone (iteration, startTime)) {
296
321
LOG (INFO) << " ==============================> Started iteration "
297
322
<< iteration << " (seed: " << currentSeed_ << " )" ;
@@ -340,7 +365,9 @@ void WriterFuzzer::go() {
340
365
}
341
366
auto input = generateInputData (names, types, partitionOffset);
342
367
343
- auto tempDirPath = exec::test::TempDirectoryPath::create ();
368
+ const auto outputDirPath = exec::test::TempDirectoryPath::create (
369
+ FLAGS_file_system_error_injection);
370
+
344
371
verifyWriter (
345
372
input,
346
373
names,
@@ -351,7 +378,7 @@ void WriterFuzzer::go() {
351
378
bucketColumns,
352
379
sortColumnOffset,
353
380
sortBy,
354
- tempDirPath-> getPath () );
381
+ outputDirPath );
355
382
356
383
LOG (INFO) << " ==============================> Done with iteration "
357
384
<< iteration++;
@@ -423,11 +450,11 @@ void WriterFuzzer::verifyWriter(
423
450
const std::vector<std::string>& bucketColumns,
424
451
const int32_t sortColumnOffset,
425
452
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
426
- const std::string & outputDirectoryPath) {
453
+ const std::shared_ptr<TempDirectoryPath> & outputDirectoryPath) {
427
454
const auto plan = PlanBuilder ()
428
455
.values (input)
429
456
.tableWrite (
430
- outputDirectoryPath,
457
+ outputDirectoryPath-> getPath () ,
431
458
partitionKeys,
432
459
bucketCount,
433
460
bucketColumns,
@@ -436,7 +463,22 @@ void WriterFuzzer::verifyWriter(
436
463
437
464
const auto maxDrivers =
438
465
boost::random ::uniform_int_distribution<int32_t >(1 , 16 )(rng_);
439
- const auto result = veloxToPrestoResult (execute (plan, maxDrivers));
466
+ RowVectorPtr result;
467
+ const uint64_t prevInjectedErrorCount = injectedErrorCount_;
468
+ try {
469
+ result = veloxToPrestoResult (execute (plan, maxDrivers));
470
+ } catch (VeloxRuntimeError& error) {
471
+ if (injectedErrorCount_ == prevInjectedErrorCount) {
472
+ throw error;
473
+ }
474
+ VELOX_CHECK_GT (
475
+ injectedErrorCount_,
476
+ prevInjectedErrorCount,
477
+ " Unexpected writer fuzzer failure: {}" ,
478
+ error.message ());
479
+ VELOX_CHECK_EQ (
480
+ error.message (), injectedErrorMsg_, " Unexpected writer fuzzer failure" );
481
+ }
440
482
441
483
const auto dropSql = " DROP TABLE IF EXISTS tmp_write" ;
442
484
const auto sql = referenceQueryRunner_->toSql (plan).value ();
@@ -465,11 +507,13 @@ void WriterFuzzer::verifyWriter(
465
507
const auto referencedOutputDirectoryPath =
466
508
getReferenceOutputDirectoryPath (partitionKeys.size ());
467
509
comparePartitionAndBucket (
468
- outputDirectoryPath, referencedOutputDirectoryPath, bucketCount);
510
+ outputDirectoryPath->getDelegatePath (),
511
+ referencedOutputDirectoryPath,
512
+ bucketCount);
469
513
}
470
514
471
515
// 3. Verifies data itself.
472
- auto splits = makeSplits (outputDirectoryPath);
516
+ auto splits = makeSplits (outputDirectoryPath-> getDelegatePath () );
473
517
auto columnHandles =
474
518
getTableColumnHandles (names, types, partitionOffset, bucketCount);
475
519
const auto rowType = generateOutputType (names, types, bucketCount);
@@ -502,7 +546,8 @@ void WriterFuzzer::verifyWriter(
502
546
types.begin () + sortColumnOffset,
503
547
types.begin () + sortColumnOffset + sortBy.size ()};
504
548
505
- // Read from each file and check if data is sorted as presto sorted result.
549
+ // Read from each file and check if data is sorted as presto sorted
550
+ // result.
506
551
for (const auto & split : splits) {
507
552
auto splitReadPlan = PlanBuilder ()
508
553
.tableScan (generateOutputType (
0 commit comments