Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ static bool try_redistribute(PlannerInfo *root, CdbpathMfjRel *g,

static SplitUpdatePath *make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti);

static SplitMergePath *make_split_merge_path(PlannerInfo *root, Path *subpath, List* resultRelations, List *mergeActionLists);

static bool can_elide_explicit_motion(PlannerInfo *root, Index rti, Path *subpath, GpPolicy *policy);
/*
* cdbpath_cost_motion
Expand Down Expand Up @@ -2774,6 +2776,82 @@ create_split_update_path(PlannerInfo *root, Index rti, GpPolicy *policy, Path *s
return subpath;
}


Path *
create_motion_path_for_merge(PlannerInfo *root, List *resultRelations, GpPolicy *policy, List *mergeActionLists, Path *subpath)
{
GpPolicyType policyType = policy->ptype;
CdbPathLocus targetLocus;
RelOptInfo *rel;
ListCell *lc, *l;
bool need_split_merge = false;

if (policyType == POLICYTYPE_PARTITIONED)
{
/*
* If merge contain CMD_INSERT, we need split merge to let new
* insert tuple redistributed to correct segment. otherwise, we
* create motion as the same as update/delete in create_motion_path_for_upddel
*/
foreach(l, mergeActionLists)
{
List *mergeActionList = lfirst(l);
foreach(lc, mergeActionList)
{
MergeAction *action = lfirst(lc);
if (action->commandType == CMD_INSERT)
need_split_merge = true;
}
}

if (need_split_merge)
{
if (root->simple_rel_array[linitial_int(resultRelations)])
rel = root->simple_rel_array[linitial_int(resultRelations)];
else
rel = build_simple_rel(root, linitial_int(resultRelations), NULL /*parent*/);
targetLocus = cdbpathlocus_from_baserel(root, rel, 0);

subpath = (Path *) make_split_merge_path(root, subpath, resultRelations, mergeActionLists);
subpath = cdbpath_create_explicit_motion_path(root,
subpath,
targetLocus);
}
else
{

if (can_elide_explicit_motion(root, linitial_int(resultRelations), subpath, policy))
return subpath;
else
{
CdbPathLocus_MakeStrewn(&targetLocus, policy->numsegments, 0);
subpath = cdbpath_create_explicit_motion_path(root,
subpath,
targetLocus);
}
}
}
else if (policyType == POLICYTYPE_ENTRY)
{
/* Master-only table */
CdbPathLocus_MakeEntry(&targetLocus);
subpath = cdbpath_create_motion_path(root, subpath, NIL, false, targetLocus);
}
else if (policyType == POLICYTYPE_REPLICATED)
{
/*
* The statement that insert/update/delete on replicated table has to
* be dispatched to each segment and executed on each segment. Thus
* the targetlist cannot contain volatile functions.
*/
if (contain_volatile_functions((Node *) (subpath->pathtarget->exprs)))
elog(ERROR, "could not devise a plan.");
}
else
elog(ERROR, "unrecognized policy type %u", policyType);
return subpath;
}

/*
* turn_volatile_seggen_to_singleqe
*
Expand Down Expand Up @@ -2836,6 +2914,35 @@ turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node)
return path;
}

static SplitMergePath *
make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, List *mergeActionLists)
{
PathTarget *splitMergePathTarget;
SplitMergePath *splitmergepath;

splitMergePathTarget = copy_pathtarget(subpath->pathtarget);

/* populate information generated above into splitupdate node */
splitmergepath = makeNode(SplitMergePath);
splitmergepath->path.pathtype = T_SplitMerge;
splitmergepath->path.parent = subpath->parent;
splitmergepath->path.pathtarget = splitMergePathTarget;
splitmergepath->path.param_info = NULL;
splitmergepath->path.parallel_aware = false;
splitmergepath->path.parallel_safe = subpath->parallel_safe;
splitmergepath->path.parallel_workers = subpath->parallel_workers;
splitmergepath->path.rows = 2 * subpath->rows;
splitmergepath->path.startup_cost = subpath->startup_cost;
splitmergepath->path.total_cost = subpath->total_cost;
splitmergepath->path.pathkeys = subpath->pathkeys;
splitmergepath->path.locus = subpath->locus;
splitmergepath->subpath = subpath;
splitmergepath->resultRelations = resultRelations;
splitmergepath->mergeActionLists = mergeActionLists;

return splitmergepath;
}

static SplitUpdatePath *
make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti)
{
Expand Down
11 changes: 11 additions & 0 deletions src/backend/cdb/cdbplan.c
Original file line number Diff line number Diff line change
Expand Up @@ -970,6 +970,17 @@ plan_tree_mutator(Node *node,
}
break;

case T_SplitMerge:
{
SplitMerge *splitMerge = (SplitMerge *) node;
SplitMerge *newSplitMerge;

FLATCOPY(newSplitMerge, splitMerge, SplitMerge);
PLANMUTATE(newSplitMerge, splitMerge);
return (Node *) newSplitMerge;
}
break;

case T_IncrementalSort:
{
IncrementalSort *incrementalSort = (IncrementalSort *) node;
Expand Down
1 change: 1 addition & 0 deletions src/backend/cdb/cdbtargeteddispatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ DirectDispatchUpdateContentIdsFromPlan(PlannerInfo *root, Plan *plan)
* so disable */
break;
case T_SplitUpdate:
case T_SplitMerge:
break;
case T_CustomScan:
break;
Expand Down
5 changes: 4 additions & 1 deletion src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1961,7 +1961,10 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
break;
case T_SplitUpdate:
pname = sname = "Split";
pname = sname = "Split Update";
break;
case T_SplitMerge:
pname = sname = "Split Merge";
break;
case T_AssertOp:
pname = sname = "Assert";
Expand Down
3 changes: 3 additions & 0 deletions src/backend/commands/prepare.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ PrepareQuery(ParseState *pstate, PrepareStmt *stmt,
case CMD_DELETE:
srctag = T_DeleteStmt;
break;
case CMD_MERGE:
srctag = T_MergeStmt;
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PSTATEMENT_DEFINITION),
Expand Down
1 change: 1 addition & 0 deletions src/backend/executor/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ OBJS += nodeMotion.o \
nodeSequence.o \
nodeAssertOp.o \
nodeSplitUpdate.o \
nodeSplitMerge.o \
nodeTupleSplit.o \
nodePartitionSelector.o

Expand Down
2 changes: 1 addition & 1 deletion src/backend/executor/execMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ standard_ExecutorRun(QueryDesc *queryDesc,
*/
if (IS_QD_OR_SINGLENODE() &&
(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE ||
queryDesc->plannedstmt->hasModifyingCTE) &&
operation == CMD_MERGE || queryDesc->plannedstmt->hasModifyingCTE) &&
((es_processed > 0 || estate->es_processed > 0) || !queryDesc->plannedstmt->canSetTag))
{
MaintainMaterializedViewStatus(queryDesc, operation);
Expand Down
8 changes: 8 additions & 0 deletions src/backend/executor/execProcnode.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
#include "executor/nodeSequence.h"
#include "executor/nodeShareInputScan.h"
#include "executor/nodeSplitUpdate.h"
#include "executor/nodeSplitMerge.h"
#include "executor/nodeTableFunction.h"
#include "pg_trace.h"
#include "tcop/tcopprot.h"
Expand Down Expand Up @@ -512,6 +513,10 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
result = (PlanState *) ExecInitSplitUpdate((SplitUpdate *) node,
estate, eflags);
break;
case T_SplitMerge:
result = (PlanState *) ExecInitSplitMerge((SplitMerge *) node,
estate, eflags);
break;
case T_AssertOp:
result = (PlanState *) ExecInitAssertOp((AssertOp *) node,
estate, eflags);
Expand Down Expand Up @@ -1055,6 +1060,9 @@ ExecEndNode(PlanState *node)
case T_SplitUpdateState:
ExecEndSplitUpdate((SplitUpdateState *) node);
break;
case T_SplitMergeState:
ExecEndSplitMerge((SplitMergeState *) node);
break;
case T_AssertOpState:
ExecEndAssertOp((AssertOpState *) node);
break;
Expand Down
32 changes: 17 additions & 15 deletions src/backend/executor/nodeModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,7 @@ ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
ModifyTableState *mtstate = context->mtstate;
EState *estate = context->estate;
TransitionCaptureState *ar_delete_trig_tcs;
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;

/*
* If this delete is the result of a partition key update that moved the
Expand Down Expand Up @@ -1539,6 +1540,14 @@ ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
if (!RelationIsNonblockRelation(resultRelInfo->ri_RelationDesc) && !splitUpdate)
ExecARDeleteTriggers(estate, resultRelInfo, tupleid, oldtuple,
ar_delete_trig_tcs, changingPart);

if (resultRelationDesc->rd_rel->relispartition)
{

context->mtstate->mt_leaf_relids_deleted =
bms_add_member(context->mtstate->mt_leaf_relids_deleted, RelationGetRelid(resultRelationDesc));
context->mtstate->has_leaf_changed = true;
}
}

/* ----------------------------------------------------------------
Expand Down Expand Up @@ -1872,14 +1881,6 @@ ExecDelete(ModifyTableContext *context,
if (canSetTag)
(estate->es_processed)++;

if (resultRelationDesc->rd_rel->relispartition)
{

context->mtstate->mt_leaf_relids_deleted =
bms_add_member(context->mtstate->mt_leaf_relids_deleted, RelationGetRelid(resultRelationDesc));
context->mtstate->has_leaf_changed = true;
}

/* Tell caller that the delete actually happened. */
if (tupleDeleted)
*tupleDeleted = true;
Expand Down Expand Up @@ -2354,6 +2355,7 @@ ExecUpdateEpilogue(ModifyTableContext *context, UpdateContext *updateCxt,
{
ModifyTableState *mtstate = context->mtstate;
List *recheckIndexes = NIL;
Relation resultRelationDesc = resultRelInfo->ri_RelationDesc;

/* insert index entries for tuple if necessary */
if (resultRelInfo->ri_NumIndices > 0 && (updateCxt->updateIndexes != TU_None))
Expand Down Expand Up @@ -2388,6 +2390,13 @@ ExecUpdateEpilogue(ModifyTableContext *context, UpdateContext *updateCxt,
if (resultRelInfo->ri_WithCheckOptions != NIL)
ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo,
slot, context->estate);

if (resultRelationDesc->rd_rel->relispartition)
{
context->mtstate->mt_leaf_relids_updated =
bms_add_member(context->mtstate->mt_leaf_relids_updated, RelationGetRelid(resultRelationDesc));
context->mtstate->has_leaf_changed = true;
}
}

/*
Expand Down Expand Up @@ -2726,13 +2735,6 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
if (canSetTag)
(estate->es_processed)++;

if (resultRelationDesc->rd_rel->relispartition)
{
context->mtstate->mt_leaf_relids_updated =
bms_add_member(context->mtstate->mt_leaf_relids_updated, RelationGetRelid(resultRelationDesc));
context->mtstate->has_leaf_changed = true;
}

ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, tupleid, oldtuple,
slot);

Expand Down
Loading