@@ -282,30 +282,41 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
282
282
}
283
283
284
284
/*
285
- * Make sure that we started local transaction.
285
+ * Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
286
286
*
287
- * Also switches to ApplyMessageContext as necessary.
287
+ * Start a transaction, if this is the first step (else we keep using the
288
+ * existing transaction).
289
+ * Also provide a global snapshot and ensure we run in ApplyMessageContext.
288
290
*/
289
- static bool
290
- ensure_transaction (void )
291
+ static void
292
+ begin_replication_step (void )
291
293
{
292
- if (IsTransactionState ())
293
- {
294
- SetCurrentStatementStartTimestamp ();
295
-
296
- if (CurrentMemoryContext != ApplyMessageContext )
297
- MemoryContextSwitchTo (ApplyMessageContext );
294
+ SetCurrentStatementStartTimestamp ();
298
295
299
- return false;
296
+ if (!IsTransactionState ())
297
+ {
298
+ StartTransactionCommand ();
299
+ maybe_reread_subscription ();
300
300
}
301
301
302
- SetCurrentStatementStartTimestamp ();
303
- StartTransactionCommand ();
304
-
305
- maybe_reread_subscription ();
302
+ PushActiveSnapshot (GetTransactionSnapshot ());
306
303
307
304
MemoryContextSwitchTo (ApplyMessageContext );
308
- return true;
305
+ }
306
+
307
+ /*
308
+ * Finish up one step of a replication transaction.
309
+ * Callers of begin_replication_step() must also call this.
310
+ *
311
+ * We don't close out the transaction here, but we should increment
312
+ * the command counter to make the effects of this step visible.
313
+ */
314
+ static void
315
+ end_replication_step (void )
316
+ {
317
+ PopActiveSnapshot ();
318
+
319
+ CommandCounterIncrement ();
309
320
}
310
321
311
322
/*
@@ -359,13 +370,6 @@ create_edata_for_relation(LogicalRepRelMapEntry *rel)
359
370
RangeTblEntry * rte ;
360
371
ResultRelInfo * resultRelInfo ;
361
372
362
- /*
363
- * Input functions may need an active snapshot, as may AFTER triggers
364
- * invoked during finish_edata. For safety, ensure an active snapshot
365
- * exists throughout all our usage of the executor.
366
- */
367
- PushActiveSnapshot (GetTransactionSnapshot ());
368
-
369
373
edata = (ApplyExecutionData * ) palloc0 (sizeof (ApplyExecutionData ));
370
374
edata -> targetRel = rel ;
371
375
@@ -433,8 +437,6 @@ finish_edata(ApplyExecutionData *edata)
433
437
ExecResetTupleTable (estate -> es_tupleTable , false);
434
438
FreeExecutorState (estate );
435
439
pfree (edata );
436
-
437
- PopActiveSnapshot ();
438
440
}
439
441
440
442
/*
@@ -831,7 +833,7 @@ apply_handle_stream_start(StringInfo s)
831
833
* transaction for handling the buffile, used for serializing the
832
834
* streaming data and subxact info.
833
835
*/
834
- ensure_transaction ();
836
+ begin_replication_step ();
835
837
836
838
/* notify handle methods we're processing a remote transaction */
837
839
in_streamed_transaction = true;
@@ -861,6 +863,8 @@ apply_handle_stream_start(StringInfo s)
861
863
subxact_info_read (MyLogicalRepWorker -> subid , stream_xid );
862
864
863
865
pgstat_report_activity (STATE_RUNNING , NULL );
866
+
867
+ end_replication_step ();
864
868
}
865
869
866
870
/*
@@ -937,7 +941,7 @@ apply_handle_stream_abort(StringInfo s)
937
941
StreamXidHash * ent ;
938
942
939
943
subidx = -1 ;
940
- ensure_transaction ();
944
+ begin_replication_step ();
941
945
subxact_info_read (MyLogicalRepWorker -> subid , xid );
942
946
943
947
for (i = subxact_data .nsubxacts ; i > 0 ; i -- )
@@ -958,7 +962,7 @@ apply_handle_stream_abort(StringInfo s)
958
962
{
959
963
/* Cleanup the subxact info */
960
964
cleanup_subxact_info ();
961
-
965
+ end_replication_step ();
962
966
CommitTransactionCommand ();
963
967
return ;
964
968
}
@@ -986,6 +990,7 @@ apply_handle_stream_abort(StringInfo s)
986
990
/* write the updated subxact list */
987
991
subxact_info_write (MyLogicalRepWorker -> subid , xid );
988
992
993
+ end_replication_step ();
989
994
CommitTransactionCommand ();
990
995
}
991
996
}
@@ -1013,7 +1018,8 @@ apply_handle_stream_commit(StringInfo s)
1013
1018
1014
1019
elog (DEBUG1 , "received commit for streamed transaction %u" , xid );
1015
1020
1016
- ensure_transaction ();
1021
+ /* Make sure we have an open transaction */
1022
+ begin_replication_step ();
1017
1023
1018
1024
/*
1019
1025
* Allocate file handle and memory required to process all the messages in
@@ -1046,6 +1052,8 @@ apply_handle_stream_commit(StringInfo s)
1046
1052
in_remote_transaction = true;
1047
1053
pgstat_report_activity (STATE_RUNNING , NULL );
1048
1054
1055
+ end_replication_step ();
1056
+
1049
1057
/*
1050
1058
* Read the entries one by one and pass them through the same logic as in
1051
1059
* apply_dispatch.
@@ -1227,7 +1235,7 @@ apply_handle_insert(StringInfo s)
1227
1235
if (handle_streamed_transaction (LOGICAL_REP_MSG_INSERT , s ))
1228
1236
return ;
1229
1237
1230
- ensure_transaction ();
1238
+ begin_replication_step ();
1231
1239
1232
1240
relid = logicalrep_read_insert (s , & newtup );
1233
1241
rel = logicalrep_rel_open (relid , RowExclusiveLock );
@@ -1238,6 +1246,7 @@ apply_handle_insert(StringInfo s)
1238
1246
* transaction so it's safe to unlock it.
1239
1247
*/
1240
1248
logicalrep_rel_close (rel , RowExclusiveLock );
1249
+ end_replication_step ();
1241
1250
return ;
1242
1251
}
1243
1252
@@ -1266,7 +1275,7 @@ apply_handle_insert(StringInfo s)
1266
1275
1267
1276
logicalrep_rel_close (rel , NoLock );
1268
1277
1269
- CommandCounterIncrement ();
1278
+ end_replication_step ();
1270
1279
}
1271
1280
1272
1281
/*
@@ -1346,7 +1355,7 @@ apply_handle_update(StringInfo s)
1346
1355
if (handle_streamed_transaction (LOGICAL_REP_MSG_UPDATE , s ))
1347
1356
return ;
1348
1357
1349
- ensure_transaction ();
1358
+ begin_replication_step ();
1350
1359
1351
1360
relid = logicalrep_read_update (s , & has_oldtup , & oldtup ,
1352
1361
& newtup );
@@ -1358,6 +1367,7 @@ apply_handle_update(StringInfo s)
1358
1367
* transaction so it's safe to unlock it.
1359
1368
*/
1360
1369
logicalrep_rel_close (rel , RowExclusiveLock );
1370
+ end_replication_step ();
1361
1371
return ;
1362
1372
}
1363
1373
@@ -1416,7 +1426,7 @@ apply_handle_update(StringInfo s)
1416
1426
1417
1427
logicalrep_rel_close (rel , NoLock );
1418
1428
1419
- CommandCounterIncrement ();
1429
+ end_replication_step ();
1420
1430
}
1421
1431
1422
1432
/*
@@ -1501,7 +1511,7 @@ apply_handle_delete(StringInfo s)
1501
1511
if (handle_streamed_transaction (LOGICAL_REP_MSG_DELETE , s ))
1502
1512
return ;
1503
1513
1504
- ensure_transaction ();
1514
+ begin_replication_step ();
1505
1515
1506
1516
relid = logicalrep_read_delete (s , & oldtup );
1507
1517
rel = logicalrep_rel_open (relid , RowExclusiveLock );
@@ -1512,6 +1522,7 @@ apply_handle_delete(StringInfo s)
1512
1522
* transaction so it's safe to unlock it.
1513
1523
*/
1514
1524
logicalrep_rel_close (rel , RowExclusiveLock );
1525
+ end_replication_step ();
1515
1526
return ;
1516
1527
}
1517
1528
@@ -1542,7 +1553,7 @@ apply_handle_delete(StringInfo s)
1542
1553
1543
1554
logicalrep_rel_close (rel , NoLock );
1544
1555
1545
- CommandCounterIncrement ();
1556
+ end_replication_step ();
1546
1557
}
1547
1558
1548
1559
/*
@@ -1867,7 +1878,7 @@ apply_handle_truncate(StringInfo s)
1867
1878
if (handle_streamed_transaction (LOGICAL_REP_MSG_TRUNCATE , s ))
1868
1879
return ;
1869
1880
1870
- ensure_transaction ();
1881
+ begin_replication_step ();
1871
1882
1872
1883
remote_relids = logicalrep_read_truncate (s , & cascade , & restart_seqs );
1873
1884
@@ -1958,7 +1969,7 @@ apply_handle_truncate(StringInfo s)
1958
1969
table_close (rel , NoLock );
1959
1970
}
1960
1971
1961
- CommandCounterIncrement ();
1972
+ end_replication_step ();
1962
1973
}
1963
1974
1964
1975
0 commit comments