@@ -70,6 +70,7 @@ typedef struct {
70
70
#define USEC 1000000
71
71
#define MIN_WAIT_TIMEOUT 1000
72
72
#define MAX_WAIT_TIMEOUT 100000
73
+ #define STATUS_POLL_DELAY USEC
73
74
74
75
void _PG_init (void );
75
76
void _PG_fini (void );
@@ -147,7 +148,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
147
148
void MtmLock (LWLockMode mode )
148
149
{
149
150
#ifdef USE_SPINLOCK
150
- SpinLockAcquire (& dtm -> hashSpinlock );
151
+ SpinLockAcquire (& dtm -> spinlock );
151
152
#else
152
153
LWLockAcquire (dtm -> hashLock , mode );
153
154
#endif
@@ -156,7 +157,7 @@ void MtmLock(LWLockMode mode)
156
157
void MtmUnlock (void )
157
158
{
158
159
#ifdef USE_SPINLOCK
159
- SpinLockRelease (& dtm -> hashSpinlock );
160
+ SpinLockRelease (& dtm -> spinlock );
160
161
#else
161
162
LWLockRelease (dtm -> hashLock );
162
163
#endif
@@ -409,20 +410,22 @@ static void MtmInitialize()
409
410
dtm = (MtmState * )ShmemInitStruct (MULTIMASTER_NAME , sizeof (MtmState ), & found );
410
411
if (!found )
411
412
{
413
+ dtm -> status = MTM_INITIALIZATION ;
414
+ dtm -> recoverySlot = 0 ;
412
415
dtm -> hashLock = (LWLock * )GetNamedLWLockTranche (MULTIMASTER_NAME );
413
416
dtm -> csn = MtmGetCurrentTime ();
414
417
dtm -> oldestXid = FirstNormalTransactionId ;
415
418
dtm -> nNodes = MtmNodes ;
416
419
dtm -> disabledNodeMask = 0 ;
420
+ dtm -> pglogicalNodeMask = 0 ;
417
421
dtm -> votingTransactions = NULL ;
418
422
dtm -> transListHead = NULL ;
419
- dtm -> transListTail = & dtm -> transListHead ;
420
- pg_atomic_write_u32 ( & dtm -> nReceivers , 0 ) ;
423
+ dtm -> transListTail = & dtm -> transListHead ;
424
+ dtm -> nReceivers = 0 ;
421
425
dtm -> timeShift = 0 ;
422
- dtm -> initialized = false;
423
426
PGSemaphoreCreate (& dtm -> votingSemaphore );
424
427
PGSemaphoreReset (& dtm -> votingSemaphore );
425
- SpinLockInit (& dtm -> hashSpinlock );
428
+ SpinLockInit (& dtm -> spinlock );
426
429
BgwPoolInit (& dtm -> pool , MtmExecutor , MtmDatabaseName , MtmQueueSize );
427
430
RegisterXactCallback (MtmXactCallback , NULL );
428
431
dtmTx .snapshot = INVALID_CSN ;
@@ -463,7 +466,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
463
466
MtmLock (LW_EXCLUSIVE );
464
467
x -> xid = GetCurrentTransactionIdIfAny ();
465
468
x -> isReplicated = false;
466
- x -> isDistributed = IsNormalProcessingMode () && dtm -> initialized && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
469
+ x -> isDistributed = IsNormalProcessingMode () && dtm -> status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess ();
467
470
x -> containsDML = false;
468
471
x -> snapshot = MtmAssignCSN ();
469
472
x -> gtid .xid = InvalidTransactionId ;
@@ -575,8 +578,6 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
575
578
XidStatus prevStatus = TRANSACTION_STATUS_UNKNOWN ;
576
579
bool found ;
577
580
578
- Assert (status == TRANSACTION_STATUS_ABORTED );
579
-
580
581
MtmLock (LW_EXCLUSIVE );
581
582
ts = hash_search (xid2state , & xid , HASH_ENTER , & found );
582
583
if (!found ) {
@@ -590,7 +591,7 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
590
591
ts -> status = status ;
591
592
MtmAdjustSubtransactions (ts );
592
593
593
- if (prevStatus != TRANSACTION_STATUS_ABORTED ) {
594
+ if (dtm -> status != MTM_RECOVERY && prevStatus != TRANSACTION_STATUS_ABORTED ) {
594
595
ts -> cmd = MSG_ABORTED ;
595
596
MtmSendNotificationMessage (ts );
596
597
}
@@ -607,7 +608,7 @@ MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
607
608
MTM_TRACE ("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n" , getpid (), xid , dtmTx .xid , status , dtmTx .isDistributed );
608
609
if (xid == dtmTx .xid && dtmTx .isDistributed )
609
610
{
610
- if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML )
611
+ if (status == TRANSACTION_STATUS_ABORTED || !dtmTx .containsDML || dtm -> status == MTM_RECOVERY )
611
612
{
612
613
MtmFinishTransaction (xid , nsubxids , subxids , status );
613
614
MTM_TRACE ("Finish transaction %d, status=%d, DML=%d\n" , xid , status , dtmTx .containsDML );
@@ -863,11 +864,17 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
863
864
dtmTx .containsDML = true;
864
865
}
865
866
866
- void MtmReceiverStarted ()
867
+ void MtmReceiverStarted (int nodeId )
867
868
{
868
- if (pg_atomic_fetch_add_u32 (& dtm -> nReceivers , 1 ) == dtm -> nNodes - 2 ) {
869
- dtm -> initialized = true;
869
+ SpinLockAcquire (& dtm -> spinlock );
870
+ if (!BIT_CHECK (dtm -> pglogicalNodeMask , nodeId - 1 )) {
871
+ dtm -> pglogicalNodeMask |= (int64 )1 << (nodeId - 1 );
872
+ if (++ dtm -> nReceivers == dtm -> nNodes - 1 ) {
873
+ Assert (dtm -> status == MTM_CONNECTED );
874
+ dtm -> status = MTM_ONLINE ;
875
+ }
870
876
}
877
+ SpinLockRelease (& dtm -> spinlock );
871
878
}
872
879
873
880
csn_t MtmTransactionSnapshot (TransactionId xid )
@@ -885,10 +892,23 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
885
892
return snapshot ;
886
893
}
887
894
888
-
895
+ MtmSlotMode MtmReceiverSlotMode (int nodeId )
896
+ {
897
+ while (dtm -> status != MTM_CONNECTED && dtm -> status != MTM_ONLINE ) {
898
+ if (dtm -> status == MTM_RECOVERY ) {
899
+ if (dtm -> recoverySlot == 0 || dtm -> recoverySlot == nodeId ) {
900
+ dtm -> recoverySlot = nodeId ;
901
+ return SLOT_OPEN_EXISTED ;
902
+ }
903
+ }
904
+ MtmSleep (STATUS_POLL_DELAY );
905
+ }
906
+ return dtm -> recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS ;
907
+ }
908
+
889
909
void MtmDropNode (int nodeId , bool dropSlot )
890
910
{
891
- if (!BIT_SET (dtm -> disabledNodeMask , nodeId - 1 ))
911
+ if (!BIT_CHECK (dtm -> disabledNodeMask , nodeId - 1 ))
892
912
{
893
913
if (nodeId <= 0 || nodeId > dtm -> nNodes )
894
914
{
@@ -969,7 +989,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
969
989
p = conn_str_end ;
970
990
}
971
991
* p = '\0' ;
972
- if (!BIT_SET (disabledNodeMask , i ))
992
+ if (!BIT_CHECK (disabledNodeMask , i ))
973
993
{
974
994
conns [i ] = PQconnectdb (conn_str );
975
995
if (PQstatus (conns [i ]) != CONNECTION_OK )
0 commit comments