26
26
#include " velox/exec/tests/utils/QueryAssertions.h"
27
27
#include " velox/serializers/PrestoSerializer.h"
28
28
#include " velox/type/parser/TypeParser.h"
29
+ #include " velox/velox/connectors/hive/HiveDataSink.h"
30
+ #include " velox/velox/connectors/hive/TableHandle.h"
29
31
30
32
#include < utility>
31
33
@@ -35,16 +37,6 @@ namespace facebook::velox::exec::test {
35
37
36
38
namespace {
37
39
38
- template <typename T>
39
- T extractSingleValue (const std::vector<RowVectorPtr>& data) {
40
- VELOX_CHECK_EQ (1 , data.size ());
41
- VELOX_CHECK_EQ (1 , data[0 ]->childrenSize ());
42
-
43
- auto simpleVector = data[0 ]->childAt (0 )->as <SimpleVector<T>>();
44
- VELOX_CHECK (!simpleVector->isNullAt (0 ));
45
- return simpleVector->valueAt (0 );
46
- }
47
-
48
40
void writeToFile (
49
41
const std::string& path,
50
42
const std::vector<RowVectorPtr>& data,
@@ -174,6 +166,11 @@ std::optional<std::string> PrestoQueryRunner::toSql(
174
166
return toSql (aggregationNode);
175
167
}
176
168
169
+ if (auto tableWriteNode =
170
+ std::dynamic_pointer_cast<const core::TableWriteNode>(plan)) {
171
+ return toSql (tableWriteNode);
172
+ }
173
+
177
174
VELOX_NYI ();
178
175
}
179
176
@@ -500,6 +497,37 @@ std::optional<std::string> PrestoQueryRunner::toSql(
500
497
return sql.str ();
501
498
}
502
499
500
+ std::optional<std::string> PrestoQueryRunner::toSql (
501
+ const std::shared_ptr<const core::TableWriteNode>& tableWriteNode) {
502
+ std::vector<std::string> partitionKeys;
503
+ auto insertTableHandle =
504
+ std::dynamic_pointer_cast<connector::hive::HiveInsertTableHandle>(
505
+ tableWriteNode->insertTableHandle ()->connectorInsertTableHandle ());
506
+
507
+ // Returns a CREATE sql with specified table properties from TableWriteNode, example sql:
508
+ // CREATE TABLE tmp_write (c0 integer, c1 varchar, p0 varchar) WITH (PARTITIONED_BY = ARRAY['p0']);
509
+ std::stringstream sql;
510
+ sql << " CREATE TABLE tmp_write ( " ;
511
+ for (auto i = 0 ; i < tableWriteNode->columnNames ().size (); ++i) {
512
+ appendComma (i, sql);
513
+ sql << tableWriteNode->columnNames ()[i] << " " << toTypeSql (tableWriteNode->columns ()->childAt (i));
514
+ if (insertTableHandle->inputColumns ()[i]->isPartitionKey ()) {
515
+ partitionKeys.push_back (insertTableHandle->inputColumns ()[i]->name ());
516
+ }
517
+ }
518
+ sql << " )" ;
519
+
520
+ if (insertTableHandle->isPartitioned ()) {
521
+ sql << " WITH (PARTITIONED_BY = ARRAY[" ;
522
+ for (int i = 0 ; i < partitionKeys.size (); ++i) {
523
+ appendComma (i, sql);
524
+ sql << " '" << partitionKeys[i] << " '" ;
525
+ }
526
+ sql << " ])" ;
527
+ }
528
+ return sql.str ();
529
+ }
530
+
503
531
std::multiset<std::vector<variant>> PrestoQueryRunner::execute (
504
532
const std::string& sql,
505
533
const std::vector<RowVectorPtr>& input,
0 commit comments