Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1d135e6

Browse files
knizhnikkelvich
authored andcommitted
Node recovery
1 parent 78af328 commit 1d135e6

File tree

4 files changed

+32
-12
lines changed

4 files changed

+32
-12
lines changed

arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,10 +348,9 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
348348
/* Some node considered that I am dead, so switch to recovery mode */
349349
if (BIT_CHECK(resp.disabledNodeMask, MtmNodeId-1)) {
350350
elog(WARNING, "Node %d think that I am dead", resp.node);
351+
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
351352
MtmSwitchClusterMode(MTM_RECOVERY);
352353
}
353-
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
354-
Mtm->disabledNodeMask |= resp.disabledNodeMask;
355354
return sd;
356355
}
357356
}
@@ -377,7 +376,7 @@ static void MtmOpenConnections()
377376
}
378377
if (Mtm->nNodes < MtmNodes/2+1) { /* no quorum */
379378
elog(WARNING, "Node is out of quorum: only %d nodes from %d are accssible", Mtm->nNodes, MtmNodes);
380-
Mtm->status = MTM_OFFLINE;
379+
Mtm->status = MTM_IN_MINORITY;
381380
} else if (Mtm->status == MTM_INITIALIZATION) {
382381
MtmSwitchClusterMode(MTM_CONNECTED);
383382
}
@@ -431,6 +430,7 @@ static void MtmAcceptOneConnection()
431430
resp.dxid = HANDSHAKE_MAGIC;
432431
resp.sxid = ShmemVariableCache->nextXid;
433432
resp.csn = MtmGetCurrentTime();
433+
resp.node = MtmNodeId;
434434
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
435435
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
436436
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);

multimaster.c

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ char const* const MtmNodeStatusMnem[] =
155155
"Offline",
156156
"Connected",
157157
"Online",
158-
"Recovery"
158+
"Recovery",
159+
"InMinor"
159160
};
160161

161162
bool MtmDoReplication;
@@ -631,10 +632,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
631632
x->isReplicated = false;
632633
x->isDistributed = MtmIsUserTransaction();
633634
x->isPrepared = false;
634-
if (x->isDistributed && Mtm->status != MTM_ONLINE) {
635+
x->isTransactionBlock = IsTransactionBlock();
636+
/* Application name can be cahnged usnig PGAPPNAME environment variable */
637+
if (x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
635638
/* reject all user's transactions at offline cluster */
636639
MtmUnlock();
637-
Assert(Mtm->status == MTM_ONLINE);
638640
elog(ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
639641
}
640642
x->containsDML = false;
@@ -981,11 +983,14 @@ bool MtmIsRecoveredNode(int nodeId)
981983
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
982984
* Is there some better way to establish mapping between nodes ad WAL-seconder?
983985
*/
986+
elog(WARNING,"Node %d is catching up", nodeId);
984987
MtmLock(LW_EXCLUSIVE);
985988
BIT_SET(Mtm->nodeLockerMask, nodeId-1);
986989
BIT_SET(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
987990
Mtm->nLockers += 1;
988991
MtmUnlock();
992+
} else {
993+
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, lockers %d\n", nodeId, MyWalSnd->sentPtr, GetXLogInsertRecPtr(), Mtm->nLockers);
989994
}
990995
return true;
991996
}
@@ -1022,7 +1027,7 @@ MtmCheckClusterLock()
10221027
break;
10231028
} else {
10241029
/* recovered replica catched up with master */
1025-
elog(WARNING, "WAL-sender %d complete receovery", i);
1030+
elog(WARNING, "WAL-sender %d complete recovery", i);
10261031
BIT_CLEAR(Mtm->walSenderLockerMask, i);
10271032
}
10281033
}
@@ -1608,8 +1613,9 @@ void MtmReceiverStarted(int nodeId)
16081613
if (!BIT_CHECK(Mtm->pglogicalNodeMask, nodeId-1)) {
16091614
BIT_SET(Mtm->pglogicalNodeMask, nodeId-1);
16101615
if (++Mtm->nReceivers == Mtm->nNodes-1) {
1611-
Assert(Mtm->status == MTM_CONNECTED);
1612-
MtmSwitchClusterMode(MTM_ONLINE);
1616+
if (Mtm->status == MTM_CONNECTED) {
1617+
MtmSwitchClusterMode(MTM_ONLINE);
1618+
}
16131619
}
16141620
}
16151621
SpinLockRelease(&Mtm->spinlock);
@@ -1622,19 +1628,28 @@ void MtmReceiverStarted(int nodeId)
16221628
*/
16231629
MtmSlotMode MtmReceiverSlotMode(int nodeId)
16241630
{
1631+
bool recovery = false;
16251632
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) {
1633+
MTM_INFO("%d: receiver slot mode %s\n", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
16261634
if (Mtm->status == MTM_RECOVERY) {
1635+
recovery = true;
16271636
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) {
16281637
/* Choose for recovery first available slot */
1638+
elog(WARNING, "Start recovery from node %d", nodeId);
16291639
Mtm->recoverySlot = nodeId;
16301640
return SLOT_OPEN_EXISTED;
16311641
}
16321642
}
16331643
/* delay opening of other slots until recovery is completed */
16341644
MtmSleep(STATUS_POLL_DELAY);
16351645
}
1646+
if (recovery) {
1647+
elog(WARNING, "Recreate replication slot for node %d after end of recovery", nodeId);
1648+
} else {
1649+
MTM_INFO("%d: Reuse replication slot for node %d\n", MyProcPid, nodeId);
1650+
}
16361651
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
1637-
return Mtm->recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
1652+
return recovery ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
16381653
}
16391654

16401655
static bool MtmIsBroadcast()
@@ -1690,7 +1705,11 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16901705
static bool
16911706
MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
16921707
{
1693-
return args->origin_id == InvalidRepOriginId || MtmIsRecoveredNode(MtmReplicationNodeId);
1708+
bool res = Mtm->status != MTM_RECOVERY
1709+
&& (args->origin_id == InvalidRepOriginId
1710+
|| MtmIsRecoveredNode(MtmReplicationNodeId));
1711+
MTM_TRACE("%d: MtmReplicationTxnFilterHook->%d\n", MyProcPid, res);
1712+
return res;
16941713
}
16951714

16961715
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
2828
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
2929
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
30+
#define MULTIMASTER_ADMIN "mtm_admin"
3031

3132
#define USEC 1000000
3233

pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
103103
{
104104
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
105105
csn_t csn = MtmTransactionSnapshot(txn->xid);
106-
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n", txn->xid, csn);
106+
MTM_INFO("%d: pglogical_write_begin %d CSN=%ld\n", MyProcPid, txn->xid, csn);
107107

108108
if (csn == INVALID_CSN && !isRecovery) {
109109
MtmIsFilteredTxn = true;

0 commit comments

Comments
 (0)