Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit eb90a7d

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' of https://gitlab.postgrespro.ru/pgpro-dev/postgrespro into PGPROEE9_6_MULTIMASTER
2 parents 2baf093 + 9caeb00 commit eb90a7d

File tree

3 files changed

+64
-108
lines changed

3 files changed

+64
-108
lines changed

contrib/mmts/arbiter.c

Lines changed: 62 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ static void MtmSender(Datum arg);
9191
static void MtmReceiver(Datum arg);
9292
static void MtmMonitor(Datum arg);
9393
static void MtmSendHeartbeat(void);
94-
static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectTimeout);
94+
static bool MtmSendToNode(int node, void const* buf, int size);
9595

9696
char const* const MtmMessageKindMnem[] =
9797
{
@@ -160,7 +160,7 @@ static void MtmRegisterSocket(int fd, int node)
160160
ev.events = EPOLLIN;
161161
ev.data.u32 = node;
162162
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
163-
MTM_ELOG(LOG, "Arbiter failed to add socket to epoll set: %d", errno);
163+
MTM_ELOG(LOG, "Arbiter failed to add socket to epoll set: %s", strerror(errno));
164164
}
165165
#else
166166
FD_SET(fd, &inset);
@@ -174,7 +174,7 @@ static void MtmUnregisterSocket(int fd)
174174
{
175175
#if USE_EPOLL
176176
if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
177-
MTM_ELOG(LOG, "Arbiter failed to unregister socket from epoll set: %d", errno);
177+
MTM_ELOG(LOG, "Arbiter failed to unregister socket from epoll set: %s", strerror(errno));
178178
}
179179
#else
180180
FD_CLR(fd, &inset);
@@ -365,7 +365,7 @@ static void MtmSendHeartbeat()
365365
|| !BIT_CHECK(Mtm->disabledNodeMask, i)
366366
|| BIT_CHECK(Mtm->reconnectMask, i)))
367367
{
368-
if (!MtmSendToNode(i, &msg, sizeof(msg), MtmHeartbeatRecvTimeout)) {
368+
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
369369
MTM_ELOG(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
370370
} else {
371371
if (last_heartbeat_to_node[i] + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
@@ -402,7 +402,7 @@ void MtmCheckHeartbeat()
402402
}
403403

404404

405-
static int MtmConnectSocket(int node, int port, time_t timeout)
405+
static int MtmConnectSocket(int node, int port)
406406
{
407407
struct addrinfo *addrs = NULL;
408408
struct addrinfo *addr;
@@ -411,12 +411,9 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
411411
MtmHandshakeMessage req;
412412
MtmArbiterMessage resp;
413413
int sd = -1;
414-
int ret;
415-
timestamp_t start = MtmGetSystemTime();
414+
int rc;
416415
char const* host = Mtm->nodes[node].con.hostName;
417416
nodemask_t save_mask = busy_mask;
418-
timestamp_t afterWait;
419-
timestamp_t beforeWait;
420417

421418
/* Initialize hint structure */
422419
MemSet(&hint, 0, sizeof(hint));
@@ -425,67 +422,60 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
425422

426423
snprintf(portstr, sizeof(portstr), "%d", port);
427424

428-
ret = pg_getaddrinfo_all(host, portstr, &hint, &addrs);
429-
if (ret != 0)
425+
rc = pg_getaddrinfo_all(host, portstr, &hint, &addrs);
426+
if (rc != 0)
430427
{
431-
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name: (%d) %s", host, ret, gai_strerror(ret));
428+
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name: %s", host, gai_strerror(rc));
432429
return -1;
433430
}
434431
BIT_SET(busy_mask, node);
435432

436-
Retry:
437-
while (1) {
438-
int rc = -1;
439-
sd = socket(AF_INET, SOCK_STREAM, 0);
440-
if (sd < 0) {
441-
MTM_ELOG(LOG, "Arbiter failed to create socket: %d", errno);
442-
goto Error;
443-
}
444-
rc = fcntl(sd, F_SETFL, O_NONBLOCK);
445-
if (rc < 0) {
446-
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
447-
goto Error;
448-
}
449-
for (addr = addrs; addr != NULL; addr = addr->ai_next)
450-
{
451-
do {
452-
rc = connect(sd, addr->ai_addr, addr->ai_addrlen);
453-
} while (rc < 0 && errno == EINTR);
433+
Retry:
454434

455-
if (rc >= 0 || errno == EINPROGRESS) {
456-
break;
457-
}
458-
}
459-
if (rc == 0) {
435+
sd = socket(AF_INET, SOCK_STREAM, 0);
436+
if (sd < 0) {
437+
MTM_ELOG(LOG, "Arbiter failed to create socket: %s", strerror(errno));
438+
goto Error;
439+
}
440+
rc = fcntl(sd, F_SETFL, O_NONBLOCK);
441+
if (rc < 0) {
442+
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %s", strerror(errno));
443+
goto Error;
444+
}
445+
for (addr = addrs; addr != NULL; addr = addr->ai_next)
446+
{
447+
do {
448+
rc = connect(sd, addr->ai_addr, addr->ai_addrlen);
449+
} while (rc < 0 && errno == EINTR);
450+
451+
if (rc >= 0 || errno == EINPROGRESS) {
460452
break;
461453
}
462-
beforeWait = MtmGetSystemTime();
463-
if (errno != EINPROGRESS || start + MSEC_TO_USEC(timeout) < beforeWait ) {
464-
MTM_ELOG(WARNING, "Arbiter failed to connect to %s:%d: error=%d", host, port, errno);
465-
goto Error;
466-
} else {
467-
rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
468-
if (rc == 1) {
469-
socklen_t optlen = sizeof(int);
470-
if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&rc, &optlen) < 0) {
471-
MTM_ELOG(WARNING, "Arbiter failed to getsockopt for %s:%d: error=%d", host, port, errno);
472-
goto Error;
473-
}
474-
if (rc == 0) {
475-
break;
476-
} else {
477-
MTM_ELOG(WARNING, "Arbiter trying to connect to %s:%d: rc=%d, error=%d", host, port, rc, errno);
478-
}
479-
} else {
480-
MTM_ELOG(WARNING, "Arbiter waiting socket to %s:%d: rc=%d, error=%d", host, port, rc, errno);
454+
}
455+
456+
if (rc != 0 && errno == EINPROGRESS) {
457+
rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
458+
if (rc == 1) {
459+
socklen_t optlen = sizeof(int);
460+
int errcode;
461+
462+
if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&errcode, &optlen) < 0) {
463+
MTM_ELOG(WARNING, "Arbiter failed to getsockopt for %s:%d: %s", host, port, strerror(errcode));
464+
goto Error;
481465
}
482-
close(sd);
483-
afterWait = MtmGetSystemTime();
484-
if (afterWait < beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
485-
MtmSleep(beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout) - afterWait);
466+
if (errcode != 0) {
467+
MTM_ELOG(WARNING, "Arbiter trying to connect to %s:%d: %s", host, port, strerror(errcode));
468+
goto Error;
486469
}
470+
} else {
471+
MTM_ELOG(WARNING, "Arbiter waiting socket to %s:%d: %s", host, port, strerror(errno));
487472
}
488473
}
474+
else if (rc != 0) {
475+
MTM_ELOG(WARNING, "Arbiter failed to connect to %s:%d: (%d) %s", host, port, rc, strerror(errno));
476+
goto Error;
477+
}
478+
489479
MtmSetSocketOptions(sd);
490480
MtmInitMessage(&req.hdr, MSG_HANDSHAKE);
491481
req.hdr.node = MtmNodeId;
@@ -494,12 +484,12 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
494484
req.hdr.csn = MtmGetCurrentTime();
495485
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
496486
if (!MtmWriteSocket(sd, &req, sizeof req)) {
497-
MTM_ELOG(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
487+
MTM_ELOG(WARNING, "Arbiter failed to send handshake message to %s:%d: %s", host, port, strerror(errno));
498488
close(sd);
499489
goto Retry;
500490
}
501491
if (MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
502-
MTM_ELOG(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: errno=%d", host, port, errno);
492+
MTM_ELOG(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: %s", host, port, strerror(errno));
503493
close(sd);
504494
goto Retry;
505495
}
@@ -521,7 +511,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
521511

522512
return sd;
523513

524-
Error:
514+
Error:
525515
busy_mask = save_mask;
526516
if (sd >= 0) {
527517
close(sd);
@@ -545,7 +535,7 @@ static void MtmOpenConnections()
545535
}
546536
for (i = 0; i < nNodes; i++) {
547537
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
548-
sockets[i] = MtmConnectSocket(i, Mtm->nodes[i].con.arbiterPort, MtmConnectTimeout);
538+
sockets[i] = MtmConnectSocket(i, Mtm->nodes[i].con.arbiterPort);
549539
if (sockets[i] < 0) {
550540
MtmOnNodeDisconnect(i+1);
551541
}
@@ -560,7 +550,7 @@ static void MtmOpenConnections()
560550
}
561551

562552

563-
static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectTimeout)
553+
static bool MtmSendToNode(int node, void const* buf, int size)
564554
{
565555
bool result = true;
566556
nodemask_t save_mask = busy_mask;
@@ -583,11 +573,11 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
583573
}
584574
if (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
585575
if (sockets[node] >= 0) {
586-
MTM_ELOG(WARNING, "Arbiter fail to write to node %d: %d", node+1, errno);
576+
MTM_ELOG(WARNING, "Arbiter fail to write to node %d: %s", node+1, strerror(errno));
587577
close(sockets[node]);
588578
sockets[node] = -1;
589579
}
590-
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort, reconnectTimeout);
580+
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort);
591581
if (sockets[node] < 0) {
592582
MtmOnNodeDisconnect(node+1);
593583
result = false;
@@ -607,7 +597,7 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
607597
{
608598
int rc = MtmReadSocket(sockets[node], buf, buf_size);
609599
if (rc <= 0) {
610-
MTM_ELOG(WARNING, "Arbiter failed to read from node=%d, rc=%d, errno=%d", node+1, rc, errno);
600+
MTM_ELOG(WARNING, "Arbiter failed to read from node=%d: %s", node+1, strerror(errno));
611601
MtmDisconnect(node);
612602
}
613603
return rc;
@@ -617,17 +607,17 @@ static void MtmAcceptOneConnection()
617607
{
618608
int fd = accept(gateway, NULL, NULL);
619609
if (fd < 0) {
620-
MTM_ELOG(WARNING, "Arbiter failed to accept socket: %d", errno);
610+
MTM_ELOG(WARNING, "Arbiter failed to accept socket: %s", strerror(errno));
621611
} else {
622612
MtmHandshakeMessage req;
623613
MtmArbiterMessage resp;
624614
int rc = fcntl(fd, F_SETFL, O_NONBLOCK);
625615
if (rc < 0) {
626-
MTM_ELOG(ERROR, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
616+
MTM_ELOG(ERROR, "Arbiter failed to switch socket to non-blocking mode: %s", strerror(errno));
627617
}
628618
rc = MtmReadSocket(fd, &req, sizeof req);
629619
if (rc < sizeof(req)) {
630-
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %d, errno=%d", rc, errno);
620+
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %s", strerror(errno));
631621
close(fd);
632622
} else if (req.hdr.code != MSG_HANDSHAKE && req.hdr.dxid != HANDSHAKE_MAGIC) {
633623
MTM_ELOG(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
@@ -764,7 +754,7 @@ static void MtmSender(Datum arg)
764754

765755
for (i = 0; i < Mtm->nAllNodes; i++) {
766756
if (txBuffer[i].used != 0) {
767-
MtmSendToNode(i, txBuffer[i].data, txBuffer[i].used*sizeof(MtmArbiterMessage), MtmReconnectTimeout);
757+
MtmSendToNode(i, txBuffer[i].data, txBuffer[i].used*sizeof(MtmArbiterMessage));
768758
txBuffer[i].used = 0;
769759
}
770760
}
@@ -864,7 +854,7 @@ static void MtmReceiver(Datum arg)
864854
if (errno == EINTR) {
865855
continue;
866856
}
867-
MTM_ELOG(ERROR, "Arbiter failed to poll sockets: %d", errno);
857+
MTM_ELOG(ERROR, "Arbiter failed to poll sockets: %s", strerror(errno));
868858
}
869859
for (j = 0; j < n; j++) {
870860
i = events[j].data.u32;
@@ -888,7 +878,7 @@ static void MtmReceiver(Datum arg)
888878
} while (n < 0 && MtmRecovery());
889879

890880
if (n < 0) {
891-
MTM_ELOG(ERROR, "Arbiter failed to select sockets: %d", errno);
881+
MTM_ELOG(ERROR, "Arbiter failed to select sockets: %s", strerror(errno));
892882
}
893883
for (i = 0; i < nNodes; i++) {
894884
if (sockets[i] >= 0 && FD_ISSET(sockets[i], &events))

contrib/mmts/multimaster.c

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ char const* const MtmNodeStatusMnem[] =
204204
{
205205
"Initialization",
206206
"Offline",
207-
"Connected",
207+
"Connecting",
208208
"Online",
209209
"Recovery",
210210
"Recovered",
@@ -229,8 +229,6 @@ int MtmNodes;
229229
int MtmNodeId;
230230
int MtmReplicationNodeId;
231231
int MtmArbiterPort;
232-
int MtmConnectTimeout;
233-
int MtmReconnectTimeout;
234232
int MtmNodeDisableDelay;
235233
int MtmTransSpillThreshold;
236234
int MtmMaxNodes;
@@ -3278,36 +3276,6 @@ _PG_init(void)
32783276
NULL
32793277
);
32803278

3281-
DefineCustomIntVariable(
3282-
"multimaster.connect_timeout",
3283-
"Multimaster nodes connect timeout",
3284-
"Interval in milliseconds for establishing connection with cluster node",
3285-
&MtmConnectTimeout,
3286-
10000, /* 10 seconds */
3287-
1,
3288-
INT_MAX,
3289-
PGC_BACKEND,
3290-
0,
3291-
NULL,
3292-
NULL,
3293-
NULL
3294-
);
3295-
3296-
DefineCustomIntVariable(
3297-
"multimaster.reconnect_timeout",
3298-
"Multimaster nodes reconnect timeout",
3299-
"Interval in milliseconds for reestablishing connection with cluster node",
3300-
&MtmReconnectTimeout,
3301-
5000, /* 5 seconds */
3302-
1,
3303-
INT_MAX,
3304-
PGC_BACKEND,
3305-
0,
3306-
NULL,
3307-
NULL,
3308-
NULL
3309-
);
3310-
33113279
if (!ConfigIsSane()) {
33123280
MTM_ELOG(ERROR, "Multimaster config is insane, refusing to work");
33133281
}
@@ -5182,7 +5150,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
51825150
if (relid != InvalidOid) {
51835151
Oid constraint_oid;
51845152
Bitmapset* pk = get_primary_key_attnos(relid, true, &constraint_oid);
5185-
if (pk == NULL) {
5153+
if (pk == NULL && !MtmVolksWagenMode) {
51865154
elog(WARNING,
51875155
MtmIgnoreTablesWithoutPk
51885156
? "Table %s.%s without primary will not be replicated"

contrib/mmts/multimaster.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,6 @@ extern int MtmNodes;
343343
extern int MtmArbiterPort;
344344
extern char* MtmDatabaseName;
345345
extern char* MtmDatabaseUser;
346-
extern int MtmConnectTimeout;
347-
extern int MtmReconnectTimeout;
348346
extern int MtmNodeDisableDelay;
349347
extern int MtmTransSpillThreshold;
350348
extern int MtmHeartbeatSendTimeout;

0 commit comments

Comments
 (0)