Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 33f01b4

Browse files
knizhnikkelvich
authored andcommitted
Support RDMA for multimaster
1 parent e7a5da3 commit 33f01b4

File tree

4 files changed

+64
-38
lines changed

4 files changed

+64
-38
lines changed

arbiter.c

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,14 @@
1818
#include <time.h>
1919
#include <fcntl.h>
2020

21+
#ifdef WITH_RSOCKET
22+
#include <rdma/rsocket.h>
23+
#endif
24+
2125
#include "postgres.h"
2226
#include "fmgr.h"
2327
#include "miscadmin.h"
28+
#include "pg_socket.h"
2429
#include "postmaster/postmaster.h"
2530
#include "postmaster/bgworker.h"
2631
#include "storage/s_lock.h"
@@ -58,6 +63,7 @@
5863
#include "tcop/utility.h"
5964
#include "libpq/ip.h"
6065

66+
6167
#ifndef USE_EPOLL
6268
#ifdef __linux__
6369
#define USE_EPOLL 0
@@ -185,7 +191,7 @@ static void MtmUnregisterSocket(int fd)
185191
static void MtmDisconnect(int node)
186192
{
187193
MtmUnregisterSocket(sockets[node]);
188-
close(sockets[node]);
194+
pg_closesocket(sockets[node], MtmUseRDMA);
189195
sockets[node] = -1;
190196
MtmOnNodeDisconnect(node+1);
191197
}
@@ -208,7 +214,7 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
208214
FD_SET(sd, &set);
209215
tv.tv_sec = (deadline - now)/USECS_PER_SEC;
210216
tv.tv_usec = (deadline - now)%USECS_PER_SEC;
211-
} while ((rc = select(sd+1, forWrite ? NULL : &set, forWrite ? &set : NULL, NULL, &tv)) < 0 && errno == EINTR);
217+
} while ((rc = pg_select([sd+1, forWrite ? NULL : &set, forWrite ? &set : NULL, NULL, &tv, MtmUseRDMA)) < 0 && errno == EINTR);
212218

213219
return rc;
214220
}
@@ -219,7 +225,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
219225
while (size != 0) {
220226
int rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
221227
if (rc == 1) {
222-
while ((rc = send(sd, src, size, 0)) < 0 && errno == EINTR);
228+
while ((rc = pg_send(sd, src, size, 0, MtmUseRDMA)) < 0 && errno == EINTR);
223229
if (rc < 0) {
224230
if (errno == EINPROGRESS) {
225231
continue;
@@ -238,11 +244,11 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
238244
static int MtmReadSocket(int sd, void* buf, int buf_size)
239245
{
240246
int rc;
241-
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
247+
while ((rc = pg_recv(sd, buf, buf_size, 0, MtmUseRDMA)) < 0 && errno == EINTR);
242248
if (rc <= 0 && (errno == EAGAIN || errno == EINPROGRESS)) {
243249
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
244250
if (rc == 1) {
245-
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
251+
while ((rc = pg_recv(sd, buf, buf_size, 0, MtmUseRDMA)) < 0 && errno == EINTR);
246252
}
247253
}
248254
return rc;
@@ -254,25 +260,25 @@ static void MtmSetSocketOptions(int sd)
254260
{
255261
#ifdef TCP_NODELAY
256262
int on = 1;
257-
if (setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&on, sizeof(on)) < 0) {
263+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char const*)&on, sizeof(on), MtmUseRDMA) < 0) {
258264
MTM_ELOG(WARNING, "Failed to set TCP_NODELAY: %m");
259265
}
260266
#endif
261-
if (setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char const*)&on, sizeof(on)) < 0) {
267+
if (pg_setsockopt(sd, SOL_SOCKET, SO_KEEPALIVE, (char const*)&on, sizeof(on), MtmUseRDMA) < 0) {
262268
MTM_ELOG(WARNING, "Failed to set SO_KEEPALIVE: %m");
263269
}
264270

265271
if (tcp_keepalives_idle) {
266272
#ifdef TCP_KEEPIDLE
267-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPIDLE,
268-
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle)) < 0)
273+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPIDLE,
274+
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle), MtmUseRDMA) < 0)
269275
{
270276
MTM_ELOG(WARNING, "Failed to set TCP_KEEPIDLE: %m");
271277
}
272278
#else
273279
#ifdef TCP_KEEPALIVE
274-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPALIVE,
275-
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle)) < 0)
280+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPALIVE,
281+
(char *) &tcp_keepalives_idle, sizeof(tcp_keepalives_idle), MtmUseRDMA) < 0)
276282
{
277283
MTM_ELOG(WARNING, "Failed to set TCP_KEEPALIVE: %m");
278284
}
@@ -281,17 +287,17 @@ static void MtmSetSocketOptions(int sd)
281287
}
282288
#ifdef TCP_KEEPINTVL
283289
if (tcp_keepalives_interval) {
284-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPINTVL,
285-
(char *) &tcp_keepalives_interval, sizeof(tcp_keepalives_interval)) < 0)
290+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPINTVL,
291+
(char *) &tcp_keepalives_interval, sizeof(tcp_keepalives_interval), MtmUseRDMA) < 0)
286292
{
287293
MTM_ELOG(WARNING, "Failed to set TCP_KEEPINTVL: %m");
288294
}
289295
}
290296
#endif
291297
#ifdef TCP_KEEPCNT
292298
if (tcp_keepalives_count) {
293-
if (setsockopt(sd, IPPROTO_TCP, TCP_KEEPCNT,
294-
(char *) &tcp_keepalives_count, sizeof(tcp_keepalives_count)) < 0)
299+
if (pg_setsockopt(sd, IPPROTO_TCP, TCP_KEEPCNT,
300+
(char *) &tcp_keepalives_count, sizeof(tcp_keepalives_count), MtmUseRDMA) < 0)
295301
{
296302
MTM_ELOG(WARNING, "Failed to set TCP_KEEPCNT: %m");
297303
}
@@ -375,7 +381,7 @@ static void MtmSendHeartbeat()
375381
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
376382
if (BIT_CHECK(SELF_CONNECTIVITY_MASK, i)) {
377383
MTM_LOG1("Force reconnect to node %d", i+1);
378-
close(sockets[i]);
384+
pg_closesocket(sockets[i], MtmUseRDMA);
379385
sockets[i] = -1;
380386
MtmReconnectNode(i+1); /* set reconnect mask to force node reconnent */
381387
}
@@ -436,20 +442,20 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
436442
Retry:
437443
while (1) {
438444
int rc = -1;
439-
sd = socket(AF_INET, SOCK_STREAM, 0);
445+
sd = pg_socket(AF_INET, SOCK_STREAM, 0, MtmUseRDMA);
440446
if (sd < 0) {
441447
MTM_ELOG(LOG, "Arbiter failed to create socket: %d", errno);
442448
goto Error;
443449
}
444-
rc = fcntl(sd, F_SETFL, O_NONBLOCK);
450+
rc = pg_fcntl(sd, F_SETFL, O_NONBLOCK, MtmUseRDMA);
445451
if (rc < 0) {
446452
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
447453
goto Error;
448454
}
449455
for (addr = addrs; addr != NULL; addr = addr->ai_next)
450456
{
451457
do {
452-
rc = connect(sd, addr->ai_addr, addr->ai_addrlen);
458+
rc = pg_connect(sd, addr->ai_addr, addr->ai_addrlen, MtmUseRDMA);
453459
} while (rc < 0 && errno == EINTR);
454460

455461
if (rc >= 0 || errno == EINPROGRESS) {
@@ -479,7 +485,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
479485
} else {
480486
MTM_ELOG(WARNING, "Arbiter waiting socket to %s:%d: rc=%d, error=%d", host, port, rc, errno);
481487
}
482-
close(sd);
488+
pg_closesocket(sd, MtmUseRDMA);
483489
afterWait = MtmGetSystemTime();
484490
if (afterWait < beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
485491
MtmSleep(beforeWait + MSEC_TO_USEC(MtmHeartbeatSendTimeout) - afterWait);
@@ -495,17 +501,17 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
495501
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
496502
if (!MtmWriteSocket(sd, &req, sizeof req)) {
497503
MTM_ELOG(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
498-
close(sd);
504+
pg_closesocket(sd, MtmUseRDMA);
499505
goto Retry;
500506
}
501507
if (MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
502508
MTM_ELOG(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: errno=%d", host, port, errno);
503-
close(sd);
509+
pg_closesocket(sd, MtmUseRDMA);
504510
goto Retry;
505511
}
506512
if (resp.code != MSG_STATUS || resp.dxid != HANDSHAKE_MAGIC) {
507513
MTM_ELOG(WARNING, "Arbiter get unexpected response %d for handshake message from %s:%d", resp.code, host, port);
508-
close(sd);
514+
pg_closesocket(sd, MtmUseRDMA);
509515
goto Retry;
510516
}
511517
if (addrs)
@@ -524,7 +530,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
524530
Error:
525531
busy_mask = save_mask;
526532
if (sd >= 0) {
527-
close(sd);
533+
pg_closesocket(sd, MtmUseRDMA);
528534
}
529535
if (addrs) {
530536
pg_freeaddrinfo_all(hint.ai_family, addrs);
@@ -572,7 +578,7 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
572578
*/
573579
if (sockets[node] >= 0 && BIT_CHECK(Mtm->reconnectMask, node)) {
574580
MTM_ELOG(WARNING, "Arbiter is forced to reconnect to node %d", node+1);
575-
close(sockets[node]);
581+
pg_closesocket(sockets[node], MtmUseRDMA);
576582
sockets[node] = -1;
577583
}
578584
#endif
@@ -584,7 +590,7 @@ static bool MtmSendToNode(int node, void const* buf, int size, time_t reconnectT
584590
if (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
585591
if (sockets[node] >= 0) {
586592
MTM_ELOG(WARNING, "Arbiter fail to write to node %d: %d", node+1, errno);
587-
close(sockets[node]);
593+
pg_closesocket(sockets[node], MtmUseRDMA);
588594
sockets[node] = -1;
589595
}
590596
sockets[node] = MtmConnectSocket(node, Mtm->nodes[node].con.arbiterPort, reconnectTimeout);
@@ -615,23 +621,23 @@ static int MtmReadFromNode(int node, void* buf, int buf_size)
615621

616622
static void MtmAcceptOneConnection()
617623
{
618-
int fd = accept(gateway, NULL, NULL);
624+
int fd = pg_accept(gateway, NULL, NULL, MtmUseRDMA);
619625
if (fd < 0) {
620626
MTM_ELOG(WARNING, "Arbiter failed to accept socket: %d", errno);
621627
} else {
622628
MtmHandshakeMessage req;
623629
MtmArbiterMessage resp;
624-
int rc = fcntl(fd, F_SETFL, O_NONBLOCK);
630+
int rc = pg_fcntl(fd, F_SETFL, O_NONBLOCK, MtmUseRDMA);
625631
if (rc < 0) {
626632
MTM_ELOG(ERROR, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
627633
}
628634
rc = MtmReadSocket(fd, &req, sizeof req);
629635
if (rc < sizeof(req)) {
630636
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %d, errno=%d", rc, errno);
631-
close(fd);
637+
pg_closesocket(fd, MtmUseRDMA);
632638
} else if (req.hdr.code != MSG_HANDSHAKE && req.hdr.dxid != HANDSHAKE_MAGIC) {
633639
MTM_ELOG(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
634-
close(fd);
640+
pg_closesocket(fd, MtmUseRDMA);
635641
} else {
636642
int node = req.hdr.node-1;
637643
Assert(node >= 0 && node < Mtm->nAllNodes && node+1 != MtmNodeId);
@@ -648,7 +654,7 @@ static void MtmAcceptOneConnection()
648654
MtmUpdateNodeConnectionInfo(&Mtm->nodes[node].con, req.connStr);
649655
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
650656
MTM_ELOG(WARNING, "Arbiter failed to write response for handshake message to node %d", node+1);
651-
close(fd);
657+
pg_closesocket(fd, MtmUseRDMA);
652658
} else {
653659
MTM_LOG1("Arbiter established connection with node %d", node+1);
654660
if (sockets[node] >= 0) {
@@ -678,18 +684,18 @@ static void MtmAcceptIncomingConnections()
678684
sock_inet.sin_addr.s_addr = htonl(INADDR_ANY);
679685
sock_inet.sin_port = htons(MtmArbiterPort);
680686

681-
gateway = socket(sock_inet.sin_family, SOCK_STREAM, 0);
687+
gateway = pg_socket(sock_inet.sin_family, SOCK_STREAM, 0, MtmUseRDMA);
682688
if (gateway < 0) {
683689
MTM_ELOG(ERROR, "Arbiter failed to create socket: %s", strerror(errno));
684690
}
685-
if (setsockopt(gateway, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on) < 0) {
691+
if (pg_setsockopt(gateway, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on) < 0) {
686692
MTM_ELOG(ERROR, "Arbiter failed to set options for socket: %s", strerror(errno));
687693
}
688694

689-
if (bind(gateway, (struct sockaddr*)&sock_inet, sizeof(sock_inet)) < 0) {
695+
if (pg_bind(gateway, (struct sockaddr*)&sock_inet, sizeof(sock_inet), MtmUseRDMA) < 0) {
690696
MTM_ELOG(ERROR, "Arbiter failed to bind socket: %s", strerror(errno));
691697
}
692-
if (listen(gateway, nNodes) < 0) {
698+
if (pg_listen(gateway, nNodes, MtmUseRDMA) < 0) {
693699
MTM_ELOG(ERROR, "Arbiter failed to listen socket: %s", strerror(errno));
694700
}
695701

@@ -790,7 +796,7 @@ static bool MtmRecovery()
790796
fd_set tryset;
791797
FD_ZERO(&tryset);
792798
FD_SET(sd, &tryset);
793-
if (select(sd+1, &tryset, NULL, NULL, &tm) < 0) {
799+
if (pg_select(sd+1, &tryset, NULL, NULL, &tm, MtmUseRDMA) < 0) {
794800
MTM_ELOG(WARNING, "Arbiter lost connection with node %d", i+1);
795801
MtmDisconnect(i);
796802
recovered = true;
@@ -883,7 +889,7 @@ static void MtmReceiver(Datum arg)
883889
tv.tv_sec = selectTimeout/1000;
884890
tv.tv_usec = selectTimeout%1000*1000;
885891
do {
886-
n = select(max_fd+1, &events, NULL, NULL, &tv);
892+
n = pg_select(max_fd+1, &events, NULL, NULL, &tv, MtmUseRDMA);
887893
} while (n < 0 && errno == EINTR);
888894
} while (n < 0 && MtmRecovery());
889895

multimaster.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ int MtmHeartbeatRecvTimeout;
237237
int MtmMin2PCTimeout;
238238
int MtmMax2PCRatio;
239239
bool MtmUseDtm;
240+
bool MtmUseRDMA;
240241
bool MtmPreserveCommitOrder;
241242
bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
242243

@@ -3091,6 +3092,19 @@ _PG_init(void)
30913092
NULL
30923093
);
30933094

3095+
DefineCustomBoolVariable(
3096+
"multimaster.use_rdma",
3097+
"Use RDMA sockets",
3098+
NULL,
3099+
&MtmUseRDMA,
3100+
false,
3101+
PGC_POSTMASTER,
3102+
0,
3103+
NULL,
3104+
NULL,
3105+
NULL
3106+
);
3107+
30943108
DefineCustomBoolVariable(
30953109
"multimaster.preserve_commit_order",
30963110
"Transactions from one node will be committed in same order on all nodes",

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ extern int MtmNodeDisableDelay;
349349
extern int MtmTransSpillThreshold;
350350
extern int MtmHeartbeatSendTimeout;
351351
extern int MtmHeartbeatRecvTimeout;
352+
extern bool MtmUseRDMA;
352353
extern bool MtmUseDtm;
353354
extern bool MtmPreserveCommitOrder;
354355
extern HTAB* MtmXid2State;

pglogical_receiver.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@
1414
*/
1515

1616
/* Some general headers for custom bgworker facility */
17+
#ifdef WITH_RSOCKET
18+
#include <rdma/rsocket.h>
19+
#endif
20+
1721
#include <unistd.h>
1822
#include "postgres.h"
1923
#include "fmgr.h"
2024
#include "miscadmin.h"
25+
#include "pg_socket.h"
2126
#include "libpq-fe.h"
2227
#include "pqexpbuffer.h"
2328
#include "access/xact.h"
@@ -635,7 +640,7 @@ pglogical_receiver_main(Datum main_arg)
635640
timeout.tv_usec = usecs;
636641
timeoutptr = &timeout;
637642

638-
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
643+
r = pg_select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr, conn->isRsocket);
639644
if (r == 0)
640645
{
641646
int64 now = feGetCurrentTimestamp();

0 commit comments

Comments
 (0)