@@ -1601,11 +1601,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
1601
1601
Assert (ts -> gid [0 ]);
1602
1602
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
1603
1603
elog (LOG , "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d" , ts -> gid , MtmNodeId );
1604
- ts -> isPinned = true;
1605
- MtmUnlock ();
1606
1604
MtmFinishPreparedTransaction (ts , false);
1607
- MtmLock (LW_EXCLUSIVE );
1608
- ts -> isPinned = false;
1609
1605
} else {
1610
1606
MTM_LOG1 ("Poll state of transaction %d (%s)" , ts -> xid , ts -> gid );
1611
1607
MtmBroadcastPollMessage (ts );
@@ -2017,7 +2013,7 @@ void MtmOnNodeConnect(int nodeId)
2017
2013
MtmLock (LW_EXCLUSIVE );
2018
2014
elog (LOG , "Connect node %d connectivity mask %llx" , nodeId , (long long ) Mtm -> connectivityMask );
2019
2015
BIT_CLEAR (Mtm -> connectivityMask , nodeId - 1 );
2020
- BIT_CLEAR (Mtm -> reconnectMask , nodeId - 1 );
2016
+ BIT_SET (Mtm -> reconnectMask , nodeId - 1 ); /* force sender to reestablish connection and send heartbeat */
2021
2017
MtmUnlock ();
2022
2018
}
2023
2019
@@ -2945,26 +2941,36 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
2945
2941
}
2946
2942
}
2947
2943
2948
-
2944
+ /*
2945
+ * This function is call with MTM mutex locked.
2946
+ * It shoudl unlock mutex before calling FinishPreparedTransaction to avoid deadlocks.
2947
+ * ts object is pinned to prevent deallocation while lock is released.
2948
+ */
2949
2949
void MtmFinishPreparedTransaction (MtmTransState * ts , bool commit )
2950
2950
{
2951
2951
bool insideTransaction = IsTransactionState ();
2952
+
2952
2953
Assert (ts -> votingCompleted );
2954
+
2955
+ ts -> isPinned = true;
2956
+ MtmUnlock ();
2957
+
2953
2958
MtmResetTransaction ();
2954
2959
2955
2960
if (!insideTransaction ) {
2956
2961
StartTransactionCommand ();
2957
2962
}
2958
- //MtmBeginSession(MtmNodeId);
2959
2963
MtmSetCurrentTransactionCSN (ts -> csn );
2960
2964
MtmSetCurrentTransactionGID (ts -> gid );
2961
2965
FinishPreparedTransaction (ts -> gid , commit );
2962
2966
2963
2967
if (!insideTransaction ) {
2964
2968
CommitTransactionCommand ();
2965
- //MtmEndSession(MtmNodeId, true);
2966
2969
Assert (ts -> status == commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED );
2967
2970
}
2971
+
2972
+ MtmLock (LW_EXCLUSIVE );
2973
+ ts -> isPinned = false;
2968
2974
}
2969
2975
2970
2976
/*
@@ -2980,6 +2986,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2980
2986
2981
2987
if (!Mtm -> preparedTransactionsLoaded )
2982
2988
{
2989
+ /* We must restore state of prepared (but no committed or aborted) transaction before start of recovery. */
2983
2990
MtmLoadPreparedTransactions ();
2984
2991
Mtm -> preparedTransactionsLoaded = true;
2985
2992
}
@@ -2991,6 +2998,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
2991
2998
MtmUnlock ();
2992
2999
return REPLMODE_EXIT ;
2993
3000
}
3001
+ /* We are not interested in receiving any deteriorated logical messages from recovered node, do recreate slot */
2994
3002
if (BIT_CHECK (Mtm -> disabledNodeMask , nodeId - 1 )) {
2995
3003
mode = REPLMODE_CREATE_NEW ;
2996
3004
}
@@ -3147,6 +3155,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
3147
3155
MtmEnableNode (MtmReplicationNodeId );
3148
3156
MtmCheckQuorum ();
3149
3157
} else {
3158
+ /* Force arbiter to reestablish connection with this nodem send heartbeat to inform this node that it was disabled and should perform recovery */
3159
+ BIT_SET (Mtm -> reconnectMask , MtmReplicationNodeId - 1 );
3150
3160
MtmUnlock ();
3151
3161
elog (ERROR , "Disabled node %d tries to reconnect without recovery" , MtmReplicationNodeId );
3152
3162
}
@@ -3156,9 +3166,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
3156
3166
elog (LOG , "Start %d senders and %d receivers from %d cluster status %s" , Mtm -> nSenders + 1 , Mtm -> nReceivers , Mtm -> nLiveNodes - 1 , MtmNodeStatusMnem [Mtm -> status ]);
3157
3167
MtmSenderStarted = 1 ;
3158
3168
if (++ Mtm -> nSenders == Mtm -> nLiveNodes - 1 && Mtm -> nReceivers == Mtm -> nLiveNodes - 1 && Mtm -> status == MTM_CONNECTED ) {
3169
+ /* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
3159
3170
MtmSwitchClusterMode (MTM_ONLINE );
3160
3171
}
3161
- BIT_SET (Mtm -> reconnectMask , MtmReplicationNodeId - 1 ); /* arbiter should try to reestblish connection with this node */
3172
+ BIT_SET (Mtm -> reconnectMask , MtmReplicationNodeId - 1 ); /* arbiter should try to reestablish connection with this node */
3162
3173
MtmUnlock ();
3163
3174
on_shmem_exit (MtmOnProcExit , 0 );
3164
3175
}
@@ -3168,6 +3179,16 @@ XLogRecPtr MtmGetFlushPosition(int nodeId)
3168
3179
return Mtm -> nodes [nodeId - 1 ].flushPos ;
3169
3180
}
3170
3181
3182
+ /**
3183
+ * Keep track of progress of WAL writer.
3184
+ * We need to notify WAL senders at other nodes which logical records
3185
+ * are flushed to the disk and so can survive failure. In asynchronous commit mode
3186
+ * WAL is flushed by WAL writer. Current flish position can be obtained by GetFlushRecPtr().
3187
+ * So on applying new logical record we insert it in the MtmLsnMapping and compare
3188
+ * their poistions in local WAL log with current flush position.
3189
+ * The records which are flushed to the disk by WAL writer are removed from the list
3190
+ * and mapping ing mtm->nodes[].flushPos is updated for this node.
3191
+ */
3171
3192
void MtmUpdateLsnMapping (int node_id , XLogRecPtr end_lsn )
3172
3193
{
3173
3194
dlist_mutable_iter iter ;
@@ -3216,9 +3237,21 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
3216
3237
}
3217
3238
}
3218
3239
3240
+ /*
3241
+ * Filter transactions which should be replicated to other nodes.
3242
+ * This filter is applied at sender side (WAL sender).
3243
+ * Final filtering is also done at destination side by MtmFilterTransaction function.
3244
+ */
3219
3245
static bool
3220
3246
MtmReplicationTxnFilterHook (struct PGLogicalTxnFilterArgs * args )
3221
3247
{
3248
+ /* Do not replicate any transactions in recovery mode (because we should apply
3249
+ * changes sent to us rather than send our own pending changes)
3250
+ * and transactions received from other nodes
3251
+ * (originId should be non-zero in this case)
3252
+ * unless we are performing recovery of disabled node
3253
+ * (in this case all transactions should be sent)
3254
+ */
3222
3255
bool res = Mtm -> status != MTM_RECOVERY
3223
3256
&& (args -> origin_id == InvalidRepOriginId
3224
3257
|| MtmIsRecoveredNode (MtmReplicationNodeId ));
@@ -3228,6 +3261,9 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
3228
3261
return res ;
3229
3262
}
3230
3263
3264
+ /**
3265
+ * Filter record corresponding to local (non-distributed) tables
3266
+ */
3231
3267
static bool
3232
3268
MtmReplicationRowFilterHook (struct PGLogicalRowFilterArgs * args )
3233
3269
{
@@ -3248,7 +3284,11 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
3248
3284
3249
3285
/*
3250
3286
* Filter received transactions at destination side.
3251
- * This function is executed by receiver, so there are no race conditions and it is possible to update nodes[i].restartLSN without lock
3287
+ * This function is executed by receiver,
3288
+ * so there are no race conditions and it is possible to update nodes[i].restartLSN without lock.
3289
+ * It is more efficient to filter records at senders size (done by MtmReplicationTxnFilterHook) to avoid sending useless data through network. But asynchronous nature of
3290
+ * logical replications makes it not possible to guarantee (at least I failed to do it)
3291
+ * that replica do not receive deteriorated data.
3252
3292
*/
3253
3293
bool MtmFilterTransaction (char * record , int size )
3254
3294
{
@@ -3308,8 +3348,8 @@ bool MtmFilterTransaction(char* record, int size)
3308
3348
}
3309
3349
3310
3350
if (duplicate ) {
3311
- MTM_LOG1 ("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = ( origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx" ,
3312
- gid , replication_node , flags , Mtm -> nodes [origin_node - 1 ].restartLSN , origin_node , MtmReplicationNodeId , end_lsn , origin_lsn );
3351
+ MTM_LOG1 ("Ignore transaction %s from node %d flags=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction ( end_lsn=%lx, origin_lsn=%lx) " ,
3352
+ gid , replication_node , flags , Mtm -> nodes [origin_node - 1 ].restartLSN , origin_node , restart_lsn , end_lsn , origin_lsn );
3313
3353
} else {
3314
3354
MTM_LOG2 ("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx" ,
3315
3355
gid , replication_node , end_lsn , flags , origin_node , origin_lsn , restart_lsn );
@@ -3326,7 +3366,11 @@ void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
3326
3366
hooks -> row_filter_hook = MtmReplicationRowFilterHook ;
3327
3367
}
3328
3368
3329
-
3369
+ /*
3370
+ * Setup replication session origin to include origin location in WAL and
3371
+ * update slot position.
3372
+ * Sessions are not reetrant so we have to use exclusive lock here.
3373
+ */
3330
3374
void MtmBeginSession (int nodeId )
3331
3375
{
3332
3376
MtmLockNode (nodeId , LW_EXCLUSIVE );
@@ -3338,6 +3382,9 @@ void MtmBeginSession(int nodeId)
3338
3382
MTM_LOG3 ("%d: End setup replorigin session: %d" , MyProcPid , replorigin_session_origin );
3339
3383
}
3340
3384
3385
+ /*
3386
+ * Release replication session
3387
+ */
3341
3388
void MtmEndSession (int nodeId , bool unlock )
3342
3389
{
3343
3390
if (replorigin_session_origin != InvalidRepOriginId ) {
0 commit comments