Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADBDEV-1938-optimize-2 #628

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion src/backend/cdb/cdbmutate.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ typedef struct ApplyMotionState
plan_tree_base_prefix base; /* Required prefix for
* plan_tree_walker/mutator */
int nextMotionID;
int numsegments;
int sliceDepth;
bool containMotionNodes;
Node *from;
HTAB *planid_subplans; /* hash table for InitPlanItem */
} ApplyMotionState;

Expand Down Expand Up @@ -367,6 +369,49 @@ get_partitioned_policy_from_flow(Plan *plan)
}


static bool
rows_number_walker(Node *node, ApplyMotionState *context)
{
if (node == NULL)
return false;

if (is_plan_node(node))
{
Plan *plan = (Plan *) node;

if (plan->flow != NULL && plan->flow->req_move == MOVEMENT_BROADCAST)
return true;

plan->plan_rows *= context->numsegments;
}

return plan_tree_walker(node, rows_number_walker, context);
}


static bool
broadcast_motion_walker(Node *node, ApplyMotionState *context)
{
if (node == NULL)
return false;

if (is_plan_node(node))
{
Plan *plan = (Plan *) node;

if (plan->flow != NULL && plan->flow->req_move == MOVEMENT_BROADCAST)
{
context->numsegments = plan->flow->numsegments;
(void )rows_number_walker(context->from, context);
}
}
else if (IsA(node, Motion) || IsA(node, SubPlan))
context->from = node;

return plan_tree_walker(node, broadcast_motion_walker, context);
}


/* -------------------------------------------------------------------------
* Function apply_motion() and apply_motion_mutator() add motion nodes to a
* top-level Plan tree as directed by the Flow nodes in the plan.
Expand Down Expand Up @@ -403,8 +448,10 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
* descend into subplan
* plan */
state.nextMotionID = 1; /* Start at 1 so zero will mean "unassigned". */
state.numsegments = numsegments;
state.sliceDepth = 0;
state.containMotionNodes = false;
state.from = (Node *) plan;
memset(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(int);
ctl.entrysize = sizeof(InitPlanItem);
Expand Down Expand Up @@ -684,6 +731,8 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
Insist(focusPlan(plan, false, false));
}

(void) plan_tree_walker((Node *) plan, broadcast_motion_walker, &state);

result = (Plan *) apply_motion_mutator((Node *) plan, &state);

if (needToAssignDirectDispatchContentIds)
Expand Down Expand Up @@ -1103,7 +1152,7 @@ add_slice_to_motion(Motion *motion,
/* broadcast */
motion->plan.flow = makeFlow(FLOW_REPLICATED, numsegments);
motion->plan.flow->locustype = CdbLocusType_Replicated;

motion->plan.plan_rows *= numsegments;
}
else
{
Expand Down
144 changes: 47 additions & 97 deletions src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1221,64 +1221,15 @@ ExplainNode(PlanState *planstate, List *ancestors,
parentplanstate = es->parentPlanState;
es->parentPlanState = planstate;

if (Gp_role == GP_ROLE_DISPATCH)
if (save_currentSlice != NULL && save_currentSlice->gangSize > 0)
{
/*
* Estimates will have to be scaled down to be per-segment (except in a
* few cases).
*/
if ((plan->directDispatch).isDirectDispatch)
{
scaleFactor = 1.0;
}
else if (plan->flow != NULL && CdbPathLocus_IsBottleneck(*(plan->flow)))
{
/*
* Data is unified in one place (singleQE or QD), or executed on a
* single segment. We scale up estimates to make it global. We
* will later amend this for Motion nodes.
*/
scaleFactor = 1.0;
}
else if (plan->flow != NULL && CdbPathLocus_IsSegmentGeneral(*(plan->flow)))
{
/* Replicated table has full data on every segment */
scaleFactor = 1.0;
}
else if (plan->flow != NULL && es->pstmt->planGen == PLANGEN_PLANNER)
{
/*
* The plan node is executed on multiple nodes, so scale down the
* number of rows seen by each segment
*/
scaleFactor = CdbPathLocus_NumSegments(*(plan->flow));
}
else
{
/*
* The plan node is executed on multiple nodes, so scale down the
* number of rows seen by each segment
*/
scaleFactor = getgpsegmentCount();
}
scaleFactor = (float) save_currentSlice->gangSize;
}

/*
* If this is a Motion node, we're descending into a new slice.
*/
if (IsA(plan, Motion))
if (plan->flow != NULL && CdbPathLocus_IsSegmentGeneral(*(plan->flow)))
{
Motion *pMotion = (Motion *) plan;
SliceTable *sliceTable = planstate->state->es_sliceTable;

if (sliceTable)
{
es->currentSlice = (Slice *) list_nth(sliceTable->slices,
pMotion->motionID);
parentSlice = es->currentSlice->parentIndex == -1 ? NULL :
(Slice *) list_nth(sliceTable->slices,
es->currentSlice->parentIndex);
}
/* Replicated table has full data on every segment */
scaleFactor = 1.0;
}

switch (nodeTag(plan))
Expand All @@ -1288,6 +1239,12 @@ ExplainNode(PlanState *planstate, List *ancestors,
break;
case T_ModifyTable:
sname = "ModifyTable";

if (plan->flow != NULL && CdbPathLocus_IsReplicated(*(plan->flow)))
{
scaleFactor = 1.0;
}

switch (((ModifyTable *) plan)->operation)
{
case CMD_INSERT:
Expand Down Expand Up @@ -1467,81 +1424,74 @@ ExplainNode(PlanState *planstate, List *ancestors,
case T_Motion:
{
Motion *pMotion = (Motion *) plan;
SliceTable *sliceTable = planstate->state->es_sliceTable;

Assert(plan->lefttree);
Assert(plan->lefttree->flow);

motion_snd = es->currentSlice->gangSize;
motion_recv = (parentSlice == NULL ? 1 : parentSlice->gangSize);
/* Descending into a new slice. */
if (sliceTable)
{
es->currentSlice = (Slice *) list_nth(sliceTable->slices,
pMotion->motionID);
}

/* scale the number of rows by the number of segments sending data */
scaleFactor = motion_snd;
/* Size of the child slice's gang */
motion_snd = es->currentSlice->gangSize;
/* Size of the current slice's gang (save_currentSlice->gangSize) */
motion_recv = scaleFactor;

switch (pMotion->motionType)
{
case MOTIONTYPE_HASH:
sname = "Redistribute Motion";

/*
* scale the number of rows by the number of segments
* sending data
*/
scaleFactor = motion_snd;
break;
case MOTIONTYPE_FIXED:
if (pMotion->isBroadcast)
{
sname = "Broadcast Motion";

/*
* Scale the number of rows by the number of
* segments receiving data, because they were
* multiplied by this number at the planning
* stage. We don't use segments count because the
* number of receivers can be less if we are
* expanding a cluster.
*/
scaleFactor = motion_recv;
}
else if (plan->lefttree->flow->locustype == CdbLocusType_Replicated)
{
sname = "Explicit Gather Motion";
scaleFactor = 1;
motion_recv = 1;
Assert(scaleFactor == 1);
}
else
{
sname = "Gather Motion";
scaleFactor = 1;
motion_recv = 1;
Assert(scaleFactor == 1);
}
break;
case MOTIONTYPE_EXPLICIT:
sname = "Explicit Redistribute Motion";
motion_recv = getgpsegmentCount();

/*
* scale the number of rows by the number of segments
* sending data
*/
scaleFactor = motion_snd;
break;
default:
sname = "???";
break;
}

if (es->pstmt->planGen == PLANGEN_PLANNER)
{
Slice *slice = es->currentSlice;

if (slice->directDispatch.isDirectDispatch)
{
/* Special handling on direct dispatch */
motion_snd = list_length(slice->directDispatch.contentIds);
}
else if (plan->lefttree->flow->flotype == FLOW_SINGLETON)
{
/* For SINGLETON we always display sender size as 1 */
motion_snd = 1;
}
else
{
/* Otherwise find out sender size from outer plan */
motion_snd = plan->lefttree->flow->numsegments;
}

if (pMotion->motionType == MOTIONTYPE_FIXED &&
!pMotion->isBroadcast)
{
/* In Gather Motion always display receiver size as 1 */
motion_recv = 1;
}
else
{
/* Otherwise find out receiver size from plan */
motion_recv = plan->flow->numsegments;
}
}

pname = psprintf("%s %d:%d", sname, motion_snd, motion_recv);
}
break;
Expand Down Expand Up @@ -1850,7 +1800,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
{
ExplainPropertyFloat("Startup Cost", plan->startup_cost, 2, es);
ExplainPropertyFloat("Total Cost", plan->total_cost, 2, es);
ExplainPropertyFloat("Plan Rows", plan->plan_rows, 0, es);
ExplainPropertyFloat("Plan Rows", ceil(plan->plan_rows / scaleFactor), 0, es);
ExplainPropertyInteger("Plan Width", plan->plan_width, es);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@
<dxl:SortingColumnList/>
<dxl:TableScan>
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="431.000023" Rows="3.000000" Width="12"/>
<dxl:Cost StartupCost="0" TotalCost="431.000023" Rows="1.000000" Width="12"/>
</dxl:Properties>
<dxl:ProjList>
<dxl:ProjElem ColId="0" Alias="a">
Expand All @@ -316,7 +316,7 @@
</dxl:ProjElem>
</dxl:ProjList>
<dxl:Filter/>
<dxl:TableDescriptor Mdid="6.24577.1.0" TableName="test_replicated" LockMode="1">
<dxl:TableDescriptor Mdid="6.24577.1.0" TableName="test_replicated">
<dxl:Columns>
<dxl:Column ColId="0" Attno="1" ColName="a" TypeMdid="0.25.1.0" ColWidth="8"/>
<dxl:Column ColId="1" Attno="2" ColName="b" TypeMdid="0.23.1.0" ColWidth="4"/>
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/CTE15HAReplicated.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@
</dxl:HashExprList>
<dxl:TableScan>
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="431.016335" Rows="300.000000" Width="267"/>
<dxl:Cost StartupCost="0" TotalCost="431.016335" Rows="100.000000" Width="267"/>
</dxl:Properties>
<dxl:ProjList>
<dxl:ProjElem ColId="40" Alias="a">
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/CTE15Replicated.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@
</dxl:HashExprList>
<dxl:TableScan>
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="431.016335" Rows="300.000000" Width="267"/>
<dxl:Cost StartupCost="0" TotalCost="431.016335" Rows="100.000000" Width="267"/>
</dxl:Properties>
<dxl:ProjList>
<dxl:ProjElem ColId="40" Alias="a">
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/CTE2HAReplicated.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@
</dxl:HashExprList>
<dxl:TableScan>
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="431.000161" Rows="3.000000" Width="263"/>
<dxl:Cost StartupCost="0" TotalCost="431.000161" Rows="1.000000" Width="263"/>
</dxl:Properties>
<dxl:ProjList>
<dxl:ProjElem ColId="40" Alias="a">
Expand Down
2 changes: 1 addition & 1 deletion src/backend/gporca/data/dxl/minidump/CTE2Replicated.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@
</dxl:HashExprList>
<dxl:TableScan>
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="431.000161" Rows="3.000000" Width="263"/>
<dxl:Cost StartupCost="0" TotalCost="431.000161" Rows="1.000000" Width="263"/>
</dxl:Properties>
<dxl:ProjList>
<dxl:ProjElem ColId="40" Alias="a">
Expand Down
4 changes: 2 additions & 2 deletions src/backend/gporca/data/dxl/minidump/FullJoin-Replicated.mdp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@
</dxl:HashExprList>
<dxl:TableScan>
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="431.000021" Rows="3.000000" Width="8"/>
<dxl:Cost StartupCost="0" TotalCost="431.000021" Rows="1.000000" Width="8"/>
</dxl:Properties>
<dxl:ProjList>
<dxl:ProjElem ColId="0" Alias="a">
Expand Down Expand Up @@ -408,7 +408,7 @@
</dxl:HashExprList>
<dxl:TableScan>
<dxl:Properties>
<dxl:Cost StartupCost="0" TotalCost="431.000021" Rows="3.000000" Width="8"/>
<dxl:Cost StartupCost="0" TotalCost="431.000021" Rows="1.000000" Width="8"/>
</dxl:Properties>
<dxl:ProjList>
<dxl:ProjElem ColId="9" Alias="c">
Expand Down
Loading
Loading