Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5f3f8e2

Browse files
knizhnikkelvich
authored andcommitted
MMTS bug fixes
1 parent ca55e19 commit 5f3f8e2

File tree

3 files changed

+43
-28
lines changed

3 files changed

+43
-28
lines changed

arbiter.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
2-
* multimaster.c
2+
* arbiter.c
33
*
4-
* Multimaster based on logical replication
4+
* Coordinate global transaction commit
55
*
66
*/
77

@@ -99,15 +99,15 @@ static void DtmTransReceiver(Datum arg);
9999
static BackgroundWorker DtmSender = {
100100
"mm-sender",
101101
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, /* do not need connection to the database */
102-
BgWorkerStart_PostmasterStart,
102+
BgWorkerStart_ConsistentState,
103103
1, /* restrart in one second (is it possible to restort immediately?) */
104104
DtmTransSender
105105
};
106106

107107
static BackgroundWorker DtmRecevier = {
108108
"mm-receiver",
109109
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, /* do not need connection to the database */
110-
BgWorkerStart_PostmasterStart,
110+
BgWorkerStart_ConsistentState,
111111
1, /* restrart in one second (is it possible to restort immediately?) */
112112
DtmTransReceiver
113113
};
@@ -216,7 +216,7 @@ static int connectSocket(char const* host, int port)
216216

217217
static void openConnections()
218218
{
219-
int nNodes = ds->nNodes;
219+
int nNodes = MMNodes;
220220
int i;
221221
char* connStr = pstrdup(MMConnStrs);
222222

@@ -228,9 +228,14 @@ static void openConnections()
228228
if (host == NULL) {
229229
elog(ERROR, "Invalid connection string: '%s'", MMConnStrs);
230230
}
231-
for (end = host+5; *end != ' ' && *end != ',' && end != '\0'; end++);
232-
*end = '\0';
233-
connStr = end + 1;
231+
host += 5;
232+
for (end = host; *end != ' ' && *end != ',' && *end != '\0'; end++);
233+
if (*end != '\0') {
234+
*end = '\0';
235+
connStr = end + 1;
236+
} else {
237+
connStr = end;
238+
}
234239
sockets[i] = i+1 != MMNodeId ? connectSocket(host, MMArbiterPort + i) : -1;
235240
}
236241
}
@@ -241,7 +246,7 @@ static void acceptConnections()
241246
int i;
242247
int sd;
243248
int on = 1;
244-
int nNodes = ds->nNodes-1;
249+
int nNodes = MMNodes-1;
245250

246251
sockets = (int*)palloc(sizeof(int)*nNodes);
247252

@@ -359,7 +364,6 @@ static void DtmTransReceiver(Datum arg)
359364
{
360365
int nNodes = MMNodes-1;
361366
int i, j, rc;
362-
int rxBufPos = 0;
363367
DtmBuffer* rxBuffer = (DtmBuffer*)palloc(sizeof(DtmBuffer)*nNodes);
364368
HTAB* xid2state;
365369

@@ -408,7 +412,7 @@ static void DtmTransReceiver(Datum arg)
408412
#endif
409413
{
410414
int nResponses;
411-
rxBuffer[i].used += readSocket(sockets[i], (char*)rxBuffer[i].data + rxBuffer[i].used, BUFFER_SIZE-rxBufPos);
415+
rxBuffer[i].used += readSocket(sockets[i], (char*)rxBuffer[i].data + rxBuffer[i].used, BUFFER_SIZE-rxBuffer[i].used);
412416
nResponses = rxBuffer[i].used/sizeof(DtmCommitMessage);
413417

414418
LWLockAcquire(ds->hashLock, LW_SHARED);
@@ -426,6 +430,8 @@ static void DtmTransReceiver(Datum arg)
426430
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
427431
}
428432
}
433+
LWLockRelease(ds->hashLock);
434+
429435
rxBuffer[i].used -= nResponses*sizeof(DtmCommitMessage);
430436
if (rxBuffer[i].used != 0) {
431437
memmove(rxBuffer[i].data, (char*)rxBuffer[i].data + nResponses*sizeof(DtmCommitMessage), rxBuffer[i].used);

multimaster.c

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ static void DtmPrepareTransaction(DtmCurrentTrans* x)
453453
ts->status = TRANSACTION_STATUS_UNKNOWN;
454454
ts->csn = dtm_get_csn();
455455
ts->procno = MyProc->pgprocno;
456-
ts->nVotes = 1; /* I voted myself */
456+
ts->nVotes = 1; /* My own voice */
457457
for (i = 0; i < MMNodes; i++) {
458458
ts->xids[i] = InvalidTransactionId;
459459
}
@@ -479,7 +479,8 @@ static XidStatus
479479
DtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
480480
{
481481
DtmTransState* ts;
482-
csn_t csn;
482+
csn_t localCSN;
483+
csn_t globalCSN;
483484
int i;
484485
XidStatus status;
485486

@@ -489,44 +490,45 @@ DtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
489490

490491
/* now transaction is in doubt state */
491492
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
492-
csn = dtm_get_csn();
493-
if (csn > ts->csn) {
494-
ts->csn = csn;
495-
}
493+
localCSN = dtm_get_csn();
494+
ts->csn = localCSN;
496495
DtmTransactionListAppend(ts);
497496
DtmAddSubtransactions(ts, subxids, nsubxids);
498497

499498
MMVoteForTransaction(ts); /* wait until transaction at all nodes are prepared */
500-
csn = ts->csn;
501-
if (csn != INVALID_CSN) {
502-
dtm_sync(csn);
499+
globalCSN = ts->csn;
500+
Assert(globalCSN >= localCSN);
501+
502+
if (globalCSN != INVALID_CSN) {
503+
dtm_sync(globalCSN);
503504
status = TRANSACTION_STATUS_COMMITTED;
504505
} else {
506+
ts->csn = globalCSN = localCSN;
505507
status = TRANSACTION_STATUS_ABORTED;
506508
}
507509
ts->status = status;
508510
for (i = 0; i < nsubxids; i++) {
509511
ts = ts->next;
510512
ts->status = status;
511-
ts->csn = csn;
513+
ts->csn = globalCSN;
512514
}
513515
LWLockRelease(dtm->hashLock);
514516
return status;
515517
}
516518

517519
static void
518-
DtmAbortTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
520+
DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status)
519521
{
520522
int i;
521523
DtmTransState* ts;
522524

523525
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
524526
ts = hash_search(xid2state, &xid, HASH_FIND, NULL);
525527
Assert(ts != NULL); /* should be created by DtmPrepareTransaction */
526-
ts->status = TRANSACTION_STATUS_ABORTED;
528+
ts->status = status;
527529
for (i = 0; i < nsubxids; i++) {
528530
ts = ts->next;
529-
ts->status = TRANSACTION_STATUS_ABORTED;
531+
ts->status = status;
530532
}
531533
LWLockRelease(dtm->hashLock);
532534
}
@@ -539,9 +541,10 @@ DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
539541
DTM_INFO("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
540542
if (dtmTx.isDistributed)
541543
{
544+
Assert(xid == dtmTx.xid);
542545
if (status == TRANSACTION_STATUS_ABORTED || !dtmTx.containsDML)
543546
{
544-
DtmAbortTransaction(xid, nsubxids, subxids);
547+
DtmFinishTransaction(xid, nsubxids, subxids, status);
545548
DTM_INFO("Abort transaction %d\n", xid);
546549
}
547550
else
@@ -990,12 +993,18 @@ MMPoolConstructor(void)
990993
static void
991994
SendCommitMessage(DtmTransState* ts)
992995
{
996+
DtmTransState* votingList;
997+
993998
SpinLockAcquire(&dtm->votingSpinlock);
994-
ts->nextVoting = dtm->votingTransactions;
999+
votingList = dtm->votingTransactions;
1000+
ts->nextVoting = votingList;
9951001
dtm->votingTransactions = ts;
9961002
SpinLockRelease(&dtm->votingSpinlock);
9971003

998-
PGSemaphoreUnlock(&dtm->votingSemaphore);
1004+
if (votingList == NULL) {
1005+
/* singal semaphreo only once for the whole list */
1006+
PGSemaphoreUnlock(&dtm->votingSemaphore);
1007+
}
9991008
}
10001009

10011010
static void
@@ -1011,7 +1020,7 @@ MMVoteForTransaction(DtmTransState* ts)
10111020
/* ... and then send notifications to replicas */
10121021
SendCommitMessage(ts);
10131022
} else {
1014-
/* I am replica: first notify master... */
1023+
/* I am replica: first notify coordinator... */
10151024
ts->nVotes = dtm->nNodes-1; /* I just need one confirmation from coordinator */
10161025
SendCommitMessage(ts);
10171026
/* ... and wait response from it */

tests/dtmbench

9.18 KB
Binary file not shown.

0 commit comments

Comments
 (0)