Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2e16689

Browse files
knizhnikkelvich
authored andcommitted
Propages seqno in both direction
1 parent 18f3d86 commit 2e16689

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

arbiter.c

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ void MtmCheckHeartbeat()
368368
}
369369

370370

371-
static int MtmConnectSocket(char const* host, int port, int timeout)
371+
static int MtmConnectSocket(int node, int port, int timeout)
372372
{
373373
struct sockaddr_in sock_inet;
374374
unsigned addrs[MAX_ROUTES];
@@ -377,7 +377,7 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
377377
MtmArbiterMessage resp;
378378
int sd;
379379
timestamp_t start = MtmGetSystemTime();
380-
380+
char const* host = Mtm->nodes[node].con.hostName;
381381

382382
sock_inet.sin_family = AF_INET;
383383
sock_inet.sin_port = htons(port);
@@ -446,6 +446,7 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
446446
req.hdr.sxid = ShmemVariableCache->nextXid;
447447
req.hdr.csn = MtmGetCurrentTime();
448448
req.hdr.disabledNodeMask = Mtm->disabledNodeMask;
449+
req.hdr.seqno = Mtm->nodes[node].recvSeqNo;
449450
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
450451
if (!MtmWriteSocket(sd, &req, sizeof req)) {
451452
elog(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
@@ -464,7 +465,9 @@ static int MtmConnectSocket(char const* host, int port, int timeout)
464465
}
465466

466467
MtmLock(LW_EXCLUSIVE);
467-
Mtm->nodes[resp.node-1].sendSeqNo = resp.seqno;
468+
if (Mtm->nodes[resp.node-1].sendSeqNo < resp.seqno) {
469+
Mtm->nodes[resp.node-1].sendSeqNo = resp.seqno;
470+
}
468471

469472
/* Some node considered that I am dead, so switch to recovery mode */
470473
if (BIT_CHECK(resp.disabledNodeMask, MtmNodeId-1)) {
@@ -499,7 +502,7 @@ static void MtmOpenConnections()
499502
} else {
500503
arbiterPort = MtmArbiterPort + i + 1;
501504
}
502-
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, arbiterPort, MtmConnectTimeout);
505+
sockets[i] = MtmConnectSocket(i, arbiterPort, MtmConnectTimeout);
503506
if (sockets[i] < 0) {
504507
MtmOnNodeDisconnect(i+1);
505508
}
@@ -531,7 +534,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
531534
close(sockets[node]);
532535
sockets[node] = -1;
533536
}
534-
sockets[node] = MtmConnectSocket(Mtm->nodes[node].con.hostName, MtmArbiterPort + node + 1, MtmReconnectTimeout);
537+
sockets[node] = MtmConnectSocket(node, MtmArbiterPort + node + 1, MtmReconnectTimeout);
535538
if (sockets[node] < 0) {
536539
MtmOnNodeDisconnect(node+1);
537540
return false;
@@ -579,6 +582,9 @@ static void MtmAcceptOneConnection()
579582
resp.csn = MtmGetCurrentTime();
580583
resp.node = MtmNodeId;
581584
resp.seqno = Mtm->nodes[req.hdr.node-1].recvSeqNo;
585+
if (Mtm->nodes[req.hdr.node-1].sendSeqNo < req.hdr.seqno) {
586+
Mtm->nodes[req.hdr.node-1].sendSeqNo = req.hdr.seqno;
587+
}
582588
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
583589
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
584590
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);

0 commit comments

Comments
 (0)