Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit beeff8f

Browse files
committed
Drop replication slots on recovered nodes
1 parent 5740217 commit beeff8f

File tree

6 files changed

+33
-21
lines changed

6 files changed

+33
-21
lines changed

contrib/mmts/arbiter.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ static void MtmSendHeartbeat()
362362
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
363363
}
364364
} else {
365-
MTM_LOG1("Do not send hearbeat to node %d, busy mask %ld, status %d", i+1, busy_mask, Mtm->status);
365+
MTM_LOG1("Do not send heartbeat to node %d, busy mask %ld, status %d", i+1, busy_mask, Mtm->status);
366366
}
367367
}
368368
}
@@ -484,6 +484,8 @@ static int MtmConnectSocket(int node, int port, int timeout)
484484
MtmCheckResponse(&resp);
485485
MtmUnlock();
486486

487+
MtmOnNodeConnect(node+1);
488+
487489
busy_mask = save_mask;
488490

489491
return sd;
@@ -539,7 +541,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
539541
}
540542
if (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
541543
if (sockets[node] >= 0) {
542-
elog(WARNING, "Arbiter failed to write to node %d: %d", node+1, errno);
544+
elog(WARNING, "Arbiter fail to write to node %d: %d", node+1, errno);
543545
close(sockets[node]);
544546
sockets[node] = -1;
545547
}
@@ -549,7 +551,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
549551
result = false;
550552
break;
551553
}
552-
MTM_LOG3("Arbiter restablished connection with node %d", node+1);
554+
MTM_LOG3("Arbiter reestablish connection with node %d", node+1);
553555
} else {
554556
result = true;
555557
break;
@@ -1075,7 +1077,7 @@ static void MtmReceiver(Datum arg)
10751077
if (!MtmWatchdog(now)) {
10761078
for (i = 0; i < nNodes; i++) {
10771079
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1078-
MTM_LOG1("Last hearbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1080+
MTM_LOG1("Last heartbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
10791081
}
10801082
}
10811083
}

contrib/mmts/multimaster.c

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
451451
for (i = 0; i < MAX_WAIT_LOOPS; i++)
452452
{
453453
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
454-
if (ts != NULL/* && ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
454+
if (ts != NULL /*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
455455
{
456456
if (ts->csn > MtmTx.snapshot) {
457457
MTM_LOG4("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld",
@@ -1243,13 +1243,17 @@ void MtmAbortTransaction(MtmTransState* ts)
12431243
{
12441244
Assert(MtmLockCount != 0); /* should be invoked with exclsuive lock */
12451245
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1246-
MTM_LOG1("Rollback active transaction %d:%d (local xid %d)", ts->gtid.node, ts->gtid.xid, ts->xid);
1247-
ts->status = TRANSACTION_STATUS_ABORTED;
1248-
MtmAdjustSubtransactions(ts);
1249-
if (ts->isActive) {
1250-
ts->isActive = false;
1251-
Assert(Mtm->nActiveTransactions != 0);
1252-
Mtm->nActiveTransactions -= 1;
1246+
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1247+
elog(WARNING, "Attempt to rollback already committed transaction %d (%s)", ts->xid, ts->gid);
1248+
} else {
1249+
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %d", ts->gtid.node, ts->gtid.xid, ts->xid, ts->status);
1250+
ts->status = TRANSACTION_STATUS_ABORTED;
1251+
MtmAdjustSubtransactions(ts);
1252+
if (ts->isActive) {
1253+
ts->isActive = false;
1254+
Assert(Mtm->nActiveTransactions != 0);
1255+
Mtm->nActiveTransactions -= 1;
1256+
}
12531257
}
12541258
}
12551259
}
@@ -1321,6 +1325,7 @@ static void MtmDisableNode(int nodeId)
13211325
elog(WARNING, "Disable node %d at xlog position %lx, last status change time %d msec ago", nodeId, GetXLogInsertRecPtr(),
13221326
(int)USEC_TO_MSEC(now - Mtm->nodes[nodeId-1].lastStatusChangeTime));
13231327
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
1328+
Mtm->nodes[nodeId-1].timeline += 1;
13241329
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
13251330
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
13261331
if (nodeId != MtmNodeId) {
@@ -1344,8 +1349,8 @@ static void MtmEnableNode(int nodeId)
13441349
void MtmRecoveryCompleted(void)
13451350
{
13461351
int i;
1347-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, reconnect mask=%lx, live nodes=%d",
1348-
MtmNodeId, Mtm->disabledNodeMask, Mtm->reconnectMask, Mtm->nLiveNodes);
1352+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, connectivity mask=%lx, live nodes=%d",
1353+
MtmNodeId, Mtm->disabledNodeMask, Mtm->connectivityMask, Mtm->nLiveNodes);
13491354
MtmLock(LW_EXCLUSIVE);
13501355
Mtm->recoverySlot = 0;
13511356
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
@@ -1903,6 +1908,7 @@ static void MtmInitialize()
19031908
Mtm->nodes[i].lastHeartbeat = 0;
19041909
Mtm->nodes[i].restartLsn = 0;
19051910
Mtm->nodes[i].originId = InvalidRepOriginId;
1911+
Mtm->nodes[i].timeline = 0;
19061912
}
19071913
PGSemaphoreCreate(&Mtm->sendSemaphore);
19081914
PGSemaphoreReset(&Mtm->sendSemaphore);

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ typedef struct
184184
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
185185
XLogRecPtr restartLsn;
186186
RepOriginId originId;
187+
int timeline;
187188
} MtmNodeInfo;
188189

189190
typedef struct MtmTransState

contrib/mmts/pglogical_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,7 @@ process_remote_commit(StringInfo in)
620620
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
621621
csn = pq_getmsgint64(in);
622622
gid = pq_getmsgstring(in);
623-
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
623+
MTM_LOG1("PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s, lsn=%ld", csn, gid, end_lsn);
624624
StartTransactionCommand();
625625
MtmBeginSession();
626626
MtmSetCurrentTransactionCSN(csn);
@@ -1057,7 +1057,7 @@ void MtmExecutor(int id, void* work, size_t size)
10571057
MemoryContextSwitchTo(oldcontext);
10581058
EmitErrorReport();
10591059
FlushErrorState();
1060-
MTM_LOG2("%d: REMOTE begin abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());
1060+
MTM_LOG1("%d: REMOTE begin abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());
10611061
MtmEndSession(false);
10621062
AbortCurrentTransaction();
10631063
MTM_LOG2("%d: REMOTE end abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());

contrib/mmts/pglogical_proto.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
155155
else
156156
Assert(false);
157157

158+
Assert(flags != PGLOGICAL_COMMIT_PREPARED || txn->xid < 1000 || MtmTransactionRecords != 1);
159+
158160
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
159161
if (MtmIsFilteredTxn) {
160162
Assert(MtmTransactionRecords == 0);
@@ -164,8 +166,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
164166
csn_t csn = MtmTransactionSnapshot(txn->xid);
165167
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
166168

167-
168-
if (!isRecovery && txn->origin_id != InvalidRepOriginId)
169+
if (!isRecovery && csn == INVALID_CSN && (flags != PGLOGICAL_ABORT_PREPARED || txn->origin_id != InvalidRepOriginId))
169170
{
170171
if (flags == PGLOGICAL_ABORT_PREPARED) {
171172
MTM_LOG1("Skip ABORT_PREPARED for transaction %s to node %d", txn->gid, MtmReplicationNodeId);
@@ -182,6 +183,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
182183
flags |= PGLOGICAL_CAUGHT_UP;
183184
}
184185
}
186+
185187
pq_sendbyte(out, 'C'); /* sending COMMIT */
186188

187189
MTM_LOG2("PGLOGICAL_SEND commit: event=%d, gid=%s, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx", flags, txn->gid, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr());

contrib/mmts/pglogical_receiver.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ pglogical_receiver_main(Datum main_arg)
250250
int count;
251251
ConnStatusType status;
252252
XLogRecPtr originStartPos = InvalidXLogRecPtr;
253+
int timeline;
253254

254255
/*
255256
* Determine when and how we should open replication slot.
@@ -261,6 +262,7 @@ pglogical_receiver_main(Datum main_arg)
261262
{
262263
break;
263264
}
265+
timeline = Mtm->nodes[nodeId-1].timeline;
264266
count = Mtm->recoveryCount;
265267

266268
/* Establish connection to remote server */
@@ -274,14 +276,13 @@ pglogical_receiver_main(Datum main_arg)
274276
}
275277

276278
query = createPQExpBuffer();
277-
#if 0 /* Do we need to recreate slot ? */
278-
if (mode == REPLMODE_RECOVERED) { /* recreate slot */
279+
if (mode == REPLMODE_NORMAL && timeline != Mtm->nodes[nodeId-1].timeline) { /* recreate slot */
279280
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
280281
res = PQexec(conn, query->data);
281282
PQclear(res);
282283
resetPQExpBuffer(query);
284+
timeline = Mtm->nodes[nodeId-1].timeline;
283285
}
284-
#endif
285286
/* My original assumption was that we can perfrom recovery only fromm existed slot,
286287
* but unfortunately looks like slots can "disapear" together with WAL-sender.
287288
* So let's try to recreate slot always. */

0 commit comments

Comments
 (0)