From 7103ebb7aae8ab8076b7e85f335ceb8fe799097c Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Mon, 28 Mar 2022 16:45:58 +0200 Subject: Add support for MERGE SQL command MERGE performs actions that modify rows in the target table using a source table or query. MERGE provides a single SQL statement that can conditionally INSERT/UPDATE/DELETE rows -- a task that would otherwise require multiple PL statements. For example, MERGE INTO target AS t USING source AS s ON t.tid = s.sid WHEN MATCHED AND t.balance > s.delta THEN UPDATE SET balance = t.balance - s.delta WHEN MATCHED THEN DELETE WHEN NOT MATCHED AND s.delta > 0 THEN INSERT VALUES (s.sid, s.delta) WHEN NOT MATCHED THEN DO NOTHING; MERGE works with regular tables, partitioned tables and inheritance hierarchies, including column and row security enforcement, as well as support for row and statement triggers and transition tables therein. MERGE is optimized for OLTP and is parameterizable, though also useful for large scale ETL/ELT. MERGE is not intended to be used in preference to existing single SQL commands for INSERT, UPDATE or DELETE since there is some overhead. MERGE can be used from PL/pgSQL. MERGE does not support targetting updatable views or foreign tables, and RETURNING clauses are not allowed either. These limitations are likely fixable with sufficient effort. Rewrite rules are also not supported, but it's not clear that we'd want to support them. Author: Pavan Deolasee Author: Álvaro Herrera Author: Amit Langote Author: Simon Riggs Reviewed-by: Peter Eisentraut Reviewed-by: Andres Freund (earlier versions) Reviewed-by: Peter Geoghegan (earlier versions) Reviewed-by: Robert Haas (earlier versions) Reviewed-by: Japin Li Reviewed-by: Justin Pryzby Reviewed-by: Tomas Vondra Reviewed-by: Zhihong Yu Discussion: https://postgr.es/m/CANP8+jKitBSrB7oTgT9CY2i1ObfOt36z0XMraQc+Xrz8QB0nXA@mail.gmail.com Discussion: https://postgr.es/m/CAH2-WzkJdBuxj9PO=2QaO9-3h3xGbQPZ34kJH=HukRekwM-GZg@mail.gmail.com Discussion: https://postgr.es/m/20201231134736.GA25392@alvherre.pgsql --- src/backend/catalog/sql_features.txt | 6 +- src/backend/commands/explain.c | 35 ++ src/backend/commands/trigger.c | 126 ++-- src/backend/executor/README | 41 +- src/backend/executor/execMain.c | 16 + src/backend/executor/execPartition.c | 113 +++- src/backend/executor/execReplication.c | 2 +- src/backend/executor/nodeModifyTable.c | 936 ++++++++++++++++++++++++++++-- src/backend/executor/spi.c | 3 + src/backend/nodes/copyfuncs.c | 55 ++ src/backend/nodes/equalfuncs.c | 49 ++ src/backend/nodes/nodeFuncs.c | 59 +- src/backend/nodes/outfuncs.c | 36 ++ src/backend/nodes/readfuncs.c | 43 ++ src/backend/optimizer/plan/createplan.c | 15 +- src/backend/optimizer/plan/planner.c | 64 +- src/backend/optimizer/plan/setrefs.c | 64 +- src/backend/optimizer/prep/prepjointree.c | 91 +++ src/backend/optimizer/prep/preptlist.c | 37 ++ src/backend/optimizer/util/appendinfo.c | 19 +- src/backend/optimizer/util/pathnode.c | 11 +- src/backend/optimizer/util/plancat.c | 4 + src/backend/parser/Makefile | 1 + src/backend/parser/analyze.c | 20 +- src/backend/parser/gram.y | 180 +++++- src/backend/parser/parse_agg.c | 10 + src/backend/parser/parse_collate.c | 1 + src/backend/parser/parse_expr.c | 4 + src/backend/parser/parse_func.c | 3 + src/backend/parser/parse_merge.c | 415 +++++++++++++ src/backend/parser/parse_relation.c | 23 +- src/backend/parser/parse_target.c | 3 +- src/backend/rewrite/rewriteHandler.c | 41 +- src/backend/rewrite/rowsecurity.c | 106 +++- src/backend/tcop/pquery.c | 3 + src/backend/tcop/utility.c | 16 + src/backend/utils/adt/ruleutils.c | 9 +- 37 files changed, 2532 insertions(+), 128 deletions(-) create mode 100644 src/backend/parser/parse_merge.c (limited to 'src/backend') diff --git a/src/backend/catalog/sql_features.txt b/src/backend/catalog/sql_features.txt index 097d9c4784b..4c3e29111de 100644 --- a/src/backend/catalog/sql_features.txt +++ b/src/backend/catalog/sql_features.txt @@ -240,9 +240,9 @@ F311 Schema definition statement 02 CREATE TABLE for persistent base tables YES F311 Schema definition statement 03 CREATE VIEW YES F311 Schema definition statement 04 CREATE VIEW: WITH CHECK OPTION YES F311 Schema definition statement 05 GRANT statement YES -F312 MERGE statement NO consider INSERT ... ON CONFLICT DO UPDATE -F313 Enhanced MERGE statement NO -F314 MERGE statement with DELETE branch NO +F312 MERGE statement YES +F313 Enhanced MERGE statement YES +F314 MERGE statement with DELETE branch YES F321 User authorization YES F341 Usage tables YES F361 Subprogram support YES diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 9f632285b62..cb13227db1f 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1188,6 +1188,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case CMD_DELETE: pname = operation = "Delete"; break; + case CMD_MERGE: + pname = operation = "Merge"; + break; default: pname = "???"; break; @@ -3877,6 +3880,11 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors, operation = "Delete"; foperation = "Foreign Delete"; break; + case CMD_MERGE: + operation = "Merge"; + /* XXX unsupported for now, but avoid compiler noise */ + foperation = "Foreign Merge"; + break; default: operation = "???"; foperation = "Foreign ???"; @@ -3999,6 +4007,33 @@ show_modifytable_info(ModifyTableState *mtstate, List *ancestors, other_path, 0, es); } } + else if (node->operation == CMD_MERGE) + { + /* EXPLAIN ANALYZE display of tuples processed */ + if (es->analyze && mtstate->ps.instrument) + { + double total; + double insert_path; + double update_path; + double delete_path; + double skipped_path; + + InstrEndLoop(outerPlanState(mtstate)->instrument); + + /* count the number of source rows */ + total = outerPlanState(mtstate)->instrument->ntuples; + insert_path = mtstate->mt_merge_inserted; + update_path = mtstate->mt_merge_updated; + delete_path = mtstate->mt_merge_deleted; + skipped_path = total - insert_path - update_path - delete_path; + Assert(skipped_path >= 0); + + ExplainPropertyFloat("Tuples Inserted", NULL, insert_path, 0, es); + ExplainPropertyFloat("Tuples Updated", NULL, update_path, 0, es); + ExplainPropertyFloat("Tuples Deleted", NULL, delete_path, 0, es); + ExplainPropertyFloat("Tuples Skipped", NULL, skipped_path, 0, es); + } + } if (labeltargets) ExplainCloseGroup("Target Tables", "Target Tables", false, es); diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index fce79b02a57..13cb516752b 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -84,7 +84,8 @@ static bool GetTupleForTrigger(EState *estate, ItemPointer tid, LockTupleMode lockmode, TupleTableSlot *oldslot, - TupleTableSlot **newSlot); + TupleTableSlot **newSlot, + TM_FailureData *tmfpd); static bool TriggerEnabled(EState *estate, ResultRelInfo *relinfo, Trigger *trigger, TriggerEvent event, Bitmapset *modifiedCols, @@ -2713,7 +2714,8 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, TupleTableSlot *epqslot_candidate = NULL; if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid, - LockTupleExclusive, slot, &epqslot_candidate)) + LockTupleExclusive, slot, &epqslot_candidate, + NULL)) return false; /* @@ -2728,7 +2730,6 @@ ExecBRDeleteTriggers(EState *estate, EPQState *epqstate, } trigtuple = ExecFetchSlotHeapTuple(slot, true, &should_free); - } else { @@ -2804,6 +2805,7 @@ ExecARDeleteTriggers(EState *estate, tupleid, LockTupleExclusive, slot, + NULL, NULL); else ExecForceStoreHeapTuple(fdw_trigtuple, slot, false); @@ -2944,7 +2946,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, ResultRelInfo *relinfo, ItemPointer tupleid, HeapTuple fdw_trigtuple, - TupleTableSlot *newslot) + TupleTableSlot *newslot, + TM_FailureData *tmfd) { TriggerDesc *trigdesc = relinfo->ri_TrigDesc; TupleTableSlot *oldslot = ExecGetTriggerOldSlot(estate, relinfo); @@ -2967,7 +2970,8 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate, /* get a copy of the on-disk tuple we are planning to update */ if (!GetTupleForTrigger(estate, epqstate, relinfo, tupleid, - lockmode, oldslot, &epqslot_candidate)) + lockmode, oldslot, &epqslot_candidate, + tmfd)) return false; /* cancel the update action */ /* @@ -3121,6 +3125,7 @@ ExecARUpdateTriggers(EState *estate, ResultRelInfo *relinfo, tupleid, LockTupleExclusive, oldslot, + NULL, NULL); else if (fdw_trigtuple != NULL) ExecForceStoreHeapTuple(fdw_trigtuple, oldslot, false); @@ -3275,7 +3280,8 @@ GetTupleForTrigger(EState *estate, ItemPointer tid, LockTupleMode lockmode, TupleTableSlot *oldslot, - TupleTableSlot **epqslot) + TupleTableSlot **epqslot, + TM_FailureData *tmfdp) { Relation relation = relinfo->ri_RelationDesc; @@ -3301,6 +3307,10 @@ GetTupleForTrigger(EState *estate, lockflags, &tmfd); + /* Let the caller know about the status of this operation */ + if (tmfdp) + *tmfdp = tmfd; + switch (test) { case TM_SelfModified: @@ -3821,8 +3831,23 @@ struct AfterTriggersTableData bool before_trig_done; /* did we already queue BS triggers? */ bool after_trig_done; /* did we already queue AS triggers? */ AfterTriggerEventList after_trig_events; /* if so, saved list pointer */ - Tuplestorestate *old_tuplestore; /* "old" transition table, if any */ - Tuplestorestate *new_tuplestore; /* "new" transition table, if any */ + + /* + * We maintain separate transition tables for UPDATE/INSERT/DELETE since + * MERGE can run all three actions in a single statement. Note that UPDATE + * needs both old and new transition tables whereas INSERT needs only new, + * and DELETE needs only old. + */ + + /* "old" transition table for UPDATE, if any */ + Tuplestorestate *old_upd_tuplestore; + /* "new" transition table for UPDATE, if any */ + Tuplestorestate *new_upd_tuplestore; + /* "old" transition table for DELETE, if any */ + Tuplestorestate *old_del_tuplestore; + /* "new" transition table for INSERT, if any */ + Tuplestorestate *new_ins_tuplestore; + TupleTableSlot *storeslot; /* for converting to tuplestore's format */ }; @@ -4374,13 +4399,19 @@ AfterTriggerExecute(EState *estate, { if (LocTriggerData.tg_trigger->tgoldtable) { - LocTriggerData.tg_oldtable = evtshared->ats_table->old_tuplestore; + if (TRIGGER_FIRED_BY_UPDATE(evtshared->ats_event)) + LocTriggerData.tg_oldtable = evtshared->ats_table->old_upd_tuplestore; + else + LocTriggerData.tg_oldtable = evtshared->ats_table->old_del_tuplestore; evtshared->ats_table->closed = true; } if (LocTriggerData.tg_trigger->tgnewtable) { - LocTriggerData.tg_newtable = evtshared->ats_table->new_tuplestore; + if (TRIGGER_FIRED_BY_INSERT(evtshared->ats_event)) + LocTriggerData.tg_newtable = evtshared->ats_table->new_ins_tuplestore; + else + LocTriggerData.tg_newtable = evtshared->ats_table->new_upd_tuplestore; evtshared->ats_table->closed = true; } } @@ -4794,8 +4825,10 @@ TransitionCaptureState * MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType) { TransitionCaptureState *state; - bool need_old, - need_new; + bool need_old_upd, + need_new_upd, + need_old_del, + need_new_ins; AfterTriggersTableData *table; MemoryContext oldcxt; ResourceOwner saveResourceOwner; @@ -4807,23 +4840,31 @@ MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType) switch (cmdType) { case CMD_INSERT: - need_old = false; - need_new = trigdesc->trig_insert_new_table; + need_old_upd = need_old_del = need_new_upd = false; + need_new_ins = trigdesc->trig_insert_new_table; break; case CMD_UPDATE: - need_old = trigdesc->trig_update_old_table; - need_new = trigdesc->trig_update_new_table; + need_old_upd = trigdesc->trig_update_old_table; + need_new_upd = trigdesc->trig_update_new_table; + need_old_del = need_new_ins = false; break; case CMD_DELETE: - need_old = trigdesc->trig_delete_old_table; - need_new = false; + need_old_del = trigdesc->trig_delete_old_table; + need_old_upd = need_new_upd = need_new_ins = false; + break; + case CMD_MERGE: + need_old_upd = trigdesc->trig_update_old_table; + need_new_upd = trigdesc->trig_update_new_table; + need_old_del = trigdesc->trig_delete_old_table; + need_new_ins = trigdesc->trig_insert_new_table; break; default: elog(ERROR, "unexpected CmdType: %d", (int) cmdType); - need_old = need_new = false; /* keep compiler quiet */ + /* keep compiler quiet */ + need_old_upd = need_new_upd = need_old_del = need_new_ins = false; break; } - if (!need_old && !need_new) + if (!need_old_upd && !need_new_upd && !need_new_ins && !need_old_del) return NULL; /* Check state, like AfterTriggerSaveEvent. */ @@ -4853,10 +4894,14 @@ MakeTransitionCaptureState(TriggerDesc *trigdesc, Oid relid, CmdType cmdType) saveResourceOwner = CurrentResourceOwner; CurrentResourceOwner = CurTransactionResourceOwner; - if (need_old && table->old_tuplestore == NULL) - table->old_tuplestore = tuplestore_begin_heap(false, false, work_mem); - if (need_new && table->new_tuplestore == NULL) - table->new_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_old_upd && table->old_upd_tuplestore == NULL) + table->old_upd_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_new_upd && table->new_upd_tuplestore == NULL) + table->new_upd_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_old_del && table->old_del_tuplestore == NULL) + table->old_del_tuplestore = tuplestore_begin_heap(false, false, work_mem); + if (need_new_ins && table->new_ins_tuplestore == NULL) + table->new_ins_tuplestore = tuplestore_begin_heap(false, false, work_mem); CurrentResourceOwner = saveResourceOwner; MemoryContextSwitchTo(oldcxt); @@ -5045,12 +5090,20 @@ AfterTriggerFreeQuery(AfterTriggersQueryData *qs) { AfterTriggersTableData *table = (AfterTriggersTableData *) lfirst(lc); - ts = table->old_tuplestore; - table->old_tuplestore = NULL; + ts = table->old_upd_tuplestore; + table->old_upd_tuplestore = NULL; + if (ts) + tuplestore_end(ts); + ts = table->new_upd_tuplestore; + table->new_upd_tuplestore = NULL; + if (ts) + tuplestore_end(ts); + ts = table->old_del_tuplestore; + table->old_del_tuplestore = NULL; if (ts) tuplestore_end(ts); - ts = table->new_tuplestore; - table->new_tuplestore = NULL; + ts = table->new_ins_tuplestore; + table->new_ins_tuplestore = NULL; if (ts) tuplestore_end(ts); if (table->storeslot) @@ -5356,17 +5409,17 @@ GetAfterTriggersTransitionTable(int event, { Assert(TupIsNull(newslot)); if (event == TRIGGER_EVENT_DELETE && delete_old_table) - tuplestore = transition_capture->tcs_private->old_tuplestore; + tuplestore = transition_capture->tcs_private->old_del_tuplestore; else if (event == TRIGGER_EVENT_UPDATE && update_old_table) - tuplestore = transition_capture->tcs_private->old_tuplestore; + tuplestore = transition_capture->tcs_private->old_upd_tuplestore; } else if (!TupIsNull(newslot)) { Assert(TupIsNull(oldslot)); if (event == TRIGGER_EVENT_INSERT && insert_new_table) - tuplestore = transition_capture->tcs_private->new_tuplestore; + tuplestore = transition_capture->tcs_private->new_ins_tuplestore; else if (event == TRIGGER_EVENT_UPDATE && update_new_table) - tuplestore = transition_capture->tcs_private->new_tuplestore; + tuplestore = transition_capture->tcs_private->new_upd_tuplestore; } return tuplestore; @@ -5980,6 +6033,7 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, */ if (row_trigger && transition_capture != NULL) { + TupleTableSlot *original_insert_tuple = transition_capture->tcs_original_insert_tuple; /* * Capture the old tuple in the appropriate transition table based on @@ -6010,17 +6064,15 @@ AfterTriggerSaveEvent(EState *estate, ResultRelInfo *relinfo, newslot, transition_capture); TransitionTableAddTuple(estate, transition_capture, relinfo, - newslot, - transition_capture->tcs_original_insert_tuple, - new_tuplestore); + newslot, original_insert_tuple, new_tuplestore); } /* * If transition tables are the only reason we're here, return. As * mentioned above, we can also be here during update tuple routing in * presence of transition tables, in which case this function is - * called separately for oldtup and newtup, so we expect exactly one - * of them to be NULL. + * called separately for OLD and NEW, so we expect exactly one of them + * to be NULL. */ if (trigdesc == NULL || (event == TRIGGER_EVENT_DELETE && !trigdesc->trig_delete_after_row) || diff --git a/src/backend/executor/README b/src/backend/executor/README index bf5e70860d5..0b5183fc4a4 100644 --- a/src/backend/executor/README +++ b/src/backend/executor/README @@ -39,7 +39,7 @@ columns, combine the values into a new row, and apply the update. (For a heap table, the row-identity junk column is a CTID, but other things may be used for other table types.) For DELETE, the plan tree need only deliver junk row-identity column(s), and the ModifyTable node visits each of those -rows and marks the row deleted. +rows and marks the row deleted. MERGE is described below. XXX a great deal more documentation needs to be written here... @@ -223,6 +223,45 @@ fast-path step types (EEOP_ASSIGN_*_VAR) to handle targetlist entries that are simple Vars using only one step instead of two. +MERGE +----- + +MERGE is a multiple-table, multiple-action command: It specifies a target +table and a source relation, and can contain multiple WHEN MATCHED and +WHEN NOT MATCHED clauses, each of which specifies one UPDATE, INSERT, +UPDATE, or DO NOTHING actions. The target table is modified by MERGE, +and the source relation supplies additional data for the actions. Each action +optionally specifies a qualifying expression that is evaluated for each tuple. + +In the planner, transform_MERGE_to_join constructs a join between the target +table and the source relation, with row-identifying junk columns from the target +table. This join is an outer join if the MERGE command contains any WHEN NOT +MATCHED clauses; the ModifyTable node fetches tuples from the plan tree of that +join. If the row-identifying columns in the fetched tuple are NULL, then the +source relation contains a tuple that is not matched by any tuples in the +target table, so the qualifying expression for each WHEN NOT MATCHED clause is +evaluated given that tuple as returned by the plan. If the expression returns +true, the action indicated by the clause is executed, and no further clauses +are evaluated. On the other hand, if the row-identifying columns are not +NULL, then the matching tuple from the target table can be fetched; qualifying +expression of each WHEN MATCHED clause is evaluated given both the fetched +tuple and the tuple returned by the plan. + +If no WHEN NOT MATCHED clauses are present, then the join constructed by +the planner is an inner join, and the row-identifying junk columns are +always non NULL. + +If WHEN MATCHED ends up processing a row that is concurrently updated or deleted, +EvalPlanQual (see below) is used to find the latest version of the row, and +that is re-fetched; if it exists, the search for a matching WHEN MATCHED clause +to use starts at the top. + +MERGE does not allow its own type of triggers, but instead fires UPDATE, DELETE, +and INSERT triggers: row triggers are fired for each row when an action is +executed for that row. Statement triggers are fired always, regardless of +whether any rows match the corresponding clauses. + + Memory Management ----------------- diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 473d2e00a2f..ef2fd46092e 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -233,6 +233,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: + case CMD_MERGE: estate->es_output_cid = GetCurrentCommandId(true); break; @@ -1244,6 +1245,8 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo, resultRelInfo->ri_ReturningSlot = NULL; resultRelInfo->ri_TrigOldSlot = NULL; resultRelInfo->ri_TrigNewSlot = NULL; + resultRelInfo->ri_matchedMergeAction = NIL; + resultRelInfo->ri_notMatchedMergeAction = NIL; /* * Only ExecInitPartitionInfo() and ExecInitPartitionDispatchInfo() pass @@ -2142,6 +2145,19 @@ ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo, errmsg("new row violates row-level security policy for table \"%s\"", wco->relname))); break; + case WCO_RLS_MERGE_UPDATE_CHECK: + case WCO_RLS_MERGE_DELETE_CHECK: + if (wco->polname != NULL) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("target row violates row-level security policy \"%s\" (USING expression) for table \"%s\"", + wco->polname, wco->relname))); + else + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("target row violates row-level security policy (USING expression) for table \"%s\"", + wco->relname))); + break; case WCO_RLS_CONFLICT_CHECK: if (wco->polname != NULL) ereport(ERROR, diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index 90ed1485d17..aca42ca5b8c 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -20,6 +20,7 @@ #include "catalog/pg_type.h" #include "executor/execPartition.h" #include "executor/executor.h" +#include "executor/nodeModifyTable.h" #include "foreign/fdwapi.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -182,6 +183,7 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel, bool *isnull, int maxfieldlen); static List *adjust_partition_colnos(List *colnos, ResultRelInfo *leaf_part_rri); +static List *adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap); static void ExecInitPruningContext(PartitionPruneContext *context, List *pruning_steps, PartitionDesc partdesc, @@ -853,6 +855,99 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, lappend(estate->es_tuple_routing_result_relations, leaf_part_rri); + /* + * Initialize information about this partition that's needed to handle + * MERGE. We take the "first" result relation's mergeActionList as + * reference and make copy for this relation, converting stuff that + * references attribute numbers to match this relation's. + * + * This duplicates much of the logic in ExecInitMerge(), so something + * changes there, look here too. + */ + if (node && node->operation == CMD_MERGE) + { + List *firstMergeActionList = linitial(node->mergeActionLists); + ListCell *lc; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + + if (part_attmap == NULL) + part_attmap = + build_attrmap_by_name(RelationGetDescr(partrel), + RelationGetDescr(firstResultRel)); + + if (unlikely(!leaf_part_rri->ri_projectNewInfoValid)) + ExecInitMergeTupleSlots(mtstate, leaf_part_rri); + + foreach(lc, firstMergeActionList) + { + /* Make a copy for this relation to be safe. */ + MergeAction *action = copyObject(lfirst(lc)); + MergeActionState *action_state; + List **list; + + /* Generate the action's state for this relation */ + action_state = makeNode(MergeActionState); + action_state->mas_action = action; + + /* And put the action in the appropriate list */ + if (action->matched) + list = &leaf_part_rri->ri_matchedMergeAction; + else + list = &leaf_part_rri->ri_notMatchedMergeAction; + *list = lappend(*list, action_state); + + switch (action->commandType) + { + case CMD_INSERT: + + /* + * ExecCheckPlanOutput() already done on the targetlist + * when "first" result relation initialized and it is same + * for all result relations. + */ + action_state->mas_proj = + ExecBuildProjectionInfo(action->targetList, econtext, + leaf_part_rri->ri_newTupleSlot, + &mtstate->ps, + RelationGetDescr(partrel)); + break; + case CMD_UPDATE: + + /* + * Convert updateColnos from "first" result relation + * attribute numbers to this result rel's. + */ + if (part_attmap) + action->updateColnos = + adjust_partition_colnos_using_map(action->updateColnos, + part_attmap); + action_state->mas_proj = + ExecBuildUpdateProjection(action->targetList, + true, + action->updateColnos, + RelationGetDescr(leaf_part_rri->ri_RelationDesc), + econtext, + leaf_part_rri->ri_newTupleSlot, + NULL); + break; + case CMD_DELETE: + break; + + default: + elog(ERROR, "unknown action in MERGE WHEN clause"); + } + + /* found_whole_row intentionally ignored. */ + action->qual = + map_variable_attnos(action->qual, + firstVarno, 0, + part_attmap, + RelationGetForm(partrel)->reltype, + &found_whole_row); + action_state->mas_whenqual = + ExecInitQual((List *) action->qual, &mtstate->ps); + } + } MemoryContextSwitchTo(oldcxt); return leaf_part_rri; @@ -1433,13 +1528,23 @@ ExecBuildSlotPartitionKeyDescription(Relation rel, static List * adjust_partition_colnos(List *colnos, ResultRelInfo *leaf_part_rri) { - List *new_colnos = NIL; TupleConversionMap *map = ExecGetChildToRootMap(leaf_part_rri); - AttrMap *attrMap; + + return adjust_partition_colnos_using_map(colnos, map->attrMap); +} + +/* + * adjust_partition_colnos_using_map + * Like adjust_partition_colnos, but uses a caller-supplied map instead + * of assuming to map from the "root" result relation. + */ +static List * +adjust_partition_colnos_using_map(List *colnos, AttrMap *attrMap) +{ + List *new_colnos = NIL; ListCell *lc; - Assert(map != NULL); /* else we shouldn't be here */ - attrMap = map->attrMap; + Assert(attrMap != NULL); /* else we shouldn't be here */ foreach(lc, colnos) { diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 1a4fbdc38c6..228e3547012 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -486,7 +486,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo, resultRelInfo->ri_TrigDesc->trig_update_before_row) { if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, - tid, NULL, slot)) + tid, NULL, slot, NULL)) skip_tuple = true; /* "do nothing" */ } diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 701fe052967..171575cd73b 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -24,11 +24,20 @@ * values plus row-locating info for UPDATE and MERGE cases, or just the * row-locating info for DELETE cases. * + * MERGE runs a join between the source relation and the target + * table; if any WHEN NOT MATCHED clauses are present, then the + * join is an outer join. In this case, any unmatched tuples will + * have NULL row-locating info, and only INSERT can be run. But for + * matched tuples, then row-locating info is used to determine the + * tuple to UPDATE or DELETE. When all clauses are WHEN MATCHED, + * then an inner join is used, so all tuples contain row-locating info. + * * If the query specifies RETURNING, then the ModifyTable returns a * RETURNING tuple after completing each row insert, update, or delete. * It must be called again to continue the operation. Without RETURNING, * we just loop within the node until all the work is done, then - * return NULL. This avoids useless call/return overhead. + * return NULL. This avoids useless call/return overhead. (MERGE does + * not support RETURNING.) */ #include "postgres.h" @@ -78,6 +87,17 @@ typedef struct ModifyTableContext */ TupleTableSlot *planSlot; + /* + * During EvalPlanQual, project and return the new version of the new + * tuple + */ + TupleTableSlot *(*GetUpdateNewTuple) (ResultRelInfo *resultRelInfo, + TupleTableSlot *epqslot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + + /* MERGE specific */ + MergeActionState *relaction; /* MERGE action in progress */ /* * Information about the changes that were made concurrently to a tuple @@ -140,6 +160,28 @@ static TupleTableSlot *ExecPrepareTupleRouting(ModifyTableState *mtstate, ResultRelInfo *targetRelInfo, TupleTableSlot *slot, ResultRelInfo **partRelInfo); +static TupleTableSlot *internalGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + +static TupleTableSlot *ExecMerge(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + bool canSetTag); +static void ExecInitMerge(ModifyTableState *mtstate, EState *estate); +static bool ExecMergeMatched(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + ItemPointer tupleid, + bool canSetTag); +static void ExecMergeNotMatched(ModifyTableContext *context, + ResultRelInfo *resultRelInfo, + bool canSetTag); +static TupleTableSlot *mergeGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction); + /* * Verify that the tuples to be produced by INSERT match the @@ -616,21 +658,32 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, TupleTableSlot *planSlot, TupleTableSlot *oldSlot) { - ProjectionInfo *newProj = relinfo->ri_projectNew; - ExprContext *econtext; - /* Use a few extra Asserts to protect against outside callers */ Assert(relinfo->ri_projectNewInfoValid); Assert(planSlot != NULL && !TTS_EMPTY(planSlot)); Assert(oldSlot != NULL && !TTS_EMPTY(oldSlot)); + return internalGetUpdateNewTuple(relinfo, planSlot, oldSlot, NULL); +} + +/* + * Callback for ModifyTableState->GetUpdateNewTuple for use by regular UPDATE. + */ +static TupleTableSlot * +internalGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction) +{ + ProjectionInfo *newProj = relinfo->ri_projectNew; + ExprContext *econtext; + econtext = newProj->pi_exprContext; econtext->ecxt_outertuple = planSlot; econtext->ecxt_scantuple = oldSlot; return ExecProject(newProj); } - /* ---------------------------------------------------------------- * ExecInsert * @@ -847,9 +900,17 @@ ExecInsert(ModifyTableContext *context, * partition, we should instead check UPDATE policies, because we are * executing policies defined on the target table, and not those * defined on the child partitions. + * + * If we're running MERGE, we refer to the action that we're executing + * to know if we're doing an INSERT or UPDATE to a partition table. */ - wco_kind = (mtstate->operation == CMD_UPDATE) ? - WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; + if (mtstate->operation == CMD_UPDATE) + wco_kind = WCO_RLS_UPDATE_CHECK; + else if (mtstate->operation == CMD_MERGE) + wco_kind = (context->relaction->mas_action->commandType == CMD_UPDATE) ? + WCO_RLS_UPDATE_CHECK : WCO_RLS_INSERT_CHECK; + else + wco_kind = WCO_RLS_INSERT_CHECK; /* * ExecWithCheckOptions() will skip any WCOs which are not of the kind @@ -1453,7 +1514,13 @@ ldelete:; ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent delete"))); - /* tuple already deleted; nothing to do */ + + /* + * tuple already deleted; nothing to do. But MERGE might want + * to handle it differently. We've already filled-in + * actionInfo with sufficient information for MERGE to look + * at. + */ return NULL; default: @@ -1659,7 +1726,8 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, elog(ERROR, "failed to fetch tuple being updated"); /* and project the new tuple to retry the UPDATE with */ context->cpUpdateRetrySlot = - ExecGetUpdateNewTuple(resultRelInfo, epqslot, oldSlot); + context->GetUpdateNewTuple(resultRelInfo, epqslot, oldSlot, + context->relaction); return false; } } @@ -1718,7 +1786,8 @@ ExecUpdatePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) return ExecBRUpdateTriggers(context->estate, context->epqstate, - resultRelInfo, tupleid, oldtuple, slot); + resultRelInfo, tupleid, oldtuple, slot, + &context->tmfd); return true; } @@ -1864,6 +1933,13 @@ lreplace:; return TM_Ok; } + /* + * No luck, a retry is needed. If running MERGE, we do not do so + * here; instead let it handle that on its own rules. + */ + if (context->relaction != NULL) + return TM_Updated; + /* * ExecCrossPartitionUpdate installed an updated version of the new * tuple in the retry slot; start over. @@ -2109,8 +2185,8 @@ redo_act: /* * If ExecUpdateAct reports that a cross-partition update was done, - * then the returning tuple has been projected and there's nothing - * else for us to do. + * then the RETURNING tuple (if any) has been projected and there's + * nothing else for us to do. */ if (updateCxt.crossPartUpdate) return context->cpUpdateReturningSlot; @@ -2337,9 +2413,9 @@ ExecOnConflictUpdate(ModifyTableContext *context, * to break. * * It is the user's responsibility to prevent this situation from - * occurring. These problems are why SQL-2003 similarly specifies - * that for SQL MERGE, an exception must be raised in the event of - * an attempt to update the same row twice. + * occurring. These problems are why the SQL standard similarly + * specifies that for SQL MERGE, an exception must be raised in + * the event of an attempt to update the same row twice. */ xminDatum = slot_getsysattr(existing, MinTransactionIdAttributeNumber, @@ -2350,7 +2426,9 @@ ExecOnConflictUpdate(ModifyTableContext *context, if (TransactionIdIsCurrentTransactionId(xmin)) ereport(ERROR, (errcode(ERRCODE_CARDINALITY_VIOLATION), - errmsg("ON CONFLICT DO UPDATE command cannot affect row a second time"), + /* translator: %s is a SQL command name */ + errmsg("%s command cannot affect row a second time", + "ON CONFLICT DO UPDATE"), errhint("Ensure that no rows proposed for insertion within the same command have duplicate constrained values."))); /* This shouldn't happen */ @@ -2490,6 +2568,705 @@ ExecOnConflictUpdate(ModifyTableContext *context, return true; } +/* + * Perform MERGE. + */ +static TupleTableSlot * +ExecMerge(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, bool canSetTag) +{ + bool matched; + + /*----- + * If we are dealing with a WHEN MATCHED case (tupleid is valid), we + * execute the first action for which the additional WHEN MATCHED AND + * quals pass. If an action without quals is found, that action is + * executed. + * + * Similarly, if we are dealing with WHEN NOT MATCHED case, we look at + * the given WHEN NOT MATCHED actions in sequence until one passes. + * + * Things get interesting in case of concurrent update/delete of the + * target tuple. Such concurrent update/delete is detected while we are + * executing a WHEN MATCHED action. + * + * A concurrent update can: + * + * 1. modify the target tuple so that it no longer satisfies the + * additional quals attached to the current WHEN MATCHED action + * + * In this case, we are still dealing with a WHEN MATCHED case. + * We recheck the list of WHEN MATCHED actions from the start and + * choose the first one that satisfies the new target tuple. + * + * 2. modify the target tuple so that the join quals no longer pass and + * hence the source tuple no longer has a match. + * + * In this case, the source tuple no longer matches the target tuple, + * so we now instead find a qualifying WHEN NOT MATCHED action to + * execute. + * + * XXX Hmmm, what if the updated tuple would now match one that was + * considered NOT MATCHED so far? + * + * A concurrent delete changes a WHEN MATCHED case to WHEN NOT MATCHED. + * + * ExecMergeMatched takes care of following the update chain and + * re-finding the qualifying WHEN MATCHED action, as long as the updated + * target tuple still satisfies the join quals, i.e., it remains a WHEN + * MATCHED case. If the tuple gets deleted or the join quals fail, it + * returns and we try ExecMergeNotMatched. Given that ExecMergeMatched + * always make progress by following the update chain and we never switch + * from ExecMergeNotMatched to ExecMergeMatched, there is no risk of a + * livelock. + */ + matched = tupleid != NULL; + if (matched) + matched = ExecMergeMatched(context, resultRelInfo, tupleid, canSetTag); + + /* + * Either we were dealing with a NOT MATCHED tuple or ExecMergeMatched() + * returned "false", indicating the previously MATCHED tuple no longer + * matches. + */ + if (!matched) + ExecMergeNotMatched(context, resultRelInfo, canSetTag); + + /* No RETURNING support yet */ + return NULL; +} + +/* + * Check and execute the first qualifying MATCHED action. The current target + * tuple is identified by tupleid. + * + * We start from the first WHEN MATCHED action and check if the WHEN quals + * pass, if any. If the WHEN quals for the first action do not pass, we + * check the second, then the third and so on. If we reach to the end, no + * action is taken and we return true, indicating that no further action is + * required for this tuple. + * + * If we do find a qualifying action, then we attempt to execute the action. + * + * If the tuple is concurrently updated, EvalPlanQual is run with the updated + * tuple to recheck the join quals. Note that the additional quals associated + * with individual actions are evaluated by this routine via ExecQual, while + * EvalPlanQual checks for the join quals. If EvalPlanQual tells us that the + * updated tuple still passes the join quals, then we restart from the first + * action to look for a qualifying action. Otherwise, we return false -- + * meaning that a NOT MATCHED action must now be executed for the current + * source tuple. + */ +static bool +ExecMergeMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + ItemPointer tupleid, bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + TupleTableSlot *newslot; + EState *estate = context->estate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + bool isNull; + EPQState *epqstate = &mtstate->mt_epqstate; + ListCell *l; + + /* + * If there are no WHEN MATCHED actions, we are done. + */ + if (resultRelInfo->ri_matchedMergeAction == NIL) + return true; + + /* + * Make tuple and any needed join variables available to ExecQual and + * ExecProject. The target's existing tuple is installed in the scantuple. + * Again, this target relation's slot is required only in the case of a + * MATCHED tuple and UPDATE/DELETE actions. + */ + econtext->ecxt_scantuple = resultRelInfo->ri_oldTupleSlot; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + +lmerge_matched:; + + /* + * This routine is only invoked for matched rows, and we must have found + * the tupleid of the target row in that case; fetch that tuple. + * + * We use SnapshotAny for this because we might get called again after + * EvalPlanQual returns us a new tuple, which may not be visible to our + * MVCC snapshot. + */ + + if (!table_tuple_fetch_row_version(resultRelInfo->ri_RelationDesc, + tupleid, + SnapshotAny, + resultRelInfo->ri_oldTupleSlot)) + elog(ERROR, "failed to fetch the target tuple"); + + foreach(l, resultRelInfo->ri_matchedMergeAction) + { + MergeActionState *relaction = (MergeActionState *) lfirst(l); + CmdType commandType = relaction->mas_action->commandType; + List *recheckIndexes = NIL; + TM_Result result; + UpdateContext updateCxt = {0}; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(relaction->mas_whenqual, econtext)) + continue; + + /* + * Check if the existing target tuple meets the USING checks of + * UPDATE/DELETE RLS policies. If those checks fail, we throw an + * error. + * + * The WITH CHECK quals are applied in ExecUpdate() and hence we need + * not do anything special to handle them. + * + * NOTE: We must do this after WHEN quals are evaluated, so that we + * check policies only when they matter. + */ + if (resultRelInfo->ri_WithCheckOptions) + { + ExecWithCheckOptions(commandType == CMD_UPDATE ? + WCO_RLS_MERGE_UPDATE_CHECK : WCO_RLS_MERGE_DELETE_CHECK, + resultRelInfo, + resultRelInfo->ri_oldTupleSlot, + context->mtstate->ps.state); + } + + /* Perform stated action */ + switch (commandType) + { + case CMD_UPDATE: + + /* + * Project the output tuple, and use that to update the table. + * We don't need to filter out junk attributes, because the + * UPDATE action's targetlist doesn't have any. + */ + newslot = ExecProject(relaction->mas_proj); + + context->relaction = relaction; + context->GetUpdateNewTuple = mergeGetUpdateNewTuple; + context->cpUpdateRetrySlot = NULL; + + if (!ExecUpdatePrologue(context, resultRelInfo, + tupleid, NULL, newslot)) + { + result = TM_Ok; + break; + } + ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate); + result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL, + newslot, mtstate->canSetTag, &updateCxt); + if (result == TM_Ok && updateCxt.updated) + { + ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, + tupleid, NULL, newslot, recheckIndexes); + mtstate->mt_merge_updated += 1; + } + + break; + + case CMD_DELETE: + context->relaction = relaction; + if (!ExecDeletePrologue(context, resultRelInfo, tupleid, + NULL, NULL)) + { + result = TM_Ok; + break; + } + result = ExecDeleteAct(context, resultRelInfo, tupleid, false); + if (result == TM_Ok) + { + ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL, + false); + mtstate->mt_merge_deleted += 1; + } + break; + + case CMD_NOTHING: + /* Doing nothing is always OK */ + result = TM_Ok; + break; + + default: + elog(ERROR, "unknown action in MERGE WHEN MATCHED clause"); + } + + switch (result) + { + case TM_Ok: + /* all good; perform final actions */ + if (canSetTag) + (estate->es_processed)++; + + break; + + case TM_SelfModified: + + /* + * The SQL standard disallows this for MERGE. + */ + if (TransactionIdIsCurrentTransactionId(context->tmfd.xmax)) + ereport(ERROR, + (errcode(ERRCODE_CARDINALITY_VIOLATION), + /* translator: %s is a SQL command name */ + errmsg("%s command cannot affect row a second time", + "MERGE"), + errhint("Ensure that not more than one source row matches any one target row."))); + /* This shouldn't happen */ + elog(ERROR, "attempted to update or delete invisible tuple"); + break; + + case TM_Deleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent delete"))); + + /* + * If the tuple was already deleted, return to let caller + * handle it under NOT MATCHED clauses. + */ + return false; + + case TM_Updated: + { + Relation resultRelationDesc; + TupleTableSlot *epqslot, + *inputslot; + LockTupleMode lockmode; + + /* + * The target tuple was concurrently updated by some other + * transaction. + */ + + /* + * If cpUpdateRetrySlot is set, ExecCrossPartitionUpdate() + * must have detected that the tuple was concurrently + * updated, so we restart the search for an appropriate + * WHEN MATCHED clause to process the updated tuple. + * + * In this case, ExecDelete() would already have performed + * EvalPlanQual() on the latest version of the tuple, + * which in turn would already have been loaded into + * ri_oldTupleSlot, so no need to do either of those + * things. + * + * XXX why do we not check the WHEN NOT MATCHED list in + * this case? + */ + if (!TupIsNull(context->cpUpdateRetrySlot)) + goto lmerge_matched; + + /* + * Otherwise, we run the EvalPlanQual() with the new + * version of the tuple. If EvalPlanQual() does not return + * a tuple, then we switch to the NOT MATCHED list of + * actions. If it does return a tuple and the join qual is + * still satisfied, then we just need to recheck the + * MATCHED actions, starting from the top, and execute the + * first qualifying action. + */ + resultRelationDesc = resultRelInfo->ri_RelationDesc; + lockmode = ExecUpdateLockMode(estate, resultRelInfo); + + inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc, + resultRelInfo->ri_RangeTableIndex); + + result = table_tuple_lock(resultRelationDesc, tupleid, + estate->es_snapshot, + inputslot, estate->es_output_cid, + lockmode, LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + &context->tmfd); + switch (result) + { + case TM_Ok: + epqslot = EvalPlanQual(epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + + /* + * If we got no tuple, or the tuple we get has a + * NULL ctid, go back to caller: this one is not a + * MATCHED tuple anymore, so they can retry with + * NOT MATCHED actions. + */ + if (TupIsNull(epqslot)) + return false; + + (void) ExecGetJunkAttribute(epqslot, + resultRelInfo->ri_RowIdAttNo, + &isNull); + if (isNull) + return false; + + /* + * When a tuple was updated and migrated to + * another partition concurrently, the current + * MERGE implementation can't follow. There's + * probably a better way to handle this case, but + * it'd require recognizing the relation to which + * the tuple moved, and setting our current + * resultRelInfo to that. + */ + if (ItemPointerIndicatesMovedPartitions(&context->tmfd.ctid)) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("tuple to be deleted was already moved to another partition due to concurrent update"))); + + /* + * A non-NULL ctid means that we are still dealing + * with MATCHED case. Restart the loop so that we + * apply all the MATCHED rules again, to ensure + * that the first qualifying WHEN MATCHED action + * is executed. + * + * Update tupleid to that of the new tuple, for + * the refetch we do at the top. + */ + ItemPointerCopy(&context->tmfd.ctid, tupleid); + goto lmerge_matched; + + case TM_Deleted: + + /* + * tuple already deleted; tell caller to run NOT + * MATCHED actions + */ + return false; + + case TM_SelfModified: + + /* + * This can be reached when following an update + * chain from a tuple updated by another session, + * reaching a tuple that was already updated in + * this transaction. If previously modified by + * this command, ignore the redundant update, + * otherwise error out. + * + * See also response to TM_SelfModified in + * ExecUpdate(). + */ + if (context->tmfd.cmax != estate->es_output_cid) + ereport(ERROR, + (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), + errmsg("tuple to be updated or deleted was already modified by an operation triggered by the current command"), + errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); + return false; + + default: + /* see table_tuple_lock call in ExecDelete() */ + elog(ERROR, "unexpected table_tuple_lock status: %u", + result); + return false; + } + } + + case TM_Invisible: + case TM_WouldBlock: + case TM_BeingModified: + /* these should not occur */ + elog(ERROR, "unexpected tuple operation result: %d", result); + break; + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } + + /* + * Successfully executed an action or no qualifying action was found. + */ + return true; +} + +/* + * Execute the first qualifying NOT MATCHED action. + */ +static void +ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, + bool canSetTag) +{ + ModifyTableState *mtstate = context->mtstate; + ExprContext *econtext = mtstate->ps.ps_ExprContext; + List *actionStates = NIL; + ListCell *l; + + /* + * For INSERT actions, the root relation's merge action is OK since the + * INSERT's targetlist and the WHEN conditions can only refer to the + * source relation and hence it does not matter which result relation we + * work with. + * + * XXX does this mean that we can avoid creating copies of actionStates on + * partitioned tables, for not-matched actions? + */ + actionStates = resultRelInfo->ri_notMatchedMergeAction; + + /* + * Make source tuple available to ExecQual and ExecProject. We don't need + * the target tuple, since the WHEN quals and targetlist can't refer to + * the target columns. + */ + econtext->ecxt_scantuple = NULL; + econtext->ecxt_innertuple = context->planSlot; + econtext->ecxt_outertuple = NULL; + + foreach(l, actionStates) + { + MergeActionState *action = (MergeActionState *) lfirst(l); + CmdType commandType = action->mas_action->commandType; + TupleTableSlot *newslot; + + /* + * Test condition, if any. + * + * In the absence of any condition, we perform the action + * unconditionally (no need to check separately since ExecQual() will + * return true if there are no conditions to evaluate). + */ + if (!ExecQual(action->mas_whenqual, econtext)) + continue; + + /* Perform stated action */ + switch (commandType) + { + case CMD_INSERT: + + /* + * Project the tuple. In case of a partitioned table, the + * projection was already built to use the root's descriptor, + * so we don't need to map the tuple here. + */ + newslot = ExecProject(action->mas_proj); + context->relaction = action; + + (void) ExecInsert(context, mtstate->rootResultRelInfo, newslot, + canSetTag, NULL, NULL); + mtstate->mt_merge_inserted += 1; + break; + case CMD_NOTHING: + /* Do nothing */ + break; + default: + elog(ERROR, "unknown action in MERGE WHEN NOT MATCHED clause"); + } + + /* + * We've activated one of the WHEN clauses, so we don't search + * further. This is required behaviour, not an optimization. + */ + break; + } +} + +/* + * Initialize state for execution of MERGE. + */ +void +ExecInitMerge(ModifyTableState *mtstate, EState *estate) +{ + ModifyTable *node = (ModifyTable *) mtstate->ps.plan; + ResultRelInfo *rootRelInfo = mtstate->rootResultRelInfo; + ResultRelInfo *resultRelInfo; + ExprContext *econtext; + ListCell *lc; + int i; + + if (node->mergeActionLists == NIL) + return; + + mtstate->mt_merge_subcommands = 0; + + if (mtstate->ps.ps_ExprContext == NULL) + ExecAssignExprContext(estate, &mtstate->ps); + econtext = mtstate->ps.ps_ExprContext; + + /* + * Create a MergeActionState for each action on the mergeActionList and + * add it to either a list of matched actions or not-matched actions. + * + * Similar logic appears in ExecInitPartitionInfo(), so if changing + * anything here, do so there too. + */ + i = 0; + foreach(lc, node->mergeActionLists) + { + List *mergeActionList = lfirst(lc); + TupleDesc relationDesc; + ListCell *l; + + resultRelInfo = mtstate->resultRelInfo + i; + i++; + relationDesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + + /* initialize slots for MERGE fetches from this rel */ + if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) + ExecInitMergeTupleSlots(mtstate, resultRelInfo); + + foreach(l, mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + MergeActionState *action_state; + TupleTableSlot *tgtslot; + TupleDesc tgtdesc; + List **list; + + /* + * Build action merge state for this rel. (For partitions, + * equivalent code exists in ExecInitPartitionInfo.) + */ + action_state = makeNode(MergeActionState); + action_state->mas_action = action; + action_state->mas_whenqual = ExecInitQual((List *) action->qual, + &mtstate->ps); + + /* + * We create two lists - one for WHEN MATCHED actions and one for + * WHEN NOT MATCHED actions - and stick the MergeActionState into + * the appropriate list. + */ + if (action_state->mas_action->matched) + list = &resultRelInfo->ri_matchedMergeAction; + else + list = &resultRelInfo->ri_notMatchedMergeAction; + *list = lappend(*list, action_state); + + switch (action->commandType) + { + case CMD_INSERT: + ExecCheckPlanOutput(rootRelInfo->ri_RelationDesc, + action->targetList); + + /* + * If the MERGE targets a partitioned table, any INSERT + * actions must be routed through it, not the child + * relations. Initialize the routing struct and the root + * table's "new" tuple slot for that, if not already done. + * The projection we prepare, for all relations, uses the + * root relation descriptor, and targets the plan's root + * slot. (This is consistent with the fact that we + * checked the plan output to match the root relation, + * above.) + */ + if (rootRelInfo->ri_RelationDesc->rd_rel->relkind == + RELKIND_PARTITIONED_TABLE) + { + if (mtstate->mt_partition_tuple_routing == NULL) + { + /* + * Initialize planstate for routing if not already + * done. + * + * Note that the slot is managed as a standalone + * slot belonging to ModifyTableState, so we pass + * NULL for the 2nd argument. + */ + mtstate->mt_root_tuple_slot = + table_slot_create(rootRelInfo->ri_RelationDesc, + NULL); + mtstate->mt_partition_tuple_routing = + ExecSetupPartitionTupleRouting(estate, + rootRelInfo->ri_RelationDesc); + } + tgtslot = mtstate->mt_root_tuple_slot; + tgtdesc = RelationGetDescr(rootRelInfo->ri_RelationDesc); + } + else + { + /* not partitioned? use the stock relation and slot */ + tgtslot = resultRelInfo->ri_newTupleSlot; + tgtdesc = RelationGetDescr(resultRelInfo->ri_RelationDesc); + } + + action_state->mas_proj = + ExecBuildProjectionInfo(action->targetList, econtext, + tgtslot, + &mtstate->ps, + tgtdesc); + + mtstate->mt_merge_subcommands |= MERGE_INSERT; + break; + case CMD_UPDATE: + action_state->mas_proj = + ExecBuildUpdateProjection(action->targetList, + true, + action->updateColnos, + relationDesc, + econtext, + resultRelInfo->ri_newTupleSlot, + &mtstate->ps); + mtstate->mt_merge_subcommands |= MERGE_UPDATE; + break; + case CMD_DELETE: + mtstate->mt_merge_subcommands |= MERGE_DELETE; + break; + case CMD_NOTHING: + break; + default: + elog(ERROR, "unknown operation"); + break; + } + } + } +} + +/* + * Initializes the tuple slots in a ResultRelInfo for any MERGE action. + * + * We mark 'projectNewInfoValid' even though the projections themselves + * are not initialized here. + */ +void +ExecInitMergeTupleSlots(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo) +{ + EState *estate = mtstate->ps.state; + + Assert(!resultRelInfo->ri_projectNewInfoValid); + + resultRelInfo->ri_oldTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_newTupleSlot = + table_slot_create(resultRelInfo->ri_RelationDesc, + &estate->es_tupleTable); + resultRelInfo->ri_projectNewInfoValid = true; +} + +/* + * Callback for ModifyTableContext->GetUpdateNewTuple for use by MERGE. It + * computes the updated tuple by projecting from the current merge action's + * projection. + */ +static TupleTableSlot * +mergeGetUpdateNewTuple(ResultRelInfo *relinfo, + TupleTableSlot *planSlot, + TupleTableSlot *oldSlot, + MergeActionState *relaction) +{ + ExprContext *econtext = relaction->mas_proj->pi_exprContext; + + econtext->ecxt_scantuple = oldSlot; + econtext->ecxt_innertuple = planSlot; + + return ExecProject(relaction->mas_proj); +} /* * Process BEFORE EACH STATEMENT triggers @@ -2514,6 +3291,14 @@ fireBSTriggers(ModifyTableState *node) case CMD_DELETE: ExecBSDeleteTriggers(node->ps.state, resultRelInfo); break; + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecBSInsertTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecBSUpdateTriggers(node->ps.state, resultRelInfo); + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecBSDeleteTriggers(node->ps.state, resultRelInfo); + break; default: elog(ERROR, "unknown operation"); break; @@ -2547,6 +3332,17 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, resultRelInfo, node->mt_transition_capture); break; + case CMD_MERGE: + if (node->mt_merge_subcommands & MERGE_DELETE) + ExecASDeleteTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_UPDATE) + ExecASUpdateTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + if (node->mt_merge_subcommands & MERGE_INSERT) + ExecASInsertTriggers(node->ps.state, resultRelInfo, + node->mt_transition_capture); + break; default: elog(ERROR, "unknown operation"); break; @@ -2749,7 +3545,28 @@ ExecModifyTable(PlanState *pstate) datum = ExecGetJunkAttribute(planSlot, node->mt_resultOidAttno, &isNull); if (isNull) + { + /* + * For commands other than MERGE, any tuples having InvalidOid + * for tableoid are errors. For MERGE, we may need to handle + * them as WHEN NOT MATCHED clauses if any, so do that. + * + * Note that we use the node's toplevel resultRelInfo, not any + * specific partition's. + */ + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); + + context.planSlot = planSlot; + context.lockmode = 0; + + ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag); + continue; /* no RETURNING support yet */ + } + elog(ERROR, "tableoid is NULL"); + } resultoid = DatumGetObjectId(datum); /* If it's not the same as last time, we need to locate the rel */ @@ -2784,13 +3601,14 @@ ExecModifyTable(PlanState *pstate) oldtuple = NULL; /* - * For UPDATE/DELETE, fetch the row identity info for the tuple to be - * updated/deleted. For a heap relation, that's a TID; otherwise we - * may have a wholerow junk attr that carries the old tuple in toto. - * Keep this in step with the part of ExecInitModifyTable that sets up - * ri_RowIdAttNo. + * For UPDATE/DELETE/MERGE, fetch the row identity info for the tuple + * to be updated/deleted/merged. For a heap relation, that's a TID; + * otherwise we may have a wholerow junk attr that carries the old + * tuple in toto. Keep this in step with the part of + * ExecInitModifyTable that sets up ri_RowIdAttNo. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { char relkind; Datum datum; @@ -2806,9 +3624,30 @@ ExecModifyTable(PlanState *pstate) datum = ExecGetJunkAttribute(slot, resultRelInfo->ri_RowIdAttNo, &isNull); - /* shouldn't ever get a null result... */ + + /* + * For commands other than MERGE, any tuples having a null row + * identifier are errors. For MERGE, we may need to handle + * them as WHEN NOT MATCHED clauses if any, so do that. + * + * Note that we use the node's toplevel resultRelInfo, not any + * specific partition's. + */ if (isNull) + { + if (operation == CMD_MERGE) + { + EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); + + context.planSlot = planSlot; + context.lockmode = 0; + + ExecMerge(&context, node->resultRelInfo, NULL, node->canSetTag); + continue; /* no RETURNING support yet */ + } + elog(ERROR, "ctid is NULL"); + } tupleid = (ItemPointer) DatumGetPointer(datum); tuple_ctid = *tupleid; /* be sure we don't free ctid!! */ @@ -2898,8 +3737,10 @@ ExecModifyTable(PlanState *pstate) oldSlot)) elog(ERROR, "failed to fetch tuple being updated"); } - slot = ExecGetUpdateNewTuple(resultRelInfo, planSlot, - oldSlot); + slot = internalGetUpdateNewTuple(resultRelInfo, planSlot, + oldSlot, NULL); + context.GetUpdateNewTuple = internalGetUpdateNewTuple; + context.relaction = NULL; /* Now apply the update. */ slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple, @@ -2911,6 +3752,10 @@ ExecModifyTable(PlanState *pstate) true, false, node->canSetTag, NULL, NULL); break; + case CMD_MERGE: + slot = ExecMerge(&context, resultRelInfo, tupleid, node->canSetTag); + break; + default: elog(ERROR, "unknown operation"); break; @@ -3044,6 +3889,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->resultRelInfo = (ResultRelInfo *) palloc(nrels * sizeof(ResultRelInfo)); + mtstate->mt_merge_inserted = 0; + mtstate->mt_merge_updated = 0; + mtstate->mt_merge_deleted = 0; + /*---------- * Resolve the target relation. This is the same as: * @@ -3147,12 +3996,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } /* - * For UPDATE/DELETE, find the appropriate junk attr now, either a - * 'ctid' or 'wholerow' attribute depending on relkind. For foreign + * For UPDATE/DELETE/MERGE, find the appropriate junk attr now, either + * a 'ctid' or 'wholerow' attribute depending on relkind. For foreign * tables, the FDW might have created additional junk attr(s), but * those are no concern of ours. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { char relkind; @@ -3168,20 +4018,29 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } else if (relkind == RELKIND_FOREIGN_TABLE) { + /* + * We don't support MERGE with foreign tables for now. (It's + * problematic because the implementation uses CTID.) + */ + Assert(operation != CMD_MERGE); + /* * When there is a row-level trigger, there should be a * wholerow attribute. We also require it to be present in - * UPDATE, so we can get the values of unchanged columns. + * UPDATE and MERGE, so we can get the values of unchanged + * columns. */ resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(subplan->targetlist, "wholerow"); - if (mtstate->operation == CMD_UPDATE && + if ((mtstate->operation == CMD_UPDATE || mtstate->operation == CMD_MERGE) && !AttributeNumberIsValid(resultRelInfo->ri_RowIdAttNo)) elog(ERROR, "could not find junk wholerow column"); } else { + /* No support for MERGE */ + Assert(operation != CMD_MERGE); /* Other valid target relkinds must provide wholerow */ resultRelInfo->ri_RowIdAttNo = ExecFindJunkAttributeInTlist(subplan->targetlist, @@ -3193,10 +4052,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } /* - * If this is an inherited update/delete, there will be a junk attribute - * named "tableoid" present in the subplan's targetlist. It will be used - * to identify the result relation for a given tuple to be - * updated/deleted. + * If this is an inherited update/delete/merge, there will be a junk + * attribute named "tableoid" present in the subplan's targetlist. It + * will be used to identify the result relation for a given tuple to be + * updated/deleted/merged. */ mtstate->mt_resultOidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, "tableoid"); @@ -3209,8 +4068,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* * Build state for tuple routing if it's a partitioned INSERT. An UPDATE - * might need this too, but only if it actually moves tuples between - * partitions; in that case setup is done by ExecCrossPartitionUpdate. + * or MERGE might need this too, but only if it actually moves tuples + * between partitions; in that case setup is done by + * ExecCrossPartitionUpdate. */ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && operation == CMD_INSERT) @@ -3379,6 +4239,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) arowmarks = lappend(arowmarks, aerm); } + /* For a MERGE command, initialize its state */ + if (mtstate->operation == CMD_MERGE) + ExecInitMerge(mtstate, estate); + EvalPlanQualSetPlan(&mtstate->mt_epqstate, subplan, arowmarks); /* diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index a82e9866670..042a5f8b0a2 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -2881,6 +2881,9 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount) else res = SPI_OK_UPDATE; break; + case CMD_MERGE: + res = SPI_OK_MERGE; + break; default: return SPI_ERROR_OPUNKNOWN; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 2cbd8aa0df1..c09172164b9 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -228,6 +228,7 @@ _copyModifyTable(const ModifyTable *from) COPY_NODE_FIELD(onConflictWhere); COPY_SCALAR_FIELD(exclRelRTI); COPY_NODE_FIELD(exclRelTlist); + COPY_NODE_FIELD(mergeActionLists); return newnode; } @@ -2888,6 +2889,35 @@ _copyCommonTableExpr(const CommonTableExpr *from) return newnode; } +static MergeWhenClause * +_copyMergeWhenClause(const MergeWhenClause *from) +{ + MergeWhenClause *newnode = makeNode(MergeWhenClause); + + COPY_SCALAR_FIELD(matched); + COPY_SCALAR_FIELD(commandType); + COPY_SCALAR_FIELD(override); + COPY_NODE_FIELD(condition); + COPY_NODE_FIELD(targetList); + COPY_NODE_FIELD(values); + return newnode; +} + +static MergeAction * +_copyMergeAction(const MergeAction *from) +{ + MergeAction *newnode = makeNode(MergeAction); + + COPY_SCALAR_FIELD(matched); + COPY_SCALAR_FIELD(commandType); + COPY_SCALAR_FIELD(override); + COPY_NODE_FIELD(qual); + COPY_NODE_FIELD(targetList); + COPY_NODE_FIELD(updateColnos); + + return newnode; +} + static A_Expr * _copyA_Expr(const A_Expr *from) { @@ -3394,6 +3424,8 @@ _copyQuery(const Query *from) COPY_NODE_FIELD(setOperations); COPY_NODE_FIELD(constraintDeps); COPY_NODE_FIELD(withCheckOptions); + COPY_NODE_FIELD(mergeActionList); + COPY_SCALAR_FIELD(mergeUseOuterJoin); COPY_LOCATION_FIELD(stmt_location); COPY_SCALAR_FIELD(stmt_len); @@ -3457,6 +3489,20 @@ _copyUpdateStmt(const UpdateStmt *from) return newnode; } +static MergeStmt * +_copyMergeStmt(const MergeStmt *from) +{ + MergeStmt *newnode = makeNode(MergeStmt); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(sourceRelation); + COPY_NODE_FIELD(joinCondition); + COPY_NODE_FIELD(mergeWhenClauses); + COPY_NODE_FIELD(withClause); + + return newnode; +} + static SelectStmt * _copySelectStmt(const SelectStmt *from) { @@ -5662,6 +5708,9 @@ copyObjectImpl(const void *from) case T_UpdateStmt: retval = _copyUpdateStmt(from); break; + case T_MergeStmt: + retval = _copyMergeStmt(from); + break; case T_SelectStmt: retval = _copySelectStmt(from); break; @@ -6136,6 +6185,12 @@ copyObjectImpl(const void *from) case T_CommonTableExpr: retval = _copyCommonTableExpr(from); break; + case T_MergeWhenClause: + retval = _copyMergeWhenClause(from); + break; + case T_MergeAction: + retval = _copyMergeAction(from); + break; case T_ObjectWithArgs: retval = _copyObjectWithArgs(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 9f17e15e150..3fb423be47a 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -1146,6 +1146,8 @@ _equalQuery(const Query *a, const Query *b) COMPARE_NODE_FIELD(setOperations); COMPARE_NODE_FIELD(constraintDeps); COMPARE_NODE_FIELD(withCheckOptions); + COMPARE_NODE_FIELD(mergeActionList); + COMPARE_SCALAR_FIELD(mergeUseOuterJoin); COMPARE_LOCATION_FIELD(stmt_location); COMPARE_SCALAR_FIELD(stmt_len); @@ -1201,6 +1203,18 @@ _equalUpdateStmt(const UpdateStmt *a, const UpdateStmt *b) return true; } +static bool +_equalMergeStmt(const MergeStmt *a, const MergeStmt *b) +{ + COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(sourceRelation); + COMPARE_NODE_FIELD(joinCondition); + COMPARE_NODE_FIELD(mergeWhenClauses); + COMPARE_NODE_FIELD(withClause); + + return true; +} + static bool _equalSelectStmt(const SelectStmt *a, const SelectStmt *b) { @@ -3118,6 +3132,32 @@ _equalCommonTableExpr(const CommonTableExpr *a, const CommonTableExpr *b) return true; } +static bool +_equalMergeWhenClause(const MergeWhenClause *a, const MergeWhenClause *b) +{ + COMPARE_SCALAR_FIELD(matched); + COMPARE_SCALAR_FIELD(commandType); + COMPARE_SCALAR_FIELD(override); + COMPARE_NODE_FIELD(condition); + COMPARE_NODE_FIELD(targetList); + COMPARE_NODE_FIELD(values); + + return true; +} + +static bool +_equalMergeAction(const MergeAction *a, const MergeAction *b) +{ + COMPARE_SCALAR_FIELD(matched); + COMPARE_SCALAR_FIELD(commandType); + COMPARE_SCALAR_FIELD(override); + COMPARE_NODE_FIELD(qual); + COMPARE_NODE_FIELD(targetList); + COMPARE_NODE_FIELD(updateColnos); + + return true; +} + static bool _equalXmlSerialize(const XmlSerialize *a, const XmlSerialize *b) { @@ -3576,6 +3616,9 @@ equal(const void *a, const void *b) case T_UpdateStmt: retval = _equalUpdateStmt(a, b); break; + case T_MergeStmt: + retval = _equalMergeStmt(a, b); + break; case T_SelectStmt: retval = _equalSelectStmt(a, b); break; @@ -4050,6 +4093,12 @@ equal(const void *a, const void *b) case T_CommonTableExpr: retval = _equalCommonTableExpr(a, b); break; + case T_MergeWhenClause: + retval = _equalMergeWhenClause(a, b); + break; + case T_MergeAction: + retval = _equalMergeAction(a, b); + break; case T_ObjectWithArgs: retval = _equalObjectWithArgs(a, b); break; diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index 25cf282aab2..50898246f96 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -2303,6 +2303,16 @@ expression_tree_walker(Node *node, return true; } break; + case T_MergeAction: + { + MergeAction *action = (MergeAction *) node; + + if (walker(action->targetList, context)) + return true; + if (walker(action->qual, context)) + return true; + } + break; case T_PartitionPruneStepOp: { PartitionPruneStepOp *opstep = (PartitionPruneStepOp *) node; @@ -2463,6 +2473,8 @@ query_tree_walker(Query *query, return true; if (walker((Node *) query->onConflict, context)) return true; + if (walker((Node *) query->mergeActionList, context)) + return true; if (walker((Node *) query->returningList, context)) return true; if (walker((Node *) query->jointree, context)) @@ -3252,6 +3264,18 @@ expression_tree_mutator(Node *node, return (Node *) newnode; } break; + case T_MergeAction: + { + MergeAction *action = (MergeAction *) node; + MergeAction *newnode; + + FLATCOPY(newnode, action, MergeAction); + MUTATE(newnode->qual, action->qual, Node *); + MUTATE(newnode->targetList, action->targetList, List *); + + return (Node *) newnode; + } + break; case T_PartitionPruneStepOp: { PartitionPruneStepOp *opstep = (PartitionPruneStepOp *) node; @@ -3464,6 +3488,7 @@ query_tree_mutator(Query *query, MUTATE(query->targetList, query->targetList, List *); MUTATE(query->withCheckOptions, query->withCheckOptions, List *); MUTATE(query->onConflict, query->onConflict, OnConflictExpr *); + MUTATE(query->mergeActionList, query->mergeActionList, List *); MUTATE(query->returningList, query->returningList, List *); MUTATE(query->jointree, query->jointree, FromExpr *); MUTATE(query->setOperations, query->setOperations, Node *); @@ -3656,9 +3681,9 @@ query_or_expression_tree_mutator(Node *node, * boundaries: we descend to everything that's possibly interesting. * * Currently, the node type coverage here extends only to DML statements - * (SELECT/INSERT/UPDATE/DELETE) and nodes that can appear in them, because - * this is used mainly during analysis of CTEs, and only DML statements can - * appear in CTEs. + * (SELECT/INSERT/UPDATE/DELETE/MERGE) and nodes that can appear in them, + * because this is used mainly during analysis of CTEs, and only DML + * statements can appear in CTEs. */ bool raw_expression_tree_walker(Node *node, @@ -3839,6 +3864,34 @@ raw_expression_tree_walker(Node *node, return true; } break; + case T_MergeStmt: + { + MergeStmt *stmt = (MergeStmt *) node; + + if (walker(stmt->relation, context)) + return true; + if (walker(stmt->sourceRelation, context)) + return true; + if (walker(stmt->joinCondition, context)) + return true; + if (walker(stmt->mergeWhenClauses, context)) + return true; + if (walker(stmt->withClause, context)) + return true; + } + break; + case T_MergeWhenClause: + { + MergeWhenClause *mergeWhenClause = (MergeWhenClause *) node; + + if (walker(mergeWhenClause->condition, context)) + return true; + if (walker(mergeWhenClause->targetList, context)) + return true; + if (walker(mergeWhenClause->values, context)) + return true; + } + break; case T_SelectStmt: { SelectStmt *stmt = (SelectStmt *) node; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index c25f0bd684c..0c01f350867 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -429,6 +429,7 @@ _outModifyTable(StringInfo str, const ModifyTable *node) WRITE_NODE_FIELD(onConflictWhere); WRITE_UINT_FIELD(exclRelRTI); WRITE_NODE_FIELD(exclRelTlist); + WRITE_NODE_FIELD(mergeActionLists); } static void @@ -2250,6 +2251,7 @@ _outModifyTablePath(StringInfo str, const ModifyTablePath *node) WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(onconflict); WRITE_INT_FIELD(epqParam); + WRITE_NODE_FIELD(mergeActionLists); } static void @@ -3143,6 +3145,8 @@ _outQuery(StringInfo str, const Query *node) WRITE_NODE_FIELD(setOperations); WRITE_NODE_FIELD(constraintDeps); WRITE_NODE_FIELD(withCheckOptions); + WRITE_NODE_FIELD(mergeActionList); + WRITE_BOOL_FIELD(mergeUseOuterJoin); WRITE_LOCATION_FIELD(stmt_location); WRITE_INT_FIELD(stmt_len); } @@ -3271,6 +3275,32 @@ _outCommonTableExpr(StringInfo str, const CommonTableExpr *node) WRITE_NODE_FIELD(ctecolcollations); } +static void +_outMergeWhenClause(StringInfo str, const MergeWhenClause *node) +{ + WRITE_NODE_TYPE("MERGEWHENCLAUSE"); + + WRITE_BOOL_FIELD(matched); + WRITE_ENUM_FIELD(commandType, CmdType); + WRITE_ENUM_FIELD(override, OverridingKind); + WRITE_NODE_FIELD(condition); + WRITE_NODE_FIELD(targetList); + WRITE_NODE_FIELD(values); +} + +static void +_outMergeAction(StringInfo str, const MergeAction *node) +{ + WRITE_NODE_TYPE("MERGEACTION"); + + WRITE_BOOL_FIELD(matched); + WRITE_ENUM_FIELD(commandType, CmdType); + WRITE_ENUM_FIELD(override, OverridingKind); + WRITE_NODE_FIELD(qual); + WRITE_NODE_FIELD(targetList); + WRITE_NODE_FIELD(updateColnos); +} + static void _outSetOperationStmt(StringInfo str, const SetOperationStmt *node) { @@ -4480,6 +4510,12 @@ outNode(StringInfo str, const void *obj) case T_CommonTableExpr: _outCommonTableExpr(str, obj); break; + case T_MergeWhenClause: + _outMergeWhenClause(str, obj); + break; + case T_MergeAction: + _outMergeAction(str, obj); + break; case T_SetOperationStmt: _outSetOperationStmt(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index e0b3ad1ed20..3ee8ba6f159 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -283,6 +283,8 @@ _readQuery(void) READ_NODE_FIELD(setOperations); READ_NODE_FIELD(constraintDeps); READ_NODE_FIELD(withCheckOptions); + READ_NODE_FIELD(mergeActionList); + READ_BOOL_FIELD(mergeUseOuterJoin); READ_LOCATION_FIELD(stmt_location); READ_INT_FIELD(stmt_len); @@ -472,6 +474,42 @@ _readCommonTableExpr(void) READ_DONE(); } +/* + * _readMergeWhenClause + */ +static MergeWhenClause * +_readMergeWhenClause(void) +{ + READ_LOCALS(MergeWhenClause); + + READ_BOOL_FIELD(matched); + READ_ENUM_FIELD(commandType, CmdType); + READ_NODE_FIELD(condition); + READ_NODE_FIELD(targetList); + READ_NODE_FIELD(values); + READ_ENUM_FIELD(override, OverridingKind); + + READ_DONE(); +} + +/* + * _readMergeAction + */ +static MergeAction * +_readMergeAction(void) +{ + READ_LOCALS(MergeAction); + + READ_BOOL_FIELD(matched); + READ_ENUM_FIELD(commandType, CmdType); + READ_ENUM_FIELD(override, OverridingKind); + READ_NODE_FIELD(qual); + READ_NODE_FIELD(targetList); + READ_NODE_FIELD(updateColnos); + + READ_DONE(); +} + /* * _readSetOperationStmt */ @@ -1765,6 +1803,7 @@ _readModifyTable(void) READ_NODE_FIELD(onConflictWhere); READ_UINT_FIELD(exclRelRTI); READ_NODE_FIELD(exclRelTlist); + READ_NODE_FIELD(mergeActionLists); READ_DONE(); } @@ -2809,6 +2848,10 @@ parseNodeString(void) return_value = _readCTECycleClause(); else if (MATCH("COMMONTABLEEXPR", 15)) return_value = _readCommonTableExpr(); + else if (MATCH("MERGEWHENCLAUSE", 15)) + return_value = _readMergeWhenClause(); + else if (MATCH("MERGEACTION", 11)) + return_value = _readMergeAction(); else if (MATCH("SETOPERATIONSTMT", 16)) return_value = _readSetOperationStmt(); else if (MATCH("ALIAS", 5)) diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index fa069a217c8..179c87c6714 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -310,7 +310,8 @@ static ModifyTable *make_modifytable(PlannerInfo *root, Plan *subplan, List *resultRelations, List *updateColnosLists, List *withCheckOptionLists, List *returningLists, - List *rowMarks, OnConflictExpr *onconflict, int epqParam); + List *rowMarks, OnConflictExpr *onconflict, + List *mergeActionList, int epqParam); static GatherMerge *create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path); @@ -2775,6 +2776,7 @@ create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path) best_path->returningLists, best_path->rowMarks, best_path->onconflict, + best_path->mergeActionLists, best_path->epqParam); copy_generic_path_info(&plan->plan, &best_path->path); @@ -6924,7 +6926,8 @@ make_modifytable(PlannerInfo *root, Plan *subplan, List *resultRelations, List *updateColnosLists, List *withCheckOptionLists, List *returningLists, - List *rowMarks, OnConflictExpr *onconflict, int epqParam) + List *rowMarks, OnConflictExpr *onconflict, + List *mergeActionLists, int epqParam) { ModifyTable *node = makeNode(ModifyTable); List *fdw_private_list; @@ -6932,9 +6935,10 @@ make_modifytable(PlannerInfo *root, Plan *subplan, ListCell *lc; int i; - Assert(operation == CMD_UPDATE ? - list_length(resultRelations) == list_length(updateColnosLists) : - updateColnosLists == NIL); + Assert(operation == CMD_MERGE || + (operation == CMD_UPDATE ? + list_length(resultRelations) == list_length(updateColnosLists) : + updateColnosLists == NIL)); Assert(withCheckOptionLists == NIL || list_length(resultRelations) == list_length(withCheckOptionLists)); Assert(returningLists == NIL || @@ -6992,6 +6996,7 @@ make_modifytable(PlannerInfo *root, Plan *subplan, node->withCheckOptionLists = withCheckOptionLists; node->returningLists = returningLists; node->rowMarks = rowMarks; + node->mergeActionLists = mergeActionLists; node->epqParam = epqParam; /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index bd09f85aea1..547fda20a23 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -649,6 +649,11 @@ subquery_planner(PlannerGlobal *glob, Query *parse, if (parse->cteList) SS_process_ctes(root); + /* + * If it's a MERGE command, transform the joinlist as appropriate. + */ + transform_MERGE_to_join(parse); + /* * If the FROM clause is empty, replace it with a dummy RTE_RESULT RTE, so * that we don't need so many special cases to deal with that situation. @@ -849,6 +854,20 @@ subquery_planner(PlannerGlobal *glob, Query *parse, /* exclRelTlist contains only Vars, so no preprocessing needed */ } + foreach(l, parse->mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + + action->targetList = (List *) + preprocess_expression(root, + (Node *) action->targetList, + EXPRKIND_TARGET); + action->qual = + preprocess_expression(root, + (Node *) action->qual, + EXPRKIND_QUAL); + } + root->append_rel_list = (List *) preprocess_expression(root, (Node *) root->append_rel_list, EXPRKIND_APPINFO); @@ -1714,7 +1733,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) } /* - * If this is an INSERT/UPDATE/DELETE, add the ModifyTable node. + * If this is an INSERT/UPDATE/DELETE/MERGE, add the ModifyTable node. */ if (parse->commandType != CMD_SELECT) { @@ -1723,6 +1742,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) List *updateColnosLists = NIL; List *withCheckOptionLists = NIL; List *returningLists = NIL; + List *mergeActionLists = NIL; List *rowMarks; if (bms_membership(root->all_result_relids) == BMS_MULTIPLE) @@ -1789,6 +1809,43 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) returningLists = lappend(returningLists, returningList); } + if (parse->mergeActionList) + { + ListCell *l; + List *mergeActionList = NIL; + + /* + * Copy MergeActions and translate stuff that + * references attribute numbers. + */ + foreach(l, parse->mergeActionList) + { + MergeAction *action = lfirst(l), + *leaf_action = copyObject(action); + + leaf_action->qual = + adjust_appendrel_attrs_multilevel(root, + (Node *) action->qual, + this_result_rel->relids, + top_result_rel->relids); + leaf_action->targetList = (List *) + adjust_appendrel_attrs_multilevel(root, + (Node *) action->targetList, + this_result_rel->relids, + top_result_rel->relids); + if (leaf_action->commandType == CMD_UPDATE) + leaf_action->updateColnos = + adjust_inherited_attnums_multilevel(root, + action->updateColnos, + this_result_rel->relid, + top_result_rel->relid); + mergeActionList = lappend(mergeActionList, + leaf_action); + } + + mergeActionLists = lappend(mergeActionLists, + mergeActionList); + } } if (resultRelations == NIL) @@ -1811,6 +1868,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) withCheckOptionLists = list_make1(parse->withCheckOptions); if (parse->returningList) returningLists = list_make1(parse->returningList); + if (parse->mergeActionList) + mergeActionLists = list_make1(parse->mergeActionList); } } else @@ -1823,6 +1882,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) withCheckOptionLists = list_make1(parse->withCheckOptions); if (parse->returningList) returningLists = list_make1(parse->returningList); + if (parse->mergeActionList) + mergeActionLists = list_make1(parse->mergeActionList); } /* @@ -1859,6 +1920,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) returningLists, rowMarks, parse->onConflict, + mergeActionLists, assign_special_exec_param(root)); } diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index a7b11b7f03a..bf4c722c028 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -952,6 +952,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_ModifyTable: { ModifyTable *splan = (ModifyTable *) plan; + Plan *subplan = outerPlan(splan); Assert(splan->plan.targetlist == NIL); Assert(splan->plan.qual == NIL); @@ -963,7 +964,6 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) if (splan->returningLists) { List *newRL = NIL; - Plan *subplan = outerPlan(splan); ListCell *lcrl, *lcrr; @@ -1030,6 +1030,68 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) fix_scan_list(root, splan->exclRelTlist, rtoffset, 1); } + /* + * The MERGE statement produces the target rows by performing + * a right join between the target relation and the source + * relation (which could be a plain relation or a subquery). + * The INSERT and UPDATE actions of the MERGE statement + * require access to the columns from the source relation. We + * arrange things so that the source relation attributes are + * available as INNER_VAR and the target relation attributes + * are available from the scan tuple. + */ + if (splan->mergeActionLists != NIL) + { + ListCell *lca, + *lcr; + + /* + * Fix the targetList of individual action nodes so that + * the so-called "source relation" Vars are referenced as + * INNER_VAR. Note that for this to work correctly during + * execution, the ecxt_innertuple must be set to the tuple + * obtained by executing the subplan, which is what + * constitutes the "source relation". + * + * We leave the Vars from the result relation (i.e. the + * target relation) unchanged i.e. those Vars would be + * picked from the scan slot. So during execution, we must + * ensure that ecxt_scantuple is setup correctly to refer + * to the tuple from the target relation. + */ + indexed_tlist *itlist; + + itlist = build_tlist_index(subplan->targetlist); + + forboth(lca, splan->mergeActionLists, + lcr, splan->resultRelations) + { + List *mergeActionList = lfirst(lca); + Index resultrel = lfirst_int(lcr); + + foreach(l, mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + + /* Fix targetList of each action. */ + action->targetList = fix_join_expr(root, + action->targetList, + NULL, itlist, + resultrel, + rtoffset, + NUM_EXEC_TLIST(plan)); + + /* Fix quals too. */ + action->qual = (Node *) fix_join_expr(root, + (List *) action->qual, + NULL, itlist, + resultrel, + rtoffset, + NUM_EXEC_QUAL(plan)); + } + } + } + splan->nominalRelation += rtoffset; if (splan->rootRelation) splan->rootRelation += rtoffset; diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c index 74823e8437a..0bd99acf836 100644 --- a/src/backend/optimizer/prep/prepjointree.c +++ b/src/backend/optimizer/prep/prepjointree.c @@ -132,6 +132,86 @@ static void fix_append_rel_relids(List *append_rel_list, int varno, static Node *find_jointree_node_for_rel(Node *jtnode, int relid); +/* + * transform_MERGE_to_join + * Replace a MERGE's jointree to also include the target relation. + */ +void +transform_MERGE_to_join(Query *parse) +{ + RangeTblEntry *joinrte; + JoinExpr *joinexpr; + JoinType jointype; + int joinrti; + List *vars; + + if (parse->commandType != CMD_MERGE) + return; + + /* XXX probably bogus */ + vars = NIL; + + /* + * When any WHEN NOT MATCHED THEN INSERT clauses exist, we need to use an + * outer join so that we process all unmatched tuples from the source + * relation. If none exist, we can use an inner join. + */ + if (parse->mergeUseOuterJoin) + jointype = JOIN_RIGHT; + else + jointype = JOIN_INNER; + + /* Manufacture a join RTE to use. */ + joinrte = makeNode(RangeTblEntry); + joinrte->rtekind = RTE_JOIN; + joinrte->jointype = jointype; + joinrte->joinmergedcols = 0; + joinrte->joinaliasvars = vars; + joinrte->joinleftcols = NIL; /* MERGE does not allow JOIN USING */ + joinrte->joinrightcols = NIL; /* ditto */ + joinrte->join_using_alias = NULL; + + joinrte->alias = NULL; + joinrte->eref = makeAlias("*MERGE*", NIL); + joinrte->lateral = false; + joinrte->inh = false; + joinrte->inFromCl = true; + joinrte->requiredPerms = 0; + joinrte->checkAsUser = InvalidOid; + joinrte->selectedCols = NULL; + joinrte->insertedCols = NULL; + joinrte->updatedCols = NULL; + joinrte->extraUpdatedCols = NULL; + joinrte->securityQuals = NIL; + + /* + * Add completed RTE to pstate's range table list, so that we know its + * index. + */ + parse->rtable = lappend(parse->rtable, joinrte); + joinrti = list_length(parse->rtable); + + /* + * Create a JOIN between the target and the source relation. + */ + joinexpr = makeNode(JoinExpr); + joinexpr->jointype = jointype; + joinexpr->isNatural = false; + joinexpr->larg = (Node *) makeNode(RangeTblRef); + ((RangeTblRef *) joinexpr->larg)->rtindex = parse->resultRelation; + joinexpr->rarg = linitial(parse->jointree->fromlist); /* original join */ + joinexpr->usingClause = NIL; + joinexpr->join_using_alias = NULL; + /* The quals are removed from the jointree and into this specific join */ + joinexpr->quals = parse->jointree->quals; + joinexpr->alias = NULL; + joinexpr->rtindex = joinrti; + + /* Make the new join be the sole entry in the query's jointree */ + parse->jointree->fromlist = list_make1(joinexpr); + parse->jointree->quals = NULL; +} + /* * replace_empty_jointree * If the Query's jointree is empty, replace it with a dummy RTE_RESULT @@ -2058,6 +2138,17 @@ perform_pullup_replace_vars(PlannerInfo *root, * can't contain any references to a subquery. */ } + if (parse->mergeActionList) + { + foreach(lc, parse->mergeActionList) + { + MergeAction *action = lfirst(lc); + + action->qual = pullup_replace_vars(action->qual, rvcontext); + action->targetList = (List *) + pullup_replace_vars((Node *) action->targetList, rvcontext); + } + } replace_vars_in_jointree((Node *) parse->jointree, rvcontext, lowest_nulling_outer_join); Assert(parse->setOperations == NULL); diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index 95e82cf958f..99ab3d75594 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -124,6 +124,43 @@ preprocess_targetlist(PlannerInfo *root) tlist = root->processed_tlist; } + /* + * For MERGE we need to handle the target list for the target relation, + * and also target list for each action (only INSERT/UPDATE matter). + */ + if (command_type == CMD_MERGE) + { + ListCell *l; + + /* + * For MERGE, add any junk column(s) needed to allow the executor to + * identify the rows to be inserted or updated. + */ + root->processed_tlist = tlist; + add_row_identity_columns(root, result_relation, + target_rte, target_relation); + + tlist = root->processed_tlist; + + /* + * For MERGE, handle targetlist of each MergeAction separately. Give + * the same treatment to MergeAction->targetList as we would have + * given to a regular INSERT. For UPDATE, collect the column numbers + * being modified. + */ + foreach(l, parse->mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(l); + + if (action->commandType == CMD_INSERT) + action->targetList = expand_insert_targetlist(action->targetList, + target_relation); + else if (action->commandType == CMD_UPDATE) + action->updateColnos = + extract_update_targetlist_colnos(action->targetList); + } + } + /* * Add necessary junk columns for rowmarked rels. These values are needed * for locking of rels selected FOR UPDATE/SHARE, and to do EvalPlanQual diff --git a/src/backend/optimizer/util/appendinfo.c b/src/backend/optimizer/util/appendinfo.c index 2f06fa743c2..9d4bb470270 100644 --- a/src/backend/optimizer/util/appendinfo.c +++ b/src/backend/optimizer/util/appendinfo.c @@ -774,8 +774,8 @@ add_row_identity_var(PlannerInfo *root, Var *orig_var, Assert(orig_var->varlevelsup == 0); /* - * If we're doing non-inherited UPDATE/DELETE, there's little need for - * ROWID_VAR shenanigans. Just shove the presented Var into the + * If we're doing non-inherited UPDATE/DELETE/MERGE, there's little need + * for ROWID_VAR shenanigans. Just shove the presented Var into the * processed_tlist, and we're done. */ if (rtindex == root->parse->resultRelation) @@ -862,14 +862,16 @@ add_row_identity_columns(PlannerInfo *root, Index rtindex, char relkind = target_relation->rd_rel->relkind; Var *var; - Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE); + Assert(commandType == CMD_UPDATE || commandType == CMD_DELETE || commandType == CMD_MERGE); - if (relkind == RELKIND_RELATION || + if (commandType == CMD_MERGE || + relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW || relkind == RELKIND_PARTITIONED_TABLE) { /* - * Emit CTID so that executor can find the row to update or delete. + * Emit CTID so that executor can find the row to merge, update or + * delete. */ var = makeVar(rtindex, SelfItemPointerAttributeNumber, @@ -942,8 +944,11 @@ distribute_row_identity_vars(PlannerInfo *root) RelOptInfo *target_rel; ListCell *lc; - /* There's nothing to do if this isn't an inherited UPDATE/DELETE. */ - if (parse->commandType != CMD_UPDATE && parse->commandType != CMD_DELETE) + /* + * There's nothing to do if this isn't an inherited UPDATE/DELETE/MERGE. + */ + if (parse->commandType != CMD_UPDATE && parse->commandType != CMD_DELETE && + parse->commandType != CMD_MERGE) { Assert(root->row_identity_vars == NIL); return; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 5c32c96b71c..99df76b6b71 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3620,6 +3620,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, * 'rowMarks' is a list of PlanRowMarks (non-locking only) * 'onconflict' is the ON CONFLICT clause, or NULL * 'epqParam' is the ID of Param for EvalPlanQual re-eval + * 'mergeActionLists' is a list of lists of MERGE actions (one per rel) */ ModifyTablePath * create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, @@ -3631,13 +3632,14 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, List *updateColnosLists, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam) + List *mergeActionLists, int epqParam) { ModifyTablePath *pathnode = makeNode(ModifyTablePath); - Assert(operation == CMD_UPDATE ? - list_length(resultRelations) == list_length(updateColnosLists) : - updateColnosLists == NIL); + Assert(operation == CMD_MERGE || + (operation == CMD_UPDATE ? + list_length(resultRelations) == list_length(updateColnosLists) : + updateColnosLists == NIL)); Assert(withCheckOptionLists == NIL || list_length(resultRelations) == list_length(withCheckOptionLists)); Assert(returningLists == NIL || @@ -3697,6 +3699,7 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->rowMarks = rowMarks; pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; + pathnode->mergeActionLists = mergeActionLists; return pathnode; } diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c index a5002ad8955..df97b799174 100644 --- a/src/backend/optimizer/util/plancat.c +++ b/src/backend/optimizer/util/plancat.c @@ -2167,6 +2167,10 @@ has_row_triggers(PlannerInfo *root, Index rti, CmdType event) trigDesc->trig_delete_before_row)) result = true; break; + /* There is no separate event for MERGE, only INSERT/UPDATE/DELETE */ + case CMD_MERGE: + result = false; + break; default: elog(ERROR, "unrecognized CmdType: %d", (int) event); break; diff --git a/src/backend/parser/Makefile b/src/backend/parser/Makefile index 5ddb9a92f05..9f1c4022bbe 100644 --- a/src/backend/parser/Makefile +++ b/src/backend/parser/Makefile @@ -23,6 +23,7 @@ OBJS = \ parse_enr.o \ parse_expr.o \ parse_func.o \ + parse_merge.o \ parse_node.o \ parse_oper.o \ parse_param.o \ diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 61026753a3d..0144284aa35 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -39,6 +39,7 @@ #include "parser/parse_cte.h" #include "parser/parse_expr.h" #include "parser/parse_func.h" +#include "parser/parse_merge.h" #include "parser/parse_oper.h" #include "parser/parse_param.h" #include "parser/parse_relation.h" @@ -60,9 +61,6 @@ post_parse_analyze_hook_type post_parse_analyze_hook = NULL; static Query *transformOptionalSelectInto(ParseState *pstate, Node *parseTree); static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt); static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt); -static List *transformInsertRow(ParseState *pstate, List *exprlist, - List *stmtcols, List *icolumns, List *attrnos, - bool strip_indirection); static OnConflictExpr *transformOnConflictClause(ParseState *pstate, OnConflictClause *onConflictClause); static int count_rowexpr_columns(ParseState *pstate, Node *expr); @@ -76,8 +74,6 @@ static void determineRecursiveColTypes(ParseState *pstate, static Query *transformReturnStmt(ParseState *pstate, ReturnStmt *stmt); static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt); static List *transformReturningList(ParseState *pstate, List *returningList); -static List *transformUpdateTargetList(ParseState *pstate, - List *targetList); static Query *transformPLAssignStmt(ParseState *pstate, PLAssignStmt *stmt); static Query *transformDeclareCursorStmt(ParseState *pstate, @@ -330,6 +326,7 @@ transformStmt(ParseState *pstate, Node *parseTree) case T_InsertStmt: case T_UpdateStmt: case T_DeleteStmt: + case T_MergeStmt: (void) test_raw_expression_coverage(parseTree, NULL); break; default: @@ -354,6 +351,10 @@ transformStmt(ParseState *pstate, Node *parseTree) result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree); break; + case T_MergeStmt: + result = transformMergeStmt(pstate, (MergeStmt *) parseTree); + break; + case T_SelectStmt: { SelectStmt *n = (SelectStmt *) parseTree; @@ -438,6 +439,7 @@ analyze_requires_snapshot(RawStmt *parseTree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: case T_SelectStmt: case T_PLAssignStmt: result = true; @@ -956,7 +958,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * attrnos: integer column numbers (must be same length as icolumns) * strip_indirection: if true, remove any field/array assignment nodes */ -static List * +List * transformInsertRow(ParseState *pstate, List *exprlist, List *stmtcols, List *icolumns, List *attrnos, bool strip_indirection) @@ -1593,7 +1595,7 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt) * Generate a targetlist as though expanding "*" */ Assert(pstate->p_next_resno == 1); - qry->targetList = expandNSItemAttrs(pstate, nsitem, 0, -1); + qry->targetList = expandNSItemAttrs(pstate, nsitem, 0, true, -1); /* * The grammar allows attaching ORDER BY, LIMIT, and FOR UPDATE to a @@ -2418,9 +2420,9 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) /* * transformUpdateTargetList - - * handle SET clause in UPDATE/INSERT ... ON CONFLICT UPDATE + * handle SET clause in UPDATE/MERGE/INSERT ... ON CONFLICT UPDATE */ -static List * +List * transformUpdateTargetList(ParseState *pstate, List *origTlist) { List *tlist = NIL; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index c6613af9fe6..9399fff610f 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -278,6 +278,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; + MergeWhenClause *mergewhen; struct KeyActions *keyactions; struct KeyAction *keyaction; } @@ -307,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); DropTransformStmt DropUserMappingStmt ExplainStmt FetchStmt GrantStmt GrantRoleStmt ImportForeignSchemaStmt IndexStmt InsertStmt - ListenStmt LoadStmt LockStmt NotifyStmt ExplainableStmt PreparableStmt + ListenStmt LoadStmt LockStmt MergeStmt NotifyStmt ExplainableStmt PreparableStmt CreateFunctionStmt AlterFunctionStmt ReindexStmt RemoveAggrStmt RemoveFuncStmt RemoveOperStmt RenameStmt ReturnStmt RevokeStmt RevokeRoleStmt RuleActionStmt RuleActionStmtOrEmpty RuleStmt @@ -433,6 +434,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); any_operator expr_list attrs distinct_clause opt_distinct_clause target_list opt_target_list insert_column_list set_target_list + merge_values_clause set_clause_list set_clause def_list operator_def_list indirection opt_indirection reloption_list TriggerFuncArgs opclass_item_list opclass_drop_list @@ -506,6 +508,10 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type insert_rest %type opt_conf_expr %type opt_on_conflict +%type merge_insert merge_update merge_delete + +%type merge_when_clause opt_merge_when_condition +%type merge_when_list %type generic_set set_rest set_rest_more generic_reset reset_rest SetResetClause FunctionSetResetClause @@ -734,7 +740,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED - MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE + MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE METHOD + MINUTE_P MINVALUE MODE MONTH_P MOVE NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NFC NFD NFKC NFKD NO NONE NORMALIZE NORMALIZED @@ -1061,6 +1068,7 @@ stmt: | RefreshMatViewStmt | LoadStmt | LockStmt + | MergeStmt | NotifyStmt | PrepareStmt | ReassignOwnedStmt @@ -11123,6 +11131,7 @@ ExplainableStmt: | InsertStmt | UpdateStmt | DeleteStmt + | MergeStmt | DeclareCursorStmt | CreateAsStmt | CreateMatViewStmt @@ -11155,7 +11164,8 @@ PreparableStmt: SelectStmt | InsertStmt | UpdateStmt - | DeleteStmt /* by default all are $$=$1 */ + | DeleteStmt + | MergeStmt /* by default all are $$=$1 */ ; /***************************************************************************** @@ -11540,6 +11550,166 @@ set_target_list: ; +/***************************************************************************** + * + * QUERY: + * MERGE + * + *****************************************************************************/ + +MergeStmt: + opt_with_clause MERGE INTO relation_expr_opt_alias + USING table_ref + ON a_expr + merge_when_list + { + MergeStmt *m = makeNode(MergeStmt); + + m->withClause = $1; + m->relation = $4; + m->sourceRelation = $6; + m->joinCondition = $8; + m->mergeWhenClauses = $9; + + $$ = (Node *)m; + } + ; + +merge_when_list: + merge_when_clause { $$ = list_make1($1); } + | merge_when_list merge_when_clause { $$ = lappend($1,$2); } + ; + +merge_when_clause: + WHEN MATCHED opt_merge_when_condition THEN merge_update + { + $5->matched = true; + $5->condition = $3; + + $$ = (Node *) $5; + } + | WHEN MATCHED opt_merge_when_condition THEN merge_delete + { + $5->matched = true; + $5->condition = $3; + + $$ = (Node *) $5; + } + | WHEN NOT MATCHED opt_merge_when_condition THEN merge_insert + { + $6->matched = false; + $6->condition = $4; + + $$ = (Node *) $6; + } + | WHEN MATCHED opt_merge_when_condition THEN DO NOTHING + { + MergeWhenClause *m = makeNode(MergeWhenClause); + + m->matched = true; + m->commandType = CMD_NOTHING; + m->condition = $3; + + $$ = (Node *)m; + } + | WHEN NOT MATCHED opt_merge_when_condition THEN DO NOTHING + { + MergeWhenClause *m = makeNode(MergeWhenClause); + + m->matched = false; + m->commandType = CMD_NOTHING; + m->condition = $4; + + $$ = (Node *)m; + } + ; + +opt_merge_when_condition: + AND a_expr { $$ = $2; } + | { $$ = NULL; } + ; + +merge_update: + UPDATE SET set_clause_list + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_UPDATE; + n->override = OVERRIDING_NOT_SET; + n->targetList = $3; + n->values = NIL; + + $$ = n; + } + ; + +merge_delete: + DELETE_P + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_DELETE; + n->override = OVERRIDING_NOT_SET; + n->targetList = NIL; + n->values = NIL; + + $$ = n; + } + ; + +merge_insert: + INSERT merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = OVERRIDING_NOT_SET; + n->targetList = NIL; + n->values = $2; + $$ = n; + } + | INSERT OVERRIDING override_kind VALUE_P merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = $3; + n->targetList = NIL; + n->values = $5; + $$ = n; + } + | INSERT '(' insert_column_list ')' merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = OVERRIDING_NOT_SET; + n->targetList = $3; + n->values = $5; + $$ = n; + } + | INSERT '(' insert_column_list ')' OVERRIDING override_kind VALUE_P merge_values_clause + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = $6; + n->targetList = $3; + n->values = $8; + $$ = n; + } + | INSERT DEFAULT VALUES + { + MergeWhenClause *n = makeNode(MergeWhenClause); + n->commandType = CMD_INSERT; + n->override = OVERRIDING_NOT_SET; + n->targetList = NIL; + n->values = NIL; + $$ = n; + } + ; + +merge_values_clause: + VALUES '(' expr_list ')' + { + $$ = $3; + } + ; + /***************************************************************************** * * QUERY: @@ -16155,8 +16325,10 @@ unreserved_keyword: | LOGGED | MAPPING | MATCH + | MATCHED | MATERIALIZED | MAXVALUE + | MERGE | METHOD | MINUTE_P | MINVALUE @@ -16734,8 +16906,10 @@ bare_label_keyword: | LOGGED | MAPPING | MATCH + | MATCHED | MATERIALIZED | MAXVALUE + | MERGE | METHOD | MINVALUE | MODE diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c index ded0a14d723..3ef9e8ee5e1 100644 --- a/src/backend/parser/parse_agg.c +++ b/src/backend/parser/parse_agg.c @@ -433,6 +433,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr) case EXPR_KIND_UPDATE_SOURCE: case EXPR_KIND_UPDATE_TARGET: errkind = true; + break; + case EXPR_KIND_MERGE_WHEN: + if (isAgg) + err = _("aggregate functions are not allowed in MERGE WHEN conditions"); + else + err = _("grouping operations are not allowed in MERGE WHEN conditions"); + break; case EXPR_KIND_GROUP_BY: errkind = true; @@ -879,6 +886,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc, case EXPR_KIND_UPDATE_TARGET: errkind = true; break; + case EXPR_KIND_MERGE_WHEN: + err = _("window functions are not allowed in MERGE WHEN conditions"); + break; case EXPR_KIND_GROUP_BY: errkind = true; break; diff --git a/src/backend/parser/parse_collate.c b/src/backend/parser/parse_collate.c index 6c793b72ec7..7582faabb37 100644 --- a/src/backend/parser/parse_collate.c +++ b/src/backend/parser/parse_collate.c @@ -485,6 +485,7 @@ assign_collations_walker(Node *node, assign_collations_context *context) case T_FromExpr: case T_OnConflictExpr: case T_SortGroupClause: + case T_MergeAction: (void) expression_tree_walker(node, assign_collations_walker, (void *) &loccontext); diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 781c9709e6d..84be354f714 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -513,6 +513,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) case EXPR_KIND_INSERT_TARGET: case EXPR_KIND_UPDATE_SOURCE: case EXPR_KIND_UPDATE_TARGET: + case EXPR_KIND_MERGE_WHEN: case EXPR_KIND_GROUP_BY: case EXPR_KIND_ORDER_BY: case EXPR_KIND_DISTINCT_ON: @@ -1748,6 +1749,7 @@ transformSubLink(ParseState *pstate, SubLink *sublink) case EXPR_KIND_INSERT_TARGET: case EXPR_KIND_UPDATE_SOURCE: case EXPR_KIND_UPDATE_TARGET: + case EXPR_KIND_MERGE_WHEN: case EXPR_KIND_GROUP_BY: case EXPR_KIND_ORDER_BY: case EXPR_KIND_DISTINCT_ON: @@ -3075,6 +3077,8 @@ ParseExprKindName(ParseExprKind exprKind) case EXPR_KIND_UPDATE_SOURCE: case EXPR_KIND_UPDATE_TARGET: return "UPDATE"; + case EXPR_KIND_MERGE_WHEN: + return "MERGE WHEN"; case EXPR_KIND_GROUP_BY: return "GROUP BY"; case EXPR_KIND_ORDER_BY: diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index d91951e1f6c..f71a682cd65 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -2611,6 +2611,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location) /* okay, since we process this like a SELECT tlist */ pstate->p_hasTargetSRFs = true; break; + case EXPR_KIND_MERGE_WHEN: + err = _("set-returning functions are not allowed in MERGE WHEN conditions"); + break; case EXPR_KIND_CHECK_CONSTRAINT: case EXPR_KIND_DOMAIN_CHECK: err = _("set-returning functions are not allowed in check constraints"); diff --git a/src/backend/parser/parse_merge.c b/src/backend/parser/parse_merge.c new file mode 100644 index 00000000000..5d0035a12b6 --- /dev/null +++ b/src/backend/parser/parse_merge.c @@ -0,0 +1,415 @@ +/*------------------------------------------------------------------------- + * + * parse_merge.c + * handle merge-statement in parser + * + * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/parser/parse_merge.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/sysattr.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" +#include "parser/analyze.h" +#include "parser/parse_collate.h" +#include "parser/parsetree.h" +#include "parser/parser.h" +#include "parser/parse_clause.h" +#include "parser/parse_cte.h" +#include "parser/parse_expr.h" +#include "parser/parse_merge.h" +#include "parser/parse_relation.h" +#include "parser/parse_target.h" +#include "utils/rel.h" +#include "utils/relcache.h" + +static void setNamespaceForMergeWhen(ParseState *pstate, + MergeWhenClause *mergeWhenClause, + Index targetRTI, + Index sourceRTI); +static void setNamespaceVisibilityForRTE(List *namespace, RangeTblEntry *rte, + bool rel_visible, + bool cols_visible); + +/* + * Make appropriate changes to the namespace visibility while transforming + * individual action's quals and targetlist expressions. In particular, for + * INSERT actions we must only see the source relation (since INSERT action is + * invoked for NOT MATCHED tuples and hence there is no target tuple to deal + * with). On the other hand, UPDATE and DELETE actions can see both source and + * target relations. + * + * Also, since the internal join node can hide the source and target + * relations, we must explicitly make the respective relation as visible so + * that columns can be referenced unqualified from these relations. + */ +static void +setNamespaceForMergeWhen(ParseState *pstate, MergeWhenClause *mergeWhenClause, + Index targetRTI, Index sourceRTI) +{ + RangeTblEntry *targetRelRTE, + *sourceRelRTE; + + targetRelRTE = rt_fetch(targetRTI, pstate->p_rtable); + sourceRelRTE = rt_fetch(sourceRTI, pstate->p_rtable); + + if (mergeWhenClause->matched) + { + Assert(mergeWhenClause->commandType == CMD_UPDATE || + mergeWhenClause->commandType == CMD_DELETE || + mergeWhenClause->commandType == CMD_NOTHING); + + /* MATCHED actions can see both target and source relations. */ + setNamespaceVisibilityForRTE(pstate->p_namespace, + targetRelRTE, true, true); + setNamespaceVisibilityForRTE(pstate->p_namespace, + sourceRelRTE, true, true); + } + else + { + /* + * NOT MATCHED actions can't see target relation, but they can see + * source relation. + */ + Assert(mergeWhenClause->commandType == CMD_INSERT || + mergeWhenClause->commandType == CMD_NOTHING); + setNamespaceVisibilityForRTE(pstate->p_namespace, + targetRelRTE, false, false); + setNamespaceVisibilityForRTE(pstate->p_namespace, + sourceRelRTE, true, true); + } +} + +/* + * transformMergeStmt - + * transforms a MERGE statement + */ +Query * +transformMergeStmt(ParseState *pstate, MergeStmt *stmt) +{ + Query *qry = makeNode(Query); + ListCell *l; + AclMode targetPerms = ACL_NO_RIGHTS; + bool is_terminal[2]; + Index sourceRTI; + List *mergeActionList; + Node *joinExpr; + ParseNamespaceItem *nsitem; + + /* There can't be any outer WITH to worry about */ + Assert(pstate->p_ctenamespace == NIL); + + qry->commandType = CMD_MERGE; + qry->hasRecursive = false; + + /* process the WITH clause independently of all else */ + if (stmt->withClause) + { + if (stmt->withClause->recursive) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("WITH RECURSIVE is not supported for MERGE statement"))); + + qry->cteList = transformWithClause(pstate, stmt->withClause); + qry->hasModifyingCTE = pstate->p_hasModifyingCTE; + } + + /* + * Check WHEN clauses for permissions and sanity + */ + is_terminal[0] = false; + is_terminal[1] = false; + foreach(l, stmt->mergeWhenClauses) + { + MergeWhenClause *mergeWhenClause = (MergeWhenClause *) lfirst(l); + int when_type = (mergeWhenClause->matched ? 0 : 1); + + /* + * Collect action types so we can check target permissions + */ + switch (mergeWhenClause->commandType) + { + case CMD_INSERT: + targetPerms |= ACL_INSERT; + break; + case CMD_UPDATE: + targetPerms |= ACL_UPDATE; + break; + case CMD_DELETE: + targetPerms |= ACL_DELETE; + break; + case CMD_NOTHING: + break; + default: + elog(ERROR, "unknown action in MERGE WHEN clause"); + } + + /* + * Check for unreachable WHEN clauses + */ + if (mergeWhenClause->condition == NULL) + is_terminal[when_type] = true; + else if (is_terminal[when_type]) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unreachable WHEN clause specified after unconditional WHEN clause"))); + } + + /* Set up the MERGE target table. */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, + stmt->relation->inh, + false, targetPerms); + + /* + * MERGE is unsupported in various cases + */ + if (pstate->p_target_relation->rd_rel->relkind != RELKIND_RELATION && + pstate->p_target_relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute MERGE on relation \"%s\"", + RelationGetRelationName(pstate->p_target_relation)), + errdetail_relkind_not_supported(pstate->p_target_relation->rd_rel->relkind))); + if (pstate->p_target_relation->rd_rel->relhasrules) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot execute MERGE on relation \"%s\"", + RelationGetRelationName(pstate->p_target_relation)), + errdetail("MERGE is not supported for relations with rules."))); + + /* Now transform the source relation to produce the source RTE. */ + transformFromClause(pstate, + list_make1(stmt->sourceRelation)); + sourceRTI = list_length(pstate->p_rtable); + nsitem = GetNSItemByRangeTablePosn(pstate, sourceRTI, 0); + + /* + * Check that the target table doesn't conflict with the source table. + * This would typically be a checkNameSpaceConflicts call, but we want a + * more specific error message. + */ + if (strcmp(pstate->p_target_nsitem->p_names->aliasname, + nsitem->p_names->aliasname) == 0) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_ALIAS), + errmsg("name \"%s\" specified more than once", + pstate->p_target_nsitem->p_names->aliasname), + errdetail("The name is used both as MERGE target table and data source.")); + + qry->targetList = expandNSItemAttrs(pstate, nsitem, 0, false, + exprLocation(stmt->sourceRelation)); + + qry->rtable = pstate->p_rtable; + + /* + * Transform the join condition. This includes references to the target + * side, so add that to the namespace. + */ + addNSItemToQuery(pstate, pstate->p_target_nsitem, false, true, true); + joinExpr = transformExpr(pstate, stmt->joinCondition, + EXPR_KIND_JOIN_ON); + + /* + * Create the temporary query's jointree using the joinlist we built using + * just the source relation; the target relation is not included. The + * quals we use are the join conditions to the merge target. The join + * will be constructed fully by transform_MERGE_to_join. + */ + qry->jointree = makeFromExpr(pstate->p_joinlist, joinExpr); + + /* + * We now have a good query shape, so now look at the WHEN conditions and + * action targetlists. + * + * Overall, the MERGE Query's targetlist is NIL. + * + * Each individual action has its own targetlist that needs separate + * transformation. These transforms don't do anything to the overall + * targetlist, since that is only used for resjunk columns. + * + * We can reference any column in Target or Source, which is OK because + * both of those already have RTEs. There is nothing like the EXCLUDED + * pseudo-relation for INSERT ON CONFLICT. + */ + mergeActionList = NIL; + foreach(l, stmt->mergeWhenClauses) + { + MergeWhenClause *mergeWhenClause = lfirst_node(MergeWhenClause, l); + MergeAction *action; + + action = makeNode(MergeAction); + action->commandType = mergeWhenClause->commandType; + action->matched = mergeWhenClause->matched; + + /* Use an outer join if any INSERT actions exist in the command. */ + if (action->commandType == CMD_INSERT) + qry->mergeUseOuterJoin = true; + + /* + * Set namespace for the specific action. This must be done before + * analyzing the WHEN quals and the action targetlist. + */ + setNamespaceForMergeWhen(pstate, mergeWhenClause, + qry->resultRelation, + sourceRTI); + + /* + * Transform the WHEN condition. + * + * Note that these quals are NOT added to the join quals; instead they + * are evaluated separately during execution to decide which of the + * WHEN MATCHED or WHEN NOT MATCHED actions to execute. + */ + action->qual = transformWhereClause(pstate, mergeWhenClause->condition, + EXPR_KIND_MERGE_WHEN, "WHEN"); + + /* + * Transform target lists for each INSERT and UPDATE action stmt + */ + switch (action->commandType) + { + case CMD_INSERT: + { + List *exprList = NIL; + ListCell *lc; + RangeTblEntry *rte; + ListCell *icols; + ListCell *attnos; + List *icolumns; + List *attrnos; + + pstate->p_is_insert = true; + + icolumns = checkInsertTargets(pstate, + mergeWhenClause->targetList, + &attrnos); + Assert(list_length(icolumns) == list_length(attrnos)); + + action->override = mergeWhenClause->override; + + /* + * Handle INSERT much like in transformInsertStmt + */ + if (mergeWhenClause->values == NIL) + { + /* + * We have INSERT ... DEFAULT VALUES. We can handle + * this case by emitting an empty targetlist --- all + * columns will be defaulted when the planner expands + * the targetlist. + */ + exprList = NIL; + } + else + { + /* + * Process INSERT ... VALUES with a single VALUES + * sublist. We treat this case separately for + * efficiency. The sublist is just computed directly + * as the Query's targetlist, with no VALUES RTE. So + * it works just like a SELECT without any FROM. + */ + + /* + * Do basic expression transformation (same as a ROW() + * expr, but allow SetToDefault at top level) + */ + exprList = transformExpressionList(pstate, + mergeWhenClause->values, + EXPR_KIND_VALUES_SINGLE, + true); + + /* Prepare row for assignment to target table */ + exprList = transformInsertRow(pstate, exprList, + mergeWhenClause->targetList, + icolumns, attrnos, + false); + } + + /* + * Generate action's target list using the computed list + * of expressions. Also, mark all the target columns as + * needing insert permissions. + */ + rte = pstate->p_target_nsitem->p_rte; + forthree(lc, exprList, icols, icolumns, attnos, attrnos) + { + Expr *expr = (Expr *) lfirst(lc); + ResTarget *col = lfirst_node(ResTarget, icols); + AttrNumber attr_num = (AttrNumber) lfirst_int(attnos); + TargetEntry *tle; + + tle = makeTargetEntry(expr, + attr_num, + col->name, + false); + action->targetList = lappend(action->targetList, tle); + + rte->insertedCols = + bms_add_member(rte->insertedCols, + attr_num - FirstLowInvalidHeapAttributeNumber); + } + } + break; + case CMD_UPDATE: + { + pstate->p_is_insert = false; + action->targetList = + transformUpdateTargetList(pstate, + mergeWhenClause->targetList); + } + break; + case CMD_DELETE: + break; + + case CMD_NOTHING: + action->targetList = NIL; + break; + default: + elog(ERROR, "unknown action in MERGE WHEN clause"); + } + + mergeActionList = lappend(mergeActionList, action); + } + + qry->mergeActionList = mergeActionList; + + /* RETURNING could potentially be added in the future, but not in SQL std */ + qry->returningList = NULL; + + qry->hasTargetSRFs = false; + qry->hasSubLinks = pstate->p_hasSubLinks; + + assign_query_collations(pstate, qry); + + return qry; +} + +static void +setNamespaceVisibilityForRTE(List *namespace, RangeTblEntry *rte, + bool rel_visible, + bool cols_visible) +{ + ListCell *lc; + + foreach(lc, namespace) + { + ParseNamespaceItem *nsitem = (ParseNamespaceItem *) lfirst(lc); + + if (nsitem->p_rte == rte) + { + nsitem->p_rel_visible = rel_visible; + nsitem->p_cols_visible = cols_visible; + break; + } + } +} diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c index cb9e177b5e5..7efa5f15d72 100644 --- a/src/backend/parser/parse_relation.c +++ b/src/backend/parser/parse_relation.c @@ -701,6 +701,17 @@ scanNSItemForColumn(ParseState *pstate, ParseNamespaceItem *nsitem, colname), parser_errposition(pstate, location))); + /* + * In a MERGE WHEN condition, no system column is allowed except tableOid + */ + if (pstate->p_expr_kind == EXPR_KIND_MERGE_WHEN && + attnum < InvalidAttrNumber && attnum != TableOidAttributeNumber) + ereport(ERROR, + (errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot use system column \"%s\" in MERGE WHEN condition", + colname), + parser_errposition(pstate, location))); + /* Found a valid match, so build a Var */ if (attnum > InvalidAttrNumber) { @@ -3095,11 +3106,12 @@ expandNSItemVars(ParseNamespaceItem *nsitem, * for the attributes of the nsitem * * pstate->p_next_resno determines the resnos assigned to the TLEs. - * The referenced columns are marked as requiring SELECT access. + * The referenced columns are marked as requiring SELECT access, if + * caller requests that. */ List * expandNSItemAttrs(ParseState *pstate, ParseNamespaceItem *nsitem, - int sublevels_up, int location) + int sublevels_up, bool require_col_privs, int location) { RangeTblEntry *rte = nsitem->p_rte; List *names, @@ -3133,8 +3145,11 @@ expandNSItemAttrs(ParseState *pstate, ParseNamespaceItem *nsitem, false); te_list = lappend(te_list, te); - /* Require read access to each column */ - markVarForSelectPriv(pstate, varnode); + if (require_col_privs) + { + /* Require read access to each column */ + markVarForSelectPriv(pstate, varnode); + } } Assert(name == NULL && var == NULL); /* lists not the same length? */ diff --git a/src/backend/parser/parse_target.c b/src/backend/parser/parse_target.c index 204d2857733..e6445c7bafe 100644 --- a/src/backend/parser/parse_target.c +++ b/src/backend/parser/parse_target.c @@ -1308,6 +1308,7 @@ ExpandAllTables(ParseState *pstate, int location) expandNSItemAttrs(pstate, nsitem, 0, + true, location)); } @@ -1370,7 +1371,7 @@ ExpandSingleTable(ParseState *pstate, ParseNamespaceItem *nsitem, if (make_target_entry) { /* expandNSItemAttrs handles permissions marking */ - return expandNSItemAttrs(pstate, nsitem, sublevels_up, location); + return expandNSItemAttrs(pstate, nsitem, sublevels_up, true, location); } else { diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 4eeed580b16..29ae27e5e32 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1643,6 +1643,10 @@ matchLocks(CmdType event, if (rulelocks == NULL) return NIL; + /* No rule support for MERGE */ + if (parsetree->commandType == CMD_MERGE) + return NIL; + if (parsetree->commandType != CMD_SELECT) { if (parsetree->resultRelation != varno) @@ -3671,8 +3675,8 @@ RewriteQuery(Query *parsetree, List *rewrite_events) } /* - * If the statement is an insert, update, or delete, adjust its targetlist - * as needed, and then fire INSERT/UPDATE/DELETE rules on it. + * If the statement is an insert, update, delete, or merge, adjust its + * targetlist as needed, and then fire INSERT/UPDATE/DELETE rules on it. * * SELECT rules are handled later when we have all the queries that should * get executed. Also, utilities aren't rewritten at all (do we still @@ -3770,6 +3774,7 @@ RewriteQuery(Query *parsetree, List *rewrite_events) } else if (event == CMD_UPDATE) { + Assert(parsetree->override == OVERRIDING_NOT_SET); parsetree->targetList = rewriteTargetListIU(parsetree->targetList, parsetree->commandType, @@ -3780,6 +3785,38 @@ RewriteQuery(Query *parsetree, List *rewrite_events) /* Also populate extraUpdatedCols (for generated columns) */ fill_extraUpdatedCols(rt_entry, rt_entry_relation); } + else if (event == CMD_MERGE) + { + Assert(parsetree->override == OVERRIDING_NOT_SET); + + /* + * Rewrite each action targetlist separately + */ + foreach(lc1, parsetree->mergeActionList) + { + MergeAction *action = (MergeAction *) lfirst(lc1); + + switch (action->commandType) + { + case CMD_NOTHING: + case CMD_DELETE: /* Nothing to do here */ + break; + case CMD_UPDATE: + case CMD_INSERT: + /* XXX is it possible to have a VALUES clause? */ + action->targetList = + rewriteTargetListIU(action->targetList, + action->commandType, + action->override, + rt_entry_relation, + NULL, 0, NULL); + break; + default: + elog(ERROR, "unrecognized commandType: %d", action->commandType); + break; + } + } + } else if (event == CMD_DELETE) { /* Nothing to do here */ diff --git a/src/backend/rewrite/rowsecurity.c b/src/backend/rewrite/rowsecurity.c index f0a046d65a6..a233dd47585 100644 --- a/src/backend/rewrite/rowsecurity.c +++ b/src/backend/rewrite/rowsecurity.c @@ -232,15 +232,17 @@ get_row_security_policies(Query *root, RangeTblEntry *rte, int rt_index, hasSubLinks); /* - * Similar to above, during an UPDATE or DELETE, if SELECT rights are also - * required (eg: when a RETURNING clause exists, or the user has provided - * a WHERE clause which involves columns from the relation), we collect up - * CMD_SELECT policies and add them via add_security_quals first. + * Similar to above, during an UPDATE, DELETE, or MERGE, if SELECT rights + * are also required (eg: when a RETURNING clause exists, or the user has + * provided a WHERE clause which involves columns from the relation), we + * collect up CMD_SELECT policies and add them via add_security_quals + * first. * * This way, we filter out any records which are not visible through an * ALL or SELECT USING policy. */ - if ((commandType == CMD_UPDATE || commandType == CMD_DELETE) && + if ((commandType == CMD_UPDATE || commandType == CMD_DELETE || + commandType == CMD_MERGE) && rte->requiredPerms & ACL_SELECT) { List *select_permissive_policies; @@ -380,6 +382,92 @@ get_row_security_policies(Query *root, RangeTblEntry *rte, int rt_index, } } + /* + * FOR MERGE, we fetch policies for UPDATE, DELETE and INSERT (and ALL) + * and set them up so that we can enforce the appropriate policy depending + * on the final action we take. + * + * We already fetched the SELECT policies above. + * + * We don't push the UPDATE/DELETE USING quals to the RTE because we don't + * really want to apply them while scanning the relation since we don't + * know whether we will be doing an UPDATE or a DELETE at the end. We + * apply the respective policy once we decide the final action on the + * target tuple. + * + * XXX We are setting up USING quals as WITH CHECK. If RLS prohibits + * UPDATE/DELETE on the target row, we shall throw an error instead of + * silently ignoring the row. This is different than how normal + * UPDATE/DELETE works and more in line with INSERT ON CONFLICT DO UPDATE + * handling. + */ + if (commandType == CMD_MERGE) + { + List *merge_permissive_policies; + List *merge_restrictive_policies; + + /* + * Fetch the UPDATE policies and set them up to execute on the + * existing target row before doing UPDATE. + */ + get_policies_for_relation(rel, CMD_UPDATE, user_id, + &merge_permissive_policies, + &merge_restrictive_policies); + + /* + * WCO_RLS_MERGE_UPDATE_CHECK is used to check UPDATE USING quals on + * the existing target row. + */ + add_with_check_options(rel, rt_index, + WCO_RLS_MERGE_UPDATE_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + true); + + /* + * Same with DELETE policies. + */ + get_policies_for_relation(rel, CMD_DELETE, user_id, + &merge_permissive_policies, + &merge_restrictive_policies); + + add_with_check_options(rel, rt_index, + WCO_RLS_MERGE_DELETE_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + true); + + /* + * No special handling is required for INSERT policies. They will be + * checked and enforced during ExecInsert(). But we must add them to + * withCheckOptions. + */ + get_policies_for_relation(rel, CMD_INSERT, user_id, + &merge_permissive_policies, + &merge_restrictive_policies); + + add_with_check_options(rel, rt_index, + WCO_RLS_INSERT_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + false); + + /* Enforce the WITH CHECK clauses of the UPDATE policies */ + add_with_check_options(rel, rt_index, + WCO_RLS_UPDATE_CHECK, + merge_permissive_policies, + merge_restrictive_policies, + withCheckOptions, + hasSubLinks, + false); + } + table_close(rel, NoLock); /* @@ -444,6 +532,14 @@ get_policies_for_relation(Relation relation, CmdType cmd, Oid user_id, if (policy->polcmd == ACL_DELETE_CHR) cmd_matches = true; break; + case CMD_MERGE: + + /* + * We do not support a separate policy for MERGE command. + * Instead it derives from the policies defined for other + * commands. + */ + break; default: elog(ERROR, "unrecognized policy command type %d", (int) cmd); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 5f907831a3a..5aa5a350f38 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -178,6 +178,9 @@ ProcessQuery(PlannedStmt *plan, case CMD_DELETE: SetQueryCompletion(qc, CMDTAG_DELETE, queryDesc->estate->es_processed); break; + case CMD_MERGE: + SetQueryCompletion(qc, CMDTAG_MERGE, queryDesc->estate->es_processed); + break; default: SetQueryCompletion(qc, CMDTAG_UNKNOWN, queryDesc->estate->es_processed); break; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 3780c6e812e..f364a9b88a9 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -113,6 +113,7 @@ CommandIsReadOnly(PlannedStmt *pstmt) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: return false; case CMD_UTILITY: /* For now, treat all utility commands as read/write */ @@ -2124,6 +2125,8 @@ QueryReturnsTuples(Query *parsetree) case CMD_SELECT: /* returns tuples */ return true; + case CMD_MERGE: + return false; case CMD_INSERT: case CMD_UPDATE: case CMD_DELETE: @@ -2365,6 +2368,10 @@ CreateCommandTag(Node *parsetree) tag = CMDTAG_UPDATE; break; + case T_MergeStmt: + tag = CMDTAG_MERGE; + break; + case T_SelectStmt: tag = CMDTAG_SELECT; break; @@ -3125,6 +3132,9 @@ CreateCommandTag(Node *parsetree) case CMD_DELETE: tag = CMDTAG_DELETE; break; + case CMD_MERGE: + tag = CMDTAG_MERGE; + break; case CMD_UTILITY: tag = CreateCommandTag(stmt->utilityStmt); break; @@ -3185,6 +3195,9 @@ CreateCommandTag(Node *parsetree) case CMD_DELETE: tag = CMDTAG_DELETE; break; + case CMD_MERGE: + tag = CMDTAG_MERGE; + break; case CMD_UTILITY: tag = CreateCommandTag(stmt->utilityStmt); break; @@ -3233,6 +3246,7 @@ GetCommandLogLevel(Node *parsetree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: lev = LOGSTMT_MOD; break; @@ -3682,6 +3696,7 @@ GetCommandLogLevel(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: lev = LOGSTMT_MOD; break; @@ -3712,6 +3727,7 @@ GetCommandLogLevel(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: lev = LOGSTMT_MOD; break; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index df5c4865014..82dc849a301 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -4940,6 +4940,8 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) * For a WorkTableScan, locate the parent RecursiveUnion plan node and use * that as INNER referent. * + * For MERGE, make the inner tlist point to the merge source tlist, which + * is same as the targetlist that the ModifyTable's source plan provides. * For ON CONFLICT .. UPDATE we just need the inner tlist to point to the * excluded expression's tlist. (Similar to the SubqueryScan we don't want * to reuse OUTER, it's used for RETURNING in some modify table cases, @@ -4959,7 +4961,12 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) dpns->inner_plan = innerPlan(plan); if (IsA(plan, ModifyTable)) - dpns->inner_tlist = ((ModifyTable *) plan)->exclRelTlist; + { + if (((ModifyTable *) plan)->operation == CMD_MERGE) + dpns->inner_tlist = dpns->outer_plan->targetlist; + else + dpns->inner_tlist = ((ModifyTable *) plan)->exclRelTlist; + } else if (dpns->inner_plan) dpns->inner_tlist = dpns->inner_plan->targetlist; else -- cgit v1.2.3