22
22
#include " velox/common/base/Fs.h"
23
23
#include " velox/common/encode/Base64.h"
24
24
#include " velox/common/file/FileSystems.h"
25
+ #include " velox/common/file/tests/FaultyFileSystem.h"
25
26
#include " velox/connectors/hive/HiveConnector.h"
26
27
#include " velox/connectors/hive/HiveConnectorSplit.h"
27
28
#include " velox/connectors/hive/TableHandle.h"
36
37
#include " velox/vector/VectorSaver.h"
37
38
#include " velox/vector/fuzzer/VectorFuzzer.h"
38
39
40
+ DEFINE_bool (
41
+ file_system_error_injection,
42
+ true ,
43
+ " When enabled, inject file system write error with certain possibility" );
44
+
39
45
DEFINE_int32 (steps, 10 , " Number of plans to generate and test." );
40
46
41
47
DEFINE_int32 (
@@ -63,6 +69,9 @@ using namespace facebook::velox::test;
63
69
namespace facebook ::velox::exec::test {
64
70
65
71
namespace {
72
+ using facebook::velox::filesystems::FileSystem;
73
+ using tests::utils::FaultFileOperation;
74
+ using tests::utils::FaultyFileSystem;
66
75
67
76
class WriterFuzzer {
68
77
public:
@@ -123,7 +132,9 @@ class WriterFuzzer {
123
132
const std::vector<std::string>& bucketColumns,
124
133
int32_t sortColumnOffset,
125
134
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
126
- const std::string& outputDirectoryPath);
135
+ const std::string& outputFaultyDirectoryPath,
136
+ const std::string& outputDirectoryPath,
137
+ const uint64_t prevInjectedErrorCount);
127
138
128
139
// Generates table column handles based on table column properties
129
140
std::unordered_map<std::string, std::shared_ptr<connector::ColumnHandle>>
@@ -244,6 +255,9 @@ class WriterFuzzer {
244
255
std::shared_ptr<memory::MemoryPool> writerPool_{
245
256
rootPool_->addAggregateChild (" writerFuzzerWriter" )};
246
257
VectorFuzzer vectorFuzzer_;
258
+
259
+ std::atomic<uint64_t > injectedErrorCount_{0 };
260
+ const std::string injectedErrorMsg_ = " Injected Faulty File Error" ;
247
261
};
248
262
} // namespace
249
263
@@ -293,6 +307,7 @@ void WriterFuzzer::go() {
293
307
size_t iteration = 0 ;
294
308
295
309
while (!isDone (iteration, startTime)) {
310
+ uint64_t prevInjectedErrorCount = injectedErrorCount_;
296
311
LOG (INFO) << " ==============================> Started iteration "
297
312
<< iteration << " (seed: " << currentSeed_ << " )" ;
298
313
@@ -340,7 +355,20 @@ void WriterFuzzer::go() {
340
355
}
341
356
auto input = generateInputData (names, types, partitionOffset);
342
357
343
- auto tempDirPath = exec::test::TempDirectoryPath::create ();
358
+ auto tempFaultyDirPath = exec::test::TempDirectoryPath::create (true );
359
+
360
+ // Generates file system write error with certain possibility
361
+ auto faultyFs = std::dynamic_pointer_cast<FaultyFileSystem>(
362
+ filesystems::getFileSystem (tempFaultyDirPath->getPath (), {}));
363
+ if (FLAGS_file_system_error_injection && partitionKeys.empty ()) {
364
+ faultyFs->setFileInjectionHook ([&](FaultFileOperation* op) {
365
+ if (vectorFuzzer_.coinToss (1 )) {
366
+ injectedErrorCount_++;
367
+ VELOX_FAIL (injectedErrorMsg_);
368
+ }
369
+ });
370
+ }
371
+
344
372
verifyWriter (
345
373
input,
346
374
names,
@@ -351,7 +379,11 @@ void WriterFuzzer::go() {
351
379
bucketColumns,
352
380
sortColumnOffset,
353
381
sortBy,
354
- tempDirPath->getPath ());
382
+ tempFaultyDirPath->getPath (),
383
+ std::string (faultyFs->extractPath (tempFaultyDirPath->getPath ())),
384
+ prevInjectedErrorCount);
385
+
386
+ faultyFs->clearFileFaultInjections ();
355
387
356
388
LOG (INFO) << " ==============================> Done with iteration "
357
389
<< iteration++;
@@ -423,11 +455,13 @@ void WriterFuzzer::verifyWriter(
423
455
const std::vector<std::string>& bucketColumns,
424
456
const int32_t sortColumnOffset,
425
457
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
426
- const std::string& outputDirectoryPath) {
458
+ const std::string& outputFaultyDirectoryPath,
459
+ const std::string& outputDirectoryPath,
460
+ const uint64_t prevInjectedErrorCount) {
427
461
const auto plan = PlanBuilder ()
428
462
.values (input)
429
463
.tableWrite (
430
- outputDirectoryPath ,
464
+ outputFaultyDirectoryPath ,
431
465
partitionKeys,
432
466
bucketCount,
433
467
bucketColumns,
@@ -436,7 +470,17 @@ void WriterFuzzer::verifyWriter(
436
470
437
471
const auto maxDrivers =
438
472
boost::random ::uniform_int_distribution<int32_t >(1 , 16 )(rng_);
439
- const auto result = veloxToPrestoResult (execute (plan, maxDrivers));
473
+ RowVectorPtr result;
474
+ try {
475
+ result = veloxToPrestoResult (execute (plan, maxDrivers));
476
+ } catch (VeloxRuntimeError& error) {
477
+ if (injectedErrorCount_ > prevInjectedErrorCount) {
478
+ VELOX_CHECK (
479
+ error.message () == injectedErrorMsg_,
480
+ " write plan failed with different error code" );
481
+ return ;
482
+ }
483
+ }
440
484
441
485
const auto dropSql = " DROP TABLE IF EXISTS tmp_write" ;
442
486
const auto sql = referenceQueryRunner_->toSql (plan).value ();
@@ -502,7 +546,8 @@ void WriterFuzzer::verifyWriter(
502
546
types.begin () + sortColumnOffset,
503
547
types.begin () + sortColumnOffset + sortBy.size ()};
504
548
505
- // Read from each file and check if data is sorted as presto sorted result.
549
+ // Read from each file and check if data is sorted as presto sorted
550
+ // result.
506
551
for (const auto & split : splits) {
507
552
auto splitReadPlan = PlanBuilder ()
508
553
.tableScan (generateOutputType (
0 commit comments