26
26
#include " velox/exec/tests/utils/QueryAssertions.h"
27
27
#include " velox/serializers/PrestoSerializer.h"
28
28
#include " velox/type/parser/TypeParser.h"
29
+ #include " velox/velox/connectors/hive/HiveDataSink.h"
30
+ #include " velox/velox/connectors/hive/TableHandle.h"
29
31
30
32
#include < utility>
31
33
@@ -35,16 +37,6 @@ namespace facebook::velox::exec::test {
35
37
36
38
namespace {
37
39
38
- template <typename T>
39
- T extractSingleValue (const std::vector<RowVectorPtr>& data) {
40
- VELOX_CHECK_EQ (1 , data.size ());
41
- VELOX_CHECK_EQ (1 , data[0 ]->childrenSize ());
42
-
43
- auto simpleVector = data[0 ]->childAt (0 )->as <SimpleVector<T>>();
44
- VELOX_CHECK (!simpleVector->isNullAt (0 ));
45
- return simpleVector->valueAt (0 );
46
- }
47
-
48
40
void writeToFile (
49
41
const std::string& path,
50
42
const std::vector<RowVectorPtr>& data,
@@ -174,6 +166,11 @@ std::optional<std::string> PrestoQueryRunner::toSql(
174
166
return toSql (aggregationNode);
175
167
}
176
168
169
+ if (auto tableWriteNode =
170
+ std::dynamic_pointer_cast<const core::TableWriteNode>(plan)) {
171
+ return toSql (tableWriteNode);
172
+ }
173
+
177
174
VELOX_NYI ();
178
175
}
179
176
@@ -500,6 +497,40 @@ std::optional<std::string> PrestoQueryRunner::toSql(
500
497
return sql.str ();
501
498
}
502
499
500
+ std::optional<std::string> PrestoQueryRunner::toSql (
501
+ const std::shared_ptr<const core::TableWriteNode>& tableWriteNode) {
502
+ std::vector<std::string> partitionKeys;
503
+ auto insertTableHandle =
504
+ std::dynamic_pointer_cast<connector::hive::HiveInsertTableHandle>(
505
+ tableWriteNode->insertTableHandle ()->connectorInsertTableHandle ());
506
+
507
+ // Returns a CREATE sql with specified table properties from TableWriteNode,
508
+ // example sql:
509
+ // CREATE TABLE tmp_write (c0 integer, c1 varchar, p0 varchar)
510
+ // WITH (PARTITIONED_BY = ARRAY['p0']);
511
+ std::stringstream sql;
512
+ sql << " CREATE TABLE tmp_write ( " ;
513
+ for (auto i = 0 ; i < tableWriteNode->columnNames ().size (); ++i) {
514
+ appendComma (i, sql);
515
+ sql << tableWriteNode->columnNames ()[i] << " "
516
+ << toTypeSql (tableWriteNode->columns ()->childAt (i));
517
+ if (insertTableHandle->inputColumns ()[i]->isPartitionKey ()) {
518
+ partitionKeys.push_back (insertTableHandle->inputColumns ()[i]->name ());
519
+ }
520
+ }
521
+ sql << " )" ;
522
+
523
+ if (insertTableHandle->isPartitioned ()) {
524
+ sql << " WITH (PARTITIONED_BY = ARRAY[" ;
525
+ for (int i = 0 ; i < partitionKeys.size (); ++i) {
526
+ appendComma (i, sql);
527
+ sql << " '" << partitionKeys[i] << " '" ;
528
+ }
529
+ sql << " ])" ;
530
+ }
531
+ return sql.str ();
532
+ }
533
+
503
534
std::multiset<std::vector<variant>> PrestoQueryRunner::execute (
504
535
const std::string& sql,
505
536
const std::vector<RowVectorPtr>& input,
0 commit comments