Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5e35877

Browse files
knizhnikkelvich
authored andcommitted
Correctly store LSN
1 parent c9f6ec1 commit 5e35877

8 files changed

+214
-117
lines changed

arbiter.c

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,7 @@ static char const* const messageText[] =
106106
"HANDSHAKE",
107107
"READY",
108108
"PREPARE",
109-
"COMMIT",
110-
"ABORT",
111109
"PREPARED",
112-
"COMMITTED",
113110
"ABORTED",
114111
"STATUS"
115112
};
@@ -456,8 +453,10 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
456453
buf->used = 0;
457454
}
458455
MTM_TRACE("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
459-
ts->status == TRANSACTION_STATUS_ABORTED ? "abort" : "commit", ts->csn, node+1, MtmNodeId, ts->gtid.xid, ts->xid);
460-
buf->data[buf->used].code = ts->status == TRANSACTION_STATUS_ABORTED ? MSG_ABORTED : MSG_PREPARED;
456+
messageText[ts->cmd], ts->csn, node+1, MtmNodeId, ts->gtid.xid, ts->xid);
457+
458+
Assert(ts->cmd != MSG_INVALID);
459+
buf->data[buf->used].code = ts->cmd;
461460
buf->data[buf->used].dxid = xid;
462461
buf->data[buf->used].sxid = ts->xid;
463462
buf->data[buf->used].csn = ts->csn;
@@ -466,6 +465,22 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
466465
buf->used += 1;
467466
}
468467

468+
static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
469+
{
470+
int i;
471+
int n = 1;
472+
for (i = 0; i < MtmNodes; i++)
473+
{
474+
if (TransactionIdIsValid(ts->xids[i])) {
475+
Assert(i+1 != MtmNodeId);
476+
MtmAppendBuffer(txBuffer, ts->xids[i], i, ts);
477+
n += 1;
478+
}
479+
}
480+
Assert(n == ds->nNodes);
481+
}
482+
483+
469484
static void MtmTransSender(Datum arg)
470485
{
471486
int nNodes = MtmNodes;
@@ -492,7 +507,11 @@ static void MtmTransSender(Datum arg)
492507
MtmLock(LW_SHARED);
493508

494509
for (ts = ds->votingTransactions; ts != NULL; ts = ts->nextVoting) {
495-
MtmAppendBuffer(txBuffer, ts->gtid.xid, ts->gtid.node-1, ts);
510+
if (MtmIsCoordinator(ts)) {
511+
MtmBroadcastMessage(txBuffer, ts);
512+
} else {
513+
MtmAppendBuffer(txBuffer, ts->gtid.xid, ts->gtid.node-1, ts);
514+
}
496515
}
497516
ds->votingTransactions = NULL;
498517

@@ -510,6 +529,7 @@ static void MtmTransSender(Datum arg)
510529
static void MtmWakeUpBackend(MtmTransState* ts)
511530
{
512531
MTM_TRACE("Wakeup backed procno=%d, pid=%d\n", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
532+
ts->votingCompleted = true;
513533
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
514534
}
515535

@@ -565,6 +585,9 @@ static void MtmTransReceiver(Datum arg)
565585
#if USE_EPOLL
566586
n = epoll_wait(epollfd, events, nNodes, MtmKeepaliveTimeout/1000);
567587
if (n < 0) {
588+
if (errno == EINTR) {
589+
continue;
590+
}
568591
elog(ERROR, "Arbiter failed to poll sockets: %d", errno);
569592
}
570593
for (j = 0; j < n; j++) {
@@ -581,7 +604,9 @@ static void MtmTransReceiver(Datum arg)
581604
events = inset;
582605
tv.tv_sec = MtmKeepaliveTimeout/USEC;
583606
tv.tv_usec = MtmKeepaliveTimeout%USEC;
584-
n = select(max_fd+1, &events, NULL, NULL, &tv);
607+
do {
608+
n = select(max_fd+1, &events, NULL, NULL, &tv);
609+
} while (n < 0 && errno == ENINTR);
585610
} while (n < 0 && MtmRecovery());
586611

587612
if (rc < 0) {
@@ -612,31 +637,62 @@ static void MtmTransReceiver(Datum arg)
612637
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);
613638
Assert(ts != NULL);
614639
Assert(msg->node > 0 && msg->node <= nNodes && msg->node != MtmNodeId);
615-
Assert (MtmIsCoordinator(ts));
616-
switch (msg->code) {
617-
case MSG_PREPARED:
618-
if (ts->status != TRANSACTION_STATUS_ABORTED) {
619-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS || ts->status == TRANSACTION_STATUS_UNKNOWN);
620-
if (msg->csn > ts->csn) {
621-
ts->csn = msg->csn;
622-
MtmSyncClock(ts->csn);
623-
}
624-
if (++ts->nVotes == ds->nNodes) {
625-
MtmWakeUpBackend(ts);
640+
if (MtmIsCoordinator(ts)) {
641+
switch (msg->code) {
642+
case MSG_READY:
643+
Assert(ts->nVotes < ds->nNodes);
644+
ds->nodeTransDelay[msg->node-1] += MtmGetCurrentTime() - ts->csn;
645+
ts->xids[msg->node-1] = msg->sxid;
646+
if (++ts->nVotes == ds->nNodes) {
647+
/* All nodes are finished their transactions */
648+
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
649+
ts->nVotes = 1; /* I voted myself */
650+
MtmSendNotificationMessage(ts, MSG_PREPARE);
651+
} else {
652+
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
653+
MtmWakeUpBackend(ts);
626654
}
627655
}
628-
break;
629-
case MSG_ABORTED:
656+
break;
657+
case MSG_ABORTED:
658+
Assert(ts->nVotes < ds->nNodes);
630659
if (ts->status != TRANSACTION_STATUS_ABORTED) {
631-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS || ts->status == TRANSACTION_STATUS_UNKNOWN);
660+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
632661
ts->status = TRANSACTION_STATUS_ABORTED;
633662
MtmAdjustSubtransactions(ts);
663+
}
664+
if (++ts->nVotes == ds->nNodes) {
634665
MtmWakeUpBackend(ts);
635666
}
636667
break;
637-
default:
668+
case MSG_PREPARED:
669+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
670+
Assert(ts->nVotes < ds->nNodes);
671+
if (msg->csn > ts->csn) {
672+
ts->csn = msg->csn;
673+
MtmSyncClock(ts->csn);
674+
}
675+
if (++ts->nVotes == ds->nNodes) {
676+
ts->csn = MtmAssignCSN();
677+
ts->status = TRANSACTION_STATUS_UNKNOWN;
678+
MtmWakeUpBackend(ts);
679+
}
680+
default:
681+
Assert(false);
682+
}
683+
} else {
684+
switch (msg->code) {
685+
case MSG_PREPARE:
686+
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
687+
ts->status = TRANSACTION_STATUS_UNKNOWN;
688+
ts->csn = MtmAssignCSN();
689+
MtmAdjustSubtransactions(ts);
690+
MtmSendNotificationMessage(ts, MSG_PREPARED);
691+
break;
692+
default:
638693
Assert(false);
639-
}
694+
}
695+
}
640696
}
641697
MtmUnlock();
642698

bgwpool.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ static void BgwPoolMainLoop(Datum arg)
3737
SpinLockAcquire(&pool->lock);
3838
size = *(int*)&pool->queue[pool->head];
3939
Assert(size < pool->size);
40-
work = palloc(size);
40+
work = malloc(size);
4141
pool->active -= 1;
4242
if (pool->head + size + 4 > pool->size) {
4343
memcpy(work, pool->queue, size);
@@ -55,7 +55,7 @@ static void BgwPoolMainLoop(Datum arg)
5555
}
5656
SpinLockRelease(&pool->lock);
5757
pool->executor(id, work, size);
58-
pfree(work);
58+
free(work);
5959
}
6060
}
6161

0 commit comments

Comments
 (0)