Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5bf4b47

Browse files
committed
return back filtering and status polling
1 parent 941421a commit 5bf4b47

8 files changed

+39
-46
lines changed

arbiter.c

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,10 @@ static void MtmSendHeartbeat()
366366
* true when disabled node connects to a major node which
367367
* is online. So just send it allways. --sk
368368
*/
369+
/*
370+
* Also recovered node need full visibitily matrix to build proper
371+
* clique. So hearbeats should be sent disregarding disabledNodeMask.
372+
*/
369373
// && (Mtm->status != MTM_ONLINE
370374
// || sockets[i] >= 0
371375
// || !BIT_CHECK(Mtm->disabledNodeMask, i)
@@ -1000,16 +1004,10 @@ static void MtmReceiver(Datum arg)
10001004
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
10011005
MTM_ELOG(LOG, "Commit transaction %s because it is prepared at all live nodes", msg->gid);
10021006

1003-
ts->status = msg->status;
1004-
MtmWakeUpBackend(ts);
1005-
// replorigin_session_origin = DoNotReplicateId;
1007+
replorigin_session_origin = DoNotReplicateId;
10061008
TXFINISH("%s COMMIT, MSG_POLL_STATUS", msg->gid);
1007-
// MtmUnlock();
1008-
// LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1009-
// MtmFinishPreparedTransaction(ts, true);
1010-
// LWLockRelease(TwoPhaseStateLock);
1011-
// MtmLock(LW_EXCLUSIVE);
1012-
// replorigin_session_origin = InvalidRepOriginId;
1009+
MtmFinishPreparedTransaction(ts, true);
1010+
replorigin_session_origin = InvalidRepOriginId;
10131011
} else {
10141012
MTM_LOG1("Receive response for transaction %s -> %s, participants=%llx, voted=%llx",
10151013
msg->gid, MtmTxnStatusMnem[msg->status], ts->participantsMask, ts->votedMask);

multimaster.c

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,7 +1750,7 @@ static void MtmLoadPreparedTransactions(void)
17501750
MtmTransactionListAppend(ts);
17511751
tm->status = ts->status;
17521752
tm->state = ts;
1753-
// MtmBroadcastPollMessage(ts);
1753+
MtmBroadcastPollMessage(ts);
17541754
}
17551755
}
17561756
MTM_LOG1("Recover %d prepared transactions", n);
@@ -3404,7 +3404,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
34043404
{
34053405
/* Lock on us */
34063406
Mtm->recoverySlot = nodeId;
3407-
// MtmPollStatusOfPreparedTransactions();
3407+
MtmPollStatusOfPreparedTransactions();
34083408
MtmUnlock();
34093409
return REPLMODE_RECOVERY;
34103410
}
@@ -3563,13 +3563,13 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35633563
ulong64 recoveredLSN;
35643564
sscanf(strVal(elem->arg), "%llx", &recoveredLSN);
35653565
MTM_LOG1("Recovered position of node %d is %llx", MtmReplicationNodeId, recoveredLSN);
3566-
// if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3567-
// MTM_LOG1("Advance restartLSN for node %d from %llx to %llx (MtmReplicationStartupHook)",
3568-
// MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
3569-
// // Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == INVALID_LSN
3570-
// // || recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
3571-
// Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
3572-
// }
3566+
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3567+
MTM_LOG1("Advance restartLSN for node %d from %llx to %llx (MtmReplicationStartupHook)",
3568+
MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
3569+
// Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == INVALID_LSN
3570+
// || recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
3571+
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
3572+
}
35733573
} else {
35743574
MTM_ELOG(ERROR, "Recovered position is not specified");
35753575
}
@@ -3786,11 +3786,7 @@ bool MtmFilterTransaction(char* record, int size)
37863786
default:
37873787
break;
37883788
}
3789-
3790-
return false;
3791-
37923789
restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
3793-
37943790
if (Mtm->nodes[origin_node-1].restartLSN < restart_lsn) {
37953791
MTM_LOG2("[restartlsn] node %d: %llx -> %llx (MtmFilterTransaction)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, restart_lsn);
37963792
if (event != PGLOGICAL_PREPARE) {

multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
#define MTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4242
#endif
4343

44-
// #define MTM_TXFINISH 1
44+
#define MTM_TXFINISH 1
4545

4646
#ifndef MTM_TXFINISH
4747
#define TXFINISH(fmt, ...)

pglogical_apply.c

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -463,21 +463,20 @@ process_remote_message(StringInfo s)
463463
* restartLSN without locks
464464
*/
465465
if (origin_node == MtmReplicationNodeId) {
466-
// Assert(msg->origin_lsn == INVALID_LSN);
467-
// msg->origin_lsn = MtmSenderWalEnd;
466+
Assert(msg->origin_lsn == INVALID_LSN);
467+
msg->origin_lsn = MtmSenderWalEnd;
468+
}
469+
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
470+
MTM_LOG1("Receive logical abort message for transaction %s from node %d: %llx < %llx", msg->gid, origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
471+
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
472+
replorigin_session_origin_lsn = msg->origin_lsn;
473+
MtmRollbackPreparedTransaction(origin_node, msg->gid);
474+
} else {
475+
if (msg->origin_lsn != INVALID_LSN) {
476+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx",
477+
msg->gid, origin_node, msg->origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
478+
}
468479
}
469-
MtmRollbackPreparedTransaction(origin_node, msg->gid);
470-
// if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
471-
// MTM_LOG1("Receive logical abort message for transaction %s from node %d: %llx < %llx", msg->gid, origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
472-
// Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
473-
// replorigin_session_origin_lsn = msg->origin_lsn;
474-
// MtmRollbackPreparedTransaction(origin_node, msg->gid);
475-
// } else {
476-
// if (msg->origin_lsn != INVALID_LSN) {
477-
// MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx",
478-
// msg->gid, origin_node, msg->origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
479-
// }
480-
// }
481480
standalone = true;
482481
break;
483482
}

pglogical_receiver.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,10 @@ pglogical_receiver_main(Datum main_arg)
349349
resetPQExpBuffer(query);
350350
Mtm->nodes[nodeId-1].manualRecovery = false;
351351
} else {
352-
// if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
353-
// MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
354-
// Mtm->nodes[nodeId-1].restartLSN = originStartPos;
355-
// }
352+
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
353+
MTM_LOG1("Advance restartLSN for node %d: from %llx to %llx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
354+
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
355+
}
356356
MTM_LOG1("Restart logical receiver at position %llx from node %d", originStartPos, nodeId);
357357
}
358358

state.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,9 +287,9 @@ void MtmDisableNode(int nodeId)
287287

288288
if (Mtm->status == MTM_ONLINE) {
289289
/* Make decision about prepared transaction status only in quorum */
290-
//MtmLock(LW_EXCLUSIVE);
291-
//MtmPollStatusOfPreparedTransactionsForDisabledNode(nodeId, false);
292-
//MtmUnlock();
290+
MtmLock(LW_EXCLUSIVE);
291+
MtmPollStatusOfPreparedTransactionsForDisabledNode(nodeId, false);
292+
MtmUnlock();
293293
}
294294
}
295295

tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ if [ "$1" = 'postgres' ]; then
6767
multimaster.heartbeat_send_timeout = 250
6868
multimaster.max_recovery_lag = 1GB
6969
multimaster.min_recovery_lag = 10kB
70-
multimaster.preserve_commit_order = off
70+
multimaster.preserve_commit_order = on
7171
EOF
7272

7373
if [ -n "$NODE_ID" ]; then

tests2/lib/bank_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def run(self):
289289
self.loop = asyncio.get_event_loop()
290290

291291
for i, _ in enumerate(self.dsns):
292-
for j in range(15):
292+
for j in range(3):
293293
asyncio.ensure_future(self.exec_tx(self.transfer_tx, i, 'transfer', j))
294294
asyncio.ensure_future(self.exec_tx(self.total_tx, i, 'sumtotal', 0))
295295

0 commit comments

Comments
 (0)