@@ -97,6 +97,18 @@ typedef struct SlotErrCallbackArg
97
97
int remote_attnum ;
98
98
} SlotErrCallbackArg ;
99
99
100
+ typedef struct ApplyExecutionData
101
+ {
102
+ EState * estate ; /* executor state, used to track resources */
103
+
104
+ LogicalRepRelMapEntry * targetRel ; /* replication target rel */
105
+ ResultRelInfo * targetRelInfo ; /* ResultRelInfo for same */
106
+
107
+ /* These fields are used when the target relation is partitioned: */
108
+ ModifyTableState * mtstate ; /* dummy ModifyTable state */
109
+ PartitionTupleRouting * proute ; /* partition routing info */
110
+ } ApplyExecutionData ;
111
+
100
112
static MemoryContext ApplyMessageContext = NULL ;
101
113
MemoryContext ApplyContext = NULL ;
102
114
@@ -127,11 +139,9 @@ static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
127
139
LogicalRepRelation * remoterel ,
128
140
TupleTableSlot * remoteslot ,
129
141
TupleTableSlot * * localslot );
130
- static void apply_handle_tuple_routing (ResultRelInfo * relinfo ,
131
- EState * estate ,
142
+ static void apply_handle_tuple_routing (ApplyExecutionData * edata ,
132
143
TupleTableSlot * remoteslot ,
133
144
LogicalRepTupleData * newtup ,
134
- LogicalRepRelMapEntry * relmapentry ,
135
145
CmdType operation );
136
146
137
147
/*
@@ -188,25 +198,29 @@ ensure_transaction(void)
188
198
189
199
/*
190
200
* Executor state preparation for evaluation of constraint expressions,
191
- * indexes and triggers.
201
+ * indexes and triggers for the specified relation .
192
202
*
193
- * This is based on similar code in copy.c
203
+ * Note that the caller must open and close any indexes to be updated.
194
204
*/
195
- static EState *
196
- create_estate_for_relation (LogicalRepRelMapEntry * rel )
205
+ static ApplyExecutionData *
206
+ create_edata_for_relation (LogicalRepRelMapEntry * rel )
197
207
{
208
+ ApplyExecutionData * edata ;
198
209
EState * estate ;
199
210
ResultRelInfo * resultRelInfo ;
200
211
RangeTblEntry * rte ;
201
212
202
213
/*
203
214
* Input functions may need an active snapshot, as may AFTER triggers
204
- * invoked during finish_estate . For safety, ensure an active snapshot
215
+ * invoked during finish_edata . For safety, ensure an active snapshot
205
216
* exists throughout all our usage of the executor.
206
217
*/
207
218
PushActiveSnapshot (GetTransactionSnapshot ());
208
219
209
- estate = CreateExecutorState ();
220
+ edata = (ApplyExecutionData * ) palloc0 (sizeof (ApplyExecutionData ));
221
+ edata -> targetRel = rel ;
222
+
223
+ edata -> estate = estate = CreateExecutorState ();
210
224
211
225
rte = makeNode (RangeTblEntry );
212
226
rte -> rtekind = RTE_RELATION ;
@@ -215,7 +229,12 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
215
229
rte -> rellockmode = AccessShareLock ;
216
230
ExecInitRangeTable (estate , list_make1 (rte ));
217
231
218
- resultRelInfo = makeNode (ResultRelInfo );
232
+ edata -> targetRelInfo = resultRelInfo = makeNode (ResultRelInfo );
233
+
234
+ /*
235
+ * Use Relation opened by logicalrep_rel_open() instead of opening it
236
+ * again.
237
+ */
219
238
InitResultRelInfo (resultRelInfo , rel -> localrel , 1 , NULL , 0 );
220
239
221
240
estate -> es_result_relations = resultRelInfo ;
@@ -227,22 +246,38 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
227
246
/* Prepare to catch AFTER triggers. */
228
247
AfterTriggerBeginQuery ();
229
248
230
- return estate ;
249
+ /* other fields of edata remain NULL for now */
250
+
251
+ return edata ;
231
252
}
232
253
233
254
/*
234
255
* Finish any operations related to the executor state created by
235
- * create_estate_for_relation ().
256
+ * create_edata_for_relation ().
236
257
*/
237
258
static void
238
- finish_estate ( EState * estate )
259
+ finish_edata ( ApplyExecutionData * edata )
239
260
{
261
+ EState * estate = edata -> estate ;
262
+
240
263
/* Handle any queued AFTER triggers. */
241
264
AfterTriggerEndQuery (estate );
242
265
243
- /* Cleanup. */
266
+ /* Shut down tuple routing, if any was done. */
267
+ if (edata -> proute )
268
+ ExecCleanupTupleRouting (edata -> mtstate , edata -> proute );
269
+
270
+ /*
271
+ * Cleanup. It might seem that we should call ExecCloseResultRelations()
272
+ * here, but we intentionally don't. It would close the rel we added to
273
+ * the estate above, which is wrong because we took no corresponding
274
+ * refcount. We rely on ExecCleanupTupleRouting() to close any other
275
+ * relations opened during execution.
276
+ */
244
277
ExecResetTupleTable (estate -> es_tupleTable , false);
245
278
FreeExecutorState (estate );
279
+ pfree (edata );
280
+
246
281
PopActiveSnapshot ();
247
282
}
248
283
@@ -633,6 +668,7 @@ apply_handle_insert(StringInfo s)
633
668
LogicalRepRelMapEntry * rel ;
634
669
LogicalRepTupleData newtup ;
635
670
LogicalRepRelId relid ;
671
+ ApplyExecutionData * edata ;
636
672
EState * estate ;
637
673
TupleTableSlot * remoteslot ;
638
674
MemoryContext oldctx ;
@@ -652,7 +688,8 @@ apply_handle_insert(StringInfo s)
652
688
}
653
689
654
690
/* Initialize the executor state. */
655
- estate = create_estate_for_relation (rel );
691
+ edata = create_edata_for_relation (rel );
692
+ estate = edata -> estate ;
656
693
remoteslot = ExecInitExtraTupleSlot (estate ,
657
694
RelationGetDescr (rel -> localrel ),
658
695
& TTSOpsVirtual );
@@ -665,13 +702,13 @@ apply_handle_insert(StringInfo s)
665
702
666
703
/* For a partitioned table, insert the tuple into a partition. */
667
704
if (rel -> localrel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
668
- apply_handle_tuple_routing (estate -> es_result_relation_info , estate ,
669
- remoteslot , NULL , rel , CMD_INSERT );
705
+ apply_handle_tuple_routing (edata ,
706
+ remoteslot , NULL , CMD_INSERT );
670
707
else
671
708
apply_handle_insert_internal (estate -> es_result_relation_info , estate ,
672
709
remoteslot );
673
710
674
- finish_estate ( estate );
711
+ finish_edata ( edata );
675
712
676
713
logicalrep_rel_close (rel , NoLock );
677
714
@@ -735,6 +772,7 @@ apply_handle_update(StringInfo s)
735
772
{
736
773
LogicalRepRelMapEntry * rel ;
737
774
LogicalRepRelId relid ;
775
+ ApplyExecutionData * edata ;
738
776
EState * estate ;
739
777
LogicalRepTupleData oldtup ;
740
778
LogicalRepTupleData newtup ;
@@ -762,7 +800,8 @@ apply_handle_update(StringInfo s)
762
800
check_relation_updatable (rel );
763
801
764
802
/* Initialize the executor state. */
765
- estate = create_estate_for_relation (rel );
803
+ edata = create_edata_for_relation (rel );
804
+ estate = edata -> estate ;
766
805
remoteslot = ExecInitExtraTupleSlot (estate ,
767
806
RelationGetDescr (rel -> localrel ),
768
807
& TTSOpsVirtual );
@@ -800,13 +839,13 @@ apply_handle_update(StringInfo s)
800
839
801
840
/* For a partitioned table, apply update to correct partition. */
802
841
if (rel -> localrel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
803
- apply_handle_tuple_routing (estate -> es_result_relation_info , estate ,
804
- remoteslot , & newtup , rel , CMD_UPDATE );
842
+ apply_handle_tuple_routing (edata ,
843
+ remoteslot , & newtup , CMD_UPDATE );
805
844
else
806
845
apply_handle_update_internal (estate -> es_result_relation_info , estate ,
807
846
remoteslot , & newtup , rel );
808
847
809
- finish_estate ( estate );
848
+ finish_edata ( edata );
810
849
811
850
logicalrep_rel_close (rel , NoLock );
812
851
@@ -881,6 +920,7 @@ apply_handle_delete(StringInfo s)
881
920
LogicalRepRelMapEntry * rel ;
882
921
LogicalRepTupleData oldtup ;
883
922
LogicalRepRelId relid ;
923
+ ApplyExecutionData * edata ;
884
924
EState * estate ;
885
925
TupleTableSlot * remoteslot ;
886
926
MemoryContext oldctx ;
@@ -903,7 +943,8 @@ apply_handle_delete(StringInfo s)
903
943
check_relation_updatable (rel );
904
944
905
945
/* Initialize the executor state. */
906
- estate = create_estate_for_relation (rel );
946
+ edata = create_edata_for_relation (rel );
947
+ estate = edata -> estate ;
907
948
remoteslot = ExecInitExtraTupleSlot (estate ,
908
949
RelationGetDescr (rel -> localrel ),
909
950
& TTSOpsVirtual );
@@ -915,13 +956,13 @@ apply_handle_delete(StringInfo s)
915
956
916
957
/* For a partitioned table, apply delete to correct partition. */
917
958
if (rel -> localrel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
918
- apply_handle_tuple_routing (estate -> es_result_relation_info , estate ,
919
- remoteslot , NULL , rel , CMD_DELETE );
959
+ apply_handle_tuple_routing (edata ,
960
+ remoteslot , NULL , CMD_DELETE );
920
961
else
921
962
apply_handle_delete_internal (estate -> es_result_relation_info , estate ,
922
963
remoteslot , & rel -> remoterel );
923
964
924
- finish_estate ( estate );
965
+ finish_edata ( edata );
925
966
926
967
logicalrep_rel_close (rel , NoLock );
927
968
@@ -1004,16 +1045,17 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
1004
1045
* This handles insert, update, delete on a partitioned table.
1005
1046
*/
1006
1047
static void
1007
- apply_handle_tuple_routing (ResultRelInfo * relinfo ,
1008
- EState * estate ,
1048
+ apply_handle_tuple_routing (ApplyExecutionData * edata ,
1009
1049
TupleTableSlot * remoteslot ,
1010
1050
LogicalRepTupleData * newtup ,
1011
- LogicalRepRelMapEntry * relmapentry ,
1012
1051
CmdType operation )
1013
1052
{
1053
+ EState * estate = edata -> estate ;
1054
+ LogicalRepRelMapEntry * relmapentry = edata -> targetRel ;
1055
+ ResultRelInfo * relinfo = edata -> targetRelInfo ;
1014
1056
Relation parentrel = relinfo -> ri_RelationDesc ;
1015
- ModifyTableState * mtstate = NULL ;
1016
- PartitionTupleRouting * proute = NULL ;
1057
+ ModifyTableState * mtstate ;
1058
+ PartitionTupleRouting * proute ;
1017
1059
ResultRelInfo * partrelinfo ;
1018
1060
Relation partrel ;
1019
1061
TupleTableSlot * remoteslot_part ;
@@ -1022,12 +1064,15 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
1022
1064
MemoryContext oldctx ;
1023
1065
1024
1066
/* ModifyTableState is needed for ExecFindPartition(). */
1025
- mtstate = makeNode (ModifyTableState );
1067
+ edata -> mtstate = mtstate = makeNode (ModifyTableState );
1026
1068
mtstate -> ps .plan = NULL ;
1027
1069
mtstate -> ps .state = estate ;
1028
1070
mtstate -> operation = operation ;
1029
1071
mtstate -> resultRelInfo = relinfo ;
1030
- proute = ExecSetupPartitionTupleRouting (estate , mtstate , parentrel );
1072
+
1073
+ /* ... as is PartitionTupleRouting. */
1074
+ edata -> proute = proute = ExecSetupPartitionTupleRouting (estate , mtstate ,
1075
+ parentrel );
1031
1076
1032
1077
/*
1033
1078
* Find the partition to which the "search tuple" belongs.
@@ -1225,8 +1270,6 @@ apply_handle_tuple_routing(ResultRelInfo *relinfo,
1225
1270
elog (ERROR , "unrecognized CmdType: %d" , (int ) operation );
1226
1271
break ;
1227
1272
}
1228
-
1229
- ExecCleanupTupleRouting (mtstate , proute );
1230
1273
}
1231
1274
1232
1275
/*
0 commit comments