Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2afd9cc

Browse files
knizhnikkelvich
authored andcommitted
Recovery fixes
1 parent 5bd20f8 commit 2afd9cc

6 files changed

+87
-49
lines changed

arbiter.c

Lines changed: 37 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,13 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
247247
while (size != 0) {
248248
int rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
249249
if (rc == 1) {
250-
int n = send(sd, src, size, 0);
251-
if (n < 0) {
252-
Assert(errno != EINTR); /* should not happen in non-blocking call */
250+
while ((rc = send(sd, src, size, 0)) < 0 && errno == EINTR);
251+
if (rc < 0) {
253252
busy_socket = -1;
254253
return false;
255254
}
256-
size -= n;
257-
src += n;
255+
size -= rc;
256+
src += rc;
258257
} else if (rc < 0) {
259258
busy_socket = -1;
260259
return false;
@@ -266,15 +265,12 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
266265

267266
static int MtmReadSocket(int sd, void* buf, int buf_size)
268267
{
269-
int rc = recv(sd, buf, buf_size, 0);
268+
int rc;
269+
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
270270
if (rc < 0 && errno == EAGAIN) {
271271
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
272272
if (rc == 1) {
273-
rc = recv(sd, buf, buf_size, 0);
274-
if (rc < 0) {
275-
Assert(errno != EINTR); /* should not happen in non-blocking call */
276-
return -1;
277-
}
273+
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
278274
} else {
279275
return 0;
280276
}
@@ -370,12 +366,13 @@ static void MtmSendHeartbeat()
370366
for (i = 0; i < Mtm->nAllNodes; i++)
371367
{
372368
if (i+1 != MtmNodeId && sockets[i] != busy_socket
373-
&& ((sockets[i] >= 0 && !BIT_CHECK(Mtm->disabledNodeMask, i)) || BIT_CHECK(Mtm->reconnectMask, i)))
369+
&& (Mtm->status != MTM_ONLINE
370+
|| (sockets[i] >= 0 && !BIT_CHECK(Mtm->disabledNodeMask, i) && !BIT_CHECK(Mtm->reconnectMask, i))))
374371
{
375372
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
376373
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
377374
} else {
378-
MTM_LOG1("Send heartbeat to node %d with timestamp %ld", i+1, now);
375+
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
379376
}
380377
}
381378
}
@@ -593,8 +590,9 @@ static void MtmAcceptOneConnection()
593590
} else if (req.hdr.code != MSG_HANDSHAKE && req.hdr.dxid != HANDSHAKE_MAGIC) {
594591
elog(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
595592
close(fd);
596-
} else{
597-
Assert(req.hdr.node > 0 && req.hdr.node <= Mtm->nAllNodes && req.hdr.node != MtmNodeId);
593+
} else {
594+
int node = req.hdr.node-1;
595+
Assert(node >= 0 && node < Mtm->nAllNodes && node+1 != MtmNodeId);
598596

599597
MtmLock(LW_EXCLUSIVE);
600598
MtmCheckResponse(&req.hdr);
@@ -606,15 +604,18 @@ static void MtmAcceptOneConnection()
606604
resp.sxid = ShmemVariableCache->nextXid;
607605
resp.csn = MtmGetCurrentTime();
608606
resp.node = MtmNodeId;
609-
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
607+
MtmUpdateNodeConnectionInfo(&Mtm->nodes[node].con, req.connStr);
610608
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
611-
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);
609+
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", node+1);
612610
close(fd);
613611
} else {
614-
MTM_LOG1("Arbiter established connection with node %d", req.hdr.node);
615-
MtmRegisterSocket(fd, req.hdr.node-1);
616-
sockets[req.hdr.node-1] = fd;
617-
MtmOnNodeConnect(req.hdr.node);
612+
MTM_LOG1("Arbiter established connection with node %d", node+1);
613+
if (sockets[node] >= 0) {
614+
MtmUnregisterSocket(sockets[node]);
615+
}
616+
sockets[node] = fd;
617+
MtmRegisterSocket(fd, node);
618+
MtmOnNodeConnect(node+1);
618619
}
619620
}
620621
}
@@ -889,7 +890,7 @@ static void MtmTransReceiver(Datum arg)
889890
Mtm->nodes[msg->node-1].lastHeartbeat = MtmGetSystemTime();
890891

891892
if (msg->code == MSG_HEARTBEAT) {
892-
MTM_LOG1("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893+
MTM_LOG2("Receive HEARTBEAT from node %d with timestamp %ld delay %ld",
893894
msg->node, msg->csn, USEC_TO_MSEC(MtmGetSystemTime() - msg->csn));
894895
continue;
895896
}
@@ -1002,21 +1003,23 @@ static void MtmTransReceiver(Datum arg)
10021003
}
10031004
}
10041005
}
1005-
now = MtmGetSystemTime();
1006-
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1007-
if (!MtmWatchdog(stopPolling)) {
1008-
for (i = 0; i < nNodes; i++) {
1009-
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1010-
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1006+
if (Mtm->status != MTM_RECOVERY) {
1007+
now = MtmGetSystemTime();
1008+
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1009+
if (!MtmWatchdog(stopPolling)) {
1010+
for (i = 0; i < nNodes; i++) {
1011+
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1012+
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1013+
}
10111014
}
1015+
MTM_LOG1("epoll started %ld and finished %ld microseconds ago", now - startPolling, now - stopPolling);
10121016
}
1013-
MTM_LOG1("epoll started %ld and finished %ld microseconds ago", now - startPolling, now - stopPolling);
1017+
lastHeartbeatCheck = now;
1018+
}
1019+
if (n == 0 && Mtm->disabledNodeMask != 0) {
1020+
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1021+
MtmRefreshClusterStatus(false);
10141022
}
1015-
lastHeartbeatCheck = now;
1016-
}
1017-
if (n == 0 && Mtm->disabledNodeMask != 0) {
1018-
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1019-
MtmRefreshClusterStatus(false);
10201023
}
10211024
}
10221025
proc_exit(1); /* force restart of this bgwroker */

multimaster.c

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,6 @@ MtmCreateTransState(MtmCurrentTrans* x)
742742
/* I am coordinator of transaction */
743743
ts->gtid.xid = x->xid;
744744
ts->gtid.node = MtmNodeId;
745-
//ts->gid[0] = '\0';
746745
strcpy(ts->gid, x->gid);
747746
}
748747
return ts;
@@ -1186,26 +1185,33 @@ static void MtmDisableNode(int nodeId)
11861185
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
11871186
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
11881187
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1189-
Mtm->nLiveNodes -= 1;
1190-
}
1188+
if (nodeId != MtmNodeId) {
1189+
Mtm->nLiveNodes -= 1;
1190+
}
1191+
elog(WARNING, "Disable node %d at xlog position %lx", nodeId, GetXLogInsertRecPtr());
1192+
}
11911193

11921194
static void MtmEnableNode(int nodeId)
11931195
{
11941196
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
11951197
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
11961198
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
11971199
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1198-
Mtm->nLiveNodes += 1;
1200+
if (nodeId != MtmNodeId) {
1201+
Mtm->nLiveNodes += 1;
1202+
}
1203+
elog(WARNING, "Enable node %d at xlog position %lx", nodeId, GetXLogInsertRecPtr());
11991204
}
12001205

12011206
void MtmRecoveryCompleted(void)
12021207
{
1203-
MTM_LOG1("Recovery of node %d is completed", MtmNodeId);
1208+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, reconnect mask=%ld, live nodes=%d",
1209+
MtmNodeId, Mtm->disabledNodeMask, Mtm->reconnectMask, Mtm->nLiveNodes);
12041210
MtmLock(LW_EXCLUSIVE);
12051211
Mtm->recoverySlot = 0;
1206-
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
12071212
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
1208-
/* Mode will be changed to online once all locagical reciever are connected */
1213+
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1214+
/* Mode will be changed to online once all logical reciever are connected */
12091215
MtmSwitchClusterMode(MTM_CONNECTED);
12101216
MtmUnlock();
12111217
}
@@ -1464,16 +1470,16 @@ bool MtmRefreshClusterStatus(bool nowait)
14641470
MtmEnableNode(i+1);
14651471
}
14661472
}
1467-
#endif
14681473
Mtm->reconnectMask |= clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
1474+
#endif
14691475

14701476
if (disabled) {
14711477
MtmCheckQuorum();
14721478
}
14731479
/* Interrupt voting for active transaction and abort them */
14741480
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
14751481
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1476-
ts->gid, ts->gtid.node, ts->xid, ts->status, ts->gtid.xid);
1482+
ts->gid, ts->gtid.nхode, ts->xid, ts->status, ts->gtid.xid);
14771483
if (MtmIsCoordinator(ts)) {
14781484
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
14791485
MtmAbortTransaction(ts);
@@ -1728,6 +1734,7 @@ static void MtmInitialize()
17281734
Mtm->transCount = 0;
17291735
Mtm->gcCount = 0;
17301736
Mtm->nConfigChanges = 0;
1737+
Mtm->recoveryCount = 0;
17311738
Mtm->localTablesHashLoaded = false;
17321739
Mtm->inject2PCError = 0;
17331740
for (i = 0; i < MtmNodes; i++) {
@@ -2271,6 +2278,9 @@ void MtmReceiverStarted(int nodeId)
22712278
MtmLock(LW_EXCLUSIVE);
22722279
if (!BIT_CHECK(Mtm->pglogicalNodeMask, nodeId-1)) {
22732280
BIT_SET(Mtm->pglogicalNodeMask, nodeId-1);
2281+
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2282+
MtmEnableNode(nodeId);
2283+
}
22742284
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
22752285
if (Mtm->status == MTM_CONNECTED) {
22762286
MtmSwitchClusterMode(MTM_ONLINE);
@@ -2296,6 +2306,9 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
22962306
/* Choose for recovery first available slot */
22972307
MTM_LOG1("Start recovery from node %d", nodeId);
22982308
Mtm->recoverySlot = nodeId;
2309+
Mtm->nReceivers = 0;
2310+
Mtm->recoveryCount += 1;
2311+
Mtm->pglogicalNodeMask = 0;
22992312
FinishAllPreparedTransactions(false);
23002313
return SLOT_OPEN_EXISTED;
23012314
}

multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ typedef struct
174174
LWLockPadded *locks; /* multimaster lock tranche */
175175
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
176176
nodemask_t disabledNodeMask; /* bitmask of disabled nodes */
177-
nodemask_t connectivityMask; /* bitmask of dicconnected nodes */
177+
nodemask_t connectivityMask; /* bitmask of disconnected nodes */
178178
nodemask_t pglogicalNodeMask; /* bitmask of started pglogic receivers */
179179
nodemask_t walSenderLockerMask; /* Mask of WAL-senders IDs locking the cluster */
180180
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
@@ -188,6 +188,7 @@ typedef struct
188188
int nLockers; /* Number of lockers */
189189
int nActiveTransactions; /* Nunmber of active 2PC transactions */
190190
int nConfigChanges; /* Number of cluster configuration changes */
191+
int recoveryCount; /* Number of completed recoveries */
191192
int64 timeShift; /* Local time correction */
192193
csn_t csn; /* Last obtained timestamp: used to provide unique acending CSNs based on system time */
193194
csn_t lastCsn; /* CSN of last committed transaction */

pglogical_apply.c

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ static void
492492
MtmEndSession(void)
493493
{
494494
if (replorigin_session_origin != InvalidRepOriginId) {
495-
MTM_LOG3("%d: Begin reset replorigin session: %d", MyProcPid, replorigin_session_origin);
495+
MTM_LOG2("%d: Begin reset replorigin session for node %d: %d, progress %lx", MyProcPid, MtmReplicationNodeId, replorigin_session_origin, replorigin_session_get_progress(false));
496496
replorigin_session_origin = InvalidRepOriginId;
497497
replorigin_session_reset();
498498
if (unlock) {
@@ -568,7 +568,7 @@ process_remote_commit(StringInfo in)
568568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569569
csn = pq_getmsgint64(in);
570570
gid = pq_getmsgstring(in);
571-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", csn, gid);
571+
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
572572
StartTransactionCommand();
573573
MtmBeginSession();
574574
MtmSetCurrentTransactionCSN(csn);
@@ -585,6 +585,7 @@ process_remote_commit(StringInfo in)
585585
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
586586
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
587587
StartTransactionCommand();
588+
MtmBeginSession();
588589
MtmSetCurrentTransactionGID(gid);
589590
FinishPreparedTransaction(gid, false);
590591
CommitTransactionCommand();
@@ -594,6 +595,12 @@ process_remote_commit(StringInfo in)
594595
default:
595596
Assert(false);
596597
}
598+
#if 0 /* Do ont need to advance slot position here: it will be done by transaction commit */
599+
if (replorigin_session_origin != InvalidRepOriginId) {
600+
replorigin_advance(replorigin_session_origin, end_lsn,
601+
XactLastCommitEnd, false, false);
602+
}
603+
#endif
597604
MtmEndSession(true);
598605
MtmUpdateLsnMapping(MtmReplicationNodeId, end_lsn);
599606
if (flags & PGLOGICAL_CAUGHT_UP) {
@@ -936,6 +943,11 @@ void MtmExecutor(int id, void* work, size_t size)
936943
while (true) {
937944
char action = pq_getmsgbyte(&s);
938945
MTM_LOG3("%d: REMOTE process action %c", MyProcPid, action);
946+
#if 0
947+
if (Mtm->status == MTM_RECOVERY) {
948+
MTM_LOG1("Replay action %c[%x]", action, s.data[s.cursor]);
949+
}
950+
#endif
939951
switch (action) {
940952
/* BEGIN */
941953
case 'B':

pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,8 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
424424
PGLogicalProtoAPI *
425425
pglogical_init_api(PGLogicalProtoType typ)
426426
{
427-
PGLogicalProtoAPI* res = palloc0(sizeof(PGLogicalProtoAPI));
427+
PGLogicalProtoAPI* res = malloc(sizeof(PGLogicalProtoAPI));
428+
MemSet(res, 0, sizeof(PGLogicalProtoAPI));
428429
sscanf(MyReplicationSlot->data.name.data, MULTIMASTER_SLOT_PATTERN, &MtmReplicationNodeId);
429430
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d", MyProcPid, MyReplicationSlot->data.name.data, MtmReplicationNodeId);
430431
res->write_rel = pglogical_write_rel;

pglogical_receiver.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,15 @@ pglogical_receiver_main(Datum main_arg)
244244
*/
245245
while (!got_sigterm)
246246
{
247+
int count;
248+
247249
/*
248250
* Determine when and how we should open replication slot.
249251
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
250252
* Slots at other nodes should be removed
251253
*/
252254
mode = MtmReceiverSlotMode(nodeId);
255+
count = Mtm->recoveryCount;
253256

254257
/* Establish connection to remote server */
255258
conn = PQconnectdb(connString);
@@ -303,7 +306,7 @@ pglogical_receiver_main(Datum main_arg)
303306
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
304307
} else {
305308
originStartPos = replorigin_get_progress(originId, false);
306-
MTM_LOG1("Restart logical receiver at position %lx from node %d", originStartPos, nodeId);
309+
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
307310
}
308311
CommitTransactionCommand();
309312

@@ -359,7 +362,12 @@ pglogical_receiver_main(Datum main_arg)
359362

360363
if (Mtm->status == MTM_OFFLINE || (Mtm->status == MTM_RECOVERY && Mtm->recoverySlot != nodeId))
361364
{
362-
ereport(LOG, (errmsg("%s: suspending WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
365+
ereport(LOG, (errmsg("%s: restart WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
366+
break;
367+
}
368+
if (count != Mtm->recoveryCount) {
369+
370+
ereport(LOG, (errmsg("%s: restart WAL receiver because node was recovered", worker_proc)));
363371
break;
364372
}
365373

0 commit comments

Comments
 (0)