Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel DEDUP_SEMI and DEDUP_SEMI_REVERSE Join.(A new approach to process Semi Join Query in Parallel of MPP) #653

Merged
merged 8 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/backend/access/transam/parallel.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ typedef struct FixedParallelState
* worker will get a different parallel worker number.
*/
int ParallelWorkerNumber = -1;
int ParallelWorkerNumberOfSlice = -1;
int TotalParallelWorkerNumberOfSlice = 0;

/* Is there a parallel message pending which we need to receive? */
volatile bool ParallelMessagePending = false;
Expand Down Expand Up @@ -1746,6 +1748,8 @@ void GpDestroyParallelDSMEntry()
ParallelSession->area = NULL;
}
LWLockRelease(GpParallelDSMHashLock);
ParallelWorkerNumberOfSlice = -1;
TotalParallelWorkerNumberOfSlice = 0;
}

void
Expand Down Expand Up @@ -1835,6 +1839,8 @@ GpInsertParallelDSMHash(PlanState *planstate)
entry->tolaunch = parallel_workers - 1;
entry->parallel_workers = parallel_workers;
entry->temp_worker_id = 0;
ParallelWorkerNumberOfSlice = 0; /* The first worker. */
Assert(TotalParallelWorkerNumberOfSlice == parallel_workers);

/* Create a DSA area that can be used by the leader and all workers. */
char *area_space = shm_toc_allocate(entry->toc, dsa_minsize);
Expand Down Expand Up @@ -1894,7 +1900,7 @@ GpInsertParallelDSMHash(PlanState *planstate)
.nworkers = parallel_workers,
.worker_id = entry->temp_worker_id,
};

ParallelWorkerNumberOfSlice = ctx.worker_id;
InitializeGpParallelWorkers(planstate, &ctx);
}

Expand Down
112 changes: 108 additions & 4 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"

#include "port/pg_bitutils.h"

typedef struct
{
Expand Down Expand Up @@ -2998,6 +2999,8 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
CdbPathLocus_MakeNull(&inner.move_to);
outer.isouter = true;
inner.isouter = false;
int outerParallel = outer.locus.parallel_workers;
int innerParallel = inner.locus.parallel_workers;

Assert(cdbpathlocus_is_valid(outer.locus));
Assert(cdbpathlocus_is_valid(inner.locus));
Expand Down Expand Up @@ -3091,6 +3094,9 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
case JOIN_INNER:
break;
case JOIN_SEMI:
if (!enable_parallel_semi_join)
goto fail;
/* FALLTHROUGH */
case JOIN_ANTI:
case JOIN_LEFT:
case JOIN_LASJ_NOTIN:
Expand All @@ -3100,19 +3106,117 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
case JOIN_UNIQUE_INNER:
case JOIN_RIGHT:
case JOIN_FULL:
case JOIN_DEDUP_SEMI:
case JOIN_DEDUP_SEMI_REVERSE:
/* Join types are not supported in parallel yet. */
goto fail;
case JOIN_DEDUP_SEMI:
if (!enable_parallel_dedup_semi_join)
goto fail;

if (!CdbPathLocus_IsPartitioned(inner.locus))
goto fail;

if (CdbPathLocus_IsPartitioned(outer.locus) ||
CdbPathLocus_IsBottleneck(outer.locus))
{
/* ok */
}
else if (CdbPathLocus_IsGeneral(outer.locus))
{
CdbPathLocus_MakeSingleQE(&outer.locus,
CdbPathLocus_NumSegments(inner.locus));
outer.path->locus = outer.locus;
}
else if (CdbPathLocus_IsSegmentGeneral(outer.locus))
{
CdbPathLocus_MakeSingleQE(&outer.locus,
CdbPathLocus_CommonSegments(inner.locus,
outer.locus));
outer.path->locus = outer.locus;
}
else if (CdbPathLocus_IsSegmentGeneralWorkers(outer.locus))
{
/* CBDB_PARALLEL_FIXME: Consider gather from SegmentGeneralWorkers. */
goto fail;
}
else
goto fail;
inner.ok_to_replicate = false;

/*
* CBDB_PARALLEL:
* rowidexpr is executed by 48 bits of row counter of a 64 bit int.
* When in parallel mode, we need to compute the total bits of the
* left 16 bits for segments and parallel workers.
* The formula is:
* parallel_bits + seg_bits
* while segs is max(dbid) across cluster in case that dbid segments
* are uncontinuous.
* And keep some room to make sure there should not be
* duplicated rows when execution.
*/
if (outerParallel > 1)
{
int segs = cdbcomponent_get_maxdbid();
int parallel_bits = pg_leftmost_one_pos32(outerParallel) + 1;
int seg_bits = pg_leftmost_one_pos32(segs) + 1;
if (parallel_bits + seg_bits > 16)
goto fail;
}
outer.path = add_rowid_to_path(root, outer.path, p_rowidexpr_id);
*p_outer_path = outer.path;
break;

case JOIN_DEDUP_SEMI_REVERSE:
if (!enable_parallel_dedup_semi_reverse_join)
goto fail;
/* same as JOIN_DEDUP_SEMI, but with inner and outer reversed */
if (!CdbPathLocus_IsPartitioned(outer.locus))
goto fail;
if (CdbPathLocus_IsPartitioned(inner.locus) ||
CdbPathLocus_IsBottleneck(inner.locus))
{
/* ok */
}
else if (CdbPathLocus_IsGeneral(inner.locus))
{
CdbPathLocus_MakeSingleQE(&inner.locus,
CdbPathLocus_NumSegments(outer.locus));
inner.path->locus = inner.locus;
}
else if (CdbPathLocus_IsSegmentGeneral(inner.locus))
{
CdbPathLocus_MakeSingleQE(&inner.locus,
CdbPathLocus_CommonSegments(outer.locus,
inner.locus));
inner.path->locus = inner.locus;
}
else if (CdbPathLocus_IsSegmentGeneralWorkers(inner.locus))
{
/* CBDB_PARALLEL_FIXME: Consider gather from SegmentGeneralWorkers. */
goto fail;
}
else
goto fail;
outer.ok_to_replicate = false;
if (innerParallel > 1)
{
int segs = cdbcomponent_get_maxdbid();
int parallel_bits = pg_leftmost_one_pos32(innerParallel) + 1;
int seg_bits = pg_leftmost_one_pos32(segs) + 1;
if (parallel_bits + seg_bits > 16)
goto fail;
}
inner.path = add_rowid_to_path(root, inner.path, p_rowidexpr_id);
*p_inner_path = inner.path;
break;

default:
elog(ERROR, "unexpected join type %d", jointype);
}

/* Get rel sizes. */
outer.bytes = outer.path->rows * outer.path->pathtarget->width;
inner.bytes = inner.path->rows * inner.path->pathtarget->width;
int outerParallel = outer.locus.parallel_workers;
int innerParallel = inner.locus.parallel_workers;

if (join_quals_contain_outer_references ||
CdbPathLocus_IsOuterQuery(outer.locus) ||
Expand Down
17 changes: 16 additions & 1 deletion src/backend/executor/execExpr.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
#include "cdb/cdbvars.h"
#include "utils/pg_locale.h"

#include "port/pg_bitutils.h"

typedef struct LastAttnumInfo
{
Expand Down Expand Up @@ -1139,7 +1140,21 @@ ExecInitExprRec(Expr *node, ExprState *state,
* value.
*/
scratch.opcode = EEOP_ROWIDEXPR;
scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << 48;

/*
* CBDB_PARALLEL
* Planner have ensured that there is enough space for num of segments and parallel workers.
* As we has not set ParallelWokerNumber yet now, use TotalParallelWorkerNumberOfSlice here
* and keep bits space for ParallelWokerNumber.
*/
if (TotalParallelWorkerNumberOfSlice > 0)
{
int parallel_bits = pg_leftmost_one_pos32(TotalParallelWorkerNumberOfSlice) + 1;
/* Planner has checked that there is enough room. */
scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << (48 + parallel_bits);
}
else
scratch.d.rowidexpr.rowcounter = ((int64) GpIdentity.dbid) << 48;

ExprEvalPushStep(state, &scratch);
break;
Expand Down
6 changes: 6 additions & 0 deletions src/backend/executor/execExprInterp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,12 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
EEO_CASE(EEOP_ROWIDEXPR)
{
int64 rowcounter = ++op->d.rowidexpr.rowcounter;
/*
* CBDB_PARALLEL_FIXME
* Take ParallelWorkerNumberOfSlice into account for just once when initialization.
*/
if (IsParallelWorkerOfSlice())
rowcounter |= (((int64) ParallelWorkerNumberOfSlice) << (48));

*op->resvalue = Int64GetDatum(rowcounter);
*op->resnull = false;
Expand Down
9 changes: 8 additions & 1 deletion src/backend/executor/execMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1925,6 +1925,12 @@ InitPlan(QueryDesc *queryDesc, int eflags)
ExecSlice *sendSlice = &estate->es_sliceTable->slices[m->motionID];
estate->currentSliceId = sendSlice->parentIndex;
estate->useMppParallelMode = sendSlice->useMppParallelMode;
/*
* CBDB_PARALLEL
* Remember: parallel_workers is set to no less than = 1 when gang is filled
* for convenience in Motion execution.
*/
TotalParallelWorkerNumberOfSlice = sendSlice->parallel_workers > 1 ? sendSlice->parallel_workers : 0;
}
/* Compute SubPlans' root plan nodes for SubPlans reachable from this plan root */
estate->locallyExecutableSubplans = getLocallyExecutableSubplans(plannedstmt, start_plan_node);
Expand Down Expand Up @@ -1961,9 +1967,10 @@ InitPlan(QueryDesc *queryDesc, int eflags)
bool save_useMppParallelMode = estate->useMppParallelMode;

estate->currentSliceId = estate->es_plannedstmt->subplan_sliceIds[subplan_id - 1];
/* FIXME: test whether mpp parallel style exists for subplan case */
/* CBDB_PARALLEL_FIXME: test whether mpp parallel style exists for subplan case */
estate->useMppParallelMode = false;

/* CBDB_PARALLEL_FIXME: update TotalParallelWorkerNumberOfSlice for subplan, could it be possible? */
Plan *subplan = (Plan *) lfirst(l);
subplanstate = ExecInitNode(subplan, estate, sp_eflags);

Expand Down
2 changes: 1 addition & 1 deletion src/backend/optimizer/path/costsize.c
Original file line number Diff line number Diff line change
Expand Up @@ -6784,7 +6784,7 @@ get_parallel_divisor(Path *path)
* parallel plan.
*/
/*
* GPDB parallel: We don't have a leader like upstream.
* CBDB_PARALLEL: We don't have a leader like upstream.
* parallel_divisor is usually used to estimate rows.
* Since we don't have a leader in GP parallel style, set it the same
* as path's parallel_workers which may be 0 sometimes.
Expand Down
9 changes: 3 additions & 6 deletions src/backend/optimizer/path/joinpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -1327,8 +1327,6 @@ sort_inner_and_outer(PlannerInfo *root,
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
save_jointype != JOIN_FULL &&
save_jointype != JOIN_DEDUP_SEMI &&
save_jointype != JOIN_DEDUP_SEMI_REVERSE &&
save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
Expand Down Expand Up @@ -1936,8 +1934,6 @@ match_unsorted_outer(PlannerInfo *root,
if (joinrel->consider_parallel &&
save_jointype != JOIN_UNIQUE_OUTER &&
save_jointype != JOIN_FULL &&
save_jointype != JOIN_DEDUP_SEMI &&
save_jointype != JOIN_DEDUP_SEMI_REVERSE &&
save_jointype != JOIN_RIGHT &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
Expand Down Expand Up @@ -2032,6 +2028,9 @@ consider_parallel_nestloop(PlannerInfo *root,

if (jointype == JOIN_UNIQUE_INNER)
jointype = JOIN_INNER;

if (jointype == JOIN_DEDUP_SEMI || jointype == JOIN_DEDUP_SEMI_REVERSE)
jointype = JOIN_INNER;

foreach(lc1, outerrel->partial_pathlist)
{
Expand Down Expand Up @@ -2309,8 +2308,6 @@ hash_inner_and_outer(PlannerInfo *root,
save_jointype != JOIN_UNIQUE_OUTER &&
save_jointype != JOIN_FULL &&
save_jointype != JOIN_RIGHT &&
save_jointype != JOIN_DEDUP_SEMI &&
save_jointype != JOIN_DEDUP_SEMI_REVERSE &&
outerrel->partial_pathlist != NIL &&
bms_is_empty(joinrel->lateral_relids))
{
Expand Down
33 changes: 33 additions & 0 deletions src/backend/utils/misc/guc_gp.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ bool gp_appendonly_verify_write_block = false;
bool gp_appendonly_compaction = true;
int gp_appendonly_compaction_threshold = 0;
bool enable_parallel = false;
bool enable_parallel_semi_join = true;
bool enable_parallel_dedup_semi_join = true;
bool enable_parallel_dedup_semi_reverse_join = true;
int gp_appendonly_insert_files = 0;
int gp_appendonly_insert_files_tuples_range = 0;
int gp_random_insert_segments = 0;
Expand Down Expand Up @@ -3041,6 +3044,36 @@ struct config_bool ConfigureNamesBool_gp[] =
false,
NULL, NULL, NULL
},
{
{"enable_parallel_semi_join", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("allow to use of parallel semi join."),
NULL,
GUC_EXPLAIN
},
&enable_parallel_semi_join,
true,
NULL, NULL, NULL
},
{
{"enable_parallel_dedup_semi_join", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("allow to use of parallel dedup semi join."),
NULL,
GUC_EXPLAIN
},
&enable_parallel_dedup_semi_join,
true,
NULL, NULL, NULL
},
{
{"enable_parallel_dedup_semi_reverse_join", PGC_USERSET, DEVELOPER_OPTIONS,
gettext_noop("allow to use of parallel dedup semi reverse join."),
NULL,
GUC_EXPLAIN
},
&enable_parallel_dedup_semi_reverse_join,
true,
NULL, NULL, NULL
},
{
{"gp_internal_is_singlenode", PGC_POSTMASTER, UNGROUPED,
gettext_noop("Is in SingleNode mode (no segments). WARNING: user SHOULD NOT set this by any means."),
Expand Down
13 changes: 13 additions & 0 deletions src/include/access/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ extern volatile bool ParallelMessagePending;
extern PGDLLIMPORT int ParallelWorkerNumber;
extern PGDLLIMPORT bool InitializingParallelWorker;

/* CBDB_PARALLEL: Total parallel workers of a slice including myself, 0 for no parallel */
extern PGDLLIMPORT int ParallelWorkerNumberOfSlice;
extern PGDLLIMPORT int TotalParallelWorkerNumberOfSlice;

typedef struct ParallelEntryTag
{
int cid;
Expand Down Expand Up @@ -90,6 +94,15 @@ typedef struct GpParallelDSMEntry
Barrier build_barrier; /* synchronization for the build dsm phases */
} GpParallelDSMEntry;

/*
* CBDB_PARALLEL
* The Postgres uses ParallelWorkerNumber to handle background workers including
* parallel workers under Gather node.
* To avoid mixing them and assertion failure, we use ParallelWorkerNumberOfSlice
* to indentify CBDB style parallel mode.
*/
#define IsParallelWorkerOfSlice() (ParallelWorkerNumberOfSlice >= 0)

#define IsParallelWorker() (ParallelWorkerNumber >= 0)

extern ParallelContext *CreateParallelContext(const char *library_name,
Expand Down
3 changes: 3 additions & 0 deletions src/include/utils/guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ extern bool gp_appendonly_verify_block_checksums;
extern bool gp_appendonly_verify_write_block;
extern bool gp_appendonly_compaction;
extern bool enable_parallel;
extern bool enable_parallel_semi_join;
extern bool enable_parallel_dedup_semi_join;
extern bool enable_parallel_dedup_semi_reverse_join;
extern int gp_appendonly_insert_files;
extern int gp_appendonly_insert_files_tuples_range;
extern int gp_random_insert_segments;
Expand Down
3 changes: 3 additions & 0 deletions src/include/utils/unsync_guc_name.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@
"enable_nestloop",
"enable_parallel_append",
"enable_parallel_hash",
"enable_parallel_semi_join",
"enable_parallel_dedup_semi_join",
"enable_parallel_dedup_semi_reverse_join",
"enable_partition_pruning",
"enable_partitionwise_aggregate",
"enable_partitionwise_join",
Expand Down
Loading
Loading