@@ -224,6 +224,8 @@ static void maybe_reread_subscription(void);
224
224
/* prototype needed because of stream_commit */
225
225
static void apply_dispatch (StringInfo s );
226
226
227
+ static void apply_handle_commit_internal (StringInfo s ,
228
+ LogicalRepCommitData * commit_data );
227
229
static void apply_handle_insert_internal (ResultRelInfo * relinfo ,
228
230
EState * estate , TupleTableSlot * remoteslot );
229
231
static void apply_handle_update_internal (ResultRelInfo * relinfo ,
@@ -709,29 +711,7 @@ apply_handle_commit(StringInfo s)
709
711
710
712
Assert (commit_data .commit_lsn == remote_final_lsn );
711
713
712
- /* The synchronization worker runs in single transaction. */
713
- if (IsTransactionState () && !am_tablesync_worker ())
714
- {
715
- /*
716
- * Update origin state so we can restart streaming from correct
717
- * position in case of crash.
718
- */
719
- replorigin_session_origin_lsn = commit_data .end_lsn ;
720
- replorigin_session_origin_timestamp = commit_data .committime ;
721
-
722
- CommitTransactionCommand ();
723
- pgstat_report_stat (false);
724
-
725
- store_flush_position (commit_data .end_lsn );
726
- }
727
- else
728
- {
729
- /* Process any invalidation messages that might have accumulated. */
730
- AcceptInvalidationMessages ();
731
- maybe_reread_subscription ();
732
- }
733
-
734
- in_remote_transaction = false;
714
+ apply_handle_commit_internal (s , & commit_data );
735
715
736
716
/* Process any tables that are being synchronized in parallel. */
737
717
process_syncing_tables (commit_data .end_lsn );
@@ -772,8 +752,10 @@ apply_handle_stream_start(StringInfo s)
772
752
773
753
/*
774
754
* Start a transaction on stream start, this transaction will be committed
775
- * on the stream stop. We need the transaction for handling the buffile,
776
- * used for serializing the streaming data and subxact info.
755
+ * on the stream stop unless it is a tablesync worker in which case it will
756
+ * be committed after processing all the messages. We need the transaction
757
+ * for handling the buffile, used for serializing the streaming data and
758
+ * subxact info.
777
759
*/
778
760
ensure_transaction ();
779
761
@@ -825,8 +807,12 @@ apply_handle_stream_stop(StringInfo s)
825
807
/* We must be in a valid transaction state */
826
808
Assert (IsTransactionState ());
827
809
828
- /* Commit the per-stream transaction */
829
- CommitTransactionCommand ();
810
+ /* The synchronization worker runs in single transaction. */
811
+ if (!am_tablesync_worker ())
812
+ {
813
+ /* Commit the per-stream transaction */
814
+ CommitTransactionCommand ();
815
+ }
830
816
831
817
in_streamed_transaction = false;
832
818
@@ -902,7 +888,10 @@ apply_handle_stream_abort(StringInfo s)
902
888
{
903
889
/* Cleanup the subxact info */
904
890
cleanup_subxact_info ();
905
- CommitTransactionCommand ();
891
+
892
+ /* The synchronization worker runs in single transaction */
893
+ if (!am_tablesync_worker ())
894
+ CommitTransactionCommand ();
906
895
return ;
907
896
}
908
897
@@ -928,7 +917,9 @@ apply_handle_stream_abort(StringInfo s)
928
917
929
918
/* write the updated subxact list */
930
919
subxact_info_write (MyLogicalRepWorker -> subid , xid );
931
- CommitTransactionCommand ();
920
+
921
+ if (!am_tablesync_worker ())
922
+ CommitTransactionCommand ();
932
923
}
933
924
}
934
925
@@ -1048,35 +1039,54 @@ apply_handle_stream_commit(StringInfo s)
1048
1039
1049
1040
BufFileClose (fd );
1050
1041
1051
- /*
1052
- * Update origin state so we can restart streaming from correct position
1053
- * in case of crash.
1054
- */
1055
- replorigin_session_origin_lsn = commit_data .end_lsn ;
1056
- replorigin_session_origin_timestamp = commit_data .committime ;
1057
-
1058
1042
pfree (buffer );
1059
1043
pfree (s2 .data );
1060
1044
1061
- CommitTransactionCommand ();
1062
- pgstat_report_stat (false);
1063
-
1064
- store_flush_position (commit_data .end_lsn );
1065
-
1066
1045
elog (DEBUG1 , "replayed %d (all) changes from file \"%s\"" ,
1067
1046
nchanges , path );
1068
1047
1069
- in_remote_transaction = false;
1070
-
1071
- /* Process any tables that are being synchronized in parallel. */
1072
- process_syncing_tables (commit_data .end_lsn );
1048
+ apply_handle_commit_internal (s , & commit_data );
1073
1049
1074
1050
/* unlink the files with serialized changes and subxact info */
1075
1051
stream_cleanup_files (MyLogicalRepWorker -> subid , xid );
1076
1052
1053
+ /* Process any tables that are being synchronized in parallel. */
1054
+ process_syncing_tables (commit_data .end_lsn );
1055
+
1077
1056
pgstat_report_activity (STATE_IDLE , NULL );
1078
1057
}
1079
1058
1059
+ /*
1060
+ * Helper function for apply_handle_commit and apply_handle_stream_commit.
1061
+ */
1062
+ static void
1063
+ apply_handle_commit_internal (StringInfo s , LogicalRepCommitData * commit_data )
1064
+ {
1065
+ /* The synchronization worker runs in single transaction. */
1066
+ if (IsTransactionState () && !am_tablesync_worker ())
1067
+ {
1068
+ /*
1069
+ * Update origin state so we can restart streaming from correct
1070
+ * position in case of crash.
1071
+ */
1072
+ replorigin_session_origin_lsn = commit_data -> end_lsn ;
1073
+ replorigin_session_origin_timestamp = commit_data -> committime ;
1074
+
1075
+ CommitTransactionCommand ();
1076
+ pgstat_report_stat (false);
1077
+
1078
+ store_flush_position (commit_data -> end_lsn );
1079
+ }
1080
+ else
1081
+ {
1082
+ /* Process any invalidation messages that might have accumulated. */
1083
+ AcceptInvalidationMessages ();
1084
+ maybe_reread_subscription ();
1085
+ }
1086
+
1087
+ in_remote_transaction = false;
1088
+ }
1089
+
1080
1090
/*
1081
1091
* Handle RELATION message.
1082
1092
*
0 commit comments