@@ -864,10 +864,13 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
864
864
865
865
void MtmJoinTransaction (GlobalTransactionId * gtid , csn_t globalSnapshot )
866
866
{
867
- MtmLock (LW_EXCLUSIVE );
868
- MtmSyncClock (globalSnapshot );
869
- MtmUnlock ();
870
-
867
+ if (globalSnapshot != INVALID_CSN ) {
868
+ MtmLock (LW_EXCLUSIVE );
869
+ MtmSyncClock (globalSnapshot );
870
+ MtmUnlock ();
871
+ } else {
872
+ globalSnapshot = MtmTx .snapshot ;
873
+ }
871
874
if (!TransactionIdIsValid (gtid -> xid )) {
872
875
/* In case of recovery InvalidTransactionId is passed */
873
876
Assert (Mtm -> status == MTM_RECOVERY );
@@ -1876,6 +1879,14 @@ void MtmDropNode(int nodeId, bool dropSlot)
1876
1879
}
1877
1880
}
1878
1881
}
1882
+ static void
1883
+ MtmOnProcExit (int code , Datum arg )
1884
+ {
1885
+ if (MtmReplicationNodeId >= 0 ) {
1886
+ elog (WARNING , "WAL-sender to %d is terminated" , MtmReplicationNodeId );
1887
+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1888
+ }
1889
+ }
1879
1890
1880
1891
static void
1881
1892
MtmReplicationStartupHook (struct PGLogicalStartupHookArgs * args )
@@ -1922,13 +1933,17 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1922
1933
elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
1923
1934
}
1924
1935
MtmUnlock ();
1936
+ on_proc_exit (MtmOnProcExit , 0 );
1925
1937
}
1926
1938
1927
1939
static void
1928
1940
MtmReplicationShutdownHook (struct PGLogicalShutdownHookArgs * args )
1929
1941
{
1930
- elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
1931
- MtmOnNodeDisconnect (MtmReplicationNodeId );
1942
+ if (MtmReplicationNodeId >= 0 ) {
1943
+ elog (WARNING , "Logical replication to node %d is stopped" , MtmReplicationNodeId );
1944
+ MtmOnNodeDisconnect (MtmReplicationNodeId );
1945
+ MtmReplicationNodeId = -1 ; /* defuse on_proc_exit hook */
1946
+ }
1932
1947
}
1933
1948
1934
1949
static bool
@@ -2166,7 +2181,8 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
2166
2181
return ret ;
2167
2182
}
2168
2183
2169
- void MtmNoticeReceiver (void * i , const PGresult * res )
2184
+ static void
2185
+ MtmNoticeReceiver (void * i , const PGresult * res )
2170
2186
{
2171
2187
char * notice = PQresultErrorMessage (res );
2172
2188
char * stripped_notice ;
0 commit comments