@@ -82,20 +82,25 @@ typedef struct
82
82
int node ; /* Sender node ID */
83
83
TransactionId dxid ; /* Transaction ID at destination node */
84
84
TransactionId sxid ; /* Transaction ID at sender node */
85
- csn_t csn ; /* local CSN in case of sending data from replica to master, global CSN master->replica */
86
- nodemask_t disabledNodeMask ; /* bitmask of disabled nodes at the sender of message */
85
+ csn_t csn ; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
86
+ nodemask_t disabledNodeMask ; /* Bitmask of disabled nodes at the sender of message */
87
+ csn_t oldestSnapshot ; /* Oldest snapshot used by active transactions at this node */
87
88
} MtmArbiterMessage ;
88
89
90
+ typedef struct
91
+ {
92
+ MtmArbiterMessage hdr ;
93
+ char connStr [MULTIMASTER_MAX_CONN_STR_SIZE ];
94
+ } MtmHandshakeMessage ;
95
+
89
96
typedef struct
90
97
{
91
98
int used ;
92
99
MtmArbiterMessage data [BUFFER_SIZE ];
93
100
} MtmBuffer ;
94
101
95
102
static int * sockets ;
96
- static char * * hosts ;
97
103
static int gateway ;
98
- static MtmState * ds ;
99
104
100
105
static void MtmTransSender (Datum arg );
101
106
static void MtmTransReceiver (Datum arg );
@@ -266,39 +271,41 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
266
271
continue ;
267
272
} else {
268
273
int optval = 1 ;
269
- MtmArbiterMessage msg ;
274
+ MtmHandshakeMessage req ;
275
+ MtmArbiterMessage resp ;
270
276
setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
271
277
setsockopt (sd , SOL_SOCKET , SO_KEEPALIVE , (char const * )& optval , sizeof (optval ));
272
278
273
- msg .code = MSG_HANDSHAKE ;
274
- msg .node = MtmNodeId ;
275
- msg .dxid = HANDSHAKE_MAGIC ;
276
- msg .sxid = ShmemVariableCache -> nextXid ;
277
- msg .csn = MtmGetCurrentTime ();
278
- msg .disabledNodeMask = ds -> disabledNodeMask ;
279
- if (!MtmWriteSocket (sd , & msg , sizeof msg )) {
279
+ req .hdr .code = MSG_HANDSHAKE ;
280
+ req .hdr .node = MtmNodeId ;
281
+ req .hdr .dxid = HANDSHAKE_MAGIC ;
282
+ req .hdr .sxid = ShmemVariableCache -> nextXid ;
283
+ req .hdr .csn = MtmGetCurrentTime ();
284
+ req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
285
+ strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].connStr );
286
+ if (!MtmWriteSocket (sd , & req , sizeof req )) {
280
287
elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
281
288
close (sd );
282
289
goto Retry ;
283
290
}
284
- if (MtmReadSocket (sd , & msg , sizeof msg ) != sizeof (msg )) {
291
+ if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
285
292
elog (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d" , host , port , errno );
286
293
close (sd );
287
294
goto Retry ;
288
295
}
289
- if (msg .code != MSG_STATUS || msg .dxid != HANDSHAKE_MAGIC ) {
290
- elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , msg .code , host , port );
296
+ if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
297
+ elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , resp .code , host , port );
291
298
close (sd );
292
299
goto Retry ;
293
300
}
294
301
295
302
/* Some node considered that I am dead, so switch to recovery mode */
296
- if (BIT_CHECK (msg .disabledNodeMask , MtmNodeId - 1 )) {
297
- elog (WARNING , "Node %d think that I am dead" , msg .node );
303
+ if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
304
+ elog (WARNING , "Node %d think that I am dead" , resp .node );
298
305
MtmSwitchClusterMode (MTM_RECOVERY );
299
306
}
300
307
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
301
- ds -> disabledNodeMask |= msg .disabledNodeMask ;
308
+ Mtm -> disabledNodeMask |= resp .disabledNodeMask ;
302
309
return sd ;
303
310
}
304
311
}
@@ -309,39 +316,23 @@ static void MtmOpenConnections()
309
316
{
310
317
int nNodes = MtmNodes ;
311
318
int i ;
312
- char * connStr = pstrdup (MtmConnStrs );
313
319
314
320
sockets = (int * )palloc (sizeof (int )* nNodes );
315
- hosts = (char * * )palloc (sizeof (char * )* nNodes );
316
321
317
322
for (i = 0 ; i < nNodes ; i ++ ) {
318
- char * host = strstr (connStr , "host=" );
319
- char * end ;
320
- if (host == NULL ) {
321
- elog (ERROR , "Invalid connection string: '%s'" , MtmConnStrs );
322
- }
323
- host += 5 ;
324
- for (end = host ; * end != ' ' && * end != ',' && * end != '\0' ; end ++ );
325
- if (* end != '\0' ) {
326
- * end = '\0' ;
327
- connStr = end + 1 ;
328
- } else {
329
- connStr = end ;
330
- }
331
- hosts [i ] = host ;
332
323
if (i + 1 != MtmNodeId ) {
333
- sockets [i ] = MtmConnectSocket (host , MtmArbiterPort + i + 1 , MtmConnectAttempts );
324
+ sockets [i ] = MtmConnectSocket (Mtm -> nodes [ i ]. hostName , MtmArbiterPort + i + 1 , MtmConnectAttempts );
334
325
if (sockets [i ] < 0 ) {
335
326
MtmOnNodeDisconnect (i + 1 );
336
327
}
337
328
} else {
338
329
sockets [i ] = -1 ;
339
330
}
340
331
}
341
- if (ds -> nNodes < MtmNodes /2 + 1 ) { /* no quorum */
342
- elog (WARNING , "Node is out of quorum: only %d nodes from %d are accssible" , ds -> nNodes , MtmNodes );
343
- ds -> status = MTM_OFFLINE ;
344
- } else if (ds -> status == MTM_INITIALIZATION ) {
332
+ if (Mtm -> nNodes < MtmNodes /2 + 1 ) { /* no quorum */
333
+ elog (WARNING , "Node is out of quorum: only %d nodes from %d are accssible" , Mtm -> nNodes , MtmNodes );
334
+ Mtm -> status = MTM_OFFLINE ;
335
+ } else if (Mtm -> status == MTM_INITIALIZATION ) {
345
336
MtmSwitchClusterMode (MTM_CONNECTED );
346
337
}
347
338
}
@@ -354,7 +345,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
354
345
if (sockets [node ] >= 0 ) {
355
346
close (sockets [node ]);
356
347
}
357
- sockets [node ] = MtmConnectSocket (hosts [node ], MtmArbiterPort + node + 1 , MtmReconnectAttempts );
348
+ sockets [node ] = MtmConnectSocket (Mtm -> nodes [node ]. hostName , MtmArbiterPort + node + 1 , MtmReconnectAttempts );
358
349
if (sockets [node ] < 0 ) {
359
350
MtmOnNodeDisconnect (node + 1 );
360
351
return false;
@@ -379,29 +370,31 @@ static void MtmAcceptOneConnection()
379
370
if (fd < 0 ) {
380
371
elog (WARNING , "Arbiter failed to accept socket: %d" , errno );
381
372
} else {
382
- MtmArbiterMessage msg ;
383
- int rc = MtmReadSocket (fd , & msg , sizeof msg );
384
- if (rc < sizeof (msg )) {
373
+ MtmHandshakeMessage req ;
374
+ MtmArbiterMessage resp ;
375
+ int rc = MtmReadSocket (fd , & req , sizeof req );
376
+ if (rc < sizeof (req )) {
385
377
elog (WARNING , "Arbiter failed to handshake socket: %d, errno=%d" , rc , errno );
386
- } else if (msg . code != MSG_HANDSHAKE && msg .dxid != HANDSHAKE_MAGIC ) {
387
- elog (WARNING , "Arbiter get unexpected handshake message %d" , msg .code );
378
+ } else if (req . hdr . code != MSG_HANDSHAKE && req . hdr .dxid != HANDSHAKE_MAGIC ) {
379
+ elog (WARNING , "Arbiter get unexpected handshake message %d" , req . hdr .code );
388
380
close (fd );
389
381
} else {
390
- Assert (msg .node > 0 && msg .node <= MtmNodes && msg .node != MtmNodeId );
391
- msg .code = MSG_STATUS ;
392
- msg .disabledNodeMask = ds -> disabledNodeMask ;
393
- msg .dxid = HANDSHAKE_MAGIC ;
394
- msg .sxid = ShmemVariableCache -> nextXid ;
395
- msg .csn = MtmGetCurrentTime ();
396
- if (!MtmWriteSocket (fd , & msg , sizeof msg )) {
397
- elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , msg .node );
382
+ Assert (req .hdr .node > 0 && req .hdr .node <= MtmNodes && req .hdr .node != MtmNodeId );
383
+ resp .code = MSG_STATUS ;
384
+ resp .disabledNodeMask = Mtm -> disabledNodeMask ;
385
+ resp .dxid = HANDSHAKE_MAGIC ;
386
+ resp .sxid = ShmemVariableCache -> nextXid ;
387
+ resp .csn = MtmGetCurrentTime ();
388
+ MtmUpdateNodeConnStr (req .hdr .node , req .connStr );
389
+ if (!MtmWriteSocket (fd , & resp , sizeof resp )) {
390
+ elog (WARNING , "Arbiter failed to write response for handshake message to node %d" , resp .node );
398
391
close (fd );
399
392
} else {
400
- elog (NOTICE , "Arbiter established connection with node %d" , msg .node );
401
- BIT_CLEAR (ds -> connectivityMask , msg .node - 1 );
402
- MtmRegisterSocket (fd , msg .node - 1 );
403
- sockets [msg .node - 1 ] = fd ;
404
- MtmOnNodeConnect (msg .node );
393
+ elog (NOTICE , "Arbiter established connection with node %d" , req . hdr .node );
394
+ BIT_CLEAR (Mtm -> connectivityMask , req . hdr .node - 1 );
395
+ MtmRegisterSocket (fd , req . hdr .node - 1 );
396
+ sockets [req . hdr .node - 1 ] = fd ;
397
+ MtmOnNodeConnect (req . hdr .node );
405
398
}
406
399
}
407
400
}
@@ -415,7 +408,9 @@ static void MtmAcceptIncomingConnections()
415
408
int i ;
416
409
417
410
sockets = (int * )palloc (sizeof (int )* MtmNodes );
418
-
411
+ for (i = 0 ; i < MtmNodes ; i ++ ) {
412
+ sockets [i ] = -1 ;
413
+ }
419
414
sock_inet .sin_family = AF_INET ;
420
415
sock_inet .sin_addr .s_addr = htonl (INADDR_ANY );
421
416
sock_inet .sin_port = htons (MtmArbiterPort + MtmNodeId );
@@ -461,7 +456,8 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
461
456
buf -> data [buf -> used ].sxid = ts -> xid ;
462
457
buf -> data [buf -> used ].csn = ts -> csn ;
463
458
buf -> data [buf -> used ].node = MtmNodeId ;
464
- buf -> data [buf -> used ].disabledNodeMask = ds -> disabledNodeMask ;
459
+ buf -> data [buf -> used ].disabledNodeMask = Mtm -> disabledNodeMask ;
460
+ buf -> data [buf -> used ].oldestSnapshot = Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot ;
465
461
buf -> used += 1 ;
466
462
}
467
463
@@ -477,7 +473,7 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
477
473
n += 1 ;
478
474
}
479
475
}
480
- Assert (n == ds -> nNodes );
476
+ Assert (n == Mtm -> nNodes );
481
477
}
482
478
483
479
@@ -487,8 +483,6 @@ static void MtmTransSender(Datum arg)
487
483
int i ;
488
484
MtmBuffer * txBuffer = (MtmBuffer * )palloc (sizeof (MtmBuffer )* nNodes );
489
485
490
- ds = MtmGetState ();
491
-
492
486
MtmOpenConnections ();
493
487
494
488
for (i = 0 ; i < nNodes ; i ++ ) {
@@ -497,7 +491,7 @@ static void MtmTransSender(Datum arg)
497
491
498
492
while (true) {
499
493
MtmTransState * ts ;
500
- PGSemaphoreLock (& ds -> votingSemaphore );
494
+ PGSemaphoreLock (& Mtm -> votingSemaphore );
501
495
CHECK_FOR_INTERRUPTS ();
502
496
503
497
/*
@@ -506,14 +500,14 @@ static void MtmTransSender(Datum arg)
506
500
*/
507
501
MtmLock (LW_SHARED );
508
502
509
- for (ts = ds -> votingTransactions ; ts != NULL ; ts = ts -> nextVoting ) {
503
+ for (ts = Mtm -> votingTransactions ; ts != NULL ; ts = ts -> nextVoting ) {
510
504
if (MtmIsCoordinator (ts )) {
511
505
MtmBroadcastMessage (txBuffer , ts );
512
506
} else {
513
507
MtmAppendBuffer (txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
514
508
}
515
509
}
516
- ds -> votingTransactions = NULL ;
510
+ Mtm -> votingTransactions = NULL ;
517
511
518
512
MtmUnlock ();
519
513
@@ -573,8 +567,6 @@ static void MtmTransReceiver(Datum arg)
573
567
max_fd = 0 ;
574
568
#endif
575
569
576
- ds = MtmGetState ();
577
-
578
570
MtmAcceptIncomingConnections ();
579
571
580
572
for (i = 0 ; i < nNodes ; i ++ ) {
@@ -613,7 +605,7 @@ static void MtmTransReceiver(Datum arg)
613
605
elog (ERROR , "Arbiter failed to select sockets: %d" , errno );
614
606
}
615
607
for (i = 0 ; i < nNodes ; i ++ ) {
616
- if (FD_ISSET (sockets [i ], & events ))
608
+ if (sockets [ i ] >= 0 && FD_ISSET (sockets [i ], & events ))
617
609
#endif
618
610
{
619
611
if (i + 1 == MtmNodeId ) {
@@ -637,13 +629,24 @@ static void MtmTransReceiver(Datum arg)
637
629
MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State , & msg -> dxid , HASH_FIND , NULL );
638
630
Assert (ts != NULL );
639
631
Assert (msg -> node > 0 && msg -> node <= nNodes && msg -> node != MtmNodeId );
632
+
633
+ Mtm -> nodes [msg -> node - 1 ].oldestSnapshot = msg -> oldestSnapshot ;
634
+
640
635
if (MtmIsCoordinator (ts )) {
641
636
switch (msg -> code ) {
642
637
case MSG_READY :
643
- Assert (ts -> nVotes < ds -> nNodes );
644
- ds -> nodeTransDelay [msg -> node - 1 ] += MtmGetCurrentTime () - ts -> csn ;
638
+ Assert (ts -> nVotes < Mtm -> nNodes );
639
+ Mtm -> nodes [msg -> node - 1 ]. transDelay += MtmGetCurrentTime () - ts -> csn ;
645
640
ts -> xids [msg -> node - 1 ] = msg -> sxid ;
646
- if (++ ts -> nVotes == ds -> nNodes ) {
641
+
642
+ if ((~msg -> disabledNodeMask & Mtm -> disabledNodeMask ) != 0 ) {
643
+ /* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
644
+ commit on smaller subset of nodes */
645
+ ts -> status = TRANSACTION_STATUS_ABORTED ;
646
+ MtmAdjustSubtransactions (ts );
647
+ }
648
+
649
+ if (++ ts -> nVotes == Mtm -> nNodes ) {
647
650
/* All nodes are finished their transactions */
648
651
if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
649
652
ts -> nVotes = 1 ; /* I voted myself */
@@ -655,24 +658,24 @@ static void MtmTransReceiver(Datum arg)
655
658
}
656
659
break ;
657
660
case MSG_ABORTED :
658
- Assert (ts -> nVotes < ds -> nNodes );
661
+ Assert (ts -> nVotes < Mtm -> nNodes );
659
662
if (ts -> status != TRANSACTION_STATUS_ABORTED ) {
660
663
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
661
664
ts -> status = TRANSACTION_STATUS_ABORTED ;
662
665
MtmAdjustSubtransactions (ts );
663
666
}
664
- if (++ ts -> nVotes == ds -> nNodes ) {
667
+ if (++ ts -> nVotes == Mtm -> nNodes ) {
665
668
MtmWakeUpBackend (ts );
666
669
}
667
670
break ;
668
671
case MSG_PREPARED :
669
672
Assert (ts -> status == TRANSACTION_STATUS_IN_PROGRESS );
670
- Assert (ts -> nVotes < ds -> nNodes );
673
+ Assert (ts -> nVotes < Mtm -> nNodes );
671
674
if (msg -> csn > ts -> csn ) {
672
675
ts -> csn = msg -> csn ;
673
676
MtmSyncClock (ts -> csn );
674
677
}
675
- if (++ ts -> nVotes == ds -> nNodes ) {
678
+ if (++ ts -> nVotes == Mtm -> nNodes ) {
676
679
ts -> csn = MtmAssignCSN ();
677
680
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
678
681
MtmWakeUpBackend (ts );
@@ -703,7 +706,7 @@ static void MtmTransReceiver(Datum arg)
703
706
}
704
707
}
705
708
}
706
- if (n == 0 && ds -> disabledNodeMask != 0 ) {
709
+ if (n == 0 && Mtm -> disabledNodeMask != 0 ) {
707
710
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
708
711
MtmRefreshClusterStatus (false);
709
712
}
0 commit comments