diff options
Diffstat (limited to 'src/backend')
37 files changed, 2532 insertions, 128 deletions
diff --git a/src/backend/catalog/sql_features.txt b/src/backend/catalog/sql_features.txt index 097d9c4784b..4c3e29111de 100644 --- a/src/backend/catalog/sql_features.txt +++ b/src/backend/catalog/sql_features.txt @@ -240,9 +240,9 @@ F311 Schema definition statement 02 CREATE TABLE for persistent base tables YES F311 Schema definition statement 03 CREATE VIEW YES F311 Schema definition statement 04 CREATE VIEW: WITH CHECK OPTION YES F311 Schema definition statement 05 GRANT statement YES -F312 MERGE statement NO consider INSERT ... ON CONFLICT DO UPDATE -F313 Enhanced MERGE statement NO -F314 MERGE statement with DELETE branch NO +F312 MERGE statement YES +F313 Enhanced MERGE statement YES +F314 MERGE statement with DELETE branch YES F321 User authorization YES F341 Usage tables YES F361 Subprogram support YES diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 9f632285b62..cb13227db1f 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1188,6 +1188,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case CMD_DELETE: pname = operation = "Delete"; break; + case CMD_MERGE: + pname = operation = "Merge"; + break; default: pname = "???"; break; @@ -3877,6 +3880,11 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors, operation = "Delete"; foperation = "Foreign Delete"; break; + case CMD_MERGE: + operation = "Merge"; + /* XXX unsupported for now, but avoid compiler noise */ + foperation = "Foreign Merge"; + break; default: operation = "???"; foperation = "Foreign ???"; @@ -3999,6 +4007,33 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors, other_path, 0, es); } } + else if (node->operation == CMD_MERGE) + { + /* EXPLAIN ANALYZE display of tuples processed */ + if (es->analyze && mtstate->ps.instrument) + { + double total; + double insert_path; + double update_path; + double delete_path; + double skipped_path; + + InstrEndLoop(outerPlanState(mtstate)->instrument); + + /* count the number of source rows */ + total = outerPlanState(mtstate)->instrument->ntuples; + insert_path = mtstate->mt_merge_inserted; + update_path = mtstate->mt_merge_updated; + delete_path = mtstate->mt_merge_deleted; + skipped_path = total - insert_path - update_path - delete_path; + Assert(skipped_path >= 0); + + ExplainPropertyFloat("Tuples Inserted", NULL, insert_path, 0, es); + ExplainPropertyFloat("Tuples Updated", NULL, update_path, 0, es); + ExplainPropertyFloat("Tuples Deleted", NULL, delete_path, 0, es); + ExplainPropertyFloat("Tuples Skipped", NULL, skipped_path, 0, es); + } + } if (labeltargets) ExplainCloseGroup("Target Tables", "Target Tables", false, es); diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index fce79b02a57..13cb516752b 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -84,7 +84,8 @@ static bool GetTupleForTrigger(EState *estate, ItemPointer tid, LockTupleMode lockmode, TupleTableSlot *oldslot, - TupleTableSlot **newSlot); + TupleTableSlot **newSlot, + TM_FailureData *tmfpd); static bool TriggerEnabled(EState *estate, ResultRelInfo *relinfo, Trigger *trigger, TriggerEvent event, Bitmapset *modifiedCols, @@ -2713,7 +2714,8 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, TupleTableSlot *epqslot_candidate = NULL; if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid, - LockTupleExclusive, slot, &epqslot_candidate)) + LockTupleExclusive, slot, &epqslot_candidate, + NULL)) return false; /* @@ -2728,7 +2730,6 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, } trigtuple = ExecFetchSlotHeapTuple(slot, true, &should_free); - } else { @@ -2804,6 +2805,7 @@ ExecARDeleteTriggers(EState *estate, tupleid, LockTupleExclusive, slot, + NULL, NULL); else ExecForceStoreHeapTuple(fdw_trigtuple, slot, false); @@ -2944,7 +2946,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, - TupleTableSlot *newslot) + TupleTableSlot *newslot, + TM_FailureData *tmfd) { TriggerDesc *trigdesc = relinfo->ri_TrigDesc; TupleTableSlot *oldslot = ExecGetTriggerOldSlot(estate, relinfo); @@ -2967,7 +2970,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, /* get a copy of the on-disk tuple we are planning to update */ if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid, - lockmode, oldslot, &epqslot_candidate)) + lockmode, oldslot, &epqslot_candidate, + tmfd)) return false; /* cancel the update action */ /* @@ -3121,6 +3125,7 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, tupleid, LockTupleExclusive, oldslot, + NULL, NULL); else if (fdw_trigtuple != NULL) ExecForceStoreHeapTuple(fdw_trigtuple, oldslot, false); @@ -3275,7 +3280,8 @@ GetTupleForTrigger(EState *estate, ItemPointer tid, LockTupleMode lockmode, TupleTableSlot *oldslot, - TupleTableSlot **epqslot) + TupleTableSlot **epqslot, + TM_FailureData *tmfdp) { Relation relation = relinfo->ri_RelationDesc; @@ -3301,6 +3307,10 @@ GetTupleForTrigger(EState *estate, lockflags, &tmfd); + /* Let the caller know about the status of this operation */ + if (tmfdp) + *tmfdp = tmfd; + switch (test) { case TM_SelfModified: @@ -3821,8 +3831,23 @@ struct AfterTriggersTableData bool before_trig_done; /* did we already queue BS triggers? */ bool after_trig_done; /* did we already queue AS triggers? */ AfterTriggerEventList after_trig_events; /* if so, saved list pointer */ - Tuplestorestate *old_tuplestore; /* "old" transition table, if any */ - Tuplestorestate *new_tuplestore; /* "new" transition table, if any */ + + /* + * We maintain separate transition tables for UPDATE/INSERT/DELETE since + * MERGE can run all three actions in a single statement. Note that UPDATE + * needs both old and new transition tables whereas INSERT needs only new, + * and DELETE needs only old. + */ + + /* "old" transition table for UPDATE, if any */ + Tuplestorestate *old_upd_tuplestore; + /* "new" transition table for UPDATE, if any */ + Tuplestorestate *new_upd_tuplestore; + /* "old" transition table for DELETE, if any */ + Tuplestorestate *old_del_tuplestore; + /* "new" transition table for INSERT, if any */ + Tuplestorestate *new_ins_tuplestore; + TupleTableSlot *storeslot; /* for converting to tuplestore's format */ }; @@ -4374,13 +4399,19 @@ AfterTriggerExecute(EState *estate, { if (LocTriggerData.tg_trigger->tgoldtable) { - LocTriggerData.tg_oldtable = evtshared->ats_table->old_tuplestore; + if (TRIGGER_FIRED_BY_UPDATE(evtshared->ats_event)) + LocTriggerData.tg_oldtable = evtshared->ats_table->old_upd_tuplestore; + else + LocTriggerData.tg_oldtable = evtshared->ats_table->old_del_tuplestore; evtshared->ats_table->closed = true; } if (LocTriggerData.tg_trigger->tgnewtable) { - LocTriggerData.tg_newtable = evtshared->ats_table->new_tuplestore; + if (TRIGGER_FIRED_BY_INSERT(evtshared->ats_event)) + LocTriggerData.tg_newtable = evtshared->ats_table->new_ins_tuplestore; + else + LocTriggerData.tg_newtable = evtshared->ats_table->new_upd_tuplestore; evtshared->ats_table->closed = true; } } @@ -4794,8 +4825,10 @@ TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType) { TransitionCaptureState *state; - bool need_old, - need_new; + bool need_old_upd, + need_new_upd, + need_old_del, + need_new_ins; AfterTriggersTableData *table; MemoryContext oldcxt; ResourceOwner saveResourceOwner; @@ -4807,23 +4840,31 @@ MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType) switch (cmdType) { case CMD_INSERT: - need_old = false; - need_new = trigdesc->trig_insert_new_table; + need_old_upd = need_old_del = need_new_upd = false; + need_new_ins = trigdesc->trig_insert_new_table; break; case CMD_UPDATE: - need_old = trigdesc->trig_update_old_table; - need_new = trigdesc->trig_update_new_table; + need_old_upd = trigdesc->trig_update_old_table; + need_new_upd = trigdesc->trig_update_new_table; + need_old_del = need_new_ins = false; break; case CMD_DELETE: - need_old = trigdesc->trig_delete_old_table; - need_new = false; + need_old_del = trigdesc->trig_delete_old_table; + need_old_upd = need_new_upd = need_new_ins = false; + break; + case CMD_MERGE: + need_old_upd = trigdesc->trig_update_old_table; + need_new_upd = trigdesc->trig_update_new_table; + need_old_del = trigdesc->trig_delete_old_table; + need_new_ins = trigdesc->trig_insert_new_table; break; default: elog(ERROR, "unexpected CmdType: %d", (int) cmdType); - need_old = need_new = false; /* keep compiler quiet */ + /* keep compiler quiet */ + need_old_upd = need_new_upd = need_old_del = need_new_ins = false; break; } - if (!need_old && !need_new) + if (!need_old_upd && !need_new_upd && !need_new_ins && !need_old_del) return NULL; /* Check state, like AfterTriggerSaveEvent. */ @@ -4853,10 +4894,14 @@ MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType) saveResourceOwner = CurrentResourceOwner; CurrentResourceOwner = CurTransactionResourceOwner; - if (need_old && table->old_tuplestore == NULL) - table->old_tuplestore = tuplestore_begin_heap(false, false, work_mem); - if (need_new && table->new_tuplestore == NULL) - table->new_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_old_upd && table->old_upd_tuplestore == NULL) + table->old_upd_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_new_upd && table->new_upd_tuplestore == NULL) + table->new_upd_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_old_del && table->old_del_tuplestore == NULL) + table->old_del_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_new_ins && table->new_ins_tuplestore == NULL) + table->new_ins_tuplestore = tuplestore_begin_heap(false, false, work_mem); CurrentResourceOwner = saveResourceOwner; MemoryContextSwitchTo(oldcxt); @@ -5045,12 +5090,20 @@ AfterTriggerFreeQuery(AfterTriggersQueryData *qs) { AfterTriggersTableData *table = (AfterTriggersTableData *) lfirst(lc); - ts = table->old_tuplestore; - table->old_tuplestore = NULL; + ts = table->old_upd_tuplestore; + table->old_upd_tuplestore = NULL; + if (ts) + tuplestore_end(ts); + ts = table->new_upd_tuplestore; + table->new_upd_tuplestore = NULL; + if (ts) + tuplestore_end(ts); + ts = table->old_del_tuplestore; + table->old_del_tuplestore = NULL; if (ts) tuplestore_end(ts); - ts = table->new_tuplestore; - table->new_tuplestore = NULL; + ts = table->new_ins_tuplestore; + table->new_ins_tuplestore = NULL; if (ts) tuplestore_end(ts); if (table->storeslot) @@ -5356,17 +5409,17 @@ GetAfterTriggersTransitionTable(int event, { Assert(TupIsNull(newslot)); if (event == TRIGGER_EVENT_DELETE && delete_old_table) - tuplestore = transition_capture->tcs_private->old_tuplestore; + tuplestore = transition_capture->tcs_private->old_del_tuplestore; else if (event == TRIGGER_EVENT_UPDATE && update_old_table) - tuplestore = transition_capture->tcs_private->old_tuplestore; + tuplestore = transition_capture->tcs_private->old_upd_tuplestore; } else if (!TupIsNull(newslot)) { Assert(TupIsNull(oldslot)); if (event == TRIGGER_EVENT_INSERT && insert_new_table) - tuplestore = transition_capture->tcs_private->new_tuplestore; + tuplestore = transition_capture->tcs_private->new_ins_tuplestore; else if (event == TRIGGER_EVENT_UPDATE && update_new_table) - tuplestore = transition_capture->tcs_private->new_tuplestore; + tuplestore = transition_capture->tcs_private->new_upd_tuplestore; } return tuplestore; @@ -5980,6 +6033,7 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, */ if (row_trigger && transition_capture != NULL) { + TupleTableSlot *original_insert_tuple = transition_capture->tcs_original_insert_tuple; /* * Capture the old tuple in the appropriate transition table based on @@ -6010,17 +6064,15 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, newslot, transition_capture); TransitionTableAddTuple(estate, transition_capture, relinfo, - newslot, - transition_capture->tcs_original_insert_tuple, - new_tuplestore); + newslot, original_insert_tuple, new_tuplestore); } /* * If transition tables are the only reason we're here, return. As * mentioned above, we can also be here during update tuple routing in * presence of transition tables, in which case this function is - * called separately for oldtup and newtup, so we expect exactly one - * of them to be NULL. + * called separately for OLD and NEW, so we expect exactly one of them + * to be NULL. */ if (trigdesc == NULL || (event == TRIGGER_EVENT_DELETE && !trigdesc->trig_delete_after_row) || diff --git a/src/backend/executor/README b/src/backend/executor/README index bf5e70860d5..0b5183fc4a4 100644 --- a/src/backend/executor/README +++ b/src/backend/executor/README @@ -39,7 +39,7 @@ columns, combine the values into a new row, and apply the update. (For a heap table, the row-identity junk column is a CTID, but other things may be used for other table types.) For DELETE, the plan tree need only deliver junk row-identity column(s), and the ModifyTable node visits each of those -rows and marks the row deleted. +rows and marks the row deleted. MERGE is described below. XXX a great deal more documentation needs to be written here... @@ -223,6 +223,45 @@ fast-path step types (EEOP_ASSIGN_*_VAR) to handle targetlist entries that are simple Vars using only one step instead of two. +MERGE +----- + +MERGE is a multiple-table, multiple-action command: It specifies a target +table and a source relation, and can contain multiple WHEN MATCHED and +WHEN NOT MATCHED clauses, each of which specifies one UPDATE, INSERT, +UPDATE, or DO NOTHING actions. The target table is modified by MERGE, +and the source relation supplies additional data for the actions. Each action +optionally specifies a qualifying expression that is evaluated for each tuple. + +In the planner, transform_MERGE_to_join constructs a join between the target +table and the source relation, with row-identifying junk columns from the target +table. This join is an outer join if the MERGE command contains any WHEN NOT +MATCHED clauses; the ModifyTable node fetches tuples from the plan tree of that +join. If the row-identifying columns in the fetched tuple are NULL, then the +source relation contains a tuple that is not matched by any tuples in the +target table, so the qualifying expression for each WHEN NOT MATCHED clause is +evaluated given that tuple as returned by the plan. If the expression returns +true, the action indicated by the clause is executed, and no further clauses +are evaluated. On the other hand, if the row-identifying columns are not +NULL, then the matching tuple from the target table can be fetched; qualifying +expression of each WHEN MATCHED clause is evaluated given both the fetched +tuple and the tuple returned by the plan. + +If no WHEN NOT MATCHED clauses are present, then the join constructed by +the planner is an inner join, and the row-identifying junk columns are +always non NULL. + +If WHEN MATCHED ends up processing a row that is concurrently updated or deleted, +EvalPlanQual (see below) is used to find the latest version of the row, and +that is re-fetched; if it exists, the search for a matching WHEN MATCHED clause +to use starts at the top. + +MERGE does not allow its own type of triggers, but instead fires UPDATE, DELETE, +and INSERT triggers: row triggers are fired for each row when an action is +executed for that row. Statement triggers are fired always, regardless of +whether any rows match the corresponding clauses. + + Memory Management ----------------- diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 473d2e00a2f..ef2fd46092e 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -233,6 +233,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: + case CMD_MERGE: estate->es_output_cid = GetCurrentCommandId(true); break; @@ -1244,6 +1245,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_ReturningSlot = NULL; resultRelInfo->ri_TrigOldSlot = NULL; resultRelInfo->ri_TrigNewSlot = NULL; + resultRelInfo->ri_matchedMergeAction = NIL; + resultRelInfo->ri_notMatchedMergeAction = NIL; /* * Only ExecInitPartitionInfo() and ExecInitPartitionDispatchInfo() pass @@ -2142,6 +2145,19 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo, errmsg("new row violates row-level security policy for table \"%s\"", wco->relname))); break; + case WCO_RLS_MERGE_UPDATE_CHECK: + case WCO_RLS_MERGE_DELETE_CHECK: + if (wco->polname != NULL) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("target row violates row-level security policy \"%s\" (USING expression) for table \"%s\"", + wco->polname, wco->relname))); + else + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("target row violates row-level security policy (USING expression) for table \"%s\"", + wco->relname))); + break; case WCO_RLS_CONFLICT_CHECK: if (wco->polname != NULL) ereport(ERROR, diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 90ed1485d17..aca42ca5b8c 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -20,6 +20,7 @@ #include "catalog/pg_type.h" #include "executor/execPartition.h" #include "executor/executor.h" +#include "executor/nodeModifyTable.h" #include "foreign/fdwapi.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -182,6 +183,7 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel, bool *isnull, int maxfieldlen); static List *adjust_partition_colnos(List *colnos, ResultRelInfo *leaf_part_rri); +static List *adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap); static void ExecInitPruningContext(PartitionPruneContext *context, List *pruning_steps, PartitionDesc partdesc, @@ -853,6 +855,99 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, lappend(estate->es_tuple_routing_result_relations, leaf_part_rri); + /* + * Initialize information about this partition that's needed to handle + * MERGE. We take the "first" result relation's mergeActionList as + * reference and make copy for this relation, converting stuff that + * references attribute numbers to match this relation's. + * + * This duplicates much of the logic in ExecInitMerge(), so something + * changes there, look here too. + */ + if (node && node->operation == CMD_MERGE) + { + List *firstMergeActionList = linitial(node->mergeActionLists); + ListCell *lc; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + + if (part_attmap == NULL) + part_attmap = + build_attrmap_by_name(RelationGetDescr(partrel), + RelationGetDescr(firstResultRel)); + + if (unlikely(!leaf_part_rri->ri_projectNewInfoValid)) + ExecInitMergeTupleSlots(mtstate, leaf_part_rri); + + foreach(lc, firstMergeActionList) + { + /* Make a copy for this relation to be safe. */ + MergeAction *action = copyObject(lfirst(lc)); + MergeActionState *action_state; + List **list; + + /* Generate the action's state for this relation */ + action_state = makeNode(MergeActionState); + action_state->mas_action = action; + + /* And put the action in the appropriate list */ + if (action->matched) + list = &leaf_part_rri->ri_matchedMergeAction; + else + list = &leaf_part_rri->ri_notMatchedMergeAction; + *list = lappend(*list, action_state); + + switch (action->commandType) + { + case CMD_INSERT: + + /* + * ExecCheckPlanOutput() already done on the targetlist + * when "first" result relation initialized and it is same + * for all result relations. + */ + action_state->mas_proj = + ExecBuildProjectionInfo(action->targetList, econtext, + leaf_part_rri->ri_newTupleSlot, + &mtstate->ps, + RelationGetDescr(partrel)); + break; + case CMD_UPDATE: + + /* + * Convert updateColnos from "first" result relation + * attribute numbers to this result rel's. + */ + if (part_attmap) + action->updateColnos = + adjust_partition_colnos_using_map(action->updateColnos, + part_attmap); + action_state->mas_proj = + ExecBuildUpdateProjection(action->targetList, + true, + action->updateColnos, + RelationGetDescr(leaf_part_rri->ri_RelationDesc), + econtext, + leaf_part_rri->ri_newTupleSlot, + NULL); + break; + case CMD_DELETE: + break; + + default: + elog(ERROR, "unknown action in MERGE WHEN clause"); + } + + /* found_whole_row intentionally ignored. */ + action->qual = + map_variable_attnos(action->qual, + firstVarno, 0, + part_attmap, + RelationGetForm(partrel)->reltype, + &found_whole_row); + action_state->mas_whenqual = + ExecInitQual((List *) action->qual, &mtstate->ps); + } + } MemoryContextSwitchTo(oldcxt); return leaf_part_rri; @@ -1433,13 +1528,23 @@ ExecBuildSlotPartitionKeyDescription(Relation rel, static List * adjust_partition_colnos(List *colnos, ResultRelInfo *leaf_part_rri) { - List *new_colnos = NIL; TupleConversionMap *map = ExecGetChildToRootMap(leaf_part_rri); - AttrMap *attrMap; + + return adjust_partition_colnos_using_map(colnos, map->attrMap); +} + +/* + * adjust_partition_colnos_using_map + * Like adjust_partition_colnos, but uses a caller-supplied map instead + * of assuming to map from the "root" result relation. + */ +static List * +adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap) +{ + List *new_colnos = NIL; ListCell *lc; - Assert(map != NULL); /* else we shouldn't be here */ - attrMap = map->attrMap; + Assert(attrMap != NULL); /* else we shouldn't be here */ foreach(lc, colnos) { diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 1a4fbdc38c6..228e3547012 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -486,7 +486,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, resultRelInfo->ri_TrigDesc->trig_update_before_row) { if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, - tid, NULL, slot)) + tid, NULL, slot, NULL)) skip_tuple = true; /* "do nothing" */ } diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 701fe052967..171575cd73b 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -24,11 +24,20 @@ * values plus row-locating info for UPDATE and MERGE cases, or just the * row-locating info for DELETE cases. * + * MERGE runs a join between the source relation and the target + * table; if any WHEN NOT MATCHED clauses are present, then the + * join is an outer join. In this case, any unmatched tuples will + * have NULL row-locating info, and only INSERT can be run. But for + * matched tuples, then row-locating info is used to determine the + * tuple to UPDATE or DELETE. When all clauses are WHEN MATCHED, + * then an inner join is used, so all tuples contain row-locating info. + * * If the query specifies RETURNING, then the ModifyTable returns a * RETURNING tuple after completing each row insert, update, or delete. * It must be called again to continue the operation. Without RETURNING, * we just loop within the node until all the work is done, then - * return NULL. This avoids useless call/return overhead. + * return NULL. This avoids useless call/return overhead. (MERGE does + * not support RETURNING.) */ #include "postgres.h" @@ -78,6 +87,17 @@ typedef struct ModifyTableContext */ TupleTableSlot *planSlot; + /* + * During EvalPlanQual, project and return the new version of the new + * tuple + */ + TupleTableSlot *(*GetUpdateNewTuple) (ResultRelInfo *resultRelInfo, + TupleTableSlot *epqslot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + + /* MERGE specific */ + MergeActionState *relaction; /* MERGE action in progress */ /* * Information about the changes that were made concurrently to a tuple @@ -140,6 +160,28 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate, ResultRelInfo *targetRelInfo, TupleTableSlot *slot, ResultRelInfo **partRelInfo); +static TupleTableSlot *internalGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + +static TupleTableSlot *ExecMerge(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + bool canSetTag); +static void ExecInitMerge(ModifyTableState *mtstate, EState *estate); +static bool ExecMergeMatched(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + bool canSetTag); +static void ExecMergeNotMatched(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + bool canSetTag); +static TupleTableSlot *mergeGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + /* * Verify that the tuples to be produced by INSERT match the @@ -616,21 +658,32 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, TupleTableSlot *oldSlot) { - ProjectionInfo *newProj = relinfo->ri_projectNew; - ExprContext *econtext; - /* Use a few extra Asserts to protect against outside callers */ Assert(relinfo->ri_projectNewInfoValid); Assert(planSlot != NULL && !TTS_EMPTY(planSlot)); Assert(oldSlot != NULL && !TTS_EMPTY(oldSlot)); + return internalGetUpdateNewTuple(relinfo, planSlot, oldSlot, NULL); +} + +/* + * Callback for ModifyTableState->GetUpdateNewTuple for use by regular UPDATE. + */ +static TupleTableSlot * +internalGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction) +{ + ProjectionInfo *newProj = relinfo->ri_projectNew; + ExprContext *econtext; + econtext = newProj->pi_exprContext; econtext->ecxt_outertuple = planSlot; econtext->ecxt_scantuple = oldSlot; return ExecProject(newProj); } - /* ---------------------------------------------------------------- * ExecInsert * @@ -847,9 +900,17 @@ ExecInsert(ModifyTableContext *context, * partition, we should instead check UPDATE policies, because we are * executing policies defined on the target table, and not those * defined on the child partitions. + * + * If we're running MERGE, we refer to the action that we're executing + * to know if we're doing an INSERT or UPDATE to a partition table. */ - wco_kind = (mtstate->operation == CMD_UPDATE) ? - WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; + if (mtstate->operation == CMD_UPDATE) + wco_kind = WCO_RLS_UPDATE_CHECK; + else if (mtstate->operation == CMD_MERGE) + wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ? + WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; + else + wco_kind = WCO_RLS_INSERT_CHECK; /* * ExecWithCheckOptions() will skip any WCOs which are not of the kind @@ -1453,7 +1514,13 @@ ldelete:; ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent delete"))); - /* tuple already deleted; nothing to do */ + + /* + * tuple already deleted; nothing to do. But MERGE might want + * to handle it differently. We've already filled-in + * actionInfo with sufficient information for MERGE to look + * at. + */ return NULL; default: @@ -1659,7 +1726,8 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, elog(ERROR, "failed to fetch tuple being updated"); /* and project the new tuple to retry the UPDATE with */ context->cpUpdateRetrySlot = - ExecGetUpdateNewTuple(resultRelInfo, epqslot, oldSlot); + context->GetUpdateNewTuple(resultRelInfo, epqslot, oldSlot, + context->relaction); return false; } } @@ -1718,7 +1786,8 @@ ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) return ExecBRUpdateTriggers(context->estate, context->epqstate, - resultRelInfo, tupleid, oldtuple, slot); + resultRelInfo, tupleid, oldtuple, slot, + &context->tmfd); return true; } @@ -1865,6 +1934,13 @@ lreplace:; } /* + * No luck, a retry is needed. If running MERGE, we do not do so + * here; instead let it handle that on its own rules. + */ + if (context->relaction != NULL) + return TM_Updated; + + /* * ExecCrossPartitionUpdate installed an updated version of the new * tuple in the retry slot; start over. */ @@ -2109,8 +2185,8 @@ redo_act: /* * If ExecUpdateAct reports that a cross-partition update was done, - * then the returning tuple has been projected and there's nothing - * else for us to do. + * then the RETURNING tuple (if any) has been projected and there's + * nothing else for us to do. */ if (updateCxt.crossPartUpdate) return context->cpUpdateReturningSlot; @@ -2337,9 +2413,9 @@ ExecOnConflictUpdate(ModifyTableContext *context, * to break. * * It is the user's responsibility to prevent this situation from - * occurring. These problems are why SQL-2003 similarly specifies - * that for SQL MERGE, an exception must be raised in the event of - * an attempt to update the same row twice. + * occurring. These problems are why the SQL standard similarly + * specifies that for SQL MERGE, an exception must be raised in + * the event of an attempt to update the same row twice. */ xminDatum = slot_getsysattr(existing, MinTransactionIdAttributeNumber, @@ -2350,7 +2426,9 @@ ExecOnConflictUpdate(ModifyTableContext *context, if (TransactionIdIsCurrentTransactionId(xmin)) ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), - errmsg("ON CONFLICT DO UPDATE command cannot affect row a second time"), + /* translator: %s is a SQL command name */ + errmsg("%s command cannot affect row a second time", + "ON CONFLICT DO UPDATE"), errhint("Ensure that no rows proposed for insertion within the same command have duplicate constrained values."))); /* This shouldn't happen */ @@ -2490,6 +2568,705 @@ ExecOnConflictUpdate(ModifyTableContext *context, return true; } +/* + * Perform MERGE. + */ +static TupleTableSlot * +ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, bool canSetTag) +{ + bool matched; + + /*----- + * If we are dealing with a WHEN MATCHED case (tupleid is valid), we + * execute the first action for which the additional WHEN MATCHED AND + * quals pass. If an action without quals is found, that action is + * executed. + * + * Similarly, if we are dealing with WHEN NOT MATCHED case, we look at + * the given WHEN NOT MATCHED actions in sequence until one passes. + * + * Things get interesting in case of concurrent update/delete of the + * target tuple. Such concurrent update/delete is detected while we are + * executing a WHEN MATCHED action. + * + * A concurrent update can: + * + * 1. modify the target tuple so that it no longer satisfies the + * additional quals attached to the current WHEN MATCHED action + * + * In this case, we are still dealing with a WHEN MATCHED case. + * We recheck the list of WHEN MATCHED actions from the start and + * choose the first one that satisfies the new target tuple. + * + * 2. modify the target tuple so that the join quals no longer pass and + * hence the source tuple no longer has a match. + * + * In this case, the source tuple no longer matches the target tuple, + * so we now instead find a qualifying WHEN NOT MATCHED action to + * execute. + * + * XXX Hmmm, what if the updated tuple would now match one that was + * considered NOT MATCHED so far? + * + * A concurrent delete changes a WHEN MATCHED case to WHEN NOT MATCHED. + * + * ExecMergeMatched takes care of following the update chain and + * re-finding the qualifying WHEN MATCHED action, as long as the updated + * target tuple still satisfies the join quals, i.e., it remains a WHEN + * MATCHED case. If the tuple gets deleted or the join quals fail, it + * returns and we try ExecMergeNotMatched. Given that ExecMergeMatched + * always make progress by following the update chain and we never switch + * from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a + * livelock. + */ + matched = tupleid != NULL; + if (matched) + matched = ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag); + + /* + * Either we were dealing with a NOT MATCHED tuple or ExecMergeMatched() + * returned "false", indicating the previously MATCHED tuple no longer + * matches. + */ + if (!matched) + ExecMergeNotMatched(context, resultRelInfo, canSetTag); + + /* No RETURNING support yet */ + return NULL; +} + +/* + * Check and execute the first qualifying MATCHED action. The current target + * tuple is identified by tupleid. + * + * We start from the first WHEN MATCHED action and check if the WHEN quals + * pass, if any. If the WHEN quals for the first action do not pass, we + * check the second, then the third and so on. If we reach to the end, no + * action is taken and we return true, indicating that no further action is + * required for this tuple. + * + * If we do find a qualifying action, then we attempt to execute the action. + * + * If the tuple is concurrently updated, EvalPlanQual is run with the updated + * tuple to recheck the join quals. Note that the additional quals associated + * with individual actions are evaluated by this routine via ExecQual, while + * EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the + * updated tuple still passes the join quals, then we restart from the first + * action to look for a qualifying action. Otherwise, we return false -- + * meaning that a NOT MATCHED action must now be executed for the current + * source tuple. + */ +static bool +ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + TupleTableSlot *newslot; + EState *estate = context->estate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + bool isNull; + EPQState *epqstate = &mtstate->mt_epqstate; + ListCell *l; + + /* + * If there are no WHEN MATCHED actions, we are done. + */ + if (resultRelInfo->ri_matchedMergeAction == NIL) + return true; + + /* + * Make tuple and any needed join variables available to ExecQual and + * ExecProject. The target's existing tuple is installed in the scantuple. + * Again, this target relation's slot is required only in the case of a + * MATCHED tuple and UPDATE/DELETE actions. + */ + econtext->ecxt_scantuple = resultRelInfo->ri_oldTupleSlot; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + +lmerge_matched:; + + /* + * This routine is only invoked for matched rows, and we must have found + * the tupleid of the target row in that case; fetch that tuple. + * + * We use SnapshotAny for this because we might get called again after + * EvalPlanQual returns us a new tuple, which may not be visible to our + * MVCC snapshot. + */ + + if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc, + tupleid, + SnapshotAny, + resultRelInfo->ri_oldTupleSlot)) + elog(ERROR, "failed to fetch the target tuple"); + + foreach(l, resultRelInfo->ri_matchedMergeAction) + { + MergeActionState *relaction = (MergeActionState *) lfirst(l); + CmdType commandType = relaction->mas_action->commandType; + List *recheckIndexes = NIL; + TM_Result result; + UpdateContext updateCxt = {0}; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(relaction->mas_whenqual, econtext)) + continue; + + /* + * Check if the existing target tuple meets the USING checks of + * UPDATE/DELETE RLS policies. If those checks fail, we throw an + * error. + * + * The WITH CHECK quals are applied in ExecUpdate() and hence we need + * not do anything special to handle them. + * + * NOTE: We must do this after WHEN quals are evaluated, so that we + * check policies only when they matter. + */ + if (resultRelInfo->ri_WithCheckOptions) + { + ExecWithCheckOptions(commandType == CMD_UPDATE ? + WCO_RLS_MERGE_UPDATE_CHECK : WCO_RLS_MERGE_DELETE_CHECK, + resultRelInfo, + resultRelInfo->ri_oldTupleSlot, + context->mtstate->ps.state); + } + + /* Perform stated action */ + switch (commandType) + { + case CMD_UPDATE: + + /* + * Project the output tuple, and use that to update the table. + * We don't need to filter out junk attributes, because the + * UPDATE action's targetlist doesn't have any. + */ + newslot = ExecProject(relaction->mas_proj); + + context->relaction = relaction; + context->GetUpdateNewTuple = mergeGetUpdateNewTuple; + context->cpUpdateRetrySlot = NULL; + + if (!ExecUpdatePrologue(context, resultRelInfo, + tupleid, NULL, newslot)) + { + result = TM_Ok; + break; + } + ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate); + result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL, + newslot, mtstate->canSetTag, &updateCxt); + if (result == TM_Ok && updateCxt.updated) + { + ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, + tupleid, NULL, newslot, recheckIndexes); + mtstate->mt_merge_updated += 1; + } + + break; + + case CMD_DELETE: + context->relaction = relaction; + if (!ExecDeletePrologue(context, resultRelInfo, tupleid, + NULL, NULL)) + { + result = TM_Ok; + break; + } + result = ExecDeleteAct(context, resultRelInfo, tupleid, false); + if (result == TM_Ok) + { + ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL, + false); + mtstate->mt_merge_deleted += 1; + } + break; + + case CMD_NOTHING: + /* Doing nothing is always OK */ + result = TM_Ok; + break; + + default: + elog(ERROR, "unknown action in MERGE WHEN MATCHED clause"); + } + + switch (result) + { + case TM_Ok: + /* all good; perform final actions */ + if (canSetTag) + (estate->es_processed)++; + + break; + + case TM_SelfModified: + + /* + * The SQL standard disallows this for MERGE. + */ + if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax)) + ereport(ERROR, + (errcode(ERRCODE_CARDINALITY_VIOLATION), + /* translator: %s is a SQL command name */ + errmsg("%s command cannot affect row a second time", + "MERGE"), + errhint("Ensure that not more than one source row matches any one target row."))); + /* This shouldn't happen */ + elog(ERROR, "attempted to update or delete invisible tuple"); + break; + + case TM_Deleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent delete"))); + + /* + * If the tuple was already deleted, return to let caller + * handle it under NOT MATCHED clauses. + */ + return false; + + case TM_Updated: + { + Relation resultRelationDesc; + TupleTableSlot *epqslot, + *inputslot; + LockTupleMode lockmode; + + /* + * The target tuple was concurrently updated by some other + * transaction. + */ + + /* + * If cpUpdateRetrySlot is set, ExecCrossPartitionUpdate() + * must have detected that the tuple was concurrently + * updated, so we restart the search for an appropriate + * WHEN MATCHED clause to process the updated tuple. + * + * In this case, ExecDelete() would already have performed + * EvalPlanQual() on the latest version of the tuple, + * which in turn would already have been loaded into + * ri_oldTupleSlot, so no need to do either of those + * things. + * + * XXX why do we not check the WHEN NOT MATCHED list in + * this case? + */ + if (!TupIsNull(context->cpUpdateRetrySlot)) + goto lmerge_matched; + + /* + * Otherwise, we run the EvalPlanQual() with the new + * version of the tuple. If EvalPlanQual() does not return + * a tuple, then we switch to the NOT MATCHED list of + * actions. If it does return a tuple and the join qual is + * still satisfied, then we just need to recheck the + * MATCHED actions, starting from the top, and execute the + * first qualifying action. + */ + resultRelationDesc = resultRelInfo->ri_RelationDesc; + lockmode = ExecUpdateLockMode(estate, resultRelInfo); + + inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc, + resultRelInfo->ri_RangeTableIndex); + + result = table_tuple_lock(resultRelationDesc, tupleid, + estate->es_snapshot, + inputslot, estate->es_output_cid, + lockmode, LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + &context->tmfd); + switch (result) + { + case TM_Ok: + epqslot = EvalPlanQual(epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + + /* + * If we got no tuple, or the tuple we get has a + * NULL ctid, go back to caller: this one is not a + * MATCHED tuple anymore, so they can retry with + * NOT MATCHED actions. + */ + if (TupIsNull(epqslot)) + return false; + + (void) ExecGetJunkAttribute(epqslot, + resultRelInfo->ri_RowIdAttNo, + &isNull); + if (isNull) + return false; + + /* + * When a tuple was updated and migrated to + * another partition concurrently, the current + * MERGE implementation can't follow. There's + * probably a better way to handle this case, but + * it'd require recognizing the relation to which + * the tuple moved, and setting our current + * resultRelInfo to that. + */ + if (ItemPointerIndicatesMovedPartitions(&context->tmfd.ctid)) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be deleted was already moved to another partition due to concurrent update"))); + + /* + * A non-NULL ctid means that we are still dealing + * with MATCHED case. Restart the loop so that we + * apply all the MATCHED rules again, to ensure + * that the first qualifying WHEN MATCHED action + * is executed. + * + * Update tupleid to that of the new tuple, for + * the refetch we do at the top. + */ + ItemPointerCopy(&context->tmfd.ctid, tupleid); + goto lmerge_matched; + + case TM_Deleted: + + /* + * tuple already deleted; tell caller to run NOT + * MATCHED actions + */ + return false; + + case TM_SelfModified: + + /* + * This can be reached when following an update + * chain from a tuple updated by another session, + * reaching a tuple that was already updated in + * this transaction. If previously modified by + * this command, ignore the redundant update, + * otherwise error out. + * + * See also response to TM_SelfModified in + * ExecUpdate(). + */ + if (context->tmfd.cmax != estate->es_output_cid) + ereport(ERROR, + (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), + errmsg("tuple to be updated or deleted was already modified by an operation triggered by the current command"), + errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); + return false; + + default: + /* see table_tuple_lock call in ExecDelete() */ + elog(ERROR, "unexpected table_tuple_lock status: %u", + result); + return false; + } + } + + case TM_Invisible: + case TM_WouldBlock: + case TM_BeingModified: + /* these should not occur */ + elog(ERROR, "unexpected tuple operation result: %d", result); + break; + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } + + /* + * Successfully executed an action or no qualifying action was found. + */ + return true; +} + +/* + * Execute the first qualifying NOT MATCHED action. + */ +static void +ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + List *actionStates = NIL; + ListCell *l; + + /* + * For INSERT actions, the root relation's merge action is OK since the + * INSERT's targetlist and the WHEN conditions can only refer to the + * source relation and hence it does not matter which result relation we + * work with. + * + * XXX does this mean that we can avoid creating copies of actionStates on + * partitioned tables, for not-matched actions? + */ + actionStates = resultRelInfo->ri_notMatchedMergeAction; + + /* + * Make source tuple available to ExecQual and ExecProject. We don't need + * the target tuple, since the WHEN quals and targetlist can't refer to + * the target columns. + */ + econtext->ecxt_scantuple = NULL; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + + foreach(l, actionStates) + { + MergeActionState *action = (MergeActionState *) lfirst(l); + CmdType commandType = action->mas_action->commandType; + TupleTableSlot *newslot; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(action->mas_whenqual, econtext)) + continue; + + /* Perform stated action */ + switch (commandType) + { + case CMD_INSERT: + + /* + * Project the tuple. In case of a partitioned table, the + * projection was already built to use the root's descriptor, + * so we don't need to map the tuple here. + */ + newslot = ExecProject(action->mas_proj); + context->relaction = action; + + (void) ExecInsert(context, mtstate->rootResultRelInfo, newslot, + canSetTag, NULL, NULL); + mtstate->mt_merge_inserted += 1; + break; + case CMD_NOTHING: + /* Do nothing */ + break; + default: + elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause"); + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } +} + +/* + * Initialize state for execution of MERGE. + */ +void +ExecInitMerge(ModifyTableState *mtstate, EState *estate) +{ + ModifyTable *node = (ModifyTable *) mtstate->ps.plan; + ResultRelInfo *rootRelInfo = mtstate->rootResultRelInfo; + ResultRelInfo *resultRelInfo; + ExprContext *econtext; + ListCell *lc; + int i; + + if (node->mergeActionLists == NIL) + return; + + mtstate->mt_merge_subcommands = 0; + + if (mtstate->ps.ps_ExprContext == NULL) + ExecAssignExprContext(estate, &mtstate->ps); + econtext = mtstate->ps.ps_ExprContext; + + /* + * Create a MergeActionState for each action on the mergeActionList and + * add it to either a list of matched actions or not-matched actions. + * + * Similar logic appears in ExecInitPartitionInfo(), so if changing + * anything here, do so there too. + */ + i = 0; + foreach(lc, node->mergeActionLists) + { + List *mergeActionList = lfirst(lc); + TupleDesc relationDesc; + ListCell *l; + + resultRelInfo = mtstate->resultRelInfo + i; + i++; + relationDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + + /* initialize slots for MERGE fetches from this rel */ + if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) + ExecInitMergeTupleSlots(mtstate, resultRelInfo); + + foreach(l, mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + MergeActionState *action_state; + TupleTableSlot *tgtslot; + TupleDesc tgtdesc; + List **list; + + /* + * Build action merge state for this rel. (For partitions, + * equivalent code exists in ExecInitPartitionInfo.) + */ + action_state = makeNode(MergeActionState); + action_state->mas_action = action; + action_state->mas_whenqual = ExecInitQual((List *) action->qual, + &mtstate->ps); + + /* + * We create two lists - one for WHEN MATCHED actions and one for + * WHEN NOT MATCHED actions - and stick the MergeActionState into + * the appropriate list. + */ + if (action_state->mas_action->matched) + list = &resultRelInfo->ri_matchedMergeAction; + else + list = &resultRelInfo->ri_notMatchedMergeAction; + *list = lappend(*list, action_state); + + switch (action->commandType) + { + case CMD_INSERT: + ExecCheckPlanOutput(rootRelInfo->ri_RelationDesc, + action->targetList); + + /* + * If the MERGE targets a partitioned table, any INSERT + * actions must be routed through it, not the child + * relations. Initialize the routing struct and the root + * table's "new" tuple slot for that, if not already done. + * The projection we prepare, for all relations, uses the + * root relation descriptor, and targets the plan's root + * slot. (This is consistent with the fact that we + * checked the plan output to match the root relation, + * above.) + */ + if (rootRelInfo->ri_RelationDesc->rd_rel->relkind == + RELKIND_PARTITIONED_TABLE) + { + if (mtstate->mt_partition_tuple_routing == NULL) + { + /* + * Initialize planstate for routing if not already + * done. + * + * Note that the slot is managed as a standalone + * slot belonging to ModifyTableState, so we pass + * NULL for the 2nd argument. + */ + mtstate->mt_root_tuple_slot = + table_slot_create(rootRelInfo->ri_RelationDesc, + NULL); + mtstate->mt_partition_tuple_routing = + ExecSetupPartitionTupleRouting(estate, + rootRelInfo->ri_RelationDesc); + } + tgtslot = mtstate->mt_root_tuple_slot; + tgtdesc = RelationGetDescr(rootRelInfo->ri_RelationDesc); + } + else + { + /* not partitioned? use the stock relation and slot */ + tgtslot = resultRelInfo->ri_newTupleSlot; + tgtdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + } + + action_state->mas_proj = + ExecBuildProjectionInfo(action->targetList, econtext, + tgtslot, + &mtstate->ps, + tgtdesc); + + mtstate->mt_merge_subcommands |= MERGE_INSERT; + break; + case CMD_UPDATE: + action_state->mas_proj = + ExecBuildUpdateProjection(action->targetList, + true, + action->updateColnos, + relationDesc, + econtext, + resultRelInfo->ri_newTupleSlot, + &mtstate->ps); + mtstate->mt_merge_subcommands |= MERGE_UPDATE; + break; + case CMD_DELETE: + mtstate->mt_merge_subcommands |= MERGE_DELETE; + break; + case CMD_NOTHING: + break; + default: + elog(ERROR, "unknown operation"); + break; + } + } + } +} + +/* + * Initializes the tuple slots in a ResultRelInfo for any MERGE action. + * + * We mark 'projectNewInfoValid' even though the projections themselves + * are not initialized here. + */ +void +ExecInitMergeTupleSlots(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo) +{ + EState *estate = mtstate->ps.state; + + Assert(!resultRelInfo->ri_projectNewInfoValid); + + resultRelInfo->ri_oldTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_newTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_projectNewInfoValid = true; +} + +/* + * Callback for ModifyTableContext->GetUpdateNewTuple for use by MERGE. It + * computes the updated tuple by projecting from the current merge action's + * projection. + */ +static TupleTableSlot * +mergeGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction) +{ + ExprContext *econtext = relaction->mas_proj->pi_exprContext; + + econtext->ecxt_scantuple = oldSlot; + econtext->ecxt_innertuple = planSlot; + + return ExecProject(relaction->mas_proj); +} /* * Process BEFORE EACH STATEMENT triggers @@ -2514,6 +3291,14 @@ fireBSTriggers(ModifyTableState *node) case CMD_DELETE: ExecBSDeleteTriggers(node->ps.state, resultRelInfo); break; + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecBSInsertTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecBSUpdateTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecBSDeleteTriggers(node->ps.state, resultRelInfo); + break; default: elog(ERROR, "unknown operation"); break; @@ -2547,6 +3332,17 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, resultRelInfo, node->mt_transition_capture); break; + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecASDeleteTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecASUpdateTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecASInsertTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + break; default: elog(ERROR, "unknown operation"); break; @@ -2749,7 +3545,28 @@ ExecModifyTable(PlanState *pstate) datum = ExecGetJunkAttribute(planSlot, node->mt_resultOidAttno, &isNull); if (isNull) + { + /* + * For commands other than MERGE, any tuples having InvalidOid + * for tableoid are errors. For MERGE, we may need to handle + * them as WHEN NOT MATCHED clauses if any, so do that. + * + * Note that we use the node's toplevel resultRelInfo, not any + * specific partition's. + */ + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); + + context.planSlot = planSlot; + context.lockmode = 0; + + ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag); + continue; /* no RETURNING support yet */ + } + elog(ERROR, "tableoid is NULL"); + } resultoid = DatumGetObjectId(datum); /* If it's not the same as last time, we need to locate the rel */ @@ -2784,13 +3601,14 @@ ExecModifyTable(PlanState *pstate) oldtuple = NULL; /* - * For UPDATE/DELETE, fetch the row identity info for the tuple to be - * updated/deleted. For a heap relation, that's a TID; otherwise we - * may have a wholerow junk attr that carries the old tuple in toto. - * Keep this in step with the part of ExecInitModifyTable that sets up - * ri_RowIdAttNo. + * For UPDATE/DELETE/MERGE, fetch the row identity info for the tuple + * to be updated/deleted/merged. For a heap relation, that's a TID; + * otherwise we may have a wholerow junk attr that carries the old + * tuple in toto. Keep this in step with the part of + * ExecInitModifyTable that sets up ri_RowIdAttNo. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { char relkind; Datum datum; @@ -2806,9 +3624,30 @@ ExecModifyTable(PlanState *pstate) datum = ExecGetJunkAttribute(slot, resultRelInfo->ri_RowIdAttNo, &isNull); - /* shouldn't ever get a null result... */ + + /* + * For commands other than MERGE, any tuples having a null row + * identifier are errors. For MERGE, we may need to handle + * them as WHEN NOT MATCHED clauses if any, so do that. + * + * Note that we use the node's toplevel resultRelInfo, not any + * specific partition's. + */ if (isNull) + { + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); + + context.planSlot = planSlot; + context.lockmode = 0; + + ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag); + continue; /* no RETURNING support yet */ + } + elog(ERROR, "ctid is NULL"); + } tupleid = (ItemPointer) DatumGetPointer(datum); tuple_ctid = *tupleid; /* be sure we don't free ctid!! */ @@ -2898,8 +3737,10 @@ ExecModifyTable(PlanState *pstate) oldSlot)) elog(ERROR, "failed to fetch tuple being updated"); } - slot = ExecGetUpdateNewTuple(resultRelInfo, planSlot, - oldSlot); + slot = internalGetUpdateNewTuple(resultRelInfo, planSlot, + oldSlot, NULL); + context.GetUpdateNewTuple = internalGetUpdateNewTuple; + context.relaction = NULL; /* Now apply the update. */ slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple, @@ -2911,6 +3752,10 @@ ExecModifyTable(PlanState *pstate) true, false, node->canSetTag, NULL, NULL); break; + case CMD_MERGE: + slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag); + break; + default: elog(ERROR, "unknown operation"); break; @@ -3044,6 +3889,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->resultRelInfo = (ResultRelInfo *) palloc(nrels * sizeof(ResultRelInfo)); + mtstate->mt_merge_inserted = 0; + mtstate->mt_merge_updated = 0; + mtstate->mt_merge_deleted = 0; + /*---------- * Resolve the target relation. This is the same as: * @@ -3147,12 +3996,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } /* - * For UPDATE/DELETE, find the appropriate junk attr now, either a - * 'ctid' or 'wholerow' attribute depending on relkind. For foreign + * For UPDATE/DELETE/MERGE, find the appropriate junk attr now, either + * a 'ctid' or 'wholerow' attribute depending on relkind. For foreign * tables, the FDW might have created additional junk attr(s), but * those are no concern of ours. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { char relkind; @@ -3169,19 +4019,28 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) else if (relkind == RELKIND_FOREIGN_TABLE) { /* + * We don't support MERGE with foreign tables for now. (It's + * problematic because the implementation uses CTID.) + */ + Assert(operation != CMD_MERGE); + + /* * When there is a row-level trigger, there should be a * wholerow attribute. We also require it to be present in - * UPDATE, so we can get the values of unchanged columns. + * UPDATE and MERGE, so we can get the values of unchanged + * columns. */ resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(subplan->targetlist, "wholerow"); - if (mtstate->operation == CMD_UPDATE && + if ((mtstate->operation == CMD_UPDATE || mtstate->operation == CMD_MERGE) && !AttributeNumberIsValid(resultRelInfo->ri_RowIdAttNo)) elog(ERROR, "could not find junk wholerow column"); } else { + /* No support for MERGE */ + Assert(operation != CMD_MERGE); /* Other valid target relkinds must provide wholerow */ resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(subplan->targetlist, @@ -3193,10 +4052,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } /* - * If this is an inherited update/delete, there will be a junk attribute - * named "tableoid" present in the subplan's targetlist. It will be used - * to identify the result relation for a given tuple to be - * updated/deleted. + * If this is an inherited update/delete/merge, there will be a junk + * attribute named "tableoid" present in the subplan's targetlist. It + * will be used to identify the result relation for a given tuple to be + * updated/deleted/merged. */ mtstate->mt_resultOidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "tableoid"); @@ -3209,8 +4068,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* * Build state for tuple routing if it's a partitioned INSERT. An UPDATE - * might need this too, but only if it actually moves tuples between - * partitions; in that case setup is done by ExecCrossPartitionUpdate. + * or MERGE might need this too, but only if it actually moves tuples + * between partitions; in that case setup is done by + * ExecCrossPartitionUpdate. */ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && operation == CMD_INSERT) @@ -3379,6 +4239,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) arowmarks = lappend(arowmarks, aerm); } + /* For a MERGE command, initialize its state */ + if (mtstate->operation == CMD_MERGE) + ExecInitMerge(mtstate, estate); + EvalPlanQualSetPlan(&mtstate->mt_epqstate, subplan, arowmarks); /* diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index a82e9866670..042a5f8b0a2 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -2881,6 +2881,9 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount) else res = SPI_OK_UPDATE; break; + case CMD_MERGE: + res = SPI_OK_MERGE; + break; default: return SPI_ERROR_OPUNKNOWN; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 2cbd8aa0df1..c09172164b9 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -228,6 +228,7 @@ _copyModifyTable(const ModifyTable *from) COPY_NODE_FIELD(onConflictWhere); COPY_SCALAR_FIELD(exclRelRTI); COPY_NODE_FIELD(exclRelTlist); + COPY_NODE_FIELD(mergeActionLists); return newnode; } @@ -2888,6 +2889,35 @@ _copyCommonTableExpr(const CommonTableExpr *from) return newnode; } +static MergeWhenClause * +_copyMergeWhenClause(const MergeWhenClause *from) +{ + MergeWhenClause *newnode = makeNode(MergeWhenClause); + + COPY_SCALAR_FIELD(matched); + COPY_SCALAR_FIELD(commandType); + COPY_SCALAR_FIELD(override); + COPY_NODE_FIELD(condition); + COPY_NODE_FIELD(targetList); + COPY_NODE_FIELD(values); + return newnode; +} + +static MergeAction * +_copyMergeAction(const MergeAction *from) +{ + MergeAction *newnode = makeNode(MergeAction); + + COPY_SCALAR_FIELD(matched); + COPY_SCALAR_FIELD(commandType); + COPY_SCALAR_FIELD(override); + COPY_NODE_FIELD(qual); + COPY_NODE_FIELD(targetList); + COPY_NODE_FIELD(updateColnos); + + return newnode; +} + static A_Expr * _copyA_Expr(const A_Expr *from) { @@ -3394,6 +3424,8 @@ _copyQuery(const Query *from) COPY_NODE_FIELD(setOperations); COPY_NODE_FIELD(constraintDeps); COPY_NODE_FIELD(withCheckOptions); + COPY_NODE_FIELD(mergeActionList); + COPY_SCALAR_FIELD(mergeUseOuterJoin); COPY_LOCATION_FIELD(stmt_location); COPY_SCALAR_FIELD(stmt_len); @@ -3457,6 +3489,20 @@ _copyUpdateStmt(const UpdateStmt *from) return newnode; } +static MergeStmt * +_copyMergeStmt(const MergeStmt *from) +{ + MergeStmt *newnode = makeNode(MergeStmt); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(sourceRelation); + COPY_NODE_FIELD(joinCondition); + COPY_NODE_FIELD(mergeWhenClauses); + COPY_NODE_FIELD(withClause); + + return newnode; +} + static SelectStmt * _copySelectStmt(const SelectStmt *from) { @@ -5662,6 +5708,9 @@ copyObjectImpl(const void *from) case T_UpdateStmt: retval = _copyUpdateStmt(from); break; + case T_MergeStmt: + retval = _copyMergeStmt(from); + break; case T_SelectStmt: retval = _copySelectStmt(from); break; @@ -6136,6 +6185,12 @@ copyObjectImpl(const void *from) case T_CommonTableExpr: retval = _copyCommonTableExpr(from); break; + case T_MergeWhenClause: + retval = _copyMergeWhenClause(from); + break; + case T_MergeAction: + retval = _copyMergeAction(from); + break; case T_ObjectWithArgs: retval = _copyObjectWithArgs(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 9f17e15e150..3fb423be47a 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1146,6 +1146,8 @@ _equalQuery(const Query *a, const Query *b) COMPARE_NODE_FIELD(setOperations); COMPARE_NODE_FIELD(constraintDeps); COMPARE_NODE_FIELD(withCheckOptions); + COMPARE_NODE_FIELD(mergeActionList); + COMPARE_SCALAR_FIELD(mergeUseOuterJoin); COMPARE_LOCATION_FIELD(stmt_location); COMPARE_SCALAR_FIELD(stmt_len); @@ -1202,6 +1204,18 @@ _equalUpdateStmt(const UpdateStmt *a, const UpdateStmt *b) } static bool +_equalMergeStmt(const MergeStmt *a, const MergeStmt *b) +{ + COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(sourceRelation); + COMPARE_NODE_FIELD(joinCondition); + COMPARE_NODE_FIELD(mergeWhenClauses); + COMPARE_NODE_FIELD(withClause); + + return true; +} + +static bool _equalSelectStmt(const SelectStmt *a, const SelectStmt *b) { COMPARE_NODE_FIELD(distinctClause); @@ -3119,6 +3133,32 @@ _equalCommonTableExpr(const CommonTableExpr *a, const CommonTableExpr *b) } static bool +_equalMergeWhenClause(const MergeWhenClause *a, const MergeWhenClause *b) +{ + COMPARE_SCALAR_FIELD(matched); + COMPARE_SCALAR_FIELD(commandType); + COMPARE_SCALAR_FIELD(override); + COMPARE_NODE_FIELD(condition); + COMPARE_NODE_FIELD(targetList); + COMPARE_NODE_FIELD(values); + + return true; +} + +static bool +_equalMergeAction(const MergeAction *a, const MergeAction *b) +{ + COMPARE_SCALAR_FIELD(matched); + COMPARE_SCALAR_FIELD(commandType); + COMPARE_SCALAR_FIELD(override); + COMPARE_NODE_FIELD(qual); + COMPARE_NODE_FIELD(targetList); + COMPARE_NODE_FIELD(updateColnos); + + return true; +} + +static bool _equalXmlSerialize(const XmlSerialize *a, const XmlSerialize *b) { COMPARE_SCALAR_FIELD(xmloption); @@ -3576,6 +3616,9 @@ equal(const void *a, const void *b) case T_UpdateStmt: retval = _equalUpdateStmt(a, b); break; + case T_MergeStmt: + retval = _equalMergeStmt(a, b); + break; case T_SelectStmt: retval = _equalSelectStmt(a, b); break; @@ -4050,6 +4093,12 @@ equal(const void *a, const void *b) case T_CommonTableExpr: retval = _equalCommonTableExpr(a, b); break; + case T_MergeWhenClause: + retval = _equalMergeWhenClause(a, b); + break; + case T_MergeAction: + retval = _equalMergeAction(a, b); + break; case T_ObjectWithArgs: retval = _equalObjectWithArgs(a, b); break; diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index 25cf282aab2..50898246f96 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -2303,6 +2303,16 @@ expression_tree_walker(Node *node, return true; } break; + case T_MergeAction: + { + MergeAction *action = (MergeAction *) node; + + if (walker(action->targetList, context)) + return true; + if (walker(action->qual, context)) + return true; + } + break; case T_PartitionPruneStepOp: { PartitionPruneStepOp *opstep = (PartitionPruneStepOp *) node; @@ -2463,6 +2473,8 @@ query_tree_walker(Query *query, return true; if (walker((Node *) query->onConflict, context)) return true; + if (walker((Node *) query->mergeActionList, context)) + return true; if (walker((Node *) query->returningList, context)) return true; if (walker((Node *) query->jointree, context)) @@ -3252,6 +3264,18 @@ expression_tree_mutator(Node *node, return (Node *) newnode; } break; + case T_MergeAction: + { + MergeAction *action = (MergeAction *) node; + MergeAction *newnode; + + FLATCOPY(newnode, action, MergeAction); + MUTATE(newnode->qual, action->qual, Node *); + MUTATE(newnode->targetList, action->targetList, List *); + + return (Node *) newnode; + } + break; case T_PartitionPruneStepOp: { PartitionPruneStepOp *opstep = (PartitionPruneStepOp *) node; @@ -3464,6 +3488,7 @@ query_tree_mutator(Query *query, MUTATE(query->targetList, query->targetList, List *); MUTATE(query->withCheckOptions, query->withCheckOptions, List *); MUTATE(query->onConflict, query->onConflict, OnConflictExpr *); + MUTATE(query->mergeActionList, query->mergeActionList, List *); MUTATE(query->returningList, query->returningList, List *); MUTATE(query->jointree, query->jointree, FromExpr *); MUTATE(query->setOperations, query->setOperations, Node *); @@ -3656,9 +3681,9 @@ query_or_expression_tree_mutator(Node *node, * boundaries: we descend to everything that's possibly interesting. * * Currently, the node type coverage here extends only to DML statements - * (SELECT/INSERT/UPDATE/DELETE) and nodes that can appear in them, because - * this is used mainly during analysis of CTEs, and only DML statements can - * appear in CTEs. + * (SELECT/INSERT/UPDATE/DELETE/MERGE) and nodes that can appear in them, + * because this is used mainly during analysis of CTEs, and only DML + * statements can appear in CTEs. */ bool raw_expression_tree_walker(Node *node, @@ -3839,6 +3864,34 @@ raw_expression_tree_walker(Node *node, return true; } break; + case T_MergeStmt: + { + MergeStmt *stmt = (MergeStmt *) node; + + if (walker(stmt->relation, context)) + return true; + if (walker(stmt->sourceRelation, context)) + return true; + if (walker(stmt->joinCondition, context)) + return true; + if (walker(stmt->mergeWhenClauses, context)) + return true; + if (walker(stmt->withClause, context)) + return true; + } + break; + case T_MergeWhenClause: + { + MergeWhenClause *mergeWhenClause = (MergeWhenClause *) node; + + if (walker(mergeWhenClause->condition, context)) + return true; + if (walker(mergeWhenClause->targetList, context)) + return true; + if (walker(mergeWhenClause->values, context)) + return true; + } + break; case T_SelectStmt: { SelectStmt *stmt = (SelectStmt *) node; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index c25f0bd684c..0c01f350867 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -429,6 +429,7 @@ _outModifyTable(StringInfo str, const ModifyTable *node) WRITE_NODE_FIELD(onConflictWhere); WRITE_UINT_FIELD(exclRelRTI); WRITE_NODE_FIELD(exclRelTlist); + WRITE_NODE_FIELD(mergeActionLists); } static void @@ -2250,6 +2251,7 @@ _outModifyTablePath(StringInfo str, const ModifyTablePath *node) WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(onconflict); WRITE_INT_FIELD(epqParam); + WRITE_NODE_FIELD(mergeActionLists); } static void @@ -3143,6 +3145,8 @@ _outQuery(StringInfo str, const Query *node) WRITE_NODE_FIELD(setOperations); WRITE_NODE_FIELD(constraintDeps); WRITE_NODE_FIELD(withCheckOptions); + WRITE_NODE_FIELD(mergeActionList); + WRITE_BOOL_FIELD(mergeUseOuterJoin); WRITE_LOCATION_FIELD(stmt_location); WRITE_INT_FIELD(stmt_len); } @@ -3272,6 +3276,32 @@ _outCommonTableExpr(StringInfo str, const CommonTableExpr *node) } static void +_outMergeWhenClause(StringInfo str, const MergeWhenClause *node) +{ + WRITE_NODE_TYPE("MERGEWHENCLAUSE"); + + WRITE_BOOL_FIELD(matched); + WRITE_ENUM_FIELD(commandType, CmdType); + WRITE_ENUM_FIELD(override, OverridingKind); + WRITE_NODE_FIELD(condition); + WRITE_NODE_FIELD(targetList); + WRITE_NODE_FIELD(values); +} + +static void +_outMergeAction(StringInfo str, const MergeAction *node) +{ + WRITE_NODE_TYPE("MERGEACTION"); + + WRITE_BOOL_FIELD(matched); + WRITE_ENUM_FIELD(commandType, CmdType); + WRITE_ENUM_FIELD(override, OverridingKind); + WRITE_NODE_FIELD(qual); + WRITE_NODE_FIELD(targetList); + WRITE_NODE_FIELD(updateColnos); +} + +static void _outSetOperationStmt(StringInfo str, const SetOperationStmt *node) { WRITE_NODE_TYPE("SETOPERATIONSTMT"); @@ -4480,6 +4510,12 @@ outNode(StringInfo str, const void *obj) case T_CommonTableExpr: _outCommonTableExpr(str, obj); break; + case T_MergeWhenClause: + _outMergeWhenClause(str, obj); + break; + case T_MergeAction: + _outMergeAction(str, obj); + break; case T_SetOperationStmt: _outSetOperationStmt(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index e0b3ad1ed20..3ee8ba6f159 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -283,6 +283,8 @@ _readQuery(void) READ_NODE_FIELD(setOperations); READ_NODE_FIELD(constraintDeps); READ_NODE_FIELD(withCheckOptions); + READ_NODE_FIELD(mergeActionList); + READ_BOOL_FIELD(mergeUseOuterJoin); READ_LOCATION_FIELD(stmt_location); READ_INT_FIELD(stmt_len); @@ -473,6 +475,42 @@ _readCommonTableExpr(void) } /* + * _readMergeWhenClause + */ +static MergeWhenClause * +_readMergeWhenClause(void) +{ + READ_LOCALS(MergeWhenClause); + + READ_BOOL_FIELD(matched); + READ_ENUM_FIELD(commandType, CmdType); + READ_NODE_FIELD(condition); + READ_NODE_FIELD(targetList); + READ_NODE_FIELD(values); + READ_ENUM_FIELD(override, OverridingKind); + + READ_DONE(); +} + +/* + * _readMergeAction + */ +static MergeAction * +_readMergeAction(void) +{ + READ_LOCALS(MergeAction); + + READ_BOOL_FIELD(matched); + READ_ENUM_FIELD(commandType, CmdType); + READ_ENUM_FIELD(override, OverridingKind); + READ_NODE_FIELD(qual); + READ_NODE_FIELD(targetList); + READ_NODE_FIELD(updateColnos); + + READ_DONE(); +} + +/* * _readSetOperationStmt */ static SetOperationStmt * @@ -1765,6 +1803,7 @@ _readModifyTable(void) READ_NODE_FIELD(onConflictWhere); READ_UINT_FIELD(exclRelRTI); READ_NODE_FIELD(exclRelTlist); + READ_NODE_FIELD(mergeActionLists); READ_DONE(); } @@ -2809,6 +2848,10 @@ parseNodeString(void) return_value = _readCTECycleClause(); else if (MATCH("COMMONTABLEEXPR", 15)) return_value = _readCommonTableExpr(); + else if (MATCH("MERGEWHENCLAUSE", 15)) + return_value = _readMergeWhenClause(); + else if (MATCH("MERGEACTION", 11)) + return_value = _readMergeAction(); else if (MATCH("SETOPERATIONSTMT", 16)) return_value = _readSetOperationStmt(); else if (MATCH("ALIAS", 5)) diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index fa069a217c8..179c87c6714 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -310,7 +310,8 @@ static ModifyTable *make_modifytable(PlannerInfo *root, Plan *subplan, List *resultRelations, List *updateColnosLists, List *withCheckOptionLists, List *returningLists, - List *rowMarks, OnConflictExpr *onconflict, int epqParam); + List *rowMarks, OnConflictExpr *onconflict, + List *mergeActionList, int epqParam); static GatherMerge *create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path); @@ -2775,6 +2776,7 @@ create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path) best_path->returningLists, best_path->rowMarks, best_path->onconflict, + best_path->mergeActionLists, best_path->epqParam); copy_generic_path_info(&plan->plan, &best_path->path); @@ -6924,7 +6926,8 @@ make_modifytable(PlannerInfo *root, Plan *subplan, List *resultRelations, List *updateColnosLists, List *withCheckOptionLists, List *returningLists, - List *rowMarks, OnConflictExpr *onconflict, int epqParam) + List *rowMarks, OnConflictExpr *onconflict, + List *mergeActionLists, int epqParam) { ModifyTable *node = makeNode(ModifyTable); List *fdw_private_list; @@ -6932,9 +6935,10 @@ make_modifytable(PlannerInfo *root, Plan *subplan, ListCell *lc; int i; - Assert(operation == CMD_UPDATE ? - list_length(resultRelations) == list_length(updateColnosLists) : - updateColnosLists == NIL); + Assert(operation == CMD_MERGE || + (operation == CMD_UPDATE ? + list_length(resultRelations) == list_length(updateColnosLists) : + updateColnosLists == NIL)); Assert(withCheckOptionLists == NIL || list_length(resultRelations) == list_length(withCheckOptionLists)); Assert(returningLists == NIL || @@ -6992,6 +6996,7 @@ make_modifytable(PlannerInfo *root, Plan *subplan, node->withCheckOptionLists = withCheckOptionLists; node->returningLists = returningLists; node->rowMarks = rowMarks; + node->mergeActionLists = mergeActionLists; node->epqParam = epqParam; /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index bd09f85aea1..547fda20a23 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -650,6 +650,11 @@ subquery_planner(PlannerGlobal *glob, Query *parse, SS_process_ctes(root); /* + * If it's a MERGE command, transform the joinlist as appropriate. + */ + transform_MERGE_to_join(parse); + + /* * If the FROM clause is empty, replace it with a dummy RTE_RESULT RTE, so * that we don't need so many special cases to deal with that situation. */ @@ -849,6 +854,20 @@ subquery_planner(PlannerGlobal *glob, Query *parse, /* exclRelTlist contains only Vars, so no preprocessing needed */ } + foreach(l, parse->mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + + action->targetList = (List *) + preprocess_expression(root, + (Node *) action->targetList, + EXPRKIND_TARGET); + action->qual = + preprocess_expression(root, + (Node *) action->qual, + EXPRKIND_QUAL); + } + root->append_rel_list = (List *) preprocess_expression(root, (Node *) root->append_rel_list, EXPRKIND_APPINFO); @@ -1714,7 +1733,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) } /* - * If this is an INSERT/UPDATE/DELETE, add the ModifyTable node. + * If this is an INSERT/UPDATE/DELETE/MERGE, add the ModifyTable node. */ if (parse->commandType != CMD_SELECT) { @@ -1723,6 +1742,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) List *updateColnosLists = NIL; List *withCheckOptionLists = NIL; List *returningLists = NIL; + List *mergeActionLists = NIL; List *rowMarks; if (bms_membership(root->all_result_relids) == BMS_MULTIPLE) @@ -1789,6 +1809,43 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) returningLists = lappend(returningLists, returningList); } + if (parse->mergeActionList) + { + ListCell *l; + List *mergeActionList = NIL; + + /* + * Copy MergeActions and translate stuff that + * references attribute numbers. + */ + foreach(l, parse->mergeActionList) + { + MergeAction *action = lfirst(l), + *leaf_action = copyObject(action); + + leaf_action->qual = + adjust_appendrel_attrs_multilevel(root, + (Node *) action->qual, + this_result_rel->relids, + top_result_rel->relids); + leaf_action->targetList = (List *) + adjust_appendrel_attrs_multilevel(root, + (Node *) action->targetList, + this_result_rel->relids, + top_result_rel->relids); + if (leaf_action->commandType == CMD_UPDATE) + leaf_action->updateColnos = + adjust_inherited_attnums_multilevel(root, + action->updateColnos, + this_result_rel->relid, + top_result_rel->relid); + mergeActionList = lappend(mergeActionList, + leaf_action); + } + + mergeActionLists = lappend(mergeActionLists, + mergeActionList); + } } if (resultRelations == NIL) @@ -1811,6 +1868,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) withCheckOptionLists = list_make1(parse->withCheckOptions); if (parse->returningList) returningLists = list_make1(parse->returningList); + if (parse->mergeActionList) + mergeActionLists = list_make1(parse->mergeActionList); } } else @@ -1823,6 +1882,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) withCheckOptionLists = list_make1(parse->withCheckOptions); if (parse->returningList) returningLists = list_make1(parse->returningList); + if (parse->mergeActionList) + mergeActionLists = list_make1(parse->mergeActionList); } /* @@ -1859,6 +1920,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) returningLists, rowMarks, parse->onConflict, + mergeActionLists, assign_special_exec_param(root)); } diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index a7b11b7f03a..bf4c722c028 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -952,6 +952,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_ModifyTable: { ModifyTable *splan = (ModifyTable *) plan; + Plan *subplan = outerPlan(splan); Assert(splan->plan.targetlist == NIL); Assert(splan->plan.qual == NIL); @@ -963,7 +964,6 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) if (splan->returningLists) { List *newRL = NIL; - Plan *subplan = outerPlan(splan); ListCell *lcrl, *lcrr; @@ -1030,6 +1030,68 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) fix_scan_list(root, splan->exclRelTlist, rtoffset, 1); } + /* + * The MERGE statement produces the target rows by performing + * a right join between the target relation and the source + * relation (which could be a plain relation or a subquery). + * The INSERT and UPDATE actions of the MERGE statement + * require access to the columns from the source relation. We + * arrange things so that the source relation attributes are + * available as INNER_VAR and the target relation attributes + * are available from the scan tuple. + */ + if (splan->mergeActionLists != NIL) + { + ListCell *lca, + *lcr; + + /* + * Fix the targetList of individual action nodes so that + * the so-called "source relation" Vars are referenced as + * INNER_VAR. Note that for this to work correctly during + * execution, the ecxt_innertuple must be set to the tuple + * obtained by executing the subplan, which is what + * constitutes the "source relation". + * + * We leave the Vars from the result relation (i.e. the + * target relation) unchanged i.e. those Vars would be + * picked from the scan slot. So during execution, we must + * ensure that ecxt_scantuple is setup correctly to refer + * to the tuple from the target relation. + */ + indexed_tlist *itlist; + + itlist = build_tlist_index(subplan->targetlist); + + forboth(lca, splan->mergeActionLists, + lcr, splan->resultRelations) + { + List *mergeActionList = lfirst(lca); + Index resultrel = lfirst_int(lcr); + + foreach(l, mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + + /* Fix targetList of each action. */ + action->targetList = fix_join_expr(root, + action->targetList, + NULL, itlist, + resultrel, + rtoffset, + NUM_EXEC_TLIST(plan)); + + /* Fix quals too. */ + action->qual = (Node *) fix_join_expr(root, + (List *) action->qual, + NULL, itlist, + resultrel, + rtoffset, + NUM_EXEC_QUAL(plan)); + } + } + } + splan->nominalRelation += rtoffset; if (splan->rootRelation) splan->rootRelation += rtoffset; diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c index 74823e8437a..0bd99acf836 100644 --- a/src/backend/optimizer/prep/prepjointree.c +++ b/src/backend/optimizer/prep/prepjointree.c @@ -133,6 +133,86 @@ static Node *find_jointree_node_for_rel(Node *jtnode, int relid); /* + * transform_MERGE_to_join + * Replace a MERGE's jointree to also include the target relation. + */ +void +transform_MERGE_to_join(Query *parse) +{ + RangeTblEntry *joinrte; + JoinExpr *joinexpr; + JoinType jointype; + int joinrti; + List *vars; + + if (parse->commandType != CMD_MERGE) + return; + + /* XXX probably bogus */ + vars = NIL; + + /* + * When any WHEN NOT MATCHED THEN INSERT clauses exist, we need to use an + * outer join so that we process all unmatched tuples from the source + * relation. If none exist, we can use an inner join. + */ + if (parse->mergeUseOuterJoin) + jointype = JOIN_RIGHT; + else + jointype = JOIN_INNER; + + /* Manufacture a join RTE to use. */ + joinrte = makeNode(RangeTblEntry); + joinrte->rtekind = RTE_JOIN; + joinrte->jointype = jointype; + joinrte->joinmergedcols = 0; + joinrte->joinaliasvars = vars; + joinrte->joinleftcols = NIL; /* MERGE does not allow JOIN USING */ + joinrte->joinrightcols = NIL; /* ditto */ + joinrte->join_using_alias = NULL; + + joinrte->alias = NULL; + joinrte->eref = makeAlias("*MERGE*", NIL); + joinrte->lateral = false; + joinrte->inh = false; + joinrte->inFromCl = true; + joinrte->requiredPerms = 0; + joinrte->checkAsUser = InvalidOid; + joinrte->selectedCols = NULL; + joinrte->insertedCols = NULL; + joinrte->updatedCols = NULL; + joinrte->extraUpdatedCols = NULL; + joinrte->securityQuals = NIL; + + /* + * Add completed RTE to pstate's range table list, so that we know its + * index. + */ + parse->rtable = lappend(parse->rtable, joinrte); + joinrti = list_length(parse->rtable); + + /* + * Create a JOIN between the target and the source relation. + */ + joinexpr = makeNode(JoinExpr); + joinexpr->jointype = jointype; + joinexpr->isNatural = false; + joinexpr->larg = (Node *) makeNode(RangeTblRef); + ((RangeTblRef *) joinexpr->larg)->rtindex = parse->resultRelation; + joinexpr->rarg = linitial(parse->jointree->fromlist); /* original join */ + joinexpr->usingClause = NIL; + joinexpr->join_using_alias = NULL; + /* The quals are removed from the jointree and into this specific join */ + joinexpr->quals = parse->jointree->quals; + joinexpr->alias = NULL; + joinexpr->rtindex = joinrti; + + /* Make the new join be the sole entry in the query's jointree */ + parse->jointree->fromlist = list_make1(joinexpr); + parse->jointree->quals = NULL; +} + +/* * replace_empty_jointree * If the Query's jointree is empty, replace it with a dummy RTE_RESULT * relation. @@ -2058,6 +2138,17 @@ perform_pullup_replace_vars(PlannerInfo *root, * can't contain any references to a subquery. */ } + if (parse->mergeActionList) + { + foreach(lc, parse->mergeActionList) + { + MergeAction *action = lfirst(lc); + + action->qual = pullup_replace_vars(action->qual, rvcontext); + action->targetList = (List *) + pullup_replace_vars((Node *) action->targetList, rvcontext); + } + } replace_vars_in_jointree((Node *) parse->jointree, rvcontext, lowest_nulling_outer_join); Assert(parse->setOperations == NULL); diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index 95e82cf958f..99ab3d75594 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -125,6 +125,43 @@ preprocess_targetlist(PlannerInfo *root) } /* + * For MERGE we need to handle the target list for the target relation, + * and also target list for each action (only INSERT/UPDATE matter). + */ + if (command_type == CMD_MERGE) + { + ListCell *l; + + /* + * For MERGE, add any junk column(s) needed to allow the executor to + * identify the rows to be inserted or updated. + */ + root->processed_tlist = tlist; + add_row_identity_columns(root, result_relation, + target_rte, target_relation); + + tlist = root->processed_tlist; + + /* + * For MERGE, handle targetlist of each MergeAction separately. Give + * the same treatment to MergeAction->targetList as we would have + * given to a regular INSERT. For UPDATE, collect the column numbers + * being modified. + */ + foreach(l, parse->mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + + if (action->commandType == CMD_INSERT) + action->targetList = expand_insert_targetlist(action->targetList, + target_relation); + else if (action->commandType == CMD_UPDATE) + action->updateColnos = + extract_update_targetlist_colnos(action->targetList); + } + } + + /* * Add necessary junk columns for rowmarked rels. These values are needed * for locking of rels selected FOR UPDATE/SHARE, and to do EvalPlanQual * rechecking. See comments for PlanRowMark in plannodes.h. If you diff --git a/src/backend/optimizer/util/appendinfo.c b/src/backend/optimizer/util/appendinfo.c index 2f06fa743c2..9d4bb470270 100644 --- a/src/backend/optimizer/util/appendinfo.c +++ b/src/backend/optimizer/util/appendinfo.c @@ -774,8 +774,8 @@ add_row_identity_var(PlannerInfo *root, Var *orig_var, Assert(orig_var->varlevelsup == 0); /* - * If we're doing non-inherited UPDATE/DELETE, there's little need for - * ROWID_VAR shenanigans. Just shove the presented Var into the + * If we're doing non-inherited UPDATE/DELETE/MERGE, there's little need + * for ROWID_VAR shenanigans. Just shove the presented Var into the * processed_tlist, and we're done. */ if (rtindex == root->parse->resultRelation) @@ -862,14 +862,16 @@ add_row_identity_columns(PlannerInfo *root, Index rtindex, char relkind = target_relation->rd_rel->relkind; Var *var; - Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE); + Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE || commandType == CMD_MERGE); - if (relkind == RELKIND_RELATION || + if (commandType == CMD_MERGE || + relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW || relkind == RELKIND_PARTITIONED_TABLE) { /* - * Emit CTID so that executor can find the row to update or delete. + * Emit CTID so that executor can find the row to merge, update or + * delete. */ var = makeVar(rtindex, SelfItemPointerAttributeNumber, @@ -942,8 +944,11 @@ distribute_row_identity_vars(PlannerInfo *root) RelOptInfo *target_rel; ListCell *lc; - /* There's nothing to do if this isn't an inherited UPDATE/DELETE. */ - if (parse->commandType != CMD_UPDATE && parse->commandType != CMD_DELETE) + /* + * There's nothing to do if this isn't an inherited UPDATE/DELETE/MERGE. + */ + if (parse->commandType != CMD_UPDATE && parse->commandType != CMD_DELETE && + parse->commandType != CMD_MERGE) { Assert(root->row_identity_vars == NIL); return; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 5c32c96b71c..99df76b6b71 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3620,6 +3620,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, * 'rowMarks' is a list of PlanRowMarks (non-locking only) * 'onconflict' is the ON CONFLICT clause, or NULL * 'epqParam' is the ID of Param for EvalPlanQual re-eval + * 'mergeActionLists' is a list of lists of MERGE actions (one per rel) */ ModifyTablePath * create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, @@ -3631,13 +3632,14 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, List *updateColnosLists, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam) + List *mergeActionLists, int epqParam) { ModifyTablePath *pathnode = makeNode(ModifyTablePath); - Assert(operation == CMD_UPDATE ? - list_length(resultRelations) == list_length(updateColnosLists) : - updateColnosLists == NIL); + Assert(operation == CMD_MERGE || + (operation == CMD_UPDATE ? + list_length(resultRelations) == list_length(updateColnosLists) : + updateColnosLists == NIL)); Assert(withCheckOptionLists == NIL || list_length(resultRelations) == list_length(withCheckOptionLists)); Assert(returningLists == NIL || @@ -3697,6 +3699,7 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->rowMarks = rowMarks; pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; + pathnode->mergeActionLists = mergeActionLists; return pathnode; } diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c index a5002ad8955..df97b799174 100644 --- a/src/backend/optimizer/util/plancat.c +++ b/src/backend/optimizer/util/plancat.c @@ -2167,6 +2167,10 @@ has_row_triggers(PlannerInfo *root, Index rti, CmdType event) trigDesc->trig_delete_before_row)) result = true; break; + /* There is no separate event for MERGE, only INSERT/UPDATE/DELETE */ + case CMD_MERGE: + result = false; + break; default: elog(ERROR, "unrecognized CmdType: %d", (int) event); break; diff --git a/src/backend/parser/Makefile b/src/backend/parser/Makefile index 5ddb9a92f05..9f1c4022bbe 100644 --- a/src/backend/parser/Makefile +++ b/src/backend/parser/Makefile @@ -23,6 +23,7 @@ OBJS = \ parse_enr.o \ parse_expr.o \ parse_func.o \ + parse_merge.o \ parse_node.o \ parse_oper.o \ parse_param.o \ diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 61026753a3d..0144284aa35 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -39,6 +39,7 @@ #include "parser/parse_cte.h" #include "parser/parse_expr.h" #include "parser/parse_func.h" +#include "parser/parse_merge.h" #include "parser/parse_oper.h" #include "parser/parse_param.h" #include "parser/parse_relation.h" @@ -60,9 +61,6 @@ post_parse_analyze_hook_type post_parse_analyze_hook = NULL; static Query *transformOptionalSelectInto(ParseState *pstate, Node *parseTree); static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt); static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt); -static List *transformInsertRow(ParseState *pstate, List *exprlist, - List *stmtcols, List *icolumns, List *attrnos, - bool strip_indirection); static OnConflictExpr *transformOnConflictClause(ParseState *pstate, OnConflictClause *onConflictClause); static int count_rowexpr_columns(ParseState *pstate, Node *expr); @@ -76,8 +74,6 @@ static void determineRecursiveColTypes(ParseState *pstate, static Query *transformReturnStmt(ParseState *pstate, ReturnStmt *stmt); static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt); static List *transformReturningList(ParseState *pstate, List *returningList); -static List *transformUpdateTargetList(ParseState *pstate, - List *targetList); static Query *transformPLAssignStmt(ParseState *pstate, PLAssignStmt *stmt); static Query *transformDeclareCursorStmt(ParseState *pstate, @@ -330,6 +326,7 @@ transformStmt(ParseState *pstate, Node *parseTree) case T_InsertStmt: case T_UpdateStmt: case T_DeleteStmt: + case T_MergeStmt: (void) test_raw_expression_coverage(parseTree, NULL); break; default: @@ -354,6 +351,10 @@ transformStmt(ParseState *pstate, Node *parseTree) result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree); break; + case T_MergeStmt: + result = transformMergeStmt(pstate, (MergeStmt *) parseTree); + break; + case T_SelectStmt: { SelectStmt *n = (SelectStmt *) parseTree; @@ -438,6 +439,7 @@ analyze_requires_snapshot(RawStmt *parseTree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: case T_SelectStmt: case T_PLAssignStmt: result = true; @@ -956,7 +958,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * attrnos: integer column numbers (must be same length as icolumns) * strip_indirection: if true, remove any field/array assignment nodes */ -static List * +List * transformInsertRow(ParseState *pstate, List *exprlist, List *stmtcols, List *icolumns, List *attrnos, bool strip_indirection) @@ -1593,7 +1595,7 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt) * Generate a targetlist as though expanding "*" */ Assert(pstate->p_next_resno == 1); - qry->targetList = expandNSItemAttrs(pstate, nsitem, 0, -1); + qry->targetList = expandNSItemAttrs(pstate, nsitem, 0, true, -1); /* * The grammar allows attaching ORDER BY, LIMIT, and FOR UPDATE to a @@ -2418,9 +2420,9 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) /* * transformUpdateTargetList - - * handle SET clause in UPDATE/INSERT ... ON CONFLICT UPDATE + * handle SET clause in UPDATE/MERGE/INSERT ... ON CONFLICT UPDATE */ -static List * +List * transformUpdateTargetList(ParseState *pstate, List *origTlist) { List *tlist = NIL; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index c6613af9fe6..9399fff610f 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -278,6 +278,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; + MergeWhenClause *mergewhen; struct KeyActions *keyactions; struct KeyAction *keyaction; } @@ -307,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); DropTransformStmt DropUserMappingStmt ExplainStmt FetchStmt GrantStmt GrantRoleStmt ImportForeignSchemaStmt IndexStmt InsertStmt - ListenStmt LoadStmt LockStmt NotifyStmt ExplainableStmt PreparableStmt + ListenStmt LoadStmt LockStmt MergeStmt NotifyStmt ExplainableStmt PreparableStmt CreateFunctionStmt AlterFunctionStmt ReindexStmt RemoveAggrStmt RemoveFuncStmt RemoveOperStmt RenameStmt ReturnStmt RevokeStmt RevokeRoleStmt RuleActionStmt RuleActionStmtOrEmpty RuleStmt @@ -433,6 +434,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); any_operator expr_list attrs distinct_clause opt_distinct_clause target_list opt_target_list insert_column_list set_target_list + merge_values_clause set_clause_list set_clause def_list operator_def_list indirection opt_indirection reloption_list TriggerFuncArgs opclass_item_list opclass_drop_list @@ -506,6 +508,10 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <istmt> insert_rest %type <infer> opt_conf_expr %type <onconflict> opt_on_conflict +%type <mergewhen> merge_insert merge_update merge_delete + +%type <node> merge_when_clause opt_merge_when_condition +%type <list> merge_when_list %type <vsetstmt> generic_set set_rest set_rest_more generic_reset reset_rest SetResetClause FunctionSetResetClause @@ -734,7 +740,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED - MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE + MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE METHOD + MINUTE_P MINVALUE MODE MONTH_P MOVE NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NFC NFD NFKC NFKD NO NONE NORMALIZE NORMALIZED @@ -1061,6 +1068,7 @@ stmt: | RefreshMatViewStmt | LoadStmt | LockStmt + | MergeStmt | NotifyStmt | PrepareStmt | ReassignOwnedStmt @@ -11123,6 +11131,7 @@ ExplainableStmt: | InsertStmt | UpdateStmt | DeleteStmt + | MergeStmt | DeclareCursorStmt | CreateAsStmt | CreateMatViewStmt @@ -11155,7 +11164,8 @@ PreparableStmt: SelectStmt | InsertStmt | UpdateStmt - | DeleteStmt /* by default all are $$=$1 */ + | DeleteStmt + | MergeStmt /* by default all are $$=$1 */ ; /***************************************************************************** @@ -11543,6 +11553,166 @@ set_target_list: /***************************************************************************** * * QUERY: + * MERGE + * + *****************************************************************************/ + +MergeStmt: + opt_with_clause MERGE INTO relation_expr_opt_alias + USING table_ref + ON a_expr + merge_when_list + { + MergeStmt *m = makeNode(MergeStmt); + + m->withClause = $1; + m->relation = $4; + m->sourceRelation = $6; + m->joinCondition = $8; + m->mergeWhenClauses = $9; + + $$ = (Node *)m; + } + ; + +merge_when_list: + merge_when_clause { $$ = list_make1($1); } + | merge_when_list merge_when_clause { $$ = lappend($1,$2); } + ; + +merge_when_clause: + WHEN MATCHED opt_merge_when_condition THEN merge_update + { + $5->matched = true; + $5->condition = $3; + + $$ = (Node *) $5; + } + | WHEN MATCHED opt_merge_when_condition THEN merge_delete + { + $5->matched = true; + $5->condition = $3; + + $$ = (Node *) $5; + } + | WHEN NOT MATCHED opt_merge_when_condition THEN merge_insert + { + $6->matched = false; + $6->condition = $4; + + $$ = (Node *) $6; + } + | WHEN MATCHED opt_merge_when_condition THEN DO NOTHING + { + MergeWhenClause *m = makeNode(MergeWhenClause); + + m->matched = true; + m->commandType = CMD_NOTHING; + m->condition = $3; + + $$ = (Node *)m; + } + | WHEN NOT MATCHED opt_merge_when_condition THEN DO NOTHING + { + MergeWhenClause *m = makeNode(MergeWhenClause); + + m->matched = false; + m->commandType = CMD_NOTHING; + m->condition = $4; + + $$ = (Node *)m; + } + ; + +opt_merge_when_condition: + AND a_expr { $$ = $2; } + | { $$ = NULL; } + ; + +merge_update: + UPDATE SET set_clause_list + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_UPDATE; + n->override = OVERRIDING_NOT_SET; + n->targetList = $3; + n->values = NIL; + + $$ = n; + } + ; + +merge_delete: + DELETE_P + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_DELETE; + n->override = OVERRIDING_NOT_SET; + n->targetList = NIL; + n->values = NIL; + + $$ = n; + } + ; + +merge_insert: + INSERT merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = OVERRIDING_NOT_SET; + n->targetList = NIL; + n->values = $2; + $$ = n; + } + | INSERT OVERRIDING override_kind VALUE_P merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = $3; + n->targetList = NIL; + n->values = $5; + $$ = n; + } + | INSERT '(' insert_column_list ')' merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = OVERRIDING_NOT_SET; + n->targetList = $3; + n->values = $5; + $$ = n; + } + | INSERT '(' insert_column_list ')' OVERRIDING override_kind VALUE_P merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = $6; + n->targetList = $3; + n->values = $8; + $$ = n; + } + | INSERT DEFAULT VALUES + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = OVERRIDING_NOT_SET; + n->targetList = NIL; + n->values = NIL; + $$ = n; + } + ; + +merge_values_clause: + VALUES '(' expr_list ')' + { + $$ = $3; + } + ; + +/***************************************************************************** + * + * QUERY: * CURSOR STATEMENTS * *****************************************************************************/ @@ -16155,8 +16325,10 @@ unreserved_keyword: | LOGGED | MAPPING | MATCH + | MATCHED | MATERIALIZED | MAXVALUE + | MERGE | METHOD | MINUTE_P | MINVALUE @@ -16734,8 +16906,10 @@ bare_label_keyword: | LOGGED | MAPPING | MATCH + | MATCHED | MATERIALIZED | MAXVALUE + | MERGE | METHOD | MINVALUE | MODE diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index ded0a14d723..3ef9e8ee5e1 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -434,6 +434,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr) case EXPR_KIND_UPDATE_TARGET: errkind = true; break; + case EXPR_KIND_MERGE_WHEN: + if (isAgg) + err = _("aggregate functions are not allowed in MERGE WHEN conditions"); + else + err = _("grouping operations are not allowed in MERGE WHEN conditions"); + + break; case EXPR_KIND_GROUP_BY: errkind = true; break; @@ -879,6 +886,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc, case EXPR_KIND_UPDATE_TARGET: errkind = true; break; + case EXPR_KIND_MERGE_WHEN: + err = _("window functions are not allowed in MERGE WHEN conditions"); + break; case EXPR_KIND_GROUP_BY: errkind = true; break; diff --git a/src/backend/parser/parse_collate.c b/src/backend/parser/parse_collate.c index 6c793b72ec7..7582faabb37 100644 --- a/src/backend/parser/parse_collate.c +++ b/src/backend/parser/parse_collate.c @@ -485,6 +485,7 @@ assign_collations_walker(Node *node, assign_collations_context *context) case T_FromExpr: case T_OnConflictExpr: case T_SortGroupClause: + case T_MergeAction: (void) expression_tree_walker(node, assign_collations_walker, (void *) &loccontext); diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 781c9709e6d..84be354f714 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -513,6 +513,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) case EXPR_KIND_INSERT_TARGET: case EXPR_KIND_UPDATE_SOURCE: case EXPR_KIND_UPDATE_TARGET: + case EXPR_KIND_MERGE_WHEN: case EXPR_KIND_GROUP_BY: case EXPR_KIND_ORDER_BY: case EXPR_KIND_DISTINCT_ON: @@ -1748,6 +1749,7 @@ transformSubLink(ParseState *pstate, SubLink *sublink) case EXPR_KIND_INSERT_TARGET: case EXPR_KIND_UPDATE_SOURCE: case EXPR_KIND_UPDATE_TARGET: + case EXPR_KIND_MERGE_WHEN: case EXPR_KIND_GROUP_BY: case EXPR_KIND_ORDER_BY: case EXPR_KIND_DISTINCT_ON: @@ -3075,6 +3077,8 @@ ParseExprKindName(ParseExprKind exprKind) case EXPR_KIND_UPDATE_SOURCE: case EXPR_KIND_UPDATE_TARGET: return "UPDATE"; + case EXPR_KIND_MERGE_WHEN: + return "MERGE WHEN"; case EXPR_KIND_GROUP_BY: return "GROUP BY"; case EXPR_KIND_ORDER_BY: diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index d91951e1f6c..f71a682cd65 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -2611,6 +2611,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location) /* okay, since we process this like a SELECT tlist */ pstate->p_hasTargetSRFs = true; break; + case EXPR_KIND_MERGE_WHEN: + err = _("set-returning functions are not allowed in MERGE WHEN conditions"); + break; case EXPR_KIND_CHECK_CONSTRAINT: case EXPR_KIND_DOMAIN_CHECK: err = _("set-returning functions are not allowed in check constraints"); diff --git a/src/backend/parser/parse_merge.c b/src/backend/parser/parse_merge.c new file mode 100644 index 00000000000..5d0035a12b6 --- /dev/null +++ b/src/backend/parser/parse_merge.c @@ -0,0 +1,415 @@ +/*------------------------------------------------------------------------- + * + * parse_merge.c + * handle merge-statement in parser + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/parser/parse_merge.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/sysattr.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "parser/analyze.h" +#include "parser/parse_collate.h" +#include "parser/parsetree.h" +#include "parser/parser.h" +#include "parser/parse_clause.h" +#include "parser/parse_cte.h" +#include "parser/parse_expr.h" +#include "parser/parse_merge.h" +#include "parser/parse_relation.h" +#include "parser/parse_target.h" +#include "utils/rel.h" +#include "utils/relcache.h" + +static void setNamespaceForMergeWhen(ParseState *pstate, + MergeWhenClause *mergeWhenClause, + Index targetRTI, + Index sourceRTI); +static void setNamespaceVisibilityForRTE(List *namespace, RangeTblEntry *rte, + bool rel_visible, + bool cols_visible); + +/* + * Make appropriate changes to the namespace visibility while transforming + * individual action's quals and targetlist expressions. In particular, for + * INSERT actions we must only see the source relation (since INSERT action is + * invoked for NOT MATCHED tuples and hence there is no target tuple to deal + * with). On the other hand, UPDATE and DELETE actions can see both source and + * target relations. + * + * Also, since the internal join node can hide the source and target + * relations, we must explicitly make the respective relation as visible so + * that columns can be referenced unqualified from these relations. + */ +static void +setNamespaceForMergeWhen(ParseState *pstate, MergeWhenClause *mergeWhenClause, + Index targetRTI, Index sourceRTI) +{ + RangeTblEntry *targetRelRTE, + *sourceRelRTE; + + targetRelRTE = rt_fetch(targetRTI, pstate->p_rtable); + sourceRelRTE = rt_fetch(sourceRTI, pstate->p_rtable); + + if (mergeWhenClause->matched) + { + Assert(mergeWhenClause->commandType == CMD_UPDATE || + mergeWhenClause->commandType == CMD_DELETE || + mergeWhenClause->commandType == CMD_NOTHING); + + /* MATCHED actions can see both target and source relations. */ + setNamespaceVisibilityForRTE(pstate->p_namespace, + targetRelRTE, true, true); + setNamespaceVisibilityForRTE(pstate->p_namespace, + sourceRelRTE, true, true); + } + else + { + /* + * NOT MATCHED actions can't see target relation, but they can see + * source relation. + */ + Assert(mergeWhenClause->commandType == CMD_INSERT || + mergeWhenClause->commandType == CMD_NOTHING); + setNamespaceVisibilityForRTE(pstate->p_namespace, + targetRelRTE, false, false); + setNamespaceVisibilityForRTE(pstate->p_namespace, + sourceRelRTE, true, true); + } +} + +/* + * transformMergeStmt - + * transforms a MERGE statement + */ +Query * +transformMergeStmt(ParseState *pstate, MergeStmt *stmt) +{ + Query *qry = makeNode(Query); + ListCell *l; + AclMode targetPerms = ACL_NO_RIGHTS; + bool is_terminal[2]; + Index sourceRTI; + List *mergeActionList; + Node *joinExpr; + ParseNamespaceItem *nsitem; + + /* There can't be any outer WITH to worry about */ + Assert(pstate->p_ctenamespace == NIL); + + qry->commandType = CMD_MERGE; + qry->hasRecursive = false; + + /* process the WITH clause independently of all else */ + if (stmt->withClause) + { + if (stmt->withClause->recursive) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("WITH RECURSIVE is not supported for MERGE statement"))); + + qry->cteList = transformWithClause(pstate, stmt->withClause); + qry->hasModifyingCTE = pstate->p_hasModifyingCTE; + } + + /* + * Check WHEN clauses for permissions and sanity + */ + is_terminal[0] = false; + is_terminal[1] = false; + foreach(l, stmt->mergeWhenClauses) + { + MergeWhenClause *mergeWhenClause = (MergeWhenClause *) lfirst(l); + int when_type = (mergeWhenClause->matched ? 0 : 1); + + /* + * Collect action types so we can check target permissions + */ + switch (mergeWhenClause->commandType) + { + case CMD_INSERT: + targetPerms |= ACL_INSERT; + break; + case CMD_UPDATE: + targetPerms |= ACL_UPDATE; + break; + case CMD_DELETE: + targetPerms |= ACL_DELETE; + break; + case CMD_NOTHING: + break; + default: + elog(ERROR, "unknown action in MERGE WHEN clause"); + } + + /* + * Check for unreachable WHEN clauses + */ + if (mergeWhenClause->condition == NULL) + is_terminal[when_type] = true; + else if (is_terminal[when_type]) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unreachable WHEN clause specified after unconditional WHEN clause"))); + } + + /* Set up the MERGE target table. */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, + stmt->relation->inh, + false, targetPerms); + + /* + * MERGE is unsupported in various cases + */ + if (pstate->p_target_relation->rd_rel->relkind != RELKIND_RELATION && + pstate->p_target_relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute MERGE on relation \"%s\"", + RelationGetRelationName(pstate->p_target_relation)), + errdetail_relkind_not_supported(pstate->p_target_relation->rd_rel->relkind))); + if (pstate->p_target_relation->rd_rel->relhasrules) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute MERGE on relation \"%s\"", + RelationGetRelationName(pstate->p_target_relation)), + errdetail("MERGE is not supported for relations with rules."))); + + /* Now transform the source relation to produce the source RTE. */ + transformFromClause(pstate, + list_make1(stmt->sourceRelation)); + sourceRTI = list_length(pstate->p_rtable); + nsitem = GetNSItemByRangeTablePosn(pstate, sourceRTI, 0); + + /* + * Check that the target table doesn't conflict with the source table. + * This would typically be a checkNameSpaceConflicts call, but we want a + * more specific error message. + */ + if (strcmp(pstate->p_target_nsitem->p_names->aliasname, + nsitem->p_names->aliasname) == 0) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_ALIAS), + errmsg("name \"%s\" specified more than once", + pstate->p_target_nsitem->p_names->aliasname), + errdetail("The name is used both as MERGE target table and data source.")); + + qry->targetList = expandNSItemAttrs(pstate, nsitem, 0, false, + exprLocation(stmt->sourceRelation)); + + qry->rtable = pstate->p_rtable; + + /* + * Transform the join condition. This includes references to the target + * side, so add that to the namespace. + */ + addNSItemToQuery(pstate, pstate->p_target_nsitem, false, true, true); + joinExpr = transformExpr(pstate, stmt->joinCondition, + EXPR_KIND_JOIN_ON); + + /* + * Create the temporary query's jointree using the joinlist we built using + * just the source relation; the target relation is not included. The + * quals we use are the join conditions to the merge target. The join + * will be constructed fully by transform_MERGE_to_join. + */ + qry->jointree = makeFromExpr(pstate->p_joinlist, joinExpr); + + /* + * We now have a good query shape, so now look at the WHEN conditions and + * action targetlists. + * + * Overall, the MERGE Query's targetlist is NIL. + * + * Each individual action has its own targetlist that needs separate + * transformation. These transforms don't do anything to the overall + * targetlist, since that is only used for resjunk columns. + * + * We can reference any column in Target or Source, which is OK because + * both of those already have RTEs. There is nothing like the EXCLUDED + * pseudo-relation for INSERT ON CONFLICT. + */ + mergeActionList = NIL; + foreach(l, stmt->mergeWhenClauses) + { + MergeWhenClause *mergeWhenClause = lfirst_node(MergeWhenClause, l); + MergeAction *action; + + action = makeNode(MergeAction); + action->commandType = mergeWhenClause->commandType; + action->matched = mergeWhenClause->matched; + + /* Use an outer join if any INSERT actions exist in the command. */ + if (action->commandType == CMD_INSERT) + qry->mergeUseOuterJoin = true; + + /* + * Set namespace for the specific action. This must be done before + * analyzing the WHEN quals and the action targetlist. + */ + setNamespaceForMergeWhen(pstate, mergeWhenClause, + qry->resultRelation, + sourceRTI); + + /* + * Transform the WHEN condition. + * + * Note that these quals are NOT added to the join quals; instead they + * are evaluated separately during execution to decide which of the + * WHEN MATCHED or WHEN NOT MATCHED actions to execute. + */ + action->qual = transformWhereClause(pstate, mergeWhenClause->condition, + EXPR_KIND_MERGE_WHEN, "WHEN"); + + /* + * Transform target lists for each INSERT and UPDATE action stmt + */ + switch (action->commandType) + { + case CMD_INSERT: + { + List *exprList = NIL; + ListCell *lc; + RangeTblEntry *rte; + ListCell *icols; + ListCell *attnos; + List *icolumns; + List *attrnos; + + pstate->p_is_insert = true; + + icolumns = checkInsertTargets(pstate, + mergeWhenClause->targetList, + &attrnos); + Assert(list_length(icolumns) == list_length(attrnos)); + + action->override = mergeWhenClause->override; + + /* + * Handle INSERT much like in transformInsertStmt + */ + if (mergeWhenClause->values == NIL) + { + /* + * We have INSERT ... DEFAULT VALUES. We can handle + * this case by emitting an empty targetlist --- all + * columns will be defaulted when the planner expands + * the targetlist. + */ + exprList = NIL; + } + else + { + /* + * Process INSERT ... VALUES with a single VALUES + * sublist. We treat this case separately for + * efficiency. The sublist is just computed directly + * as the Query's targetlist, with no VALUES RTE. So + * it works just like a SELECT without any FROM. + */ + + /* + * Do basic expression transformation (same as a ROW() + * expr, but allow SetToDefault at top level) + */ + exprList = transformExpressionList(pstate, + mergeWhenClause->values, + EXPR_KIND_VALUES_SINGLE, + true); + + /* Prepare row for assignment to target table */ + exprList = transformInsertRow(pstate, exprList, + mergeWhenClause->targetList, + icolumns, attrnos, + false); + } + + /* + * Generate action's target list using the computed list + * of expressions. Also, mark all the target columns as + * needing insert permissions. + */ + rte = pstate->p_target_nsitem->p_rte; + forthree(lc, exprList, icols, icolumns, attnos, attrnos) + { + Expr *expr = (Expr *) lfirst(lc); + ResTarget *col = lfirst_node(ResTarget, icols); + AttrNumber attr_num = (AttrNumber) lfirst_int(attnos); + TargetEntry *tle; + + tle = makeTargetEntry(expr, + attr_num, + col->name, + false); + action->targetList = lappend(action->targetList, tle); + + rte->insertedCols = + bms_add_member(rte->insertedCols, + attr_num - FirstLowInvalidHeapAttributeNumber); + } + } + break; + case CMD_UPDATE: + { + pstate->p_is_insert = false; + action->targetList = + transformUpdateTargetList(pstate, + mergeWhenClause->targetList); + } + break; + case CMD_DELETE: + break; + + case CMD_NOTHING: + action->targetList = NIL; + break; + default: + elog(ERROR, "unknown action in MERGE WHEN clause"); + } + + mergeActionList = lappend(mergeActionList, action); + } + + qry->mergeActionList = mergeActionList; + + /* RETURNING could potentially be added in the future, but not in SQL std */ + qry->returningList = NULL; + + qry->hasTargetSRFs = false; + qry->hasSubLinks = pstate->p_hasSubLinks; + + assign_query_collations(pstate, qry); + + return qry; +} + +static void +setNamespaceVisibilityForRTE(List *namespace, RangeTblEntry *rte, + bool rel_visible, + bool cols_visible) +{ + ListCell *lc; + + foreach(lc, namespace) + { + ParseNamespaceItem *nsitem = (ParseNamespaceItem *) lfirst(lc); + + if (nsitem->p_rte == rte) + { + nsitem->p_rel_visible = rel_visible; + nsitem->p_cols_visible = cols_visible; + break; + } + } +} diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c index cb9e177b5e5..7efa5f15d72 100644 --- a/src/backend/parser/parse_relation.c +++ b/src/backend/parser/parse_relation.c @@ -701,6 +701,17 @@ scanNSItemForColumn(ParseState *pstate, ParseNamespaceItem *nsitem, colname), parser_errposition(pstate, location))); + /* + * In a MERGE WHEN condition, no system column is allowed except tableOid + */ + if (pstate->p_expr_kind == EXPR_KIND_MERGE_WHEN && + attnum < InvalidAttrNumber && attnum != TableOidAttributeNumber) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot use system column \"%s\" in MERGE WHEN condition", + colname), + parser_errposition(pstate, location))); + /* Found a valid match, so build a Var */ if (attnum > InvalidAttrNumber) { @@ -3095,11 +3106,12 @@ expandNSItemVars(ParseNamespaceItem *nsitem, * for the attributes of the nsitem * * pstate->p_next_resno determines the resnos assigned to the TLEs. - * The referenced columns are marked as requiring SELECT access. + * The referenced columns are marked as requiring SELECT access, if + * caller requests that. */ List * expandNSItemAttrs(ParseState *pstate, ParseNamespaceItem *nsitem, - int sublevels_up, int location) + int sublevels_up, bool require_col_privs, int location) { RangeTblEntry *rte = nsitem->p_rte; List *names, @@ -3133,8 +3145,11 @@ expandNSItemAttrs(ParseState *pstate, ParseNamespaceItem *nsitem, false); te_list = lappend(te_list, te); - /* Require read access to each column */ - markVarForSelectPriv(pstate, varnode); + if (require_col_privs) + { + /* Require read access to each column */ + markVarForSelectPriv(pstate, varnode); + } } Assert(name == NULL && var == NULL); /* lists not the same length? */ diff --git a/src/backend/parser/parse_target.c b/src/backend/parser/parse_target.c index 204d2857733..e6445c7bafe 100644 --- a/src/backend/parser/parse_target.c +++ b/src/backend/parser/parse_target.c @@ -1308,6 +1308,7 @@ ExpandAllTables(ParseState *pstate, int location) expandNSItemAttrs(pstate, nsitem, 0, + true, location)); } @@ -1370,7 +1371,7 @@ ExpandSingleTable(ParseState *pstate, ParseNamespaceItem *nsitem, if (make_target_entry) { /* expandNSItemAttrs handles permissions marking */ - return expandNSItemAttrs(pstate, nsitem, sublevels_up, location); + return expandNSItemAttrs(pstate, nsitem, sublevels_up, true, location); } else { diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 4eeed580b16..29ae27e5e32 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1643,6 +1643,10 @@ matchLocks(CmdType event, if (rulelocks == NULL) return NIL; + /* No rule support for MERGE */ + if (parsetree->commandType == CMD_MERGE) + return NIL; + if (parsetree->commandType != CMD_SELECT) { if (parsetree->resultRelation != varno) @@ -3671,8 +3675,8 @@ RewriteQuery(Query *parsetree, List *rewrite_events) } /* - * If the statement is an insert, update, or delete, adjust its targetlist - * as needed, and then fire INSERT/UPDATE/DELETE rules on it. + * If the statement is an insert, update, delete, or merge, adjust its + * targetlist as needed, and then fire INSERT/UPDATE/DELETE rules on it. * * SELECT rules are handled later when we have all the queries that should * get executed. Also, utilities aren't rewritten at all (do we still @@ -3770,6 +3774,7 @@ RewriteQuery(Query *parsetree, List *rewrite_events) } else if (event == CMD_UPDATE) { + Assert(parsetree->override == OVERRIDING_NOT_SET); parsetree->targetList = rewriteTargetListIU(parsetree->targetList, parsetree->commandType, @@ -3780,6 +3785,38 @@ RewriteQuery(Query *parsetree, List *rewrite_events) /* Also populate extraUpdatedCols (for generated columns) */ fill_extraUpdatedCols(rt_entry, rt_entry_relation); } + else if (event == CMD_MERGE) + { + Assert(parsetree->override == OVERRIDING_NOT_SET); + + /* + * Rewrite each action targetlist separately + */ + foreach(lc1, parsetree->mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(lc1); + + switch (action->commandType) + { + case CMD_NOTHING: + case CMD_DELETE: /* Nothing to do here */ + break; + case CMD_UPDATE: + case CMD_INSERT: + /* XXX is it possible to have a VALUES clause? */ + action->targetList = + rewriteTargetListIU(action->targetList, + action->commandType, + action->override, + rt_entry_relation, + NULL, 0, NULL); + break; + default: + elog(ERROR, "unrecognized commandType: %d", action->commandType); + break; + } + } + } else if (event == CMD_DELETE) { /* Nothing to do here */ diff --git a/src/backend/rewrite/rowsecurity.c b/src/backend/rewrite/rowsecurity.c index f0a046d65a6..a233dd47585 100644 --- a/src/backend/rewrite/rowsecurity.c +++ b/src/backend/rewrite/rowsecurity.c @@ -232,15 +232,17 @@ get_row_security_policies(Query *root, RangeTblEntry *rte, int rt_index, hasSubLinks); /* - * Similar to above, during an UPDATE or DELETE, if SELECT rights are also - * required (eg: when a RETURNING clause exists, or the user has provided - * a WHERE clause which involves columns from the relation), we collect up - * CMD_SELECT policies and add them via add_security_quals first. + * Similar to above, during an UPDATE, DELETE, or MERGE, if SELECT rights + * are also required (eg: when a RETURNING clause exists, or the user has + * provided a WHERE clause which involves columns from the relation), we + * collect up CMD_SELECT policies and add them via add_security_quals + * first. * * This way, we filter out any records which are not visible through an * ALL or SELECT USING policy. */ - if ((commandType == CMD_UPDATE || commandType == CMD_DELETE) && + if ((commandType == CMD_UPDATE || commandType == CMD_DELETE || + commandType == CMD_MERGE) && rte->requiredPerms & ACL_SELECT) { List *select_permissive_policies; @@ -380,6 +382,92 @@ get_row_security_policies(Query *root, RangeTblEntry *rte, int rt_index, } } + /* + * FOR MERGE, we fetch policies for UPDATE, DELETE and INSERT (and ALL) + * and set them up so that we can enforce the appropriate policy depending + * on the final action we take. + * + * We already fetched the SELECT policies above. + * + * We don't push the UPDATE/DELETE USING quals to the RTE because we don't + * really want to apply them while scanning the relation since we don't + * know whether we will be doing an UPDATE or a DELETE at the end. We + * apply the respective policy once we decide the final action on the + * target tuple. + * + * XXX We are setting up USING quals as WITH CHECK. If RLS prohibits + * UPDATE/DELETE on the target row, we shall throw an error instead of + * silently ignoring the row. This is different than how normal + * UPDATE/DELETE works and more in line with INSERT ON CONFLICT DO UPDATE + * handling. + */ + if (commandType == CMD_MERGE) + { + List *merge_permissive_policies; + List *merge_restrictive_policies; + + /* + * Fetch the UPDATE policies and set them up to execute on the + * existing target row before doing UPDATE. + */ + get_policies_for_relation(rel, CMD_UPDATE, user_id, + &merge_permissive_policies, + &merge_restrictive_policies); + + /* + * WCO_RLS_MERGE_UPDATE_CHECK is used to check UPDATE USING quals on + * the existing target row. + */ + add_with_check_options(rel, rt_index, + WCO_RLS_MERGE_UPDATE_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + true); + + /* + * Same with DELETE policies. + */ + get_policies_for_relation(rel, CMD_DELETE, user_id, + &merge_permissive_policies, + &merge_restrictive_policies); + + add_with_check_options(rel, rt_index, + WCO_RLS_MERGE_DELETE_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + true); + + /* + * No special handling is required for INSERT policies. They will be + * checked and enforced during ExecInsert(). But we must add them to + * withCheckOptions. + */ + get_policies_for_relation(rel, CMD_INSERT, user_id, + &merge_permissive_policies, + &merge_restrictive_policies); + + add_with_check_options(rel, rt_index, + WCO_RLS_INSERT_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + false); + + /* Enforce the WITH CHECK clauses of the UPDATE policies */ + add_with_check_options(rel, rt_index, + WCO_RLS_UPDATE_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + false); + } + table_close(rel, NoLock); /* @@ -444,6 +532,14 @@ get_policies_for_relation(Relation relation, CmdType cmd, Oid user_id, if (policy->polcmd == ACL_DELETE_CHR) cmd_matches = true; break; + case CMD_MERGE: + + /* + * We do not support a separate policy for MERGE command. + * Instead it derives from the policies defined for other + * commands. + */ + break; default: elog(ERROR, "unrecognized policy command type %d", (int) cmd); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 5f907831a3a..5aa5a350f38 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -178,6 +178,9 @@ ProcessQuery(PlannedStmt *plan, case CMD_DELETE: SetQueryCompletion(qc, CMDTAG_DELETE, queryDesc->estate->es_processed); break; + case CMD_MERGE: + SetQueryCompletion(qc, CMDTAG_MERGE, queryDesc->estate->es_processed); + break; default: SetQueryCompletion(qc, CMDTAG_UNKNOWN, queryDesc->estate->es_processed); break; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 3780c6e812e..f364a9b88a9 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -113,6 +113,7 @@ CommandIsReadOnly(PlannedStmt *pstmt) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: return false; case CMD_UTILITY: /* For now, treat all utility commands as read/write */ @@ -2124,6 +2125,8 @@ QueryReturnsTuples(Query *parsetree) case CMD_SELECT: /* returns tuples */ return true; + case CMD_MERGE: + return false; case CMD_INSERT: case CMD_UPDATE: case CMD_DELETE: @@ -2365,6 +2368,10 @@ CreateCommandTag(Node *parsetree) tag = CMDTAG_UPDATE; break; + case T_MergeStmt: + tag = CMDTAG_MERGE; + break; + case T_SelectStmt: tag = CMDTAG_SELECT; break; @@ -3125,6 +3132,9 @@ CreateCommandTag(Node *parsetree) case CMD_DELETE: tag = CMDTAG_DELETE; break; + case CMD_MERGE: + tag = CMDTAG_MERGE; + break; case CMD_UTILITY: tag = CreateCommandTag(stmt->utilityStmt); break; @@ -3185,6 +3195,9 @@ CreateCommandTag(Node *parsetree) case CMD_DELETE: tag = CMDTAG_DELETE; break; + case CMD_MERGE: + tag = CMDTAG_MERGE; + break; case CMD_UTILITY: tag = CreateCommandTag(stmt->utilityStmt); break; @@ -3233,6 +3246,7 @@ GetCommandLogLevel(Node *parsetree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: lev = LOGSTMT_MOD; break; @@ -3682,6 +3696,7 @@ GetCommandLogLevel(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: lev = LOGSTMT_MOD; break; @@ -3712,6 +3727,7 @@ GetCommandLogLevel(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: lev = LOGSTMT_MOD; break; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index df5c4865014..82dc849a301 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -4940,6 +4940,8 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) * For a WorkTableScan, locate the parent RecursiveUnion plan node and use * that as INNER referent. * + * For MERGE, make the inner tlist point to the merge source tlist, which + * is same as the targetlist that the ModifyTable's source plan provides. * For ON CONFLICT .. UPDATE we just need the inner tlist to point to the * excluded expression's tlist. (Similar to the SubqueryScan we don't want * to reuse OUTER, it's used for RETURNING in some modify table cases, @@ -4959,7 +4961,12 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) dpns->inner_plan = innerPlan(plan); if (IsA(plan, ModifyTable)) - dpns->inner_tlist = ((ModifyTable *) plan)->exclRelTlist; + { + if (((ModifyTable *) plan)->operation == CMD_MERGE) + dpns->inner_tlist = dpns->outer_plan->targetlist; + else + dpns->inner_tlist = ((ModifyTable *) plan)->exclRelTlist; + } else if (dpns->inner_plan) dpns->inner_tlist = dpns->inner_plan->targetlist; else |