76
76
#define MAX_ROUTES 16
77
77
#define BUFFER_SIZE 1024
78
78
79
+ typedef enum
80
+ {
81
+ MSG_PREPARE ,
82
+ MSG_COMMIT ,
83
+ MSG_ABORT
84
+ } MessageCode ;
85
+
86
+
79
87
typedef struct
80
88
{
89
+ MessageCode code ; /* Message code: MSG_PREPARE, MSG_COMMIT, MSG_ABORT
90
+ int node; /* Sender node ID */
81
91
TransactionId dxid ; /* Transaction ID at destination node */
82
92
TransactionId sxid ; /* Transaction IO at sender node */
83
- int node ; /* Sender node ID */
84
93
csn_t csn ; /* local CSN in case of sending data from replica to master, global CSN master->replica */
85
94
} DtmCommitMessage ;
86
95
@@ -100,15 +109,15 @@ static BackgroundWorker DtmSender = {
100
109
"mm-sender" ,
101
110
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION , /* do not need connection to the database */
102
111
BgWorkerStart_ConsistentState ,
103
- 1 , /* restrart in one second (is it possible to restort immediately?) */
112
+ 1 , /* restart in one second (is it possible to restart immediately?) */
104
113
DtmTransSender
105
114
};
106
115
107
116
static BackgroundWorker DtmRecevier = {
108
117
"mm-receiver" ,
109
118
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION , /* do not need connection to the database */
110
119
BgWorkerStart_ConsistentState ,
111
- 1 , /* restrart in one second (is it possible to restort immediately?) */
120
+ 1 , /* restart in one second (is it possible to restart immediately?) */
112
121
DtmTransReceiver
113
122
};
114
123
@@ -300,6 +309,25 @@ static int readSocket(int sd, void* buf, int buf_size)
300
309
return rc ;
301
310
}
302
311
312
+ static bool IsCoordinator (DtmTransState * ts )
313
+ {
314
+ return ts -> dsid .node == MMNodeId ;
315
+ }
316
+
317
+ static void DtmAppendBuffer (MessageCode code , DtmBuffer * txBuffer , TransactionId xid , int node , DtmTransState * ts )
318
+ {
319
+ DtmBuffer * buf = & txBuffer [node ];
320
+ if (buf -> used == BUFFER_SIZE ) {
321
+ writeSocket (sockets [node ], buf -> data , buf -> used * sizeof (DtmCommitMessage ));
322
+ buf -> used = 0 ;
323
+ }
324
+ buf -> data [buf -> used ].code = code ;
325
+ buf -> data [buf -> used ].dxid = xid ;
326
+ buf -> data [buf -> used ].sxid = ts -> xid ;
327
+ buf -> data [buf -> used ].csn = ts -> status == TRANSACTION_STATUS_ABORTED ? INVALID_CSN : ts -> csn ;
328
+ buf -> data [buf -> used ].node = MMNodeId ;
329
+ buf -> used += 1 ;
330
+ }
303
331
304
332
static void DtmTransSender (Datum arg )
305
333
{
@@ -327,38 +355,18 @@ static void DtmTransSender(Datum arg)
327
355
SpinLockRelease (& ds -> votingSpinlock );
328
356
329
357
for (; ts != NULL ; ts = ts -> nextVoting ) {
330
- if (ts -> gtid . node == MMNodeId ) {
331
- /* Coordinator is broadcasting confirmations to replicas */
358
+ if (IsCoordinator ( ts )) {
359
+ /* Coordinator is broadcasting PREPARE message to replicas */
332
360
for (i = 0 ; i < nNodes ; i ++ ) {
333
361
if (TransactionIdIsValid (ts -> xids [i ])) {
334
- if (txBuffer [i ].used == BUFFER_SIZE ) {
335
- writeSocket (sockets [i ], txBuffer [i ].data , txBuffer [i ].used * sizeof (DtmCommitMessage ));
336
- txBuffer [i ].used = 0 ;
337
- }
338
- DTM_TRACE ("Send notification %ld to replica %d from coordinator %d for transaction %d (local transaction %d)\n" ,
339
- ts -> csn , i + 1 , MMNodeId , ts -> xid , ts -> xids [i ]);
340
-
341
- txBuffer [i ].data [txBuffer [i ].used ].dxid = ts -> xids [i ];
342
- txBuffer [i ].data [txBuffer [i ].used ].sxid = ts -> xid ;
343
- txBuffer [i ].data [txBuffer [i ].used ].csn = ts -> csn ;
344
- txBuffer [i ].data [txBuffer [i ].used ].node = MMNodeId ;
345
- txBuffer [i ].used += 1 ;
362
+ DtmAppendBuffer (CMD_PREPARE , txBuffer , ts -> xids [i ], i , ts );
346
363
}
347
364
}
348
365
} else {
349
- /* Replica is notifying master */
350
- i = ts -> gtid .node - 1 ;
351
- if (txBuffer [i ].used == BUFFER_SIZE ) {
352
- writeSocket (sockets [i ], txBuffer [i ].data , txBuffer [i ].used * sizeof (DtmCommitMessage ));
353
- txBuffer [i ].used = 0 ;
354
- }
366
+ /* Replica is notifying master that it is ready to PREPARE */
355
367
DTM_TRACE ("Send notification %ld to coordinator %d from node %d for transaction %d (local transaction %d)\n" ,
356
368
ts -> csn , ts -> gtid .node , MMNodeId , ts -> gtid .xid , ts -> xid );
357
- txBuffer [i ].data [txBuffer [i ].used ].dxid = ts -> gtid .xid ;
358
- txBuffer [i ].data [txBuffer [i ].used ].sxid = ts -> xid ;
359
- txBuffer [i ].data [txBuffer [i ].used ].node = MMNodeId ;
360
- txBuffer [i ].data [txBuffer [i ].used ].csn = ts -> csn ;
361
- txBuffer [i ].used += 1 ;
369
+ DtmAppendBuffer (CMD_PREPARE , txBuffer , ts -> gtid .xid , ts -> gtid .node - 1 , ts );
362
370
}
363
371
}
364
372
for (i = 0 ; i < nNodes ; i ++ ) {
@@ -431,9 +439,33 @@ static void DtmTransReceiver(Datum arg)
431
439
DtmCommitMessage * msg = & rxBuffer [i ].data [j ];
432
440
DtmTransState * ts = (DtmTransState * )hash_search (xid2state , & msg -> dxid , HASH_FIND , NULL );
433
441
Assert (ts != NULL );
434
- if (msg -> csn > ts -> csn ) {
435
- ts -> csn = msg -> csn ;
436
- }
442
+ switch (msg -> code ) {
443
+ case CMD_PREPARE :
444
+ if (IsCoordinator (ts )) {
445
+ switch (msg -> command ) {
446
+ case CMD_PREPARE :
447
+
448
+ if (ts -> state == TRANSACTION_STATUS_IN_PROGRESS :
449
+ /* transaction is in-prepared stage (in-doubt): calculate max CSN */
450
+ if (msg -> csn > ts -> csn ) {
451
+ ts -> csn = msg -> csn ;
452
+ }
453
+ Assert (ts -> nVotes < dtm -> nNodes );
454
+ if (++ ts -> nVotes == dtm -> nNodes ) { /* receive responses from all nodes */
455
+ ts -> status = TRANSACTION_STATUS_COMMIT ;
456
+
457
+ if (ts -> state == TRANSACTION_STATUS_UNKNOWN ) {
458
+ /* All nodes are ready to prepare: switch transaction to in-doubt state */
459
+ ts -> csn = dtm_get_csn ();
460
+ ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
461
+ /* and broadcast PREPARE message */
462
+ MMSendNotificationMessage (ts );
463
+ } else if (ts -> state == CMD_ABORT ) {
464
+ ts -> status = TRANSACTION_STATUS_ABORTED ;
465
+
466
+ } else {
467
+ Assert (ts -> state == TRANSACTION_STATUS_IN_PROGRESS );
468
+
437
469
Assert ((unsigned )(msg -> node - 1 ) <= (unsigned )nNodes );
438
470
ts -> xids [msg -> node - 1 ] = msg -> sxid ;
439
471
DTM_TRACE ("Receive response %ld for transaction %d votes %d from node %d (transaction %d)\n" ,
0 commit comments