Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5bd20f8

Browse files
knizhnikkelvich
authored andcommitted
Do not use epoll
1 parent 8c5bcc9 commit 5bd20f8

7 files changed

+79
-32
lines changed

arbiter.c

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959

6060
#ifndef USE_EPOLL
6161
#ifdef __linux__
62-
#define USE_EPOLL 1
62+
#define USE_EPOLL 0
6363
#else
6464
#define USE_EPOLL 0
6565
#endif
@@ -105,7 +105,7 @@ typedef struct
105105
static int* sockets;
106106
static int gateway;
107107
static bool send_heartbeat;
108-
static timestamp_t last_sent_hearbeat;
108+
static timestamp_t last_sent_heartbeat;
109109
static TimeoutId heartbeat_timer;
110110
static int busy_socket;
111111

@@ -266,17 +266,20 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266266

267267
static int MtmReadSocket(int sd, void* buf, int buf_size)
268268
{
269-
int rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
270-
if (rc == 1) {
271-
int rc = recv(sd, buf, buf_size, 0);
272-
if (rc <= 0) {
273-
Assert(errno != EINTR); /* should not happen in non-blocking call */
274-
return -1;
269+
int rc = recv(sd, buf, buf_size, 0);
270+
if (rc < 0 && errno == EAGAIN) {
271+
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
272+
if (rc == 1) {
273+
rc = recv(sd, buf, buf_size, 0);
274+
if (rc < 0) {
275+
Assert(errno != EINTR); /* should not happen in non-blocking call */
276+
return -1;
277+
}
278+
} else {
279+
return 0;
275280
}
276-
return rc;
277-
} else {
278-
return 0;
279281
}
282+
return rc;
280283
}
281284

282285

@@ -343,7 +346,7 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
343346

344347
static void MtmScheduleHeartbeat()
345348
{
346-
// Assert(!last_sent_hearbeat || last_sent_hearbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
349+
// Assert(!last_sent_heartbeat || last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
347350
enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
348351
send_heartbeat = true;
349352
PGSemaphoreUnlock(&Mtm->votingSemaphore);
@@ -353,11 +356,16 @@ static void MtmSendHeartbeat()
353356
{
354357
int i;
355358
MtmArbiterMessage msg;
359+
timestamp_t now = MtmGetSystemTime();
356360
msg.code = MSG_HEARTBEAT;
357361
msg.disabledNodeMask = Mtm->disabledNodeMask;
358362
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
359363
msg.node = MtmNodeId;
360-
last_sent_hearbeat = MtmGetSystemTime();
364+
msg.csn = now;
365+
if (last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
366+
MTM_LOG1("More than %ld microseconds since last heartbeat", now - last_sent_heartbeat);
367+
}
368+
last_sent_heartbeat = now;
361369

362370
for (i = 0; i < Mtm->nAllNodes; i++)
363371
{
@@ -366,6 +374,8 @@ static void MtmSendHeartbeat()
366374
{
367375
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
368376
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
377+
} else {
378+
MTM_LOG1("Send heartbeat to node %d with timestamp %ld", i+1, now);
369379
}
370380
}
371381
}
@@ -558,7 +568,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
558568
static int MtmReadFromNode(int node, void* buf, int buf_size)
559569
{
560570
int rc = MtmReadSocket(sockets[node], buf, buf_size);
561-
if (rc <= 0) {
571+
if (rc < 0) {
562572
elog(WARNING, "Arbiter failed to read from node=%d, rc=%d, errno=%d", node+1, rc, errno);
563573
MtmDisconnect(node);
564574
}
@@ -812,6 +822,8 @@ static void MtmTransReceiver(Datum arg)
812822
}
813823

814824
while (!stop) {
825+
timestamp_t startPolling = MtmGetSystemTime();
826+
timestamp_t stopPolling;
815827
#if USE_EPOLL
816828
n = epoll_wait(epollfd, events, nNodes, MtmHeartbeatRecvTimeout);
817829
if (n < 0) {
@@ -820,13 +832,17 @@ static void MtmTransReceiver(Datum arg)
820832
}
821833
elog(ERROR, "Arbiter failed to poll sockets: %d", errno);
822834
}
835+
stopPolling = MtmGetSystemTime();
836+
823837
for (j = 0; j < n; j++) {
824838
i = events[j].data.u32;
825839
if (events[j].events & EPOLLERR) {
826840
elog(WARNING, "Arbiter lost connection with node %d", i+1);
827841
MtmDisconnect(i);
828842
}
829-
else if (events[j].events & EPOLLIN)
843+
}
844+
for (j = 0; j < n; j++) {
845+
if (events[j].events & EPOLLIN)
830846
#else
831847
fd_set events;
832848
do {
@@ -842,6 +858,8 @@ static void MtmTransReceiver(Datum arg)
842858
if (n < 0) {
843859
elog(ERROR, "Arbiter failed to select sockets: %d", errno);
844860
}
861+
stopPolling = MtmGetSystemTime();
862+
845863
for (i = 0; i < nNodes; i++) {
846864
if (sockets[i] >= 0 && FD_ISSET(sockets[i], &events))
847865
#endif
@@ -871,7 +889,8 @@ static void MtmTransReceiver(Datum arg)
871889
Mtm->nodes[msg->node-1].lastHeartbeat = MtmGetSystemTime();
872890

873891
if (msg->code == MSG_HEARTBEAT) {
874-
MTM_LOG3("Receive HEARTBEAT from node %d at %ld", msg->node, USEC_TO_MSEC(MtmGetSystemTime()));
892+
MTM_LOG1("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893+
msg->node, msg->csn, USEC_TO_MSEC(MtmGetSystemTime() - msg->csn));
875894
continue;
876895
}
877896
if (BIT_CHECK(msg->disabledNodeMask, msg->node-1)) {
@@ -985,7 +1004,14 @@ static void MtmTransReceiver(Datum arg)
9851004
}
9861005
now = MtmGetSystemTime();
9871006
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
988-
MtmWatchdog();
1007+
if (!MtmWatchdog(stopPolling)) {
1008+
for (i = 0; i < nNodes; i++) {
1009+
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1010+
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1011+
}
1012+
}
1013+
MTM_LOG1("epoll started %ld and finished %ld microseconds ago", now - startPolling, now - stopPolling);
1014+
}
9891015
lastHeartbeatCheck = now;
9901016
}
9911017
if (n == 0 && Mtm->disabledNodeMask != 0) {

multimaster.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,13 @@ void MtmLock(LWLockMode mode)
248248
#ifdef USE_SPINLOCK
249249
SpinLockAcquire(&Mtm->spinlock);
250250
#else
251+
timestamp_t start, stop;
252+
start = MtmGetSystemTime();
251253
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID], mode);
254+
stop = MtmGetSystemTime();
255+
if (stop > start + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
256+
MTM_LOG1("%d: obtaining %s lock takes %ld microseconds", MyProcPid, (mode == LW_EXCLUSIVE ? "exclusive" : "shared"), stop - start);
257+
}
252258
#endif
253259
Mtm->lastLockHolder = MyProcPid;
254260
}
@@ -819,10 +825,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
819825
/*
820826
* Check heartbeats
821827
*/
822-
void MtmWatchdog(void)
828+
bool MtmWatchdog(timestamp_t now)
823829
{
824830
int i, n = Mtm->nAllNodes;
825-
timestamp_t now = MtmGetSystemTime();
831+
bool allAlive = true;
826832
for (i = 0; i < n; i++) {
827833
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
828834
if (Mtm->nodes[i].lastHeartbeat != 0
@@ -831,9 +837,11 @@ void MtmWatchdog(void)
831837
elog(WARNING, "Heartbeat is not received from node %d during %d msec",
832838
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
833839
MtmOnNodeDisconnect(i+1);
840+
allAlive = false;
834841
}
835842
}
836843
}
844+
return allAlive;
837845
}
838846

839847

@@ -926,7 +934,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
926934
MtmLock(LW_EXCLUSIVE);
927935
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
928936
Assert(tm != NULL && tm->state != NULL);
929-
MTM_LOG1("%ld: Abort prepared transaction %d with gid='%s'", MtmGetSystemTime(), x->xid, x->gid);
937+
MTM_LOG1("Abort prepared transaction %d with gid='%s'", x->xid, x->gid);
930938
MtmAbortTransaction(tm->state);
931939
MtmUnlock();
932940
x->status = TRANSACTION_STATUS_ABORTED;
@@ -1184,6 +1192,7 @@ static void MtmDisableNode(int nodeId)
11841192
static void MtmEnableNode(int nodeId)
11851193
{
11861194
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1195+
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
11871196
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
11881197
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
11891198
Mtm->nLiveNodes += 1;
@@ -1569,6 +1578,7 @@ void MtmOnNodeConnect(int nodeId)
15691578
{
15701579
MtmLock(LW_EXCLUSIVE);
15711580
BIT_CLEAR(Mtm->connectivityMask, nodeId-1);
1581+
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
15721582
MtmUnlock();
15731583

15741584
MTM_LOG1("Reconnect node %d", nodeId);
@@ -1881,7 +1891,7 @@ _PG_init(void)
18811891
DefineCustomIntVariable(
18821892
"multimaster.heartbeat_send_timeout",
18831893
"Timeout in milliseconds of sending heartbeat messages",
1884-
"Period of broadcasting heartbeat messages by abiter to all nodes",
1894+
"Period of broadcasting heartbeat messages by arbiter to all nodes",
18851895
&MtmHeartbeatSendTimeout,
18861896
1000,
18871897
1,
@@ -2286,6 +2296,7 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
22862296
/* Choose for recovery first available slot */
22872297
MTM_LOG1("Start recovery from node %d", nodeId);
22882298
Mtm->recoverySlot = nodeId;
2299+
FinishAllPreparedTransactions(false);
22892300
return SLOT_OPEN_EXISTED;
22902301
}
22912302
}

multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ extern void MtmMakeTableLocal(char* schema, char* name);
277277
extern void MtmHandleApplyError(void);
278278
extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
279279
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
280-
extern void MtmWatchdog(void);
280+
extern bool MtmWatchdog(timestamp_t now);
281281
extern void MtmCheckHeartbeat(void);
282282

283283

pglogical_apply.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -538,11 +538,11 @@ process_remote_commit(StringInfo in)
538538
Assert(IsTransactionState() && TransactionIdIsValid(MtmGetCurrentTransactionId()));
539539
gid = pq_getmsgstring(in);
540540
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_IN_PROGRESS) == TRANSACTION_STATUS_ABORTED) {
541-
MTM_LOG1("%ld: avoid prepare of previously aborted global transaction %s", MtmGetSystemTime(), gid);
541+
MTM_LOG1("Avoid prepare of previously aborted global transaction %s", gid);
542542
AbortCurrentTransaction();
543543
} else {
544544
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
545-
MTM_LOG2("%ld: PGLOGICAL_PREPARE commit: gid=%s", MtmGetSystemTime(), gid);
545+
MTM_LOG2("PGLOGICAL_PREPARE commit: gid=%s", gid);
546546
BeginTransactionBlock();
547547
CommitTransactionCommand();
548548
StartTransactionCommand();
@@ -554,7 +554,7 @@ process_remote_commit(StringInfo in)
554554
CommitTransactionCommand();
555555

556556
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_UNKNOWN) == TRANSACTION_STATUS_ABORTED) {
557-
MTM_LOG1("%ld: perform delayed rollback of prepared global transaction %s", MtmGetSystemTime(), gid);
557+
MTM_LOG1("Perform delayed rollback of prepared global transaction %s", gid);
558558
StartTransactionCommand();
559559
MtmSetCurrentTransactionGID(gid);
560560
FinishPreparedTransaction(gid, false);
@@ -568,7 +568,7 @@ process_remote_commit(StringInfo in)
568568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569569
csn = pq_getmsgint64(in);
570570
gid = pq_getmsgstring(in);
571-
MTM_LOG2("%ld: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", MtmGetSystemTime(), csn, gid);
571+
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", csn, gid);
572572
StartTransactionCommand();
573573
MtmBeginSession();
574574
MtmSetCurrentTransactionCSN(csn);
@@ -581,9 +581,9 @@ process_remote_commit(StringInfo in)
581581
{
582582
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
583583
gid = pq_getmsgstring(in);
584-
MTM_LOG2("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s", MtmGetSystemTime(), gid);
584+
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s", gid);
585585
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
586-
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", MtmGetSystemTime(), gid);
586+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
587587
StartTransactionCommand();
588588
MtmSetCurrentTransactionGID(gid);
589589
FinishPreparedTransaction(gid, false);

pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
161161
}
162162
pq_sendbyte(out, 'C'); /* sending COMMIT */
163163

164-
MTM_LOG2("%ld: PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx", MtmGetSystemTime(), flags, txn->gid, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr());
164+
MTM_LOG2("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx", flags, txn->gid, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr());
165165

166166
/* send the flags field */
167167
pq_sendbyte(out, flags);

pglogical_receiver.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
#include "spill.h"
4141

4242
#define ERRCODE_DUPLICATE_OBJECT_STR "42710"
43-
#define RECEIVER_SUSPEND_TIMEOUT (10*USECS_PER_SEC)
43+
#define RECEIVER_SUSPEND_TIMEOUT (1*USECS_PER_SEC)
4444

4545
/* Signal handling */
4646
static volatile sig_atomic_t got_sigterm = false;
@@ -357,9 +357,10 @@ pglogical_receiver_main(Datum main_arg)
357357
if (rc & WL_POSTMASTER_DEATH)
358358
proc_exit(1);
359359

360-
if (Mtm->status == MTM_OFFLINE || (Mtm->status == MTM_RECOVERY && Mtm->recoverySlot != nodeId)) {
360+
if (Mtm->status == MTM_OFFLINE || (Mtm->status == MTM_RECOVERY && Mtm->recoverySlot != nodeId))
361+
{
361362
ereport(LOG, (errmsg("%s: suspending WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
362-
goto OnError;
363+
break;
363364
}
364365

365366

@@ -578,6 +579,9 @@ pglogical_receiver_main(Datum main_arg)
578579
goto OnError;
579580
}
580581
}
582+
PQfinish(conn);
583+
continue;
584+
581585
OnError:
582586
PQfinish(conn);
583587
MtmSleep(RECEIVER_SUSPEND_TIMEOUT);

raftable.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
1919
void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
2020
{
2121
if (MtmUseRaftable) {
22+
timestamp_t start, stop;
23+
start = MtmGetSystemTime();
2224
if (nowait) {
2325
raftable_set(key, value, size, 0);
2426
} else {
2527
while (!raftable_set(key, value, size, MtmHeartbeatSendTimeout)) {
2628
MtmCheckHeartbeat();
2729
}
2830
}
31+
stop = MtmGetSystemTime();
32+
if (stop > start + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
33+
MTM_LOG1("Raftable set nowait=%d takes %ld microseconds", nowait, stop - start);
34+
}
2935
}
3036
}
3137

0 commit comments

Comments
 (0)