Skip to content

Commit 8f0c03c

Browse files
committed
buildPipeline
remvoe parseJson
1 parent 006500a commit 8f0c03c

File tree

3 files changed

+112
-126
lines changed

3 files changed

+112
-126
lines changed

cpp-ch/local-engine/Parser/SerializedPlanParser.cpp

+100-105
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,19 @@ extern const int ILLEGAL_TYPE_OF_ARGUMENT;
9797
extern const int INVALID_JOIN_ON_EXPRESSION;
9898
}
9999
}
100-
100+
namespace
101+
{
102+
DB::NamesAndTypesList blockToNameAndTypeList(const DB::Block & header)
103+
{
104+
DB::NamesAndTypesList types;
105+
for (const auto & name : header.getNames())
106+
{
107+
const auto * column = header.findByName(name);
108+
types.push_back(DB::NameAndTypePair(column->name, column->type));
109+
}
110+
return types;
111+
}
112+
}
101113
namespace local_engine
102114
{
103115
using namespace DB;
@@ -377,90 +389,87 @@ DataTypePtr wrapNullableType(bool nullable, DataTypePtr nested_type)
377389
return nested_type;
378390
}
379391

380-
QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
392+
void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel & root_rel)
381393
{
382-
logDebugMessage(plan, "substrait plan");
383-
parseExtensions(plan.extensions());
384-
if (plan.relations_size() == 1)
394+
if (root_rel.root().names_size())
385395
{
386-
auto root_rel = plan.relations().at(0);
387-
if (!root_rel.has_root())
388-
{
389-
throw Exception(ErrorCodes::BAD_ARGUMENTS, "must have root rel!");
390-
}
391-
std::list<const substrait::Rel *> rel_stack;
392-
auto query_plan = parseOp(root_rel.root().input(), rel_stack);
393-
if (root_rel.root().names_size())
394-
{
395-
ActionsDAGPtr actions_dag = std::make_shared<ActionsDAG>(blockToNameAndTypeList(query_plan->getCurrentDataStream().header));
396-
NamesWithAliases aliases;
397-
auto cols = query_plan->getCurrentDataStream().header.getNamesAndTypesList();
398-
if (cols.getNames().size() != static_cast<size_t>(root_rel.root().names_size()))
399-
{
400-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size.");
401-
}
402-
for (int i = 0; i < static_cast<int>(cols.getNames().size()); i++)
403-
{
404-
aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i)));
405-
}
406-
actions_dag->project(aliases);
407-
auto expression_step = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), actions_dag);
408-
expression_step->setStepDescription("Rename Output");
409-
query_plan->addStep(std::move(expression_step));
410-
}
396+
ActionsDAGPtr actions_dag = std::make_shared<ActionsDAG>(blockToNameAndTypeList(query_plan->getCurrentDataStream().header));
397+
NamesWithAliases aliases;
398+
auto cols = query_plan->getCurrentDataStream().header.getNamesAndTypesList();
399+
if (cols.getNames().size() != static_cast<size_t>(root_rel.root().names_size()))
400+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size.");
401+
for (int i = 0; i < static_cast<int>(cols.getNames().size()); i++)
402+
aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i)));
403+
actions_dag->project(aliases);
404+
auto expression_step = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), actions_dag);
405+
expression_step->setStepDescription("Rename Output");
406+
query_plan->addStep(std::move(expression_step));
407+
}
411408

412-
// fixes: issue-1874, to keep the nullability as expected.
413-
const auto & output_schema = root_rel.root().output_schema();
414-
if (output_schema.types_size())
409+
// fixes: issue-1874, to keep the nullability as expected.
410+
const auto & output_schema = root_rel.root().output_schema();
411+
if (output_schema.types_size())
412+
{
413+
auto original_header = query_plan->getCurrentDataStream().header;
414+
const auto & original_cols = original_header.getColumnsWithTypeAndName();
415+
if (static_cast<size_t>(output_schema.types_size()) != original_cols.size())
416+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch output schema");
417+
bool need_final_project = false;
418+
ColumnsWithTypeAndName final_cols;
419+
for (int i = 0; i < output_schema.types_size(); ++i)
415420
{
416-
auto original_header = query_plan->getCurrentDataStream().header;
417-
const auto & original_cols = original_header.getColumnsWithTypeAndName();
418-
if (static_cast<size_t>(output_schema.types_size()) != original_cols.size())
419-
{
420-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mismatch output schema");
421-
}
422-
bool need_final_project = false;
423-
ColumnsWithTypeAndName final_cols;
424-
for (int i = 0; i < output_schema.types_size(); ++i)
421+
const auto & col = original_cols[i];
422+
auto type = TypeParser::parseType(output_schema.types(i));
423+
// At present, we only check nullable mismatch.
424+
// intermediate aggregate data is special, no check here.
425+
if (type->isNullable() != col.type->isNullable() && !typeid_cast<const DataTypeAggregateFunction *>(col.type.get()))
425426
{
426-
const auto & col = original_cols[i];
427-
auto type = TypeParser::parseType(output_schema.types(i));
428-
// At present, we only check nullable mismatch.
429-
// intermediate aggregate data is special, no check here.
430-
if (type->isNullable() != col.type->isNullable() && !typeid_cast<const DataTypeAggregateFunction *>(col.type.get()))
427+
if (type->isNullable())
431428
{
432-
if (type->isNullable())
433-
{
434-
auto wrapped = wrapNullableType(true, col.type);
435-
final_cols.emplace_back(type->createColumn(), wrapped, col.name);
436-
need_final_project = !wrapped->equals(*col.type);
437-
}
438-
else
439-
{
440-
final_cols.emplace_back(type->createColumn(), removeNullable(col.type), col.name);
441-
need_final_project = true;
442-
}
429+
auto wrapped = wrapNullableType(true, col.type);
430+
final_cols.emplace_back(type->createColumn(), wrapped, col.name);
431+
need_final_project = !wrapped->equals(*col.type);
443432
}
444433
else
445434
{
446-
final_cols.push_back(col);
435+
final_cols.emplace_back(type->createColumn(), removeNullable(col.type), col.name);
436+
need_final_project = true;
447437
}
448438
}
449-
if (need_final_project)
439+
else
450440
{
451-
ActionsDAGPtr final_project
452-
= ActionsDAG::makeConvertingActions(original_cols, final_cols, ActionsDAG::MatchColumnsMode::Position);
453-
QueryPlanStepPtr final_project_step = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), final_project);
454-
final_project_step->setStepDescription("Project for output schema");
455-
query_plan->addStep(std::move(final_project_step));
441+
final_cols.push_back(col);
456442
}
457443
}
458-
return query_plan;
444+
if (need_final_project)
445+
{
446+
ActionsDAGPtr final_project
447+
= ActionsDAG::makeConvertingActions(original_cols, final_cols, ActionsDAG::MatchColumnsMode::Position);
448+
QueryPlanStepPtr final_project_step = std::make_unique<ExpressionStep>(query_plan->getCurrentDataStream(), final_project);
449+
final_project_step->setStepDescription("Project for output schema");
450+
query_plan->addStep(std::move(final_project_step));
451+
}
459452
}
460-
else
461-
{
453+
}
454+
455+
QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
456+
{
457+
logDebugMessage(plan, "substrait plan");
458+
parseExtensions(plan.extensions());
459+
if (plan.relations_size() != 1)
462460
throw Exception(ErrorCodes::BAD_ARGUMENTS, "too many relations found");
463-
}
461+
462+
const substrait::PlanRel & root_rel = plan.relations().at(0);
463+
if (!root_rel.has_root())
464+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "must have root rel!");
465+
466+
if (root_rel.root().input().has_write())
467+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "write pipeline is not supported yet!");
468+
469+
std::list<const substrait::Rel *> rel_stack;
470+
auto query_plan = parseOp(root_rel.root().input(), rel_stack);
471+
adjustOutput(query_plan, root_rel);
472+
return query_plan;
464473
}
465474

466475
QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack)
@@ -553,17 +562,6 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list
553562
return query_plan;
554563
}
555564

556-
NamesAndTypesList SerializedPlanParser::blockToNameAndTypeList(const Block & header)
557-
{
558-
NamesAndTypesList types;
559-
for (const auto & name : header.getNames())
560-
{
561-
const auto * column = header.findByName(name);
562-
types.push_back(NameAndTypePair(column->name, column->type));
563-
}
564-
return types;
565-
}
566-
567565
std::optional<String> SerializedPlanParser::getFunctionSignatureName(UInt32 function_ref) const
568566
{
569567
auto it = function_mapping.find(std::to_string(function_ref));
@@ -1713,14 +1711,11 @@ substrait::ReadRel::LocalFiles SerializedPlanParser::parseLocalFiles(const std::
17131711
return local_files;
17141712
}
17151713

1716-
std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan)
1714+
DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPlan & query_plan)
17171715
{
1718-
Stopwatch stopwatch;
1719-
auto * logger = &Poco::Logger::get("SerializedPlanParser");
17201716
const Settings & settings = context->getSettingsRef();
1721-
17221717
QueryPriorities priorities;
1723-
auto query_status = std::make_shared<QueryStatus>(
1718+
const auto query_status = std::make_shared<QueryStatus>(
17241719
context,
17251720
"",
17261721
context->getClientInfo(),
@@ -1729,26 +1724,35 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPla
17291724
IAST::QueryKind::Select,
17301725
settings,
17311726
0);
1732-
1733-
QueryPlanOptimizationSettings optimization_settings{.optimize_plan = settings.query_plan_enable_optimizations};
1734-
auto pipeline_builder = query_plan->buildQueryPipeline(
1727+
const QueryPlanOptimizationSettings optimization_settings{.optimize_plan = settings.query_plan_enable_optimizations};
1728+
return query_plan.buildQueryPipeline(
17351729
optimization_settings,
17361730
BuildQueryPipelineSettings{
17371731
.actions_settings
17381732
= ExpressionActionsSettings{.can_compile_expressions = true, .min_count_to_compile_expression = 3, .compile_expressions = CompileExpressions::yes},
17391733
.process_list_element = query_status});
1734+
}
1735+
1736+
std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPlanPtr query_plan)
1737+
{
1738+
Stopwatch stopwatch;
1739+
1740+
const Settings & settings = context->getSettingsRef();
1741+
auto pipeline_builder = buildQueryPipeline(*query_plan);
1742+
17401743
QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
1741-
LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);
17421744

1745+
auto * logger = &Poco::Logger::get("SerializedPlanParser");
1746+
LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);
17431747
LOG_DEBUG(
17441748
logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations, PlanUtil::explainPlan(*query_plan));
17451749
LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline));
17461750

1747-
return std::make_unique<LocalExecutor>(
1748-
context, std::move(query_plan), std::move(pipeline), query_plan->getCurrentDataStream().header.cloneEmpty());
1751+
bool dump_pipeline = context->getConfigRef().getBool("dump_pipeline", false);
1752+
return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(pipeline), dump_pipeline);
17491753
}
17501754

1751-
QueryPlanPtr SerializedPlanParser::parse(const std::string_view plan)
1755+
QueryPlanPtr SerializedPlanParser::parse(std::string_view plan)
17521756
{
17531757
substrait::Plan s_plan;
17541758
/// https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
@@ -1776,15 +1780,6 @@ QueryPlanPtr SerializedPlanParser::parse(const std::string_view plan)
17761780
return res;
17771781
}
17781782

1779-
QueryPlanPtr SerializedPlanParser::parseJson(const std::string_view & json_plan)
1780-
{
1781-
substrait::Plan plan;
1782-
auto s = google::protobuf::util::JsonStringToMessage(json_plan, &plan);
1783-
if (!s.ok())
1784-
throw Exception(ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse substrait::Plan from json string failed: {}", s.ToString());
1785-
return parse(plan);
1786-
}
1787-
17881783
SerializedPlanParser::SerializedPlanParser(const ContextPtr & context_) : context(context_)
17891784
{
17901785
}
@@ -2035,7 +2030,7 @@ SharedContextHolder SerializedPlanParser::shared_context;
20352030

20362031
LocalExecutor::~LocalExecutor()
20372032
{
2038-
if (context->getConfigRef().getBool("dump_pipeline", false))
2033+
if (dump_pipeline)
20392034
LOG_INFO(&Poco::Logger::get("LocalExecutor"), "Dump pipeline:\n{}", dumpPipeline());
20402035

20412036
if (spark_buffer)
@@ -2109,11 +2104,11 @@ Block & LocalExecutor::getHeader()
21092104
return header;
21102105
}
21112106

2112-
LocalExecutor::LocalExecutor(const ContextPtr & context_, QueryPlanPtr query_plan, QueryPipeline && pipeline, const Block & header_)
2107+
LocalExecutor::LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_)
21132108
: query_pipeline(std::move(pipeline))
21142109
, executor(std::make_unique<PullingPipelineExecutor>(query_pipeline))
2115-
, header(header_)
2116-
, context(context_)
2110+
, header(query_plan->getCurrentDataStream().header.cloneEmpty())
2111+
, dump_pipeline(dump_pipeline_)
21172112
, ch_column_to_spark_row(std::make_unique<CHColumnToSparkRow>())
21182113
, current_query_plan(std::move(query_plan))
21192114
{

cpp-ch/local-engine/Parser/SerializedPlanParser.h

+10-19
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ static const std::map<std::string, std::string> SCALAR_FUNCTIONS
177177
{"array", "array"},
178178
{"shuffle", "arrayShuffle"},
179179
{"range", "range"}, /// dummy mapping
180-
{"flatten", "sparkArrayFlatten"},
180+
{"flatten", "sparkArrayFlatten"},
181181

182182
// map functions
183183
{"map", "map"},
@@ -259,19 +259,17 @@ class SerializedPlanParser
259259

260260
std::unique_ptr<LocalExecutor> createExecutor(DB::QueryPlanPtr query_plan);
261261

262-
DB::QueryPlanPtr parse(const std::string_view plan);
263-
DB::QueryPlanPtr parse(const substrait::Plan & plan);
262+
DB::QueryPlanPtr parse(std::string_view plan);
264263

265264
public:
266265
explicit SerializedPlanParser(const ContextPtr & context);
267266

268-
/// UT only
269-
DB::QueryPlanPtr parseJson(const std::string_view & json_plan);
270-
std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & plan) { return createExecutor(parse((plan))); }
267+
/// visible for UT
268+
DB::QueryPlanPtr parse(const substrait::Plan & plan);
269+
std::unique_ptr<LocalExecutor> createExecutor(const substrait::Plan & plan) { return createExecutor(parse(plan)); }
270+
DB::QueryPipelineBuilderPtr buildQueryPipeline(DB::QueryPlan & query_plan);
271271
///
272-
273-
template <bool JsonPlan>
274-
std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan);
272+
std::unique_ptr<LocalExecutor> createExecutor(const std::string_view plan) { return createExecutor(parse(plan)); }
275273

276274
DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel & rel);
277275
DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel & rel);
@@ -312,7 +310,7 @@ class SerializedPlanParser
312310

313311
IQueryPlanStep * addRemoveNullableStep(QueryPlan & plan, const std::set<String> & columns);
314312
IQueryPlanStep * addRollbackFilterHeaderStep(QueryPlanPtr & query_plan, const Block & input_header);
315-
313+
316314
static std::pair<DataTypePtr, Field> parseLiteral(const substrait::Expression_Literal & literal);
317315

318316
static ContextMutablePtr global_context;
@@ -321,7 +319,6 @@ class SerializedPlanParser
321319
std::vector<QueryPlanPtr> extra_plan_holder;
322320

323321
private:
324-
static DB::NamesAndTypesList blockToNameAndTypeList(const DB::Block & header);
325322
DB::QueryPlanPtr parseOp(const substrait::Rel & rel, std::list<const substrait::Rel *> & rel_stack);
326323
void
327324
collectJoinKeys(const substrait::Expression & condition, std::vector<std::pair<int32_t, int32_t>> & join_keys, int32_t right_key_start);
@@ -409,12 +406,6 @@ class SerializedPlanParser
409406
const ActionsDAG::Node * addColumn(DB::ActionsDAGPtr actions_dag, const DataTypePtr & type, const Field & field);
410407
};
411408

412-
template <bool JsonPlan>
413-
std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const std::string_view plan)
414-
{
415-
return createExecutor(JsonPlan ? parseJson(plan) : parse(plan));
416-
}
417-
418409
struct SparkBuffer
419410
{
420411
char * address;
@@ -424,7 +415,7 @@ struct SparkBuffer
424415
class LocalExecutor : public BlockIterator
425416
{
426417
public:
427-
LocalExecutor(const ContextPtr & context_, QueryPlanPtr query_plan, QueryPipeline && pipeline, const Block & header_);
418+
LocalExecutor(QueryPlanPtr query_plan, QueryPipeline && pipeline, bool dump_pipeline_);
428419
~LocalExecutor();
429420

430421
SparkRowInfoPtr next();
@@ -448,7 +439,7 @@ class LocalExecutor : public BlockIterator
448439
QueryPipeline query_pipeline;
449440
std::unique_ptr<PullingPipelineExecutor> executor;
450441
Block header;
451-
ContextPtr context;
442+
bool dump_pipeline;
452443
std::unique_ptr<CHColumnToSparkRow> ch_column_to_spark_row;
453444
std::unique_ptr<SparkBuffer> spark_buffer;
454445
QueryPlanPtr current_query_plan;

cpp-ch/local-engine/local_engine_jni.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
258258
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
259259
const std::string::size_type plan_size = plan_a.length();
260260
local_engine::LocalExecutor * executor
261-
= parser.createExecutor<false>({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
261+
= parser.createExecutor({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
262262
LOG_INFO(&Poco::Logger::get("jni"), "Construct LocalExecutor {}", reinterpret_cast<uintptr_t>(executor));
263263
executor->setMetric(parser.getMetric());
264264
executor->setExtraPlanHolder(parser.extra_plan_holder);
@@ -1259,7 +1259,7 @@ Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
12591259
const auto plan_a = local_engine::getByteArrayElementsSafe(env, plan);
12601260
const std::string::size_type plan_size = plan_a.length();
12611261
local_engine::LocalExecutor * executor
1262-
= parser.createExecutor<false>({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
1262+
= parser.createExecutor({reinterpret_cast<const char *>(plan_a.elems()), plan_size}).release();
12631263
return reinterpret_cast<jlong>(executor);
12641264
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
12651265
}

0 commit comments

Comments
 (0)