@@ -138,30 +138,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
138
138
}
139
139
140
140
/*
141
- * Make sure that we started local transaction.
141
+ * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
142
142
*
143
- * Also switches to ApplyMessageContext as necessary.
143
+ * Start a transaction, if this is the first step (else we keep using the
144
+ * existing transaction).
145
+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
144
146
*/
145
- static bool
146
- ensure_transaction (void )
147
+ static void
148
+ begin_replication_step (void )
147
149
{
148
- if (IsTransactionState ())
149
- {
150
- SetCurrentStatementStartTimestamp ();
151
-
152
- if (CurrentMemoryContext != ApplyMessageContext )
153
- MemoryContextSwitchTo (ApplyMessageContext );
150
+ SetCurrentStatementStartTimestamp ();
154
151
155
- return false;
152
+ if (!IsTransactionState ())
153
+ {
154
+ StartTransactionCommand ();
155
+ maybe_reread_subscription ();
156
156
}
157
157
158
- SetCurrentStatementStartTimestamp ();
159
- StartTransactionCommand ();
160
-
161
- maybe_reread_subscription ();
158
+ PushActiveSnapshot (GetTransactionSnapshot ());
162
159
163
160
MemoryContextSwitchTo (ApplyMessageContext );
164
- return true;
161
+ }
162
+
163
+ /*
164
+ * Finish up one step of a replication transaction.
165
+ * Callers of begin_replication_step() must also call this.
166
+ *
167
+ * We don't close out the transaction here, but we should increment
168
+ * the command counter to make the effects of this step visible.
169
+ */
170
+ static void
171
+ end_replication_step (void )
172
+ {
173
+ PopActiveSnapshot ();
174
+
175
+ CommandCounterIncrement ();
165
176
}
166
177
167
178
@@ -178,13 +189,6 @@ create_estate_for_relation(LogicalRepRelMapEntry *rel)
178
189
ResultRelInfo * resultRelInfo ;
179
190
RangeTblEntry * rte ;
180
191
181
- /*
182
- * Input functions may need an active snapshot, as may AFTER triggers
183
- * invoked during finish_estate. For safety, ensure an active snapshot
184
- * exists throughout all our usage of the executor.
185
- */
186
- PushActiveSnapshot (GetTransactionSnapshot ());
187
-
188
192
estate = CreateExecutorState ();
189
193
190
194
rte = makeNode (RangeTblEntry );
@@ -222,7 +226,6 @@ finish_estate(EState *estate)
222
226
/* Cleanup. */
223
227
ExecResetTupleTable (estate -> es_tupleTable , false);
224
228
FreeExecutorState (estate );
225
- PopActiveSnapshot ();
226
229
}
227
230
228
231
/*
@@ -612,7 +615,7 @@ apply_handle_insert(StringInfo s)
612
615
TupleTableSlot * remoteslot ;
613
616
MemoryContext oldctx ;
614
617
615
- ensure_transaction ();
618
+ begin_replication_step ();
616
619
617
620
relid = logicalrep_read_insert (s , & newtup );
618
621
rel = logicalrep_rel_open (relid , RowExclusiveLock );
@@ -623,6 +626,7 @@ apply_handle_insert(StringInfo s)
623
626
* transaction so it's safe to unlock it.
624
627
*/
625
628
logicalrep_rel_close (rel , RowExclusiveLock );
629
+ end_replication_step ();
626
630
return ;
627
631
}
628
632
@@ -650,7 +654,7 @@ apply_handle_insert(StringInfo s)
650
654
651
655
logicalrep_rel_close (rel , NoLock );
652
656
653
- CommandCounterIncrement ();
657
+ end_replication_step ();
654
658
}
655
659
656
660
/*
@@ -708,7 +712,7 @@ apply_handle_update(StringInfo s)
708
712
bool found ;
709
713
MemoryContext oldctx ;
710
714
711
- ensure_transaction ();
715
+ begin_replication_step ();
712
716
713
717
relid = logicalrep_read_update (s , & has_oldtup , & oldtup ,
714
718
& newtup );
@@ -720,6 +724,7 @@ apply_handle_update(StringInfo s)
720
724
* transaction so it's safe to unlock it.
721
725
*/
722
726
logicalrep_rel_close (rel , RowExclusiveLock );
727
+ end_replication_step ();
723
728
return ;
724
729
}
725
730
@@ -825,7 +830,7 @@ apply_handle_update(StringInfo s)
825
830
826
831
logicalrep_rel_close (rel , NoLock );
827
832
828
- CommandCounterIncrement ();
833
+ end_replication_step ();
829
834
}
830
835
831
836
/*
@@ -847,7 +852,7 @@ apply_handle_delete(StringInfo s)
847
852
bool found ;
848
853
MemoryContext oldctx ;
849
854
850
- ensure_transaction ();
855
+ begin_replication_step ();
851
856
852
857
relid = logicalrep_read_delete (s , & oldtup );
853
858
rel = logicalrep_rel_open (relid , RowExclusiveLock );
@@ -858,6 +863,7 @@ apply_handle_delete(StringInfo s)
858
863
* transaction so it's safe to unlock it.
859
864
*/
860
865
logicalrep_rel_close (rel , RowExclusiveLock );
866
+ end_replication_step ();
861
867
return ;
862
868
}
863
869
@@ -920,7 +926,7 @@ apply_handle_delete(StringInfo s)
920
926
921
927
logicalrep_rel_close (rel , NoLock );
922
928
923
- CommandCounterIncrement ();
929
+ end_replication_step ();
924
930
}
925
931
926
932
/*
@@ -941,7 +947,7 @@ apply_handle_truncate(StringInfo s)
941
947
ListCell * lc ;
942
948
LOCKMODE lockmode = AccessExclusiveLock ;
943
949
944
- ensure_transaction ();
950
+ begin_replication_step ();
945
951
946
952
remote_relids = logicalrep_read_truncate (s , & cascade , & restart_seqs );
947
953
@@ -982,7 +988,7 @@ apply_handle_truncate(StringInfo s)
982
988
logicalrep_rel_close (rel , NoLock );
983
989
}
984
990
985
- CommandCounterIncrement ();
991
+ end_replication_step ();
986
992
}
987
993
988
994
0 commit comments