@@ -79,7 +79,19 @@ typedef struct LogicalRepWorkerId
79
79
Oid relid ;
80
80
} LogicalRepWorkerId ;
81
81
82
- static List * on_commit_stop_workers = NIL ;
82
+ typedef struct StopWorkersData
83
+ {
84
+ int nestDepth ; /* Sub-transaction nest level */
85
+ List * workers ; /* List of LogicalRepWorkerId */
86
+ struct StopWorkersData * parent ; /* This need not be an immediate
87
+ * subtransaction parent */
88
+ } StopWorkersData ;
89
+
90
+ /*
91
+ * Stack of StopWorkersData elements. Each stack element contains the workers
92
+ * to be stopped for that subtransaction.
93
+ */
94
+ static StopWorkersData * on_commit_stop_workers = NULL ;
83
95
84
96
static void ApplyLauncherWakeup (void );
85
97
static void logicalrep_launcher_onexit (int code , Datum arg );
@@ -559,17 +571,41 @@ logicalrep_worker_stop(Oid subid, Oid relid)
559
571
void
560
572
logicalrep_worker_stop_at_commit (Oid subid , Oid relid )
561
573
{
574
+ int nestDepth = GetCurrentTransactionNestLevel ();
562
575
LogicalRepWorkerId * wid ;
563
576
MemoryContext oldctx ;
564
577
565
578
/* Make sure we store the info in context that survives until commit. */
566
579
oldctx = MemoryContextSwitchTo (TopTransactionContext );
567
580
581
+ /* Check that previous transactions were properly cleaned up. */
582
+ Assert (on_commit_stop_workers == NULL ||
583
+ nestDepth >= on_commit_stop_workers -> nestDepth );
584
+
585
+ /*
586
+ * Push a new stack element if we don't already have one for the current
587
+ * nestDepth.
588
+ */
589
+ if (on_commit_stop_workers == NULL ||
590
+ nestDepth > on_commit_stop_workers -> nestDepth )
591
+ {
592
+ StopWorkersData * newdata = palloc (sizeof (StopWorkersData ));
593
+
594
+ newdata -> nestDepth = nestDepth ;
595
+ newdata -> workers = NIL ;
596
+ newdata -> parent = on_commit_stop_workers ;
597
+ on_commit_stop_workers = newdata ;
598
+ }
599
+
600
+ /*
601
+ * Finally add a new worker into the worker list of the current
602
+ * subtransaction.
603
+ */
568
604
wid = palloc (sizeof (LogicalRepWorkerId ));
569
605
wid -> subid = subid ;
570
606
wid -> relid = relid ;
571
-
572
- on_commit_stop_workers = lappend (on_commit_stop_workers , wid );
607
+ on_commit_stop_workers -> workers =
608
+ lappend (on_commit_stop_workers -> workers , wid );
573
609
574
610
MemoryContextSwitchTo (oldctx );
575
611
}
@@ -823,7 +859,7 @@ ApplyLauncherShmemInit(void)
823
859
bool
824
860
XactManipulatesLogicalReplicationWorkers (void )
825
861
{
826
- return (on_commit_stop_workers != NIL );
862
+ return (on_commit_stop_workers != NULL );
827
863
}
828
864
829
865
/*
@@ -832,15 +868,25 @@ XactManipulatesLogicalReplicationWorkers(void)
832
868
void
833
869
AtEOXact_ApplyLauncher (bool isCommit )
834
870
{
871
+
872
+ Assert (on_commit_stop_workers == NULL ||
873
+ (on_commit_stop_workers -> nestDepth == 1 &&
874
+ on_commit_stop_workers -> parent == NULL ));
875
+
835
876
if (isCommit )
836
877
{
837
878
ListCell * lc ;
838
879
839
- foreach ( lc , on_commit_stop_workers )
880
+ if ( on_commit_stop_workers != NULL )
840
881
{
841
- LogicalRepWorkerId * wid = lfirst (lc );
882
+ List * workers = on_commit_stop_workers -> workers ;
883
+
884
+ foreach (lc , workers )
885
+ {
886
+ LogicalRepWorkerId * wid = lfirst (lc );
842
887
843
- logicalrep_worker_stop (wid -> subid , wid -> relid );
888
+ logicalrep_worker_stop (wid -> subid , wid -> relid );
889
+ }
844
890
}
845
891
846
892
if (on_commit_launcher_wakeup )
@@ -851,10 +897,64 @@ AtEOXact_ApplyLauncher(bool isCommit)
851
897
* No need to pfree on_commit_stop_workers. It was allocated in
852
898
* transaction memory context, which is going to be cleaned soon.
853
899
*/
854
- on_commit_stop_workers = NIL ;
900
+ on_commit_stop_workers = NULL ;
855
901
on_commit_launcher_wakeup = false;
856
902
}
857
903
904
+ /*
905
+ * On commit, merge the current on_commit_stop_workers list into the
906
+ * immediate parent, if present.
907
+ * On rollback, discard the current on_commit_stop_workers list.
908
+ * Pop out the stack.
909
+ */
910
+ void
911
+ AtEOSubXact_ApplyLauncher (bool isCommit , int nestDepth )
912
+ {
913
+ StopWorkersData * parent ;
914
+
915
+ /* Exit immediately if there's no work to do at this level. */
916
+ if (on_commit_stop_workers == NULL ||
917
+ on_commit_stop_workers -> nestDepth < nestDepth )
918
+ return ;
919
+
920
+ Assert (on_commit_stop_workers -> nestDepth == nestDepth );
921
+
922
+ parent = on_commit_stop_workers -> parent ;
923
+
924
+ if (isCommit )
925
+ {
926
+ /*
927
+ * If the upper stack element is not an immediate parent
928
+ * subtransaction, just decrement the notional nesting depth without
929
+ * doing any real work. Else, we need to merge the current workers
930
+ * list into the parent.
931
+ */
932
+ if (!parent || parent -> nestDepth < nestDepth - 1 )
933
+ {
934
+ on_commit_stop_workers -> nestDepth -- ;
935
+ return ;
936
+ }
937
+
938
+ parent -> workers =
939
+ list_concat (parent -> workers , on_commit_stop_workers -> workers );
940
+ }
941
+ else
942
+ {
943
+ /*
944
+ * Abandon everything that was done at this nesting level. Explicitly
945
+ * free memory to avoid a transaction-lifespan leak.
946
+ */
947
+ list_free_deep (on_commit_stop_workers -> workers );
948
+ }
949
+
950
+ /*
951
+ * We have taken care of the current subtransaction workers list for both
952
+ * abort or commit. So we are ready to pop the stack.
953
+ */
954
+ pfree (on_commit_stop_workers );
955
+ on_commit_stop_workers = parent ;
956
+ }
957
+
858
958
/*
859
959
* Request wakeup of the launcher on commit of the transaction.
860
960
*
0 commit comments