From ceaf53e4956111779047a375494ec65864e2604a Mon Sep 17 00:00:00 2001 From: liushengsong Date: Tue, 30 Dec 2025 11:32:56 +0800 Subject: [PATCH] fix merge --- src/backend/cdb/cdbpath.c | 107 ++++ src/backend/cdb/cdbplan.c | 11 + src/backend/cdb/cdbtargeteddispatch.c | 1 + src/backend/commands/explain.c | 5 +- src/backend/commands/prepare.c | 3 + src/backend/executor/Makefile | 1 + src/backend/executor/execMain.c | 2 +- src/backend/executor/execProcnode.c | 8 + src/backend/executor/nodeModifyTable.c | 32 +- src/backend/executor/nodeSplitMerge.c | 499 ++++++++++++++++ src/backend/nodes/copyfuncs.funcs.c | 34 ++ src/backend/nodes/copyfuncs.switch.c | 3 + src/backend/nodes/gen_node_support.pl | 0 src/backend/nodes/outfast.c | 3 + src/backend/nodes/outfuncs.c | 4 + src/backend/nodes/outfuncs_common.c | 18 + src/backend/nodes/print.c | 2 + src/backend/nodes/readfast.c | 42 ++ src/backend/optimizer/plan/createplan.c | 102 ++++ src/backend/optimizer/plan/setrefs.c | 4 + src/backend/optimizer/plan/subselect.c | 1 + src/backend/optimizer/prep/preptlist.c | 14 + src/backend/optimizer/util/pathnode.c | 19 +- src/backend/optimizer/util/walkers.c | 3 + src/include/cdb/cdbpath.h | 1 + src/include/executor/nodeSplitMerge.h | 23 + src/include/nodes/execnodes.h | 28 + src/include/nodes/nodes.h | 3 + src/include/nodes/pathnodes.h | 9 + src/include/nodes/plannodes.h | 18 + src/pl/plperl/ppport.h | 4 - src/test/regress/expected/merge.out | 765 +++++++++++------------- src/test/regress/sql/merge.sql | 341 ++++++----- 33 files changed, 1496 insertions(+), 614 deletions(-) create mode 100644 src/backend/executor/nodeSplitMerge.c mode change 100644 => 100755 src/backend/nodes/gen_node_support.pl create mode 100644 src/include/executor/nodeSplitMerge.h diff --git a/src/backend/cdb/cdbpath.c b/src/backend/cdb/cdbpath.c index 17d14c95548..07f14bc75f1 100644 --- a/src/backend/cdb/cdbpath.c +++ b/src/backend/cdb/cdbpath.c @@ -75,6 +75,8 @@ static bool try_redistribute(PlannerInfo *root, CdbpathMfjRel *g, static SplitUpdatePath *make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti); +static SplitMergePath *make_split_merge_path(PlannerInfo *root, Path *subpath, List* resultRelations, List *mergeActionLists); + static bool can_elide_explicit_motion(PlannerInfo *root, Index rti, Path *subpath, GpPolicy *policy); /* * cdbpath_cost_motion @@ -2774,6 +2776,82 @@ create_split_update_path(PlannerInfo *root, Index rti, GpPolicy *policy, Path *s return subpath; } + +Path * +create_motion_path_for_merge(PlannerInfo *root, List *resultRelations, GpPolicy *policy, List *mergeActionLists, Path *subpath) +{ + GpPolicyType policyType = policy->ptype; + CdbPathLocus targetLocus; + RelOptInfo *rel; + ListCell *lc, *l; + bool need_split_merge = false; + + if (policyType == POLICYTYPE_PARTITIONED) + { + /* + * If merge contain CMD_INSERT, we need split merge to let new + * insert tuple redistributed to correct segment. otherwise, we + * create motion as the same as update/delete in create_motion_path_for_upddel + */ + foreach(l, mergeActionLists) + { + List *mergeActionList = lfirst(l); + foreach(lc, mergeActionList) + { + MergeAction *action = lfirst(lc); + if (action->commandType == CMD_INSERT) + need_split_merge = true; + } + } + + if (need_split_merge) + { + if (root->simple_rel_array[linitial_int(resultRelations)]) + rel = root->simple_rel_array[linitial_int(resultRelations)]; + else + rel = build_simple_rel(root, linitial_int(resultRelations), NULL /*parent*/); + targetLocus = cdbpathlocus_from_baserel(root, rel, 0); + + subpath = (Path *) make_split_merge_path(root, subpath, resultRelations, mergeActionLists); + subpath = cdbpath_create_explicit_motion_path(root, + subpath, + targetLocus); + } + else + { + + if (can_elide_explicit_motion(root, linitial_int(resultRelations), subpath, policy)) + return subpath; + else + { + CdbPathLocus_MakeStrewn(&targetLocus, policy->numsegments, 0); + subpath = cdbpath_create_explicit_motion_path(root, + subpath, + targetLocus); + } + } + } + else if (policyType == POLICYTYPE_ENTRY) + { + /* Master-only table */ + CdbPathLocus_MakeEntry(&targetLocus); + subpath = cdbpath_create_motion_path(root, subpath, NIL, false, targetLocus); + } + else if (policyType == POLICYTYPE_REPLICATED) + { + /* + * The statement that insert/update/delete on replicated table has to + * be dispatched to each segment and executed on each segment. Thus + * the targetlist cannot contain volatile functions. + */ + if (contain_volatile_functions((Node *) (subpath->pathtarget->exprs))) + elog(ERROR, "could not devise a plan."); + } + else + elog(ERROR, "unrecognized policy type %u", policyType); + return subpath; +} + /* * turn_volatile_seggen_to_singleqe * @@ -2836,6 +2914,35 @@ turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node) return path; } +static SplitMergePath * +make_split_merge_path(PlannerInfo *root, Path *subpath, List *resultRelations, List *mergeActionLists) +{ + PathTarget *splitMergePathTarget; + SplitMergePath *splitmergepath; + + splitMergePathTarget = copy_pathtarget(subpath->pathtarget); + + /* populate information generated above into splitupdate node */ + splitmergepath = makeNode(SplitMergePath); + splitmergepath->path.pathtype = T_SplitMerge; + splitmergepath->path.parent = subpath->parent; + splitmergepath->path.pathtarget = splitMergePathTarget; + splitmergepath->path.param_info = NULL; + splitmergepath->path.parallel_aware = false; + splitmergepath->path.parallel_safe = subpath->parallel_safe; + splitmergepath->path.parallel_workers = subpath->parallel_workers; + splitmergepath->path.rows = 2 * subpath->rows; + splitmergepath->path.startup_cost = subpath->startup_cost; + splitmergepath->path.total_cost = subpath->total_cost; + splitmergepath->path.pathkeys = subpath->pathkeys; + splitmergepath->path.locus = subpath->locus; + splitmergepath->subpath = subpath; + splitmergepath->resultRelations = resultRelations; + splitmergepath->mergeActionLists = mergeActionLists; + + return splitmergepath; +} + static SplitUpdatePath * make_splitupdate_path(PlannerInfo *root, Path *subpath, Index rti) { diff --git a/src/backend/cdb/cdbplan.c b/src/backend/cdb/cdbplan.c index c1d4fcada79..4c926539fc5 100644 --- a/src/backend/cdb/cdbplan.c +++ b/src/backend/cdb/cdbplan.c @@ -970,6 +970,17 @@ plan_tree_mutator(Node *node, } break; + case T_SplitMerge: + { + SplitMerge *splitMerge = (SplitMerge *) node; + SplitMerge *newSplitMerge; + + FLATCOPY(newSplitMerge, splitMerge, SplitMerge); + PLANMUTATE(newSplitMerge, splitMerge); + return (Node *) newSplitMerge; + } + break; + case T_IncrementalSort: { IncrementalSort *incrementalSort = (IncrementalSort *) node; diff --git a/src/backend/cdb/cdbtargeteddispatch.c b/src/backend/cdb/cdbtargeteddispatch.c index b297164e273..e98810b9625 100644 --- a/src/backend/cdb/cdbtargeteddispatch.c +++ b/src/backend/cdb/cdbtargeteddispatch.c @@ -532,6 +532,7 @@ DirectDispatchUpdateContentIdsFromPlan(PlannerInfo *root, Plan *plan) * so disable */ break; case T_SplitUpdate: + case T_SplitMerge: break; case T_CustomScan: break; diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 33259c839af..ea04887fb8c 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1961,7 +1961,10 @@ ExplainNode(PlanState *planstate, List *ancestors, } break; case T_SplitUpdate: - pname = sname = "Split"; + pname = sname = "Split Update"; + break; + case T_SplitMerge: + pname = sname = "Split Merge"; break; case T_AssertOp: pname = sname = "Assert"; diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 6246a1395a0..99eb196974e 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -153,6 +153,9 @@ PrepareQuery(ParseState *pstate, PrepareStmt *stmt, case CMD_DELETE: srctag = T_DeleteStmt; break; + case CMD_MERGE: + srctag = T_MergeStmt; + break; default: ereport(ERROR, (errcode(ERRCODE_INVALID_PSTATEMENT_DEFINITION), diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 168a246c81b..1c0ff9088ba 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -88,6 +88,7 @@ OBJS += nodeMotion.o \ nodeSequence.o \ nodeAssertOp.o \ nodeSplitUpdate.o \ + nodeSplitMerge.o \ nodeTupleSplit.o \ nodePartitionSelector.o diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 3604ac6ed28..2ef23a8e09f 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1077,7 +1077,7 @@ standard_ExecutorRun(QueryDesc *queryDesc, */ if (IS_QD_OR_SINGLENODE() && (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE || - queryDesc->plannedstmt->hasModifyingCTE) && + operation == CMD_MERGE || queryDesc->plannedstmt->hasModifyingCTE) && ((es_processed > 0 || estate->es_processed > 0) || !queryDesc->plannedstmt->canSetTag)) { MaintainMaterializedViewStatus(queryDesc, operation); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 1cd50ffca21..3f98f99f267 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -137,6 +137,7 @@ #include "executor/nodeSequence.h" #include "executor/nodeShareInputScan.h" #include "executor/nodeSplitUpdate.h" +#include "executor/nodeSplitMerge.h" #include "executor/nodeTableFunction.h" #include "pg_trace.h" #include "tcop/tcopprot.h" @@ -512,6 +513,10 @@ ExecInitNode(Plan *node, EState *estate, int eflags) result = (PlanState *) ExecInitSplitUpdate((SplitUpdate *) node, estate, eflags); break; + case T_SplitMerge: + result = (PlanState *) ExecInitSplitMerge((SplitMerge *) node, + estate, eflags); + break; case T_AssertOp: result = (PlanState *) ExecInitAssertOp((AssertOp *) node, estate, eflags); @@ -1055,6 +1060,9 @@ ExecEndNode(PlanState *node) case T_SplitUpdateState: ExecEndSplitUpdate((SplitUpdateState *) node); break; + case T_SplitMergeState: + ExecEndSplitMerge((SplitMergeState *) node); + break; case T_AssertOpState: ExecEndAssertOp((AssertOpState *) node); break; diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 7e159c9a824..1c1f2442e66 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1511,6 +1511,7 @@ ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ModifyTableState *mtstate = context->mtstate; EState *estate = context->estate; TransitionCaptureState *ar_delete_trig_tcs; + Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; /* * If this delete is the result of a partition key update that moved the @@ -1539,6 +1540,14 @@ ExecDeleteEpilogue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, if (!RelationIsNonblockRelation(resultRelInfo->ri_RelationDesc) && !splitUpdate) ExecARDeleteTriggers(estate, resultRelInfo, tupleid, oldtuple, ar_delete_trig_tcs, changingPart); + + if (resultRelationDesc->rd_rel->relispartition) + { + + context->mtstate->mt_leaf_relids_deleted = + bms_add_member(context->mtstate->mt_leaf_relids_deleted, RelationGetRelid(resultRelationDesc)); + context->mtstate->has_leaf_changed = true; + } } /* ---------------------------------------------------------------- @@ -1872,14 +1881,6 @@ ExecDelete(ModifyTableContext *context, if (canSetTag) (estate->es_processed)++; - if (resultRelationDesc->rd_rel->relispartition) - { - - context->mtstate->mt_leaf_relids_deleted = - bms_add_member(context->mtstate->mt_leaf_relids_deleted, RelationGetRelid(resultRelationDesc)); - context->mtstate->has_leaf_changed = true; - } - /* Tell caller that the delete actually happened. */ if (tupleDeleted) *tupleDeleted = true; @@ -2354,6 +2355,7 @@ ExecUpdateEpilogue(ModifyTableContext *context, UpdateContext *updateCxt, { ModifyTableState *mtstate = context->mtstate; List *recheckIndexes = NIL; + Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; /* insert index entries for tuple if necessary */ if (resultRelInfo->ri_NumIndices > 0 && (updateCxt->updateIndexes != TU_None)) @@ -2388,6 +2390,13 @@ ExecUpdateEpilogue(ModifyTableContext *context, UpdateContext *updateCxt, if (resultRelInfo->ri_WithCheckOptions != NIL) ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, context->estate); + + if (resultRelationDesc->rd_rel->relispartition) + { + context->mtstate->mt_leaf_relids_updated = + bms_add_member(context->mtstate->mt_leaf_relids_updated, RelationGetRelid(resultRelationDesc)); + context->mtstate->has_leaf_changed = true; + } } /* @@ -2726,13 +2735,6 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, if (canSetTag) (estate->es_processed)++; - if (resultRelationDesc->rd_rel->relispartition) - { - context->mtstate->mt_leaf_relids_updated = - bms_add_member(context->mtstate->mt_leaf_relids_updated, RelationGetRelid(resultRelationDesc)); - context->mtstate->has_leaf_changed = true; - } - ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, tupleid, oldtuple, slot); diff --git a/src/backend/executor/nodeSplitMerge.c b/src/backend/executor/nodeSplitMerge.c new file mode 100644 index 00000000000..2b0bc158b55 --- /dev/null +++ b/src/backend/executor/nodeSplitMerge.c @@ -0,0 +1,499 @@ +/*------------------------------------------------------------------------- + * + * nodeSplitMerge.c + * Implementation of nodeSplitMerge. + * + * Portions Copyright (c) 2012, EMC Corp. + * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. + * + * + * IDENTIFICATION + * src/backend/executor/nodeSplitMerge.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" +#include "miscadmin.h" + +#include "access/tableam.h" +#include "cdb/cdbhash.h" +#include "cdb/cdbutil.h" +#include "commands/tablecmds.h" +#include "executor/instrument.h" +#include "executor/nodeSplitMerge.h" + +#include "utils/memutils.h" + + +typedef struct MTTargetRelLookup +{ + Oid relationOid; /* hash key, must be first */ + int relationIndex; /* rel's index in resultRelInfo[] array */ +} MTTargetRelLookup; + + +/* + * Evaluate the hash keys, and compute the target segment ID for the new row. + */ +static uint32 +evalHashKey(SplitMergeState *node, Datum *values, bool *isnulls) +{ + SplitMerge *plannode = (SplitMerge *) node->ps.plan; + ExprContext *econtext = node->ps.ps_ExprContext; + MemoryContext oldContext; + unsigned int target_seg; + CdbHash *h = node->cdbhash; + + ResetExprContext(econtext); + + oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + cdbhashinit(h); + + for (int i = 0; i < plannode->numHashAttrs; i++) + { + AttrNumber keyattno = plannode->hashAttnos[i]; + + /* + * Compute the hash function + */ + cdbhash(h, i + 1, values[keyattno - 1], isnulls[keyattno - 1]); + } + target_seg = cdbhashreduce(h); + + MemoryContextSwitchTo(oldContext); + + return target_seg; +} + + + +static TupleTableSlot * +MergeTupleTableSlot(TupleTableSlot *slot, SplitMerge *plannode, SplitMergeState *node, ResultRelInfo *resultRelInfo) +{ + ExprContext *econtext = node->ps.ps_ExprContext; + + List *actionStates = NIL; + ListCell *l; + TupleTableSlot *newslot = NULL; + + /* + * For INSERT actions, the root relation's merge action is OK since the + * INSERT's targetlist and the WHEN conditions can only refer to the + * source relation and hence it does not matter which result relation we + * work with. + * + * XXX does this mean that we can avoid creating copies of actionStates on + * partitioned tables, for not-matched actions? + */ + actionStates = resultRelInfo->ri_notMatchedMergeAction; + + /* + * Make source tuple available to ExecQual and ExecProject. We don't need + * the target tuple, since the WHEN quals and targetlist can't refer to + * the target columns. + */ + econtext->ecxt_scantuple = NULL; + econtext->ecxt_innertuple = slot; + econtext->ecxt_outertuple = NULL; + + foreach(l, actionStates) + { + MergeActionState *action = (MergeActionState *) lfirst(l); + CmdType commandType = action->mas_action->commandType; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(action->mas_whenqual, econtext)) + continue; + + /* Perform stated action */ + switch (commandType) + { + case CMD_INSERT: + + /* + * Project the tuple. In case of a partitioned table, the + * projection was already built to use the root's descriptor, + * so we don't need to map the tuple here. + */ + newslot = ExecProject(action->mas_proj); + + break; + case CMD_NOTHING: + /* Do nothing */ + break; + default: + elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause"); + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } + + if (newslot) + { + /* Compute segment ID for the new row */ + int32 target_seg; + + target_seg = evalHashKey(node, newslot->tts_values, newslot->tts_isnull); + + slot->tts_values[node->segid_attno - 1] = Int32GetDatum(target_seg); + slot->tts_isnull[node->segid_attno - 1] = false; + } + else + { + /* + * No newslot generated means that insert action will not be triggered. + * So we just redistributed tuple to any segment, like segment 0. + */ + slot->tts_values[node->segid_attno - 1] = Int32GetDatum(0); + slot->tts_isnull[node->segid_attno - 1] = false; + } + + return slot; +} + +/* + * ExecLookupResultRelByOid + * If the table with given OID is among the result relations to be + * updated by the given ModifyTable node, return its ResultRelInfo. + * + * If not found, return NULL if missing_ok, else raise error. + * + * If update_cache is true, then upon successful lookup, update the node's + * one-element cache. ONLY ExecModifyTable may pass true for this. + */ +static ResultRelInfo * +MergeExecLookupResultRelByOid(SplitMergeState *node, Oid resultoid, + bool missing_ok, bool update_cache) +{ + if (node->mt_resultOidHash) + { + /* Use the pre-built hash table to locate the rel */ + MTTargetRelLookup *mtlookup; + + mtlookup = (MTTargetRelLookup *) + hash_search(node->mt_resultOidHash, &resultoid, HASH_FIND, NULL); + if (mtlookup) + { + if (update_cache) + { + node->mt_lastResultOid = resultoid; + node->mt_lastResultIndex = mtlookup->relationIndex; + } + return node->resultRelInfo + mtlookup->relationIndex; + } + } + else + { + /* With few target rels, just search the ResultRelInfo array */ + for (int ndx = 0; ndx < node->nrel; ndx++) + { + ResultRelInfo *rInfo = node->resultRelInfo + ndx; + + if (RelationGetRelid(rInfo->ri_RelationDesc) == resultoid) + { + if (update_cache) + { + node->mt_lastResultOid = resultoid; + node->mt_lastResultIndex = ndx; + } + return rInfo; + } + } + } + + if (!missing_ok) + elog(ERROR, "incorrect result relation OID %u", resultoid); + return NULL; +} + +/** + * Splits every TupleTableSlot into two TupleTableSlots: DELETE and INSERT. + */ +static TupleTableSlot * +ExecSplitMerge(PlanState *pstate) +{ + SplitMergeState *node = castNode(SplitMergeState, pstate); + PlanState *outerNode = outerPlanState(node); + SplitMerge *plannode = (SplitMerge *) node->ps.plan; + ResultRelInfo *resultRelInfo = node->resultRelInfo + node->mt_lastResultIndex; + Datum datum; + bool isNull; + Oid resultoid; + + + TupleTableSlot *slot = NULL; + TupleTableSlot *result = NULL; + + Assert(outerNode != NULL); + + slot = ExecProcNode(outerNode); + + if (TupIsNull(slot)) + { + return NULL; + } + + datum = ExecGetJunkAttribute(slot, resultRelInfo->ri_RowIdAttNo, &isNull); + + /* ctid is NULL means that not matched, then check the insert action */ + if (isNull) + result = MergeTupleTableSlot(slot, plannode, node, resultRelInfo); + else + { + /* if partion table must switch resultRelInfo */ + if (AttributeNumberIsValid(node->mt_resultOidAttno)) + { + datum = ExecGetJunkAttribute(slot, node->mt_resultOidAttno, &isNull); + Assert(!isNull); + resultoid = DatumGetObjectId(datum); + if (resultoid != node->mt_lastResultOid) + resultRelInfo = MergeExecLookupResultRelByOid(node, resultoid, + false, true); + } + result = slot; + } + + return result; +} + + + + +/* + * Initializes the tuple slots in a ResultRelInfo for any MERGE action. + * + * We mark 'projectNewInfoValid' even though the projections themselves + * are not initialized here. + */ +static void +ExecInitMergeTupleSlots(SplitMergeState *mtstate, + ResultRelInfo *resultRelInfo) +{ + EState *estate = mtstate->ps.state; + + Assert(!resultRelInfo->ri_projectNewInfoValid); + + resultRelInfo->ri_oldTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_newTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_projectNewInfoValid = true; +} +/* + * Init SplitMerge Node. A memory context is created to hold Split Tuples. + * */ +SplitMergeState* +ExecInitSplitMerge(SplitMerge *node, EState *estate, int eflags) +{ + SplitMergeState *splitmergestate; + ResultRelInfo *resultRelInfo; + ExprContext *econtext; + ListCell *lc; + int i; + + + /* Check for unsupported flags */ + Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK | EXEC_FLAG_REWIND))); + + splitmergestate = makeNode(SplitMergeState); + splitmergestate->ps.plan = (Plan *)node; + splitmergestate->ps.state = estate; + splitmergestate->ps.ExecProcNode = ExecSplitMerge; + + /* + * then initialize outer plan + */ + Plan *outerPlan = outerPlan(node); + outerPlanState(splitmergestate) = ExecInitNode(outerPlan, estate, eflags); + + + ExecAssignExprContext(estate, &splitmergestate->ps); + econtext = splitmergestate->ps.ps_ExprContext; + + splitmergestate->nrel = list_length(node->resultRelations); + splitmergestate->resultRelInfo = (ResultRelInfo *)palloc(splitmergestate->nrel * sizeof(ResultRelInfo)); + + resultRelInfo = splitmergestate->resultRelInfo; + i = 0; + foreach(lc, node->resultRelations) + { + Index resultRelation = lfirst_int(lc); + + ExecInitResultRelation(estate, resultRelInfo, resultRelation); + + resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(outerPlan->targetlist, "ctid"); + if (!AttributeNumberIsValid(resultRelInfo->ri_RowIdAttNo)) + elog(ERROR, "could not find junk ctid column"); + + resultRelInfo++; + i++; + } + + splitmergestate->mt_lastResultIndex = 0; + splitmergestate->mt_lastResultOid = InvalidOid; + + + i = 0; + foreach(lc, node->mergeActionLists) + { + List *mergeActionList = lfirst(lc); + TupleDesc relationDesc; + ListCell *l; + + resultRelInfo = splitmergestate->resultRelInfo + i; + i++; + relationDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + + /* initialize slots for MERGE fetches from this rel */ + if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) + ExecInitMergeTupleSlots(splitmergestate, resultRelInfo); + + foreach(l, mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + MergeActionState *action_state; + TupleTableSlot *tgtslot; + TupleDesc tgtdesc; + List **list; + + /* + * Build action merge state for this rel. (For partitions, + * equivalent code exists in ExecInitPartitionInfo.) + */ + action_state = makeNode(MergeActionState); + action_state->mas_action = action; + action_state->mas_whenqual = ExecInitQual((List *) action->qual, + &splitmergestate->ps); + + /* + * We create two lists - one for WHEN MATCHED actions and one for + * WHEN NOT MATCHED actions - and stick the MergeActionState into + * the appropriate list. + */ + if (action_state->mas_action->matched) + list = &resultRelInfo->ri_matchedMergeAction; + else + list = &resultRelInfo->ri_notMatchedMergeAction; + *list = lappend(*list, action_state); + + switch (action->commandType) + { + case CMD_INSERT: + + /* + * If the MERGE targets a partitioned table, any INSERT + * actions must be routed through it, not the child + * relations. Initialize the routing struct and the root + * table's "new" tuple slot for that, if not already done. + * The projection we prepare, for all relations, uses the + * root relation descriptor, and targets the plan's root + * slot. (This is consistent with the fact that we + * checked the plan output to match the root relation, + * above.) + */ + /* not partitioned? use the stock relation and slot */ + tgtslot = resultRelInfo->ri_newTupleSlot; + tgtdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + + action_state->mas_proj = + ExecBuildProjectionInfo(action->targetList, econtext, + tgtslot, + &splitmergestate->ps, + tgtdesc); + break; + case CMD_UPDATE: + case CMD_DELETE: + case CMD_NOTHING: + break; + default: + elog(ERROR, "unknown action in MERGE WHEN clause"); + break; + } + } + } + + /* + * Look up the positions of the gp_segment_id in the subplan's target + * list, and in the result. + */ + splitmergestate->segid_attno = + ExecFindJunkAttributeInTlist(outerPlan->targetlist, "gp_segment_id"); + + splitmergestate->mt_resultOidAttno = + ExecFindJunkAttributeInTlist(outerPlan->targetlist, "tableoid"); + + Assert(AttributeNumberIsValid(splitmergestate->mt_resultOidAttno) || splitmergestate->nrel == 1); + + /* + * DML nodes do not project. + */ + ExecInitResultTupleSlotTL(&splitmergestate->ps, &TTSOpsVirtual); + splitmergestate->ps.ps_ProjInfo = NULL; + + /* + * Initialize for computing hash key + */ + if (node->numHashAttrs > 0) + { + splitmergestate->cdbhash = makeCdbHash(node->numHashSegments, + node->numHashAttrs, + node->hashFuncs); + } + + if (estate->es_instrument && (estate->es_instrument & INSTRUMENT_CDB)) + { + splitmergestate->ps.cdbexplainbuf = makeStringInfo(); + } + + return splitmergestate; +} + +/* Release Resources Requested by SplitMerge node. */ +void +ExecEndSplitMerge(SplitMergeState *node) +{ + + for (int i = 0; i < node->nrel; i++) + { + ResultRelInfo *resultRelInfo = node->resultRelInfo + i; + /* + * Cleanup the initialized batch slots. This only matters for FDWs + * with batching, but the other cases will have ri_NumSlotsInitialized + * == 0. + */ + for (int j = 0; j < resultRelInfo->ri_NumSlotsInitialized; j++) + { + ExecDropSingleTupleTableSlot(resultRelInfo->ri_Slots[j]); + ExecDropSingleTupleTableSlot(resultRelInfo->ri_PlanSlots[j]); + } + } + + /* + * Free the exprcontext + */ + ExecFreeExprContext(&node->ps); + + + /* + * clean out the tuple table + */ + if (node->ps.ps_ResultTupleSlot) + ExecClearTuple(node->ps.ps_ResultTupleSlot); + ExecEndNode(outerPlanState(node)); +} + diff --git a/src/backend/nodes/copyfuncs.funcs.c b/src/backend/nodes/copyfuncs.funcs.c index 4a19894adfe..ce658f7eb29 100644 --- a/src/backend/nodes/copyfuncs.funcs.c +++ b/src/backend/nodes/copyfuncs.funcs.c @@ -6445,6 +6445,40 @@ _copySplitUpdate(const SplitUpdate *from) return newnode; } +static SplitMerge * +_copySplitMerge(const SplitMerge *from) +{ + SplitMerge *newnode = makeNode(SplitMerge); + + COPY_SCALAR_FIELD(plan.startup_cost); + COPY_SCALAR_FIELD(plan.total_cost); + COPY_SCALAR_FIELD(plan.plan_rows); + COPY_SCALAR_FIELD(plan.plan_width); + COPY_SCALAR_FIELD(plan.parallel_aware); + COPY_SCALAR_FIELD(plan.parallel_safe); + COPY_SCALAR_FIELD(plan.async_capable); + COPY_SCALAR_FIELD(plan.plan_node_id); + COPY_NODE_FIELD(plan.targetlist); + COPY_NODE_FIELD(plan.qual); + COPY_NODE_FIELD(plan.lefttree); + COPY_NODE_FIELD(plan.righttree); + COPY_NODE_FIELD(plan.initPlan); + COPY_BITMAPSET_FIELD(plan.extParam); + COPY_BITMAPSET_FIELD(plan.allParam); + COPY_NODE_FIELD(plan.flow); + COPY_SCALAR_FIELD(plan.locustype); + COPY_SCALAR_FIELD(plan.parallel); + COPY_SCALAR_FIELD(plan.operatorMemKB); + COPY_SCALAR_FIELD(numHashAttrs); + COPY_POINTER_FIELD(hashAttnos, from->numHashAttrs * sizeof(AttrNumber)); + COPY_POINTER_FIELD(hashFuncs, from->numHashAttrs * sizeof(Oid)); + COPY_SCALAR_FIELD(numHashSegments); + COPY_NODE_FIELD(resultRelations); + COPY_NODE_FIELD(mergeActionLists); + + return newnode; +} + static AssertOp * _copyAssertOp(const AssertOp *from) { diff --git a/src/backend/nodes/copyfuncs.switch.c b/src/backend/nodes/copyfuncs.switch.c index b6b365432dc..1006a41b9c0 100644 --- a/src/backend/nodes/copyfuncs.switch.c +++ b/src/backend/nodes/copyfuncs.switch.c @@ -1122,6 +1122,9 @@ case T_SplitUpdate: retval = _copySplitUpdate(from); break; + case T_SplitMerge: + retval = _copySplitMerge(from); + break; case T_AssertOp: retval = _copyAssertOp(from); break; diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl old mode 100644 new mode 100755 diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c index 11bf27c08c6..02a6416264a 100644 --- a/src/backend/nodes/outfast.c +++ b/src/backend/nodes/outfast.c @@ -1115,6 +1115,9 @@ _outNode(StringInfo str, void *obj) case T_SplitUpdate: _outSplitUpdate(str, obj); break; + case T_SplitMerge: + _outSplitMerge(str, obj); + break; case T_AssertOp: _outAssertOp(str, obj); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 2e8e4d7e2b8..dce78b62d22 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -2925,6 +2925,7 @@ _outPlaceHolderVar(StringInfo str, const PlaceHolderVar *node) WRITE_NODE_FIELD(phexpr); WRITE_BITMAPSET_FIELD(phrels); + WRITE_BITMAPSET_FIELD(phnullingrels); WRITE_UINT_FIELD(phid); WRITE_UINT_FIELD(phlevelsup); } @@ -4648,6 +4649,9 @@ outNode(StringInfo str, const void *obj) case T_SplitUpdate: _outSplitUpdate(str, obj); break; + case T_SplitMerge: + _outSplitMerge(str, obj); + break; case T_AssertOp: _outAssertOp(str, obj); break; diff --git a/src/backend/nodes/outfuncs_common.c b/src/backend/nodes/outfuncs_common.c index 4616bfbc74c..818f21912c7 100644 --- a/src/backend/nodes/outfuncs_common.c +++ b/src/backend/nodes/outfuncs_common.c @@ -448,6 +448,24 @@ _outSplitUpdate(StringInfo str, const SplitUpdate *node) _outPlanInfo(str, (Plan *) node); } +/* + * _outSplitMerge + */ +static void +_outSplitMerge(StringInfo str, const SplitMerge *node) +{ + WRITE_NODE_TYPE("SplitMerge"); + + WRITE_INT_FIELD(numHashSegments); + WRITE_INT_FIELD(numHashAttrs); + WRITE_ATTRNUMBER_ARRAY(hashAttnos, node->numHashAttrs); + WRITE_OID_ARRAY(hashFuncs, node->numHashAttrs); + WRITE_NODE_FIELD(resultRelations); + WRITE_NODE_FIELD(mergeActionLists); + + _outPlanInfo(str, (Plan *) node); +} + /* * _outAssertOp */ diff --git a/src/backend/nodes/print.c b/src/backend/nodes/print.c index a0a5421e2d5..eebc325a378 100644 --- a/src/backend/nodes/print.c +++ b/src/backend/nodes/print.c @@ -582,6 +582,8 @@ plannode_type(Plan *p) return "FOREIGNSCAN"; case T_SplitUpdate: return "SPLITUPDATE"; + case T_SplitMerge: + return "SPLITMERGE"; case T_Gather: return "GATHER"; case T_GatherMerge: diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c index 36213f1862d..e12d90ac7df 100644 --- a/src/backend/nodes/readfast.c +++ b/src/backend/nodes/readfast.c @@ -1151,6 +1151,42 @@ _readSplitUpdate(void) READ_DONE(); } +/* + * _readSplitUpdate + */ +static SplitMerge * +_readSplitMerge(void) +{ + READ_LOCALS(SplitMerge); + + READ_INT_FIELD(numHashSegments); + READ_INT_FIELD(numHashAttrs); + READ_ATTRNUMBER_ARRAY(hashAttnos, local_node->numHashAttrs); + READ_OID_ARRAY(hashFuncs, local_node->numHashAttrs); + + READ_NODE_FIELD(resultRelations); + READ_NODE_FIELD(mergeActionLists); + + ReadCommonPlan(&local_node->plan); + + READ_DONE(); +} + + +static PlaceHolderVar * +_readPlaceHolderVar(void) +{ + READ_LOCALS(PlaceHolderVar); + + READ_NODE_FIELD(phexpr); + READ_BITMAPSET_FIELD(phrels); + READ_BITMAPSET_FIELD(phnullingrels); + READ_UINT_FIELD(phid); + READ_UINT_FIELD(phlevelsup); + + READ_DONE(); +} + /* * _readAssertOp */ @@ -2142,6 +2178,9 @@ readNodeBinary(void) case T_SplitUpdate: return_value = _readSplitUpdate(); break; + case T_SplitMerge: + return_value = _readSplitMerge(); + break; case T_AssertOp: return_value = _readAssertOp(); break; @@ -2991,6 +3030,9 @@ readNodeBinary(void) case T_JsonFormat: return_value = _readJsonFormat(); break; + case T_PlaceHolderVar: + return_value = _readPlaceHolderVar(); + break; default: return_value = NULL; /* keep the compiler silent */ elog(ERROR, "could not deserialize unrecognized node type: %d", diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index af918fb9640..d80e699750b 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -131,6 +131,7 @@ static Plan *create_unique_plan(PlannerInfo *root, UniquePath *best_path, int flags); static Plan *create_motion_plan(PlannerInfo *root, CdbMotionPath *path); static Plan *create_splitupdate_plan(PlannerInfo *root, SplitUpdatePath *path); +static Plan *create_splitmerge_plan(PlannerInfo *root, SplitMergePath *path); static Gather *create_gather_plan(PlannerInfo *root, GatherPath *best_path); static Plan *create_projection_plan(PlannerInfo *root, ProjectionPath *best_path, @@ -617,6 +618,9 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags) case T_SplitUpdate: plan = create_splitupdate_plan(root, (SplitUpdatePath *) best_path); break; + case T_SplitMerge: + plan = create_splitmerge_plan(root, (SplitMergePath *) best_path); + break; default: elog(ERROR, "unrecognized node type: %d", (int) best_path->pathtype); @@ -3649,6 +3653,104 @@ create_splitupdate_plan(PlannerInfo *root, SplitUpdatePath *path) return (Plan *) splitupdate; } +/* + * create_splitmerge_plan + */ +static Plan * +create_splitmerge_plan(PlannerInfo *root, SplitMergePath *path) +{ + Path *subpath = path->subpath; + Plan *subplan; + SplitMerge *splitmerge; + Relation resultRel; + TupleDesc resultDesc; + GpPolicy *cdbpolicy; + ListCell *lc; + int lastresno; + Oid *hashFuncs; + int i; + + // + RelOptInfo *relOptInfo = root->simple_rel_array[linitial_int(path->resultRelations)]; + Assert(relOptInfo); + while (relOptInfo->reloptkind == RELOPT_OTHER_MEMBER_REL) + { + Assert(relOptInfo->top_parent_relids); + + i = bms_next_member(relOptInfo->top_parent_relids, -1); + + Assert(i >= 0 && i < root->simple_rel_array_size); + Assert(root->simple_rel_array[i] != NULL); + + relOptInfo = root->simple_rel_array[i]; + } + Assert(relOptInfo->reloptkind == RELOPT_BASEREL); + resultRel = relation_open(planner_rt_fetch(relOptInfo->relid, root)->relid, NoLock); + resultDesc = RelationGetDescr(resultRel); + cdbpolicy = resultRel->rd_cdbpolicy; + + subplan = create_plan_recurse(root, subpath, CP_EXACT_TLIST); + + /* Transfer resname/resjunk labeling, too, to keep executor happy */ + apply_tlist_labeling(subplan->targetlist, root->processed_tlist); + + splitmerge = makeNode(SplitMerge); + + splitmerge->plan.targetlist = NIL; /* filled in below */ + splitmerge->plan.qual = NIL; + splitmerge->plan.lefttree = subplan; + splitmerge->plan.righttree = NULL; + + copy_generic_path_info(&splitmerge->plan, (Path *) path); + + lc = list_head(subplan->targetlist); + lastresno = 0; + + /* Copy all attributes. */ + for (; lc != NULL; lc = lnext(subplan->targetlist, lc)) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + TargetEntry *newtle; + + newtle = makeTargetEntry(tle->expr, + ++lastresno, + tle->resname, + tle->resjunk); + splitmerge->plan.targetlist = lappend(splitmerge->plan.targetlist, newtle); + } + + /* Look up the right hash functions for the hash expressions */ + hashFuncs = palloc(cdbpolicy->nattrs * sizeof(Oid)); + for (i = 0; i < cdbpolicy->nattrs; i++) + { + AttrNumber attnum = cdbpolicy->attrs[i]; + Oid typeoid = resultDesc->attrs[attnum - 1].atttypid; + Oid opfamily; + + opfamily = get_opclass_family(cdbpolicy->opclasses[i]); + + hashFuncs[i] = cdb_hashproc_in_opfamily(opfamily, typeoid); + } + splitmerge->numHashAttrs = cdbpolicy->nattrs; + splitmerge->hashAttnos = palloc(cdbpolicy->nattrs * sizeof(AttrNumber)); + memcpy(splitmerge->hashAttnos, cdbpolicy->attrs, cdbpolicy->nattrs * sizeof(AttrNumber)); + splitmerge->hashFuncs = hashFuncs; + splitmerge->numHashSegments = cdbpolicy->numsegments; + + relation_close(resultRel, NoLock); + + /* + * A SplitMerge also computes the target segment ID, based on other columns, + * so we treat it the same as a Motion node for this purpose. + */ + root->numMotions++; + + splitmerge->mergeActionLists = path->mergeActionLists; + splitmerge->resultRelations = path->resultRelations; + + return (Plan *) splitmerge; +} + /***************************************************************************** * diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 0bf59d7c756..268115f948c 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1614,6 +1614,10 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) Assert(plan->qual == NIL); set_splitupdate_tlist_references(plan, rtoffset); break; + case T_SplitMerge: + /* mergeActionLists will be process in T_ModifyTable */ + set_dummy_tlist_references(plan, rtoffset); + break; default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(plan)); diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index c7fbe634fae..52a8eba4103 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -3302,6 +3302,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, case T_Unique: case T_SetOp: case T_SplitUpdate: + case T_SplitMerge: case T_TupleSplit: /* no node-type-specific fields need fixing */ break; diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index d352e665831..ca2dd95f284 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -141,6 +141,20 @@ preprocess_targetlist(PlannerInfo *root) tlist = expand_insert_targetlist(root, tlist, target_relation, result_relation); } } + else if (command_type == CMD_MERGE) + { + /* update distributed column in merge is not supported now */ + foreach(lc, parse->mergeActionList) + { + MergeAction *action = lfirst(lc); + if(action->commandType == CMD_UPDATE) + { + if(check_splitupdate(action->targetList, result_relation, target_relation)) + ereport(ERROR, (errcode(ERRCODE_GP_FEATURE_NOT_YET), + errmsg("cannot update column in merge with distributed column"))); + } + } + } /* * For non-inherited UPDATE/DELETE/MERGE, register any junk column(s) diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 7b0e25e87d5..e9803f0fd60 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -85,8 +85,8 @@ static bool set_append_path_locus(PlannerInfo *root, Path *pathnode, RelOptInfo List *pathkeys, int parallel_workers, bool parallel_aware); static CdbPathLocus adjust_modifytable_subpath(PlannerInfo *root, CmdType operation, - int resultRelationRTI, Path **pSubpath, - bool splitUpdate); + List *resultRelationRTI, Path **pSubpath, + bool splitUpdate, List *mergeActionLists); /***************************************************************************** * MISC. PATH UTILITIES @@ -5922,9 +5922,10 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, if (Gp_role == GP_ROLE_DISPATCH) pathnode->path.locus = adjust_modifytable_subpath(root, operation, - linitial_int(resultRelations), + resultRelations, &subpath, /* IN-OUT argument */ - splitUpdate); + splitUpdate, + mergeActionLists); else { /* don't allow split updates in utility mode. */ @@ -5999,8 +6000,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, */ static CdbPathLocus adjust_modifytable_subpath(PlannerInfo *root, CmdType operation, - int resultRelationRTI, Path **pSubpath, - bool splitUpdate) + List *resultRelations, Path **pSubpath, + bool splitUpdate, List *mergeActionLists) { /* * The input plans must be distributed correctly. @@ -6011,7 +6012,7 @@ adjust_modifytable_subpath(PlannerInfo *root, CmdType operation, { - int rti = resultRelationRTI; + int rti = linitial_int(resultRelations); Path *subpath = *pSubpath; RangeTblEntry *rte = rt_fetch(rti, root->parse->rtable); GpPolicy *targetPolicy; @@ -6056,6 +6057,10 @@ adjust_modifytable_subpath(PlannerInfo *root, CmdType operation, else subpath = create_motion_path_for_upddel(root, rti, targetPolicy, subpath); } + else if(operation == CMD_MERGE) + { + subpath = create_motion_path_for_merge(root, resultRelations, targetPolicy, mergeActionLists, subpath); + } *pSubpath = subpath; } diff --git a/src/backend/optimizer/util/walkers.c b/src/backend/optimizer/util/walkers.c index 93834cb32f7..aed5cf15916 100644 --- a/src/backend/optimizer/util/walkers.c +++ b/src/backend/optimizer/util/walkers.c @@ -529,6 +529,8 @@ plan_tree_walker(Node *node, return true; if (walker((Node *) ((ModifyTable *) node)->returningLists, context)) return true; + if (walker((Node *) ((ModifyTable *) node)->mergeActionLists, context)) + return true; break; @@ -538,6 +540,7 @@ plan_tree_walker(Node *node, break; case T_SplitUpdate: + case T_SplitMerge: case T_AssertOp: if (walk_plan_node_fields((Plan *) node, walker, context)) return true; diff --git a/src/include/cdb/cdbpath.h b/src/include/cdb/cdbpath.h index 1f4d64956cf..e564b902ed9 100644 --- a/src/include/cdb/cdbpath.h +++ b/src/include/cdb/cdbpath.h @@ -44,6 +44,7 @@ extern Path *create_motion_path_for_ctas(PlannerInfo *root, GpPolicy *policy, Pa extern Path *create_motion_path_for_insert(PlannerInfo *root, GpPolicy *targetPolicy, Path *subpath); extern Path *create_motion_path_for_upddel(PlannerInfo *root, Index rti, GpPolicy *targetPolicy, Path *subpath); extern Path *create_split_update_path(PlannerInfo *root, Index rti, GpPolicy *targetPolicy, Path *subpath); +extern Path *create_motion_path_for_merge(PlannerInfo *root, List* resultRelations, GpPolicy *policy, List *mergeActionLists, Path *subpath); extern CdbPathLocus cdbpath_motion_for_join(PlannerInfo *root, diff --git a/src/include/executor/nodeSplitMerge.h b/src/include/executor/nodeSplitMerge.h new file mode 100644 index 00000000000..c0dbbe86487 --- /dev/null +++ b/src/include/executor/nodeSplitMerge.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * nodeSplitMerge.h + * Prototypes for nodeSplitMerge. + * + * Portions Copyright (c) 2012, EMC Corp. + * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. + * + * + * IDENTIFICATION + * src/include/executor/nodeSplitMerge.h + * + *------------------------------------------------------------------------- + */ + +#ifndef NODESplitMerge_H +#define NODESplitMerge_H + +extern SplitMergeState* ExecInitSplitMerge(SplitMerge *node, EState *estate, int eflags); +extern void ExecEndSplitMerge(SplitMergeState *node); + +#endif /* NODESplitMerge_H */ + diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index d2df2b8f85a..af22510884f 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -3381,6 +3381,34 @@ typedef struct SplitUpdateState } SplitUpdateState; + +/* + * ExecNode for Split. + * This operator contains a Plannode in PlanState. + * The Plannode contains indexes to the ctid, insert, delete, resjunk columns + * needed for adding the action (Insert/Delete). + * A MemoryContext and TupleTableSlot are maintained to keep the INSERT + * tuple until requested. + */ +typedef struct SplitMergeState +{ + PlanState ps; + + AttrNumber segid_attno; /* attribute number of "gp_segment_id" in target list */ + + struct CdbHash *cdbhash; /* hash api object */ + + ResultRelInfo *resultRelInfo; + + int mt_lastResultIndex; + int mt_lastResultOid; + HTAB *mt_resultOidHash; /* optional hash table to speed lookups */ + int nrel; + + AttrNumber mt_resultOidAttno; + +} SplitMergeState; + /* * ExecNode for AssertOp. * This operator contains a Plannode that contains the expressions diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 160e4052c7a..a6aceb533ec 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -115,6 +115,7 @@ typedef enum NodeTag T_Motion, T_ShareInputScan, T_SplitUpdate, + T_SplitMerge, T_AssertOp, T_PartitionSelector, T_Plan_End, @@ -194,6 +195,7 @@ typedef enum NodeTag T_MotionState, T_ShareInputScanState, T_SplitUpdateState, + T_SplitMergeState, T_AssertOpState, T_PartitionSelectorState, @@ -373,6 +375,7 @@ typedef enum NodeTag T_CdbMotionPath = 580, T_PartitionSelectorPath, T_SplitUpdatePath, + T_SplitMergePath, T_CdbRelColumnInfo, T_DistributionKey, diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 7ffcbd681fe..fab66b708a1 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -2799,6 +2799,15 @@ typedef struct SplitUpdatePath Index resultRelation; } SplitUpdatePath; +typedef struct SplitMergePath +{ + Path path; + Path *subpath; + List *resultRelations; + List *mergeActionLists; /* per-target-table lists of actions for + * MERGE */ +} SplitMergePath; + /* * ModifyTablePath represents performing INSERT/UPDATE/DELETE/MERGE * diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index e09c3a0a301..0f7ffd45c93 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -1896,6 +1896,24 @@ typedef struct SplitUpdate int numHashSegments; /* # of segs to use in hash computation */ } SplitUpdate; + +/* + * SplitMerge Node + * + */ +typedef struct SplitMerge +{ + Plan plan; + List* resultRelations; + int numHashAttrs; + AttrNumber *hashAttnos pg_node_attr(array_size(numHashAttrs)); + Oid *hashFuncs pg_node_attr(array_size(numHashAttrs)); /* corresponding hash functions */ + int numHashSegments; /* # of segs to use in hash computation */ + + List *mergeActionLists; /* per-target-table lists of actions for + * MERGE */ +} SplitMerge; + /* * AssertOp Node * diff --git a/src/pl/plperl/ppport.h b/src/pl/plperl/ppport.h index 16d001a81da..762dd362b35 100644 --- a/src/pl/plperl/ppport.h +++ b/src/pl/plperl/ppport.h @@ -12160,11 +12160,7 @@ DPPP_(my_newCONSTSUB)(HV *stash, const char *name, SV *sv) STMT_START { \ ASSUME(!"UNREACHABLE"); __builtin_unreachable(); \ } STMT_END -<<<<<<< HEAD -# elif ! defined(__GNUC__) && (defined(__sun) || defined(__hpux)) -======= # elif ! defined(__GNUC__) && defined(__sun) ->>>>>>> REL_16_9 # define NOT_REACHED # else # define NOT_REACHED ASSUME(!"UNREACHABLE") diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 5d1be9f6b24..d6568e4b271 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -25,6 +25,7 @@ SELECT t.ctid is not null as matched, t.*, s.* FROM source s FULL OUTER JOIN tar ALTER TABLE target OWNER TO regress_merge_privs; ALTER TABLE source OWNER TO regress_merge_privs; +GRANT ALL ON SCHEMA public to regress_merge_privs; CREATE TABLE target2 (tid integer, balance integer) WITH (autovacuum_enabled=off); CREATE TABLE source2 (sid integer, delta integer) @@ -42,15 +43,12 @@ WHEN MATCHED THEN QUERY PLAN ---------------------------------------- Merge on target t - -> Merge Join - Merge Cond: (t.tid = s.sid) - -> Sort - Sort Key: t.tid - -> Seq Scan on target t - -> Sort - Sort Key: s.sid + -> Hash Join + Hash Cond: (t.tid = s.sid) + -> Seq Scan on target t + -> Hash -> Seq Scan on source s -(9 rows) +(7 rows) -- -- Errors @@ -298,7 +296,7 @@ WHEN MATCHED THEN -> Seq Scan on source s -> Hash -> Seq Scan on target t -(6 rows) +(7 rows) EXPLAIN (COSTS OFF) MERGE INTO target t @@ -314,7 +312,7 @@ WHEN MATCHED THEN -> Seq Scan on source s -> Hash -> Seq Scan on target t -(6 rows) +(7 rows) EXPLAIN (COSTS OFF) MERGE INTO target t @@ -325,12 +323,14 @@ WHEN NOT MATCHED THEN QUERY PLAN ---------------------------------------- Merge on target t - -> Hash Left Join - Hash Cond: (s.sid = t.tid) - -> Seq Scan on source s - -> Hash - -> Seq Scan on target t -(6 rows) + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) + -> Split Merge + -> Hash Left Join + Hash Cond: (s.sid = t.tid) + -> Seq Scan on source s + -> Hash + -> Seq Scan on target t +(9 rows) DELETE FROM target WHERE tid > 100; ANALYZE target; @@ -882,12 +882,12 @@ BEGIN END IF; END; $$; -CREATE TRIGGER merge_bsi BEFORE INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_bsu BEFORE UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_bsd BEFORE DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_asi AFTER INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_asu AFTER UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_asd AFTER DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_bsi BEFORE INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_bsu BEFORE UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_bsd BEFORE DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_asi AFTER INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_asu AFTER UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_asd AFTER DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); CREATE TRIGGER merge_bri BEFORE INSERT ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); CREATE TRIGGER merge_bru BEFORE UPDATE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); CREATE TRIGGER merge_brd BEFORE DELETE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); @@ -897,10 +897,8 @@ CREATE TRIGGER merge_ard AFTER DELETE ON target FOR EACH ROW EXECUTE PROCEDURE m -- now the classic UPSERT, with a DELETE BEGIN; UPDATE target SET balance = 0 WHERE tid = 3; -NOTICE: BEFORE UPDATE STATEMENT trigger NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,0) NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,0) -NOTICE: AFTER UPDATE STATEMENT trigger --EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) MERGE INTO target t USING source AS s @@ -911,18 +909,12 @@ WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT VALUES (s.sid, s.delta); -NOTICE: BEFORE INSERT STATEMENT trigger -NOTICE: BEFORE UPDATE STATEMENT trigger -NOTICE: BEFORE DELETE STATEMENT trigger NOTICE: BEFORE DELETE ROW trigger row: (3,0) NOTICE: BEFORE UPDATE ROW trigger row: (2,20) -> (2,15) NOTICE: BEFORE INSERT ROW trigger row: (4,40) NOTICE: AFTER DELETE ROW trigger row: (3,0) NOTICE: AFTER UPDATE ROW trigger row: (2,20) -> (2,15) NOTICE: AFTER INSERT ROW trigger row: (4,40) -NOTICE: AFTER DELETE STATEMENT trigger -NOTICE: AFTER UPDATE STATEMENT trigger -NOTICE: AFTER INSERT STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -970,15 +962,9 @@ GET DIAGNOSTICS result := ROW_COUNT; RAISE NOTICE 'ROW_COUNT = %', result; END; $$; -NOTICE: BEFORE INSERT STATEMENT trigger -NOTICE: BEFORE UPDATE STATEMENT trigger -NOTICE: BEFORE DELETE STATEMENT trigger NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,50) NOTICE: BEFORE DELETE ROW trigger row: (2,20) NOTICE: BEFORE INSERT ROW trigger row: (4,40) -NOTICE: AFTER DELETE STATEMENT trigger -NOTICE: AFTER UPDATE STATEMENT trigger -NOTICE: AFTER INSERT STATEMENT trigger NOTICE: Not found NOTICE: ROW_COUNT = 0 SELECT * FROM target FULL OUTER JOIN source ON (sid = tid); @@ -1004,12 +990,10 @@ WHEN MATCHED AND t.balance > s.delta THEN UPDATE SET balance = t.balance - s.delta; END; $$; -NOTICE: BEFORE UPDATE STATEMENT trigger NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,10) NOTICE: BEFORE UPDATE ROW trigger row: (2,20) -> (2,15) NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,10) NOTICE: AFTER UPDATE ROW trigger row: (2,20) -> (2,15) -NOTICE: AFTER UPDATE STATEMENT trigger ROLLBACK; --source constants BEGIN; @@ -1018,10 +1002,8 @@ USING (SELECT 9 AS sid, 57 AS delta) AS s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT (tid, balance) VALUES (s.sid, s.delta); -NOTICE: BEFORE INSERT STATEMENT trigger NOTICE: BEFORE INSERT ROW trigger row: (9,57) NOTICE: AFTER INSERT ROW trigger row: (9,57) -NOTICE: AFTER INSERT STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1039,10 +1021,8 @@ USING (SELECT sid, delta FROM source WHERE delta > 0) AS s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT (tid, balance) VALUES (s.sid, s.delta); -NOTICE: BEFORE INSERT STATEMENT trigger NOTICE: BEFORE INSERT ROW trigger row: (4,40) NOTICE: AFTER INSERT ROW trigger row: (4,40) -NOTICE: AFTER INSERT STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1059,10 +1039,8 @@ USING (SELECT sid, delta as newname FROM source WHERE delta > 0) AS s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT (tid, balance) VALUES (s.sid, s.newname); -NOTICE: BEFORE INSERT STATEMENT trigger NOTICE: BEFORE INSERT ROW trigger row: (4,40) NOTICE: AFTER INSERT ROW trigger row: (4,40) -NOTICE: AFTER INSERT STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1082,16 +1060,12 @@ WHEN MATCHED THEN UPDATE SET balance = t1.balance + t2.balance WHEN NOT MATCHED THEN INSERT VALUES (t2.tid, t2.balance); -NOTICE: BEFORE INSERT STATEMENT trigger -NOTICE: BEFORE UPDATE STATEMENT trigger NOTICE: BEFORE UPDATE ROW trigger row: (1,10) -> (1,20) NOTICE: BEFORE UPDATE ROW trigger row: (2,20) -> (2,40) NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,60) NOTICE: AFTER UPDATE ROW trigger row: (1,10) -> (1,20) NOTICE: AFTER UPDATE ROW trigger row: (2,20) -> (2,40) NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,60) -NOTICE: AFTER UPDATE STATEMENT trigger -NOTICE: AFTER INSERT STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1107,8 +1081,6 @@ USING (SELECT tid as sid, balance as delta FROM target WHERE balance > 0) AS s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT (tid, balance) VALUES (s.sid, s.delta); -NOTICE: BEFORE INSERT STATEMENT trigger -NOTICE: AFTER INSERT STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1129,10 +1101,8 @@ USING ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT (tid, balance) VALUES (s.sid, s.delta); -NOTICE: BEFORE INSERT STATEMENT trigger NOTICE: BEFORE INSERT ROW trigger row: (4,40) NOTICE: AFTER INSERT ROW trigger row: (4,40) -NOTICE: AFTER INSERT STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1164,10 +1134,8 @@ RETURN result; END; $$; SELECT merge_func(3, 4); -NOTICE: BEFORE UPDATE STATEMENT trigger NOTICE: BEFORE UPDATE ROW trigger row: (3,30) -> (3,26) NOTICE: AFTER UPDATE ROW trigger row: (3,30) -> (3,26) -NOTICE: AFTER UPDATE STATEMENT trigger merge_func ------------ 1 @@ -1186,10 +1154,8 @@ ROLLBACK; BEGIN; prepare foom as merge into target t using (select 1 as sid) s on (t.tid = s.sid) when matched then update set balance = 1; execute foom; -NOTICE: BEFORE UPDATE STATEMENT trigger NOTICE: BEFORE UPDATE ROW trigger row: (1,10) -> (1,1) NOTICE: AFTER UPDATE ROW trigger row: (1,10) -> (1,1) -NOTICE: AFTER UPDATE STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1208,10 +1174,8 @@ WHEN MATCHED THEN UPDATE SET balance = $2; --EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) execute foom2 (1, 1); -NOTICE: BEFORE UPDATE STATEMENT trigger NOTICE: BEFORE UPDATE ROW trigger row: (1,10) -> (1,1) NOTICE: AFTER UPDATE ROW trigger row: (1,10) -> (1,1) -NOTICE: AFTER UPDATE STATEMENT trigger SELECT * FROM target ORDER BY tid; tid | balance -----+--------- @@ -1354,42 +1318,36 @@ SELECT explain_merge(' MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a WHEN MATCHED THEN UPDATE SET b = t.b + 1'); - explain_merge ----------------------------------------------------------------------- + explain_merge +-------------------------------------------------------------------------------------------- Merge on ex_mtarget t (actual rows=0 loops=1) - Tuples: updated=50 - -> Merge Join (actual rows=50 loops=1) - Merge Cond: (t.a = s.a) - -> Sort (actual rows=50 loops=1) - Sort Key: t.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_mtarget t (actual rows=50 loops=1) - -> Sort (actual rows=100 loops=1) - Sort Key: s.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_msource s (actual rows=100 loops=1) -(12 rows) + Tuples: skipped=20 + -> Hash Join (actual rows=20 loops=1) + Hash Cond: (t.a = s.a) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 38 of 524288 buckets. + -> Seq Scan on ex_mtarget t (actual rows=20 loops=1) + -> Hash (actual rows=38 loops=1) + Buckets: xxx Batches: xxx Memory Usage: xxx + -> Seq Scan on ex_msource s (actual rows=38 loops=1) +(10 rows) -- only updates to selected tuples SELECT explain_merge(' MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a WHEN MATCHED AND t.a < 10 THEN UPDATE SET b = t.b + 1'); - explain_merge ----------------------------------------------------------------------- + explain_merge +-------------------------------------------------------------------------------------------- + -> Seq Scan on ex_msource s (actual rows=38 loops=1) + Buckets: xxx Batches: xxx Memory Usage: xxx + -> Hash (actual rows=38 loops=1) + -> Seq Scan on ex_mtarget t (actual rows=20 loops=1) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 38 of 524288 buckets. + Hash Cond: (t.a = s.a) + -> Hash Join (actual rows=20 loops=1) + Tuples: skipped=20 Merge on ex_mtarget t (actual rows=0 loops=1) - Tuples: updated=5 skipped=45 - -> Merge Join (actual rows=50 loops=1) - Merge Cond: (t.a = s.a) - -> Sort (actual rows=50 loops=1) - Sort Key: t.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_mtarget t (actual rows=50 loops=1) - -> Sort (actual rows=100 loops=1) - Sort Key: s.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_msource s (actual rows=100 loops=1) -(12 rows) +(10 rows) -- updates + deletes SELECT explain_merge(' @@ -1398,41 +1356,37 @@ WHEN MATCHED AND t.a < 10 THEN UPDATE SET b = t.b + 1 WHEN MATCHED AND t.a >= 10 AND t.a <= 20 THEN DELETE'); - explain_merge ----------------------------------------------------------------------- + explain_merge +-------------------------------------------------------------------------------------------- + -> Seq Scan on ex_msource s (actual rows=38 loops=1) + Buckets: xxx Batches: xxx Memory Usage: xxx + -> Hash (actual rows=38 loops=1) + -> Seq Scan on ex_mtarget t (actual rows=20 loops=1) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 38 of 524288 buckets. + Hash Cond: (t.a = s.a) + -> Hash Join (actual rows=20 loops=1) + Tuples: skipped=20 Merge on ex_mtarget t (actual rows=0 loops=1) - Tuples: updated=5 deleted=5 skipped=40 - -> Merge Join (actual rows=50 loops=1) - Merge Cond: (t.a = s.a) - -> Sort (actual rows=50 loops=1) - Sort Key: t.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_mtarget t (actual rows=50 loops=1) - -> Sort (actual rows=100 loops=1) - Sort Key: s.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_msource s (actual rows=100 loops=1) -(12 rows) +(10 rows) -- only inserts SELECT explain_merge(' MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a WHEN NOT MATCHED AND s.a < 10 THEN INSERT VALUES (a, b)'); - explain_merge ----------------------------------------------------------------------- + explain_merge +-------------------------------------------------------------------------------------------------------- + -> Seq Scan on ex_mtarget t (actual rows=19 loops=1) + Buckets: xxx Batches: xxx Memory Usage: xxx + -> Hash (actual rows=19 loops=1) + -> Seq Scan on ex_msource s (actual rows=38 loops=1) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 19 of 524288 buckets. + Hash Cond: (s.a = t.a) + -> Hash Left Join (actual rows=38 loops=1) + -> Split Merge (actual rows=38 loops=1) + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) (actual rows=73 loops=1) + Tuples: skipped=73 Merge on ex_mtarget t (actual rows=0 loops=1) - Tuples: inserted=4 skipped=96 - -> Merge Left Join (actual rows=100 loops=1) - Merge Cond: (s.a = t.a) - -> Sort (actual rows=100 loops=1) - Sort Key: s.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_msource s (actual rows=100 loops=1) - -> Sort (actual rows=45 loops=1) - Sort Key: t.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_mtarget t (actual rows=45 loops=1) (12 rows) -- all three @@ -1444,20 +1398,19 @@ WHEN MATCHED AND t.a >= 30 AND t.a <= 40 THEN DELETE WHEN NOT MATCHED AND s.a < 20 THEN INSERT VALUES (a, b)'); - explain_merge ----------------------------------------------------------------------- + explain_merge +-------------------------------------------------------------------------------------------------------- + -> Seq Scan on ex_mtarget t (actual rows=22 loops=1) + Buckets: xxx Batches: xxx Memory Usage: xxx + -> Hash (actual rows=22 loops=1) + -> Seq Scan on ex_msource s (actual rows=38 loops=1) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 22 of 524288 buckets. + Hash Cond: (s.a = t.a) + -> Hash Left Join (actual rows=38 loops=1) + -> Split Merge (actual rows=38 loops=1) + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) (actual rows=66 loops=1) + Tuples: skipped=66 Merge on ex_mtarget t (actual rows=0 loops=1) - Tuples: inserted=10 updated=9 deleted=5 skipped=76 - -> Merge Left Join (actual rows=100 loops=1) - Merge Cond: (s.a = t.a) - -> Sort (actual rows=100 loops=1) - Sort Key: s.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_msource s (actual rows=100 loops=1) - -> Sort (actual rows=49 loops=1) - Sort Key: t.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_mtarget t (actual rows=49 loops=1) (12 rows) -- nothing @@ -1465,74 +1418,35 @@ SELECT explain_merge(' MERGE INTO ex_mtarget t USING ex_msource s ON t.a = s.a AND t.a < -1000 WHEN MATCHED AND t.a < 10 THEN DO NOTHING'); - explain_merge --------------------------------------------------------------------- - Merge on ex_mtarget t (actual rows=0 loops=1) - -> Merge Join (actual rows=0 loops=1) - Merge Cond: (t.a = s.a) - -> Sort (actual rows=0 loops=1) - Sort Key: t.a - Sort Method: quicksort Memory: xxx - -> Seq Scan on ex_mtarget t (actual rows=0 loops=1) + explain_merge +-------------------------------------------------------------- Filter: (a < '-1000'::integer) - Rows Removed by Filter: 54 - -> Sort (never executed) - Sort Key: s.a -> Seq Scan on ex_msource s (never executed) -(12 rows) + Filter: (a < '-1000'::integer) + Rows Removed by Filter: 23 + -> Hash (never executed) + -> Seq Scan on ex_mtarget t (actual rows=0 loops=1) + Hash Cond: (t.a = s.a) + -> Hash Join (actual rows=0 loops=1) + Merge on ex_mtarget t (actual rows=0 loops=1) +(10 rows) DROP TABLE ex_msource, ex_mtarget; DROP FUNCTION explain_merge(text); --- EXPLAIN SubPlans and InitPlans -CREATE TABLE src (a int, b int, c int, d int); -CREATE TABLE tgt (a int, b int, c int, d int); -CREATE TABLE ref (ab int, cd int); -EXPLAIN (verbose, costs off) -MERGE INTO tgt t -USING (SELECT *, (SELECT count(*) FROM ref r - WHERE r.ab = s.a + s.b - AND r.cd = s.c - s.d) cnt - FROM src s) s -ON t.a = s.a AND t.b < s.cnt -WHEN MATCHED AND t.c > s.cnt THEN - UPDATE SET (b, c) = (SELECT s.b, s.cnt); - QUERY PLAN -------------------------------------------------------------------------------------- - Merge on public.tgt t - -> Hash Join - Output: t.ctid, s.a, s.b, s.c, s.d, s.ctid - Hash Cond: (t.a = s.a) - Join Filter: (t.b < (SubPlan 1)) - -> Seq Scan on public.tgt t - Output: t.ctid, t.a, t.b - -> Hash - Output: s.a, s.b, s.c, s.d, s.ctid - -> Seq Scan on public.src s - Output: s.a, s.b, s.c, s.d, s.ctid - SubPlan 1 - -> Aggregate - Output: count(*) - -> Seq Scan on public.ref r - Output: r.ab, r.cd - Filter: ((r.ab = (s.a + s.b)) AND (r.cd = (s.c - s.d))) - SubPlan 4 - -> Aggregate - Output: count(*) - -> Seq Scan on public.ref r_2 - Output: r_2.ab, r_2.cd - Filter: ((r_2.ab = (s.a + s.b)) AND (r_2.cd = (s.c - s.d))) - SubPlan 3 (returns $9,$10) - -> Result - Output: s.b, $8 - InitPlan 2 (returns $8) - -> Aggregate - Output: count(*) - -> Seq Scan on public.ref r_1 - Output: r_1.ab, r_1.cd - Filter: ((r_1.ab = (s.a + s.b)) AND (r_1.cd = (s.c - s.d))) -(32 rows) - -DROP TABLE src, tgt, ref; +-- EXPLAIN SubPlans and InitPlans (CBDB not supported) +-- CREATE TABLE src (a int, b int, c int, d int); +-- CREATE TABLE tgt (a int, b int, c int, d int); +-- CREATE TABLE ref (ab int, cd int); +-- EXPLAIN (verbose, costs off) +-- MERGE INTO tgt t +-- USING (SELECT *, (SELECT count(*) FROM ref r +-- WHERE r.ab = s.a + s.b +-- AND r.cd = s.c - s.d) cnt +-- FROM src s) s +-- ON t.a = s.a AND t.b < s.cnt +-- WHEN MATCHED AND t.c > s.cnt THEN +-- UPDATE SET (b, c) = (SELECT s.b, s.cnt); +-- DROP TABLE src, tgt, ref; -- Subqueries BEGIN; MERGE INTO sq_target t @@ -1599,7 +1513,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- 1 | 110 | initial updated by merge @@ -1628,7 +1542,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- 1 | 110 | initial updated by merge @@ -1636,20 +1550,20 @@ SELECT * FROM pa_target ORDER BY tid; 3 | 30 | inserted by merge 3 | 300 | initial 4 | 40 | inserted by merge - 5 | 500 | initial 5 | 50 | inserted by merge + 5 | 500 | initial 6 | 60 | inserted by merge - 7 | 700 | initial 7 | 70 | inserted by merge + 7 | 700 | initial 8 | 80 | inserted by merge 9 | 90 | inserted by merge 9 | 900 | initial 10 | 100 | inserted by merge - 11 | 1100 | initial 11 | 110 | inserted by merge + 11 | 1100 | initial 12 | 120 | inserted by merge - 13 | 1300 | initial 13 | 130 | inserted by merge + 13 | 1300 | initial 14 | 140 | inserted by merge (20 rows) @@ -1664,7 +1578,7 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); IF FOUND THEN @@ -1679,22 +1593,22 @@ SELECT merge_func(); 14 (1 row) -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- - 2 | 110 | initial updated by merge + 1 | 110 | initial updated by merge 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge 4 | 40 | inserted by merge - 4 | 330 | initial updated by merge - 6 | 550 | initial updated by merge + 5 | 550 | initial updated by merge 6 | 60 | inserted by merge + 7 | 770 | initial updated by merge 8 | 80 | inserted by merge - 8 | 770 | initial updated by merge - 10 | 990 | initial updated by merge + 9 | 990 | initial updated by merge 10 | 100 | inserted by merge - 12 | 1210 | initial updated by merge + 11 | 1210 | initial updated by merge 12 | 120 | inserted by merge - 14 | 1430 | initial updated by merge + 13 | 1430 | initial updated by merge 14 | 140 | inserted by merge (14 rows) @@ -1724,11 +1638,11 @@ CREATE TABLE pa_target (tid integer, balance float, val text) CREATE TABLE part1 (tid integer, balance float, val text) WITH (autovacuum_enabled=off); CREATE TABLE part2 (balance float, tid integer, val text) - WITH (autovacuum_enabled=off); + WITH (autovacuum_enabled=off) distributed by (tid); CREATE TABLE part3 (tid integer, balance float, val text) WITH (autovacuum_enabled=off); CREATE TABLE part4 (extraid text, tid integer, balance float, val text) - WITH (autovacuum_enabled=off); + WITH (autovacuum_enabled=off) distributed by (tid); ALTER TABLE part4 DROP COLUMN extraid; ALTER TABLE pa_target ATTACH PARTITION part1 FOR VALUES IN (1,4); ALTER TABLE pa_target ATTACH PARTITION part2 FOR VALUES IN (2,5,6); @@ -1754,7 +1668,7 @@ RAISE NOTICE 'ROW_COUNT = %', result; END; $$; NOTICE: ROW_COUNT = 14 -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- 1 | 110 | initial updated by merge @@ -1784,7 +1698,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- 1 | 110 | initial updated by merge @@ -1793,17 +1707,17 @@ SELECT * FROM pa_target ORDER BY tid; 3 | 300 | initial 4 | 40 | inserted by merge 6 | 60 | inserted by merge - 7 | 700 | initial 7 | 70 | inserted by merge + 7 | 700 | initial 8 | 80 | inserted by merge - 9 | 900 | initial 9 | 90 | inserted by merge + 9 | 900 | initial 10 | 100 | inserted by merge 11 | 110 | inserted by merge 11 | 1100 | initial 12 | 120 | inserted by merge - 13 | 1300 | initial 13 | 130 | inserted by merge + 13 | 1300 | initial 14 | 140 | inserted by merge (18 rows) @@ -1818,7 +1732,7 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); GET DIAGNOSTICS result := ROW_COUNT; @@ -1826,22 +1740,22 @@ RAISE NOTICE 'ROW_COUNT = %', result; END; $$; NOTICE: ROW_COUNT = 14 -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- - 2 | 110 | initial updated by merge + 1 | 110 | initial updated by merge 2 | 20 | inserted by merge + 3 | 330 | initial updated by merge 4 | 40 | inserted by merge - 4 | 330 | initial updated by merge - 6 | 550 | initial updated by merge + 5 | 550 | initial updated by merge 6 | 60 | inserted by merge + 7 | 770 | initial updated by merge 8 | 80 | inserted by merge - 8 | 770 | initial updated by merge - 10 | 990 | initial updated by merge + 9 | 990 | initial updated by merge 10 | 100 | inserted by merge - 12 | 1210 | initial updated by merge + 11 | 1210 | initial updated by merge 12 | 120 | inserted by merge - 14 | 1430 | initial updated by merge + 13 | 1430 | initial updated by merge 14 | 140 | inserted by merge (14 rows) @@ -1860,30 +1774,30 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); GET DIAGNOSTICS result := ROW_COUNT; RAISE NOTICE 'ROW_COUNT = %', result; END; $$; -NOTICE: ROW_COUNT = 10 -SELECT * FROM pa_target ORDER BY tid; +NOTICE: ROW_COUNT = 14 +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- - 1 | 100 | initial + 1 | 110 | initial updated by merge 2 | 20 | inserted by merge - 3 | 300 | initial + 3 | 330 | initial updated by merge 4 | 40 | inserted by merge - 6 | 550 | initial updated by merge + 5 | 550 | initial updated by merge 6 | 60 | inserted by merge - 7 | 700 | initial + 7 | 770 | initial updated by merge 8 | 80 | inserted by merge - 9 | 900 | initial + 9 | 990 | initial updated by merge 10 | 100 | inserted by merge - 12 | 1210 | initial updated by merge + 11 | 1210 | initial updated by merge 12 | 120 | inserted by merge - 14 | 1430 | initial updated by merge + 13 | 1430 | initial updated by merge 14 | 140 | inserted by merge (14 rows) @@ -1902,21 +1816,25 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); GET DIAGNOSTICS result := ROW_COUNT; RAISE NOTICE 'ROW_COUNT = %', result; END; $$; -NOTICE: ROW_COUNT = 3 -SELECT * FROM pa_target ORDER BY tid; +NOTICE: ROW_COUNT = 7 +SELECT * FROM pa_target ORDER BY tid, balance, val; tid | balance | val -----+---------+-------------------------- - 6 | 550 | initial updated by merge - 12 | 1210 | initial updated by merge - 14 | 1430 | initial updated by merge -(3 rows) + 1 | 110 | initial updated by merge + 3 | 330 | initial updated by merge + 5 | 550 | initial updated by merge + 7 | 770 | initial updated by merge + 9 | 990 | initial updated by merge + 11 | 1210 | initial updated by merge + 13 | 1430 | initial updated by merge +(7 rows) ROLLBACK; -- test RLS enforcement @@ -1929,7 +1847,7 @@ MERGE INTO pa_target t ON t.tid = s.sid AND t.tid IN (1,2,3,4) WHEN MATCHED THEN UPDATE SET tid = tid - 1; -ERROR: new row violates row-level security policy for table "pa_target" +ERROR: cannot update column in merge with distributed column ROLLBACK; DROP TABLE pa_source; DROP TABLE pa_target CASCADE; @@ -1966,7 +1884,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; logts | tid | balance | val --------------------------+-----+---------+-------------------------- Tue Jan 31 00:00:00 2017 | 1 | 110 | initial updated by merge @@ -1995,17 +1913,20 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid ------------------------------------------------------------- Merge on public.pa_target t Merge on public.pa_targetp t_1 - -> Hash Left Join - Output: s.sid, s.ctid, t_1.tableoid, t_1.ctid - Inner Unique: true - Hash Cond: (s.sid = t_1.tid) - -> Seq Scan on public.pa_source s - Output: s.sid, s.ctid - -> Hash - Output: t_1.tid, t_1.tableoid, t_1.ctid - -> Seq Scan on public.pa_targetp t_1 - Output: t_1.tid, t_1.tableoid, t_1.ctid -(12 rows) + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) + Output: s.sid, s.ctid, t_1.tableoid, t_1.ctid, t_1.gp_segment_id + -> Split Merge + Output: s.sid, s.ctid, t_1.tableoid, t_1.ctid, t_1.gp_segment_id + -> Hash Left Join + Output: s.sid, s.ctid, t_1.tableoid, t_1.ctid, t_1.gp_segment_id + Hash Cond: (s.sid = t_1.tid) + -> Seq Scan on public.pa_source s + Output: s.sid, s.ctid + -> Hash + Output: t_1.tid, t_1.tableoid, t_1.ctid, t_1.gp_segment_id + -> Seq Scan on public.pa_targetp t_1 + Output: t_1.tid, t_1.tableoid, t_1.ctid, t_1.gp_segment_id +(16 rows) MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT VALUES (s.sid); @@ -2025,19 +1946,23 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid QUERY PLAN -------------------------------------------- Merge on public.pa_target t - -> Hash Left Join - Output: s.sid, s.ctid, t.ctid - Inner Unique: true - Hash Cond: (s.sid = t.tid) - -> Seq Scan on public.pa_source s - Output: s.sid, s.ctid - -> Hash - Output: t.tid, t.ctid - -> Result - Output: t.tid, t.ctid - One-Time Filter: false -(12 rows) - + -> Explicit Redistribute Motion 3:3 (slice1; segments: 3) + Output: s.sid, s.ctid, t.ctid, t.gp_segment_id + -> Split Merge + Output: s.sid, s.ctid, t.ctid, t.gp_segment_id + -> Hash Left Join + Output: s.sid, s.ctid, t.ctid, t.gp_segment_id + Hash Cond: (s.sid = t.tid) + -> Seq Scan on public.pa_source s + Output: s.sid, s.ctid + -> Hash + Output: t.tid, t.ctid, t.gp_segment_id + -> Result + Output: t.tid, t.ctid, t.gp_segment_id + One-Time Filter: false +(16 rows) + +DELETE FROM pa_source WHERE sid = 2; MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT VALUES (s.sid); ERROR: no partition of relation "pa_target" found for row @@ -2155,184 +2080,160 @@ SELECT count(*) FROM fs_target; DROP TABLE fs_target; -- SERIALIZABLE test -- handled in isolation tests --- Inheritance-based partitioning -CREATE TABLE measurement ( - city_id int not null, - logdate date not null, - peaktemp int, - unitsales int -) WITH (autovacuum_enabled=off); -CREATE TABLE measurement_y2006m02 ( - CHECK ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' ) -) INHERITS (measurement) WITH (autovacuum_enabled=off); -CREATE TABLE measurement_y2006m03 ( - CHECK ( logdate >= DATE '2006-03-01' AND logdate < DATE '2006-04-01' ) -) INHERITS (measurement) WITH (autovacuum_enabled=off); -CREATE TABLE measurement_y2007m01 ( - filler text, - peaktemp int, - logdate date not null, - city_id int not null, - unitsales int - CHECK ( logdate >= DATE '2007-01-01' AND logdate < DATE '2007-02-01') -) WITH (autovacuum_enabled=off); -ALTER TABLE measurement_y2007m01 DROP COLUMN filler; -ALTER TABLE measurement_y2007m01 INHERIT measurement; -INSERT INTO measurement VALUES (0, '2005-07-21', 5, 15); -CREATE OR REPLACE FUNCTION measurement_insert_trigger() -RETURNS TRIGGER AS $$ -BEGIN - IF ( NEW.logdate >= DATE '2006-02-01' AND - NEW.logdate < DATE '2006-03-01' ) THEN - INSERT INTO measurement_y2006m02 VALUES (NEW.*); - ELSIF ( NEW.logdate >= DATE '2006-03-01' AND - NEW.logdate < DATE '2006-04-01' ) THEN - INSERT INTO measurement_y2006m03 VALUES (NEW.*); - ELSIF ( NEW.logdate >= DATE '2007-01-01' AND - NEW.logdate < DATE '2007-02-01' ) THEN - INSERT INTO measurement_y2007m01 (city_id, logdate, peaktemp, unitsales) - VALUES (NEW.*); - ELSE - RAISE EXCEPTION 'Date out of range. Fix the measurement_insert_trigger() function!'; - END IF; - RETURN NULL; -END; -$$ LANGUAGE plpgsql ; -CREATE TRIGGER insert_measurement_trigger - BEFORE INSERT ON measurement - FOR EACH ROW EXECUTE PROCEDURE measurement_insert_trigger(); -INSERT INTO measurement VALUES (1, '2006-02-10', 35, 10); -INSERT INTO measurement VALUES (1, '2006-02-16', 45, 20); -INSERT INTO measurement VALUES (1, '2006-03-17', 25, 10); -INSERT INTO measurement VALUES (1, '2006-03-27', 15, 40); -INSERT INTO measurement VALUES (1, '2007-01-15', 10, 10); -INSERT INTO measurement VALUES (1, '2007-01-17', 10, 10); -SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; - tableoid | city_id | logdate | peaktemp | unitsales -----------------------+---------+------------+----------+----------- - measurement | 0 | 07-21-2005 | 5 | 15 - measurement_y2006m02 | 1 | 02-10-2006 | 35 | 10 - measurement_y2006m02 | 1 | 02-16-2006 | 45 | 20 - measurement_y2006m03 | 1 | 03-17-2006 | 25 | 10 - measurement_y2006m03 | 1 | 03-27-2006 | 15 | 40 - measurement_y2007m01 | 1 | 01-15-2007 | 10 | 10 - measurement_y2007m01 | 1 | 01-17-2007 | 10 | 10 -(7 rows) - -CREATE TABLE new_measurement (LIKE measurement) WITH (autovacuum_enabled=off); -INSERT INTO new_measurement VALUES (0, '2005-07-21', 25, 20); -INSERT INTO new_measurement VALUES (1, '2006-03-01', 20, 10); -INSERT INTO new_measurement VALUES (1, '2006-02-16', 50, 10); -INSERT INTO new_measurement VALUES (2, '2006-02-10', 20, 20); -INSERT INTO new_measurement VALUES (1, '2006-03-27', NULL, NULL); -INSERT INTO new_measurement VALUES (1, '2007-01-17', NULL, NULL); -INSERT INTO new_measurement VALUES (1, '2007-01-15', 5, NULL); -INSERT INTO new_measurement VALUES (1, '2007-01-16', 10, 10); +-- Inheritance-based partitioning (CBDB not supported) +-- CREATE TABLE measurement ( +-- city_id int not null, +-- logdate date not null, +-- peaktemp int, +-- unitsales int +-- ) WITH (autovacuum_enabled=off); +-- CREATE TABLE measurement_y2006m02 ( +-- CHECK ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' ) +-- ) INHERITS (measurement) WITH (autovacuum_enabled=off); +-- CREATE TABLE measurement_y2006m03 ( +-- CHECK ( logdate >= DATE '2006-03-01' AND logdate < DATE '2006-04-01' ) +-- ) INHERITS (measurement) WITH (autovacuum_enabled=off); +-- CREATE TABLE measurement_y2007m01 ( +-- filler text, +-- peaktemp int, +-- logdate date not null, +-- city_id int not null, +-- unitsales int +-- CHECK ( logdate >= DATE '2007-01-01' AND logdate < DATE '2007-02-01') +-- ) WITH (autovacuum_enabled=off); +-- ALTER TABLE measurement_y2007m01 DROP COLUMN filler; +-- ALTER TABLE measurement_y2007m01 INHERIT measurement; +-- INSERT INTO measurement VALUES (0, '2005-07-21', 5, 15); +-- CREATE OR REPLACE FUNCTION measurement_insert_trigger() +-- RETURNS TRIGGER AS $$ +-- BEGIN +-- IF ( NEW.logdate >= DATE '2006-02-01' AND +-- NEW.logdate < DATE '2006-03-01' ) THEN +-- INSERT INTO measurement_y2006m02 VALUES (NEW.*); +-- ELSIF ( NEW.logdate >= DATE '2006-03-01' AND +-- NEW.logdate < DATE '2006-04-01' ) THEN +-- INSERT INTO measurement_y2006m03 VALUES (NEW.*); +-- ELSIF ( NEW.logdate >= DATE '2007-01-01' AND +-- NEW.logdate < DATE '2007-02-01' ) THEN +-- INSERT INTO measurement_y2007m01 (city_id, logdate, peaktemp, unitsales) +-- VALUES (NEW.*); +-- ELSE +-- RAISE EXCEPTION 'Date out of range. Fix the measurement_insert_trigger() function!'; +-- END IF; +-- RETURN NULL; +-- END; +-- $$ LANGUAGE plpgsql ; +-- CREATE TRIGGER insert_measurement_trigger +-- BEFORE INSERT ON measurement +-- FOR EACH ROW EXECUTE PROCEDURE measurement_insert_trigger(); +-- INSERT INTO measurement VALUES (1, '2006-02-10', 35, 10); +-- INSERT INTO measurement VALUES (1, '2006-02-16', 45, 20); +-- INSERT INTO measurement VALUES (1, '2006-03-17', 25, 10); +-- INSERT INTO measurement VALUES (1, '2006-03-27', 15, 40); +-- INSERT INTO measurement VALUES (1, '2007-01-15', 10, 10); +-- INSERT INTO measurement VALUES (1, '2007-01-17', 10, 10); +-- SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; +-- CREATE TABLE new_measurement (LIKE measurement) WITH (autovacuum_enabled=off); +-- INSERT INTO new_measurement VALUES (0, '2005-07-21', 25, 20); +-- INSERT INTO new_measurement VALUES (1, '2006-03-01', 20, 10); +-- INSERT INTO new_measurement VALUES (1, '2006-02-16', 50, 10); +-- INSERT INTO new_measurement VALUES (2, '2006-02-10', 20, 20); +-- INSERT INTO new_measurement VALUES (1, '2006-03-27', NULL, NULL); +-- INSERT INTO new_measurement VALUES (1, '2007-01-17', NULL, NULL); +-- INSERT INTO new_measurement VALUES (1, '2007-01-15', 5, NULL); +-- INSERT INTO new_measurement VALUES (1, '2007-01-16', 10, 10); +-- BEGIN; +-- MERGE INTO ONLY measurement m +-- USING new_measurement nm ON +-- (m.city_id = nm.city_id and m.logdate=nm.logdate) +-- WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE +-- WHEN MATCHED THEN UPDATE +-- SET peaktemp = greatest(m.peaktemp, nm.peaktemp), +-- unitsales = m.unitsales + coalesce(nm.unitsales, 0) +-- WHEN NOT MATCHED THEN INSERT +-- (city_id, logdate, peaktemp, unitsales) +-- VALUES (city_id, logdate, peaktemp, unitsales); +-- SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate, peaktemp; +-- ROLLBACK; +-- MERGE into measurement m +-- USING new_measurement nm ON +-- (m.city_id = nm.city_id and m.logdate=nm.logdate) +-- WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE +-- WHEN MATCHED THEN UPDATE +-- SET peaktemp = greatest(m.peaktemp, nm.peaktemp), +-- unitsales = m.unitsales + coalesce(nm.unitsales, 0) +-- WHEN NOT MATCHED THEN INSERT +-- (city_id, logdate, peaktemp, unitsales) +-- VALUES (city_id, logdate, peaktemp, unitsales); +-- SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; +-- BEGIN; +-- MERGE INTO new_measurement nm +-- USING ONLY measurement m ON +-- (nm.city_id = m.city_id and nm.logdate=m.logdate) +-- WHEN MATCHED THEN DELETE; +-- SELECT * FROM new_measurement ORDER BY city_id, logdate; +-- ROLLBACK; +-- MERGE INTO new_measurement nm +-- USING measurement m ON +-- (nm.city_id = m.city_id and nm.logdate=m.logdate) +-- WHEN MATCHED THEN DELETE; +-- SELECT * FROM new_measurement ORDER BY city_id, logdate; +-- DROP TABLE measurement, new_measurement CASCADE; +-- DROP FUNCTION measurement_insert_trigger(); +-- prepare +RESET SESSION AUTHORIZATION; +-- try a system catalog (CBDB not supported) +-- MERGE INTO pg_class c +-- USING (SELECT 'pg_depend'::regclass AS oid) AS j +-- ON j.oid = c.oid +-- WHEN MATCHED THEN +-- UPDATE SET reltuples = reltuples + 1; +-- MERGE INTO pg_class c +-- USING pg_namespace n +-- ON n.oid = c.relnamespace +-- WHEN MATCHED AND c.oid = 'pg_depend'::regclass THEN +-- UPDATE SET reltuples = reltuples - 1; +DROP TABLE IF EXISTS test; +NOTICE: table "test" does not exist, skipping +DROP TABLE IF EXISTS test1; +NOTICE: table "test1" does not exist, skipping +CREATE TABLE test(a int, b int)distributed by (a); +CREATE TABLE test1(a int, b int)distributed by (a); +INSERT INTO test1 values(1,1); BEGIN; -MERGE INTO ONLY measurement m - USING new_measurement nm ON - (m.city_id = nm.city_id and m.logdate=nm.logdate) -WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE -WHEN MATCHED THEN UPDATE - SET peaktemp = greatest(m.peaktemp, nm.peaktemp), - unitsales = m.unitsales + coalesce(nm.unitsales, 0) -WHEN NOT MATCHED THEN INSERT - (city_id, logdate, peaktemp, unitsales) - VALUES (city_id, logdate, peaktemp, unitsales); -SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate, peaktemp; - tableoid | city_id | logdate | peaktemp | unitsales -----------------------+---------+------------+----------+----------- - measurement | 0 | 07-21-2005 | 25 | 35 - measurement_y2006m02 | 1 | 02-10-2006 | 35 | 10 - measurement_y2006m02 | 1 | 02-16-2006 | 45 | 20 - measurement_y2006m02 | 1 | 02-16-2006 | 50 | 10 - measurement_y2006m03 | 1 | 03-01-2006 | 20 | 10 - measurement_y2006m03 | 1 | 03-17-2006 | 25 | 10 - measurement_y2006m03 | 1 | 03-27-2006 | 15 | 40 - measurement_y2006m03 | 1 | 03-27-2006 | | - measurement_y2007m01 | 1 | 01-15-2007 | 5 | - measurement_y2007m01 | 1 | 01-15-2007 | 10 | 10 - measurement_y2007m01 | 1 | 01-16-2007 | 10 | 10 - measurement_y2007m01 | 1 | 01-17-2007 | 10 | 10 - measurement_y2007m01 | 1 | 01-17-2007 | | - measurement_y2006m02 | 2 | 02-10-2006 | 20 | 20 -(14 rows) +MERGE INTO test +USING test1 on test1.b = test.b +WHEN NOT MATCHED THEN + INSERT VALUES (2, 2); +INSERT INTO test values(2,2); +SELECT * FROM test WHERE a = 2; + a | b +---+--- + 2 | 2 + 2 | 2 +(2 rows) ROLLBACK; -MERGE into measurement m - USING new_measurement nm ON - (m.city_id = nm.city_id and m.logdate=nm.logdate) -WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE -WHEN MATCHED THEN UPDATE - SET peaktemp = greatest(m.peaktemp, nm.peaktemp), - unitsales = m.unitsales + coalesce(nm.unitsales, 0) -WHEN NOT MATCHED THEN INSERT - (city_id, logdate, peaktemp, unitsales) - VALUES (city_id, logdate, peaktemp, unitsales); -SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; - tableoid | city_id | logdate | peaktemp | unitsales -----------------------+---------+------------+----------+----------- - measurement | 0 | 07-21-2005 | 25 | 35 - measurement_y2006m02 | 1 | 02-10-2006 | 35 | 10 - measurement_y2006m02 | 1 | 02-16-2006 | 50 | 30 - measurement_y2006m03 | 1 | 03-01-2006 | 20 | 10 - measurement_y2006m03 | 1 | 03-17-2006 | 25 | 10 - measurement_y2007m01 | 1 | 01-15-2007 | 10 | 10 - measurement_y2007m01 | 1 | 01-16-2007 | 10 | 10 - measurement_y2006m02 | 2 | 02-10-2006 | 20 | 20 -(8 rows) - BEGIN; -MERGE INTO new_measurement nm - USING ONLY measurement m ON - (nm.city_id = m.city_id and nm.logdate=m.logdate) -WHEN MATCHED THEN DELETE; -SELECT * FROM new_measurement ORDER BY city_id, logdate; - city_id | logdate | peaktemp | unitsales ----------+------------+----------+----------- - 1 | 02-16-2006 | 50 | 10 - 1 | 03-01-2006 | 20 | 10 - 1 | 03-27-2006 | | - 1 | 01-15-2007 | 5 | - 1 | 01-16-2007 | 10 | 10 - 1 | 01-17-2007 | | - 2 | 02-10-2006 | 20 | 20 -(7 rows) - -ROLLBACK; -MERGE INTO new_measurement nm - USING measurement m ON - (nm.city_id = m.city_id and nm.logdate=m.logdate) -WHEN MATCHED THEN DELETE; -SELECT * FROM new_measurement ORDER BY city_id, logdate; - city_id | logdate | peaktemp | unitsales ----------+------------+----------+----------- - 1 | 03-27-2006 | | - 1 | 01-17-2007 | | +MERGE INTO test +USING (SELECT 2,2) as d(a, b) on d.b = test.b +WHEN NOT MATCHED THEN + INSERT VALUES (2, 2); +INSERT INTO test values(2,2); +SELECT * FROM test WHERE a = 2; + a | b +---+--- + 2 | 2 + 2 | 2 (2 rows) -DROP TABLE measurement, new_measurement CASCADE; -NOTICE: drop cascades to 3 other objects -DETAIL: drop cascades to table measurement_y2006m02 -drop cascades to table measurement_y2006m03 -drop cascades to table measurement_y2007m01 -DROP FUNCTION measurement_insert_trigger(); --- prepare -RESET SESSION AUTHORIZATION; --- try a system catalog -MERGE INTO pg_class c -USING (SELECT 'pg_depend'::regclass AS oid) AS j -ON j.oid = c.oid -WHEN MATCHED THEN - UPDATE SET reltuples = reltuples + 1; -MERGE INTO pg_class c -USING pg_namespace n -ON n.oid = c.relnamespace -WHEN MATCHED AND c.oid = 'pg_depend'::regclass THEN - UPDATE SET reltuples = reltuples - 1; +ROLLBACK; +DROP TABLE test; +DROP TABLE test1; DROP TABLE target, target2; DROP TABLE source, source2; DROP FUNCTION merge_trigfunc(); +REVOKE ALL ON SCHEMA public FROM regress_merge_privs; DROP USER regress_merge_privs; DROP USER regress_merge_no_privs; DROP USER regress_merge_none; diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index 4609241b2a5..4e99ef36bfd 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -19,6 +19,7 @@ SELECT t.ctid is not null as matched, t.*, s.* FROM source s FULL OUTER JOIN tar ALTER TABLE target OWNER TO regress_merge_privs; ALTER TABLE source OWNER TO regress_merge_privs; +GRANT ALL ON SCHEMA public to regress_merge_privs; CREATE TABLE target2 (tid integer, balance integer) WITH (autovacuum_enabled=off); @@ -605,12 +606,12 @@ BEGIN END IF; END; $$; -CREATE TRIGGER merge_bsi BEFORE INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_bsu BEFORE UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_bsd BEFORE DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_asi AFTER INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_asu AFTER UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); -CREATE TRIGGER merge_asd AFTER DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_bsi BEFORE INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_bsu BEFORE UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_bsd BEFORE DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_asi AFTER INSERT ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_asu AFTER UPDATE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); +-- CREATE TRIGGER merge_asd AFTER DELETE ON target FOR EACH STATEMENT EXECUTE PROCEDURE merge_trigfunc (); CREATE TRIGGER merge_bri BEFORE INSERT ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); CREATE TRIGGER merge_bru BEFORE UPDATE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); CREATE TRIGGER merge_brd BEFORE DELETE ON target FOR EACH ROW EXECUTE PROCEDURE merge_trigfunc (); @@ -948,22 +949,22 @@ WHEN MATCHED AND t.a < 10 THEN DROP TABLE ex_msource, ex_mtarget; DROP FUNCTION explain_merge(text); --- EXPLAIN SubPlans and InitPlans -CREATE TABLE src (a int, b int, c int, d int); -CREATE TABLE tgt (a int, b int, c int, d int); -CREATE TABLE ref (ab int, cd int); +-- EXPLAIN SubPlans and InitPlans (CBDB not supported) +-- CREATE TABLE src (a int, b int, c int, d int); +-- CREATE TABLE tgt (a int, b int, c int, d int); +-- CREATE TABLE ref (ab int, cd int); -EXPLAIN (verbose, costs off) -MERGE INTO tgt t -USING (SELECT *, (SELECT count(*) FROM ref r - WHERE r.ab = s.a + s.b - AND r.cd = s.c - s.d) cnt - FROM src s) s -ON t.a = s.a AND t.b < s.cnt -WHEN MATCHED AND t.c > s.cnt THEN - UPDATE SET (b, c) = (SELECT s.b, s.cnt); +-- EXPLAIN (verbose, costs off) +-- MERGE INTO tgt t +-- USING (SELECT *, (SELECT count(*) FROM ref r +-- WHERE r.ab = s.a + s.b +-- AND r.cd = s.c - s.d) cnt +-- FROM src s) s +-- ON t.a = s.a AND t.b < s.cnt +-- WHEN MATCHED AND t.c > s.cnt THEN +-- UPDATE SET (b, c) = (SELECT s.b, s.cnt); -DROP TABLE src, tgt, ref; +-- DROP TABLE src, tgt, ref; -- Subqueries BEGIN; @@ -1022,7 +1023,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- same with a constant qual @@ -1034,7 +1035,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- try updating the partition key column @@ -1047,7 +1048,7 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); IF FOUND THEN @@ -1057,7 +1058,7 @@ RETURN result; END; $$; SELECT merge_func(); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- bug #18871: ExecInitPartitionInfo()'s handling of DO NOTHING actions @@ -1083,11 +1084,11 @@ CREATE TABLE pa_target (tid integer, balance float, val text) CREATE TABLE part1 (tid integer, balance float, val text) WITH (autovacuum_enabled=off); CREATE TABLE part2 (balance float, tid integer, val text) - WITH (autovacuum_enabled=off); + WITH (autovacuum_enabled=off) distributed by (tid); CREATE TABLE part3 (tid integer, balance float, val text) WITH (autovacuum_enabled=off); CREATE TABLE part4 (extraid text, tid integer, balance float, val text) - WITH (autovacuum_enabled=off); + WITH (autovacuum_enabled=off) distributed by (tid); ALTER TABLE part4 DROP COLUMN extraid; ALTER TABLE pa_target ATTACH PARTITION part1 FOR VALUES IN (1,4); @@ -1115,7 +1116,7 @@ GET DIAGNOSTICS result := ROW_COUNT; RAISE NOTICE 'ROW_COUNT = %', result; END; $$; -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- same with a constant qual @@ -1128,7 +1129,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- try updating the partition key column @@ -1141,14 +1142,14 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); GET DIAGNOSTICS result := ROW_COUNT; RAISE NOTICE 'ROW_COUNT = %', result; END; $$; -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- as above, but blocked by BEFORE DELETE ROW trigger @@ -1165,14 +1166,14 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); GET DIAGNOSTICS result := ROW_COUNT; RAISE NOTICE 'ROW_COUNT = %', result; END; $$; -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- as above, but blocked by BEFORE INSERT ROW trigger @@ -1189,14 +1190,14 @@ MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN MATCHED THEN - UPDATE SET tid = tid + 1, balance = balance + delta, val = val || ' updated by merge' + UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (sid, delta, 'inserted by merge'); GET DIAGNOSTICS result := ROW_COUNT; RAISE NOTICE 'ROW_COUNT = %', result; END; $$; -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; -- test RLS enforcement @@ -1250,7 +1251,7 @@ MERGE INTO pa_target t UPDATE SET balance = balance + delta, val = val || ' updated by merge' WHEN NOT MATCHED THEN INSERT VALUES (slogts::timestamp, sid, delta, 'inserted by merge'); -SELECT * FROM pa_target ORDER BY tid; +SELECT * FROM pa_target ORDER BY tid, balance, val; ROLLBACK; DROP TABLE pa_source; @@ -1282,6 +1283,8 @@ EXPLAIN (VERBOSE, COSTS OFF) MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT VALUES (s.sid); +DELETE FROM pa_source WHERE sid = 2; + MERGE INTO pa_target t USING pa_source s ON t.tid = s.sid WHEN NOT MATCHED THEN INSERT VALUES (s.sid); @@ -1390,139 +1393,169 @@ DROP TABLE fs_target; -- SERIALIZABLE test -- handled in isolation tests --- Inheritance-based partitioning -CREATE TABLE measurement ( - city_id int not null, - logdate date not null, - peaktemp int, - unitsales int -) WITH (autovacuum_enabled=off); -CREATE TABLE measurement_y2006m02 ( - CHECK ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' ) -) INHERITS (measurement) WITH (autovacuum_enabled=off); -CREATE TABLE measurement_y2006m03 ( - CHECK ( logdate >= DATE '2006-03-01' AND logdate < DATE '2006-04-01' ) -) INHERITS (measurement) WITH (autovacuum_enabled=off); -CREATE TABLE measurement_y2007m01 ( - filler text, - peaktemp int, - logdate date not null, - city_id int not null, - unitsales int - CHECK ( logdate >= DATE '2007-01-01' AND logdate < DATE '2007-02-01') -) WITH (autovacuum_enabled=off); -ALTER TABLE measurement_y2007m01 DROP COLUMN filler; -ALTER TABLE measurement_y2007m01 INHERIT measurement; -INSERT INTO measurement VALUES (0, '2005-07-21', 5, 15); - -CREATE OR REPLACE FUNCTION measurement_insert_trigger() -RETURNS TRIGGER AS $$ -BEGIN - IF ( NEW.logdate >= DATE '2006-02-01' AND - NEW.logdate < DATE '2006-03-01' ) THEN - INSERT INTO measurement_y2006m02 VALUES (NEW.*); - ELSIF ( NEW.logdate >= DATE '2006-03-01' AND - NEW.logdate < DATE '2006-04-01' ) THEN - INSERT INTO measurement_y2006m03 VALUES (NEW.*); - ELSIF ( NEW.logdate >= DATE '2007-01-01' AND - NEW.logdate < DATE '2007-02-01' ) THEN - INSERT INTO measurement_y2007m01 (city_id, logdate, peaktemp, unitsales) - VALUES (NEW.*); - ELSE - RAISE EXCEPTION 'Date out of range. Fix the measurement_insert_trigger() function!'; - END IF; - RETURN NULL; -END; -$$ LANGUAGE plpgsql ; -CREATE TRIGGER insert_measurement_trigger - BEFORE INSERT ON measurement - FOR EACH ROW EXECUTE PROCEDURE measurement_insert_trigger(); -INSERT INTO measurement VALUES (1, '2006-02-10', 35, 10); -INSERT INTO measurement VALUES (1, '2006-02-16', 45, 20); -INSERT INTO measurement VALUES (1, '2006-03-17', 25, 10); -INSERT INTO measurement VALUES (1, '2006-03-27', 15, 40); -INSERT INTO measurement VALUES (1, '2007-01-15', 10, 10); -INSERT INTO measurement VALUES (1, '2007-01-17', 10, 10); - -SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; - -CREATE TABLE new_measurement (LIKE measurement) WITH (autovacuum_enabled=off); -INSERT INTO new_measurement VALUES (0, '2005-07-21', 25, 20); -INSERT INTO new_measurement VALUES (1, '2006-03-01', 20, 10); -INSERT INTO new_measurement VALUES (1, '2006-02-16', 50, 10); -INSERT INTO new_measurement VALUES (2, '2006-02-10', 20, 20); -INSERT INTO new_measurement VALUES (1, '2006-03-27', NULL, NULL); -INSERT INTO new_measurement VALUES (1, '2007-01-17', NULL, NULL); -INSERT INTO new_measurement VALUES (1, '2007-01-15', 5, NULL); -INSERT INTO new_measurement VALUES (1, '2007-01-16', 10, 10); - -BEGIN; -MERGE INTO ONLY measurement m - USING new_measurement nm ON - (m.city_id = nm.city_id and m.logdate=nm.logdate) -WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE -WHEN MATCHED THEN UPDATE - SET peaktemp = greatest(m.peaktemp, nm.peaktemp), - unitsales = m.unitsales + coalesce(nm.unitsales, 0) -WHEN NOT MATCHED THEN INSERT - (city_id, logdate, peaktemp, unitsales) - VALUES (city_id, logdate, peaktemp, unitsales); - -SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate, peaktemp; -ROLLBACK; - -MERGE into measurement m - USING new_measurement nm ON - (m.city_id = nm.city_id and m.logdate=nm.logdate) -WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE -WHEN MATCHED THEN UPDATE - SET peaktemp = greatest(m.peaktemp, nm.peaktemp), - unitsales = m.unitsales + coalesce(nm.unitsales, 0) -WHEN NOT MATCHED THEN INSERT - (city_id, logdate, peaktemp, unitsales) - VALUES (city_id, logdate, peaktemp, unitsales); +-- Inheritance-based partitioning (CBDB not supported) +-- CREATE TABLE measurement ( +-- city_id int not null, +-- logdate date not null, +-- peaktemp int, +-- unitsales int +-- ) WITH (autovacuum_enabled=off); +-- CREATE TABLE measurement_y2006m02 ( +-- CHECK ( logdate >= DATE '2006-02-01' AND logdate < DATE '2006-03-01' ) +-- ) INHERITS (measurement) WITH (autovacuum_enabled=off); +-- CREATE TABLE measurement_y2006m03 ( +-- CHECK ( logdate >= DATE '2006-03-01' AND logdate < DATE '2006-04-01' ) +-- ) INHERITS (measurement) WITH (autovacuum_enabled=off); +-- CREATE TABLE measurement_y2007m01 ( +-- filler text, +-- peaktemp int, +-- logdate date not null, +-- city_id int not null, +-- unitsales int +-- CHECK ( logdate >= DATE '2007-01-01' AND logdate < DATE '2007-02-01') +-- ) WITH (autovacuum_enabled=off); +-- ALTER TABLE measurement_y2007m01 DROP COLUMN filler; +-- ALTER TABLE measurement_y2007m01 INHERIT measurement; +-- INSERT INTO measurement VALUES (0, '2005-07-21', 5, 15); + +-- CREATE OR REPLACE FUNCTION measurement_insert_trigger() +-- RETURNS TRIGGER AS $$ +-- BEGIN +-- IF ( NEW.logdate >= DATE '2006-02-01' AND +-- NEW.logdate < DATE '2006-03-01' ) THEN +-- INSERT INTO measurement_y2006m02 VALUES (NEW.*); +-- ELSIF ( NEW.logdate >= DATE '2006-03-01' AND +-- NEW.logdate < DATE '2006-04-01' ) THEN +-- INSERT INTO measurement_y2006m03 VALUES (NEW.*); +-- ELSIF ( NEW.logdate >= DATE '2007-01-01' AND +-- NEW.logdate < DATE '2007-02-01' ) THEN +-- INSERT INTO measurement_y2007m01 (city_id, logdate, peaktemp, unitsales) +-- VALUES (NEW.*); +-- ELSE +-- RAISE EXCEPTION 'Date out of range. Fix the measurement_insert_trigger() function!'; +-- END IF; +-- RETURN NULL; +-- END; +-- $$ LANGUAGE plpgsql ; +-- CREATE TRIGGER insert_measurement_trigger +-- BEFORE INSERT ON measurement +-- FOR EACH ROW EXECUTE PROCEDURE measurement_insert_trigger(); +-- INSERT INTO measurement VALUES (1, '2006-02-10', 35, 10); +-- INSERT INTO measurement VALUES (1, '2006-02-16', 45, 20); +-- INSERT INTO measurement VALUES (1, '2006-03-17', 25, 10); +-- INSERT INTO measurement VALUES (1, '2006-03-27', 15, 40); +-- INSERT INTO measurement VALUES (1, '2007-01-15', 10, 10); +-- INSERT INTO measurement VALUES (1, '2007-01-17', 10, 10); + +-- SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; + +-- CREATE TABLE new_measurement (LIKE measurement) WITH (autovacuum_enabled=off); +-- INSERT INTO new_measurement VALUES (0, '2005-07-21', 25, 20); +-- INSERT INTO new_measurement VALUES (1, '2006-03-01', 20, 10); +-- INSERT INTO new_measurement VALUES (1, '2006-02-16', 50, 10); +-- INSERT INTO new_measurement VALUES (2, '2006-02-10', 20, 20); +-- INSERT INTO new_measurement VALUES (1, '2006-03-27', NULL, NULL); +-- INSERT INTO new_measurement VALUES (1, '2007-01-17', NULL, NULL); +-- INSERT INTO new_measurement VALUES (1, '2007-01-15', 5, NULL); +-- INSERT INTO new_measurement VALUES (1, '2007-01-16', 10, 10); + +-- BEGIN; +-- MERGE INTO ONLY measurement m +-- USING new_measurement nm ON +-- (m.city_id = nm.city_id and m.logdate=nm.logdate) +-- WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE +-- WHEN MATCHED THEN UPDATE +-- SET peaktemp = greatest(m.peaktemp, nm.peaktemp), +-- unitsales = m.unitsales + coalesce(nm.unitsales, 0) +-- WHEN NOT MATCHED THEN INSERT +-- (city_id, logdate, peaktemp, unitsales) +-- VALUES (city_id, logdate, peaktemp, unitsales); + +-- SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate, peaktemp; +-- ROLLBACK; + +-- MERGE into measurement m +-- USING new_measurement nm ON +-- (m.city_id = nm.city_id and m.logdate=nm.logdate) +-- WHEN MATCHED AND nm.peaktemp IS NULL THEN DELETE +-- WHEN MATCHED THEN UPDATE +-- SET peaktemp = greatest(m.peaktemp, nm.peaktemp), +-- unitsales = m.unitsales + coalesce(nm.unitsales, 0) +-- WHEN NOT MATCHED THEN INSERT +-- (city_id, logdate, peaktemp, unitsales) +-- VALUES (city_id, logdate, peaktemp, unitsales); + +-- SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; + +-- BEGIN; +-- MERGE INTO new_measurement nm +-- USING ONLY measurement m ON +-- (nm.city_id = m.city_id and nm.logdate=m.logdate) +-- WHEN MATCHED THEN DELETE; + +-- SELECT * FROM new_measurement ORDER BY city_id, logdate; +-- ROLLBACK; + +-- MERGE INTO new_measurement nm +-- USING measurement m ON +-- (nm.city_id = m.city_id and nm.logdate=m.logdate) +-- WHEN MATCHED THEN DELETE; + +-- SELECT * FROM new_measurement ORDER BY city_id, logdate; + +-- DROP TABLE measurement, new_measurement CASCADE; +-- DROP FUNCTION measurement_insert_trigger(); -SELECT tableoid::regclass, * FROM measurement ORDER BY city_id, logdate; +-- prepare -BEGIN; -MERGE INTO new_measurement nm - USING ONLY measurement m ON - (nm.city_id = m.city_id and nm.logdate=m.logdate) -WHEN MATCHED THEN DELETE; +RESET SESSION AUTHORIZATION; -SELECT * FROM new_measurement ORDER BY city_id, logdate; -ROLLBACK; +-- try a system catalog (CBDB not supported) +-- MERGE INTO pg_class c +-- USING (SELECT 'pg_depend'::regclass AS oid) AS j +-- ON j.oid = c.oid +-- WHEN MATCHED THEN +-- UPDATE SET reltuples = reltuples + 1; -MERGE INTO new_measurement nm - USING measurement m ON - (nm.city_id = m.city_id and nm.logdate=m.logdate) -WHEN MATCHED THEN DELETE; +-- MERGE INTO pg_class c +-- USING pg_namespace n +-- ON n.oid = c.relnamespace +-- WHEN MATCHED AND c.oid = 'pg_depend'::regclass THEN +-- UPDATE SET reltuples = reltuples - 1; -SELECT * FROM new_measurement ORDER BY city_id, logdate; +DROP TABLE IF EXISTS test; +DROP TABLE IF EXISTS test1; -DROP TABLE measurement, new_measurement CASCADE; -DROP FUNCTION measurement_insert_trigger(); +CREATE TABLE test(a int, b int)distributed by (a); +CREATE TABLE test1(a int, b int)distributed by (a); --- prepare +INSERT INTO test1 values(1,1); -RESET SESSION AUTHORIZATION; +BEGIN; +MERGE INTO test +USING test1 on test1.b = test.b +WHEN NOT MATCHED THEN + INSERT VALUES (2, 2); +INSERT INTO test values(2,2); +SELECT * FROM test WHERE a = 2; +ROLLBACK; --- try a system catalog -MERGE INTO pg_class c -USING (SELECT 'pg_depend'::regclass AS oid) AS j -ON j.oid = c.oid -WHEN MATCHED THEN - UPDATE SET reltuples = reltuples + 1; +BEGIN; +MERGE INTO test +USING (SELECT 2,2) as d(a, b) on d.b = test.b +WHEN NOT MATCHED THEN + INSERT VALUES (2, 2); +INSERT INTO test values(2,2); +SELECT * FROM test WHERE a = 2; +ROLLBACK; -MERGE INTO pg_class c -USING pg_namespace n -ON n.oid = c.relnamespace -WHEN MATCHED AND c.oid = 'pg_depend'::regclass THEN - UPDATE SET reltuples = reltuples - 1; +DROP TABLE test; +DROP TABLE test1; DROP TABLE target, target2; DROP TABLE source, source2; DROP FUNCTION merge_trigfunc(); +REVOKE ALL ON SCHEMA public FROM regress_merge_privs; DROP USER regress_merge_privs; DROP USER regress_merge_no_privs; DROP USER regress_merge_none;