Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 21fee91

Browse files
knizhnikkelvich
authored andcommitted
Drop replication slots on recovered nodes
1 parent 4993bce commit 21fee91

6 files changed

+33
-21
lines changed

arbiter.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ static void MtmSendHeartbeat()
362362
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
363363
}
364364
} else {
365-
MTM_LOG1("Do not send hearbeat to node %d, busy mask %ld, status %d", i+1, busy_mask, Mtm->status);
365+
MTM_LOG1("Do not send heartbeat to node %d, busy mask %ld, status %d", i+1, busy_mask, Mtm->status);
366366
}
367367
}
368368
}
@@ -484,6 +484,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
484484
MtmCheckResponse(&resp);
485485
MtmUnlock();
486486

487+
MtmOnNodeConnect(node+1);
488+
487489
busy_mask = save_mask;
488490

489491
return sd;
@@ -539,7 +541,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
539541
}
540542
if (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
541543
if (sockets[node] >= 0) {
542-
elog(WARNING, "Arbiter failed to write to node %d: %d", node+1, errno);
544+
elog(WARNING, "Arbiter fail to write to node %d: %d", node+1, errno);
543545
close(sockets[node]);
544546
sockets[node] = -1;
545547
}
@@ -549,7 +551,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
549551
result = false;
550552
break;
551553
}
552-
MTM_LOG3("Arbiter restablished connection with node %d", node+1);
554+
MTM_LOG3("Arbiter reestablish connection with node %d", node+1);
553555
} else {
554556
result = true;
555557
break;
@@ -1075,7 +1077,7 @@ static void MtmReceiver(Datum arg)
10751077
if (!MtmWatchdog(now)) {
10761078
for (i = 0; i < nNodes; i++) {
10771079
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1078-
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1080+
MTM_LOG1("Last heartbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
10791081
}
10801082
}
10811083
}

multimaster.c

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
450450
for (i = 0; i < MAX_WAIT_LOOPS; i++)
451451
{
452452
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
453-
if (ts != NULL/* && ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
453+
if (ts != NULL /*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
454454
{
455455
if (ts->csn > MtmTx.snapshot) {
456456
MTM_LOG4("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld",
@@ -1242,13 +1242,17 @@ void MtmAbortTransaction(MtmTransState* ts)
12421242
{
12431243
Assert(MtmLockCount != 0); /* should be invoked with exclsuive lock */
12441244
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1245-
MTM_LOG1("Rollback active transaction %d:%d (local xid %d)", ts->gtid.node, ts->gtid.xid, ts->xid);
1246-
ts->status = TRANSACTION_STATUS_ABORTED;
1247-
MtmAdjustSubtransactions(ts);
1248-
if (ts->isActive) {
1249-
ts->isActive = false;
1250-
Assert(Mtm->nActiveTransactions != 0);
1251-
Mtm->nActiveTransactions -= 1;
1245+
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1246+
elog(WARNING, "Attempt to rollback already committed transaction %d (%s)", ts->xid, ts->gid);
1247+
} else {
1248+
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %d", ts->gtid.node, ts->gtid.xid, ts->xid, ts->status);
1249+
ts->status = TRANSACTION_STATUS_ABORTED;
1250+
MtmAdjustSubtransactions(ts);
1251+
if (ts->isActive) {
1252+
ts->isActive = false;
1253+
Assert(Mtm->nActiveTransactions != 0);
1254+
Mtm->nActiveTransactions -= 1;
1255+
}
12521256
}
12531257
}
12541258
}
@@ -1320,6 +1324,7 @@ static void MtmDisableNode(int nodeId)
13201324
elog(WARNING, "Disable node %d at xlog position %lx, last status change time %d msec ago", nodeId, GetXLogInsertRecPtr(),
13211325
(int)USEC_TO_MSEC(now - Mtm->nodes[nodeId-1].lastStatusChangeTime));
13221326
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1327+
Mtm->nodes[nodeId-1].timeline += 1;
13231328
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
13241329
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
13251330
if (nodeId != MtmNodeId) {
@@ -1343,8 +1348,8 @@ static void MtmEnableNode(int nodeId)
13431348
void MtmRecoveryCompleted(void)
13441349
{
13451350
int i;
1346-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, reconnect mask=%lx, live nodes=%d",
1347-
MtmNodeId, Mtm->disabledNodeMask, Mtm->reconnectMask, Mtm->nLiveNodes);
1351+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, connectivity mask=%lx, live nodes=%d",
1352+
MtmNodeId, Mtm->disabledNodeMask, Mtm->connectivityMask, Mtm->nLiveNodes);
13481353
MtmLock(LW_EXCLUSIVE);
13491354
Mtm->recoverySlot = 0;
13501355
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
@@ -1902,6 +1907,7 @@ static void MtmInitialize()
19021907
Mtm->nodes[i].lastHeartbeat = 0;
19031908
Mtm->nodes[i].restartLsn = 0;
19041909
Mtm->nodes[i].originId = InvalidRepOriginId;
1910+
Mtm->nodes[i].timeline = 0;
19051911
}
19061912
PGSemaphoreCreate(&Mtm->sendSemaphore);
19071913
PGSemaphoreReset(&Mtm->sendSemaphore);

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ typedef struct
184184
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
185185
XLogRecPtr restartLsn;
186186
RepOriginId originId;
187+
int timeline;
187188
} MtmNodeInfo;
188189

189190
typedef struct MtmTransState

pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ process_remote_commit(StringInfo in)
620620
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
621621
csn = pq_getmsgint64(in);
622622
gid = pq_getmsgstring(in);
623-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
623+
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
624624
StartTransactionCommand();
625625
MtmBeginSession();
626626
MtmSetCurrentTransactionCSN(csn);
@@ -1057,7 +1057,7 @@ void MtmExecutor(int id, void* work, size_t size)
10571057
MemoryContextSwitchTo(oldcontext);
10581058
EmitErrorReport();
10591059
FlushErrorState();
1060-
MTM_LOG2("%d: REMOTE begin abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());
1060+
MTM_LOG1("%d: REMOTE begin abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());
10611061
MtmEndSession(false);
10621062
AbortCurrentTransaction();
10631063
MTM_LOG2("%d: REMOTE end abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());

pglogical_proto.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
155155
else
156156
Assert(false);
157157

158+
Assert(flags != PGLOGICAL_COMMIT_PREPARED || txn->xid < 1000 || MtmTransactionRecords != 1);
159+
158160
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
159161
if (MtmIsFilteredTxn) {
160162
Assert(MtmTransactionRecords == 0);
@@ -164,8 +166,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
164166
csn_t csn = MtmTransactionSnapshot(txn->xid);
165167
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
166168

167-
168-
if (!isRecovery && txn->origin_id != InvalidRepOriginId)
169+
if (!isRecovery && csn == INVALID_CSN && (flags != PGLOGICAL_ABORT_PREPARED || txn->origin_id != InvalidRepOriginId))
169170
{
170171
if (flags == PGLOGICAL_ABORT_PREPARED) {
171172
MTM_LOG1("Skip ABORT_PREPARED for transaction %s to node %d", txn->gid, MtmReplicationNodeId);
@@ -182,6 +183,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
182183
flags |= PGLOGICAL_CAUGHT_UP;
183184
}
184185
}
186+
185187
pq_sendbyte(out, 'C'); /* sending COMMIT */
186188

187189
MTM_LOG2("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx", flags, txn->gid, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr());

pglogical_receiver.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ pglogical_receiver_main(Datum main_arg)
250250
int count;
251251
ConnStatusType status;
252252
XLogRecPtr originStartPos = InvalidXLogRecPtr;
253+
int timeline;
253254

254255
/*
255256
* Determine when and how we should open replication slot.
@@ -261,6 +262,7 @@ pglogical_receiver_main(Datum main_arg)
261262
{
262263
break;
263264
}
265+
timeline = Mtm->nodes[nodeId-1].timeline;
264266
count = Mtm->recoveryCount;
265267

266268
/* Establish connection to remote server */
@@ -274,14 +276,13 @@ pglogical_receiver_main(Datum main_arg)
274276
}
275277

276278
query = createPQExpBuffer();
277-
#if 0 /* Do we need to recreate slot ? */
278-
if (mode == REPLMODE_RECOVERED) { /* recreate slot */
279+
if (mode == REPLMODE_NORMAL && timeline != Mtm->nodes[nodeId-1].timeline) { /* recreate slot */
279280
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
280281
res = PQexec(conn, query->data);
281282
PQclear(res);
282283
resetPQExpBuffer(query);
284+
timeline = Mtm->nodes[nodeId-1].timeline;
283285
}
284-
#endif
285286
/* My original assumption was that we can perfrom recovery only fromm existed slot,
286287
* but unfortunately looks like slots can "disapear" together with WAL-sender.
287288
* So let's try to recreate slot always. */

0 commit comments

Comments
 (0)