Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b9226d0

Browse files
committed
buggy
1 parent bc37ba2 commit b9226d0

10 files changed

+207
-217
lines changed

arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,7 +999,7 @@ static void MtmReceiver(Datum arg)
999999
default:
10001000
break;
10011001
}
1002-
if (BIT_CHECK(msg->disabledNodeMask, node-1)) {
1002+
if (BIT_CHECK(msg->disabledNodeMask, node-1) || BIT_CHECK(Mtm->disabledNodeMask, node-1)) {
10031003
MTM_ELOG(WARNING, "Ignore message from dead node %d\n", node);
10041004
continue;
10051005
}

multimaster.c

Lines changed: 77 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ char const* const MtmNodeStatusMnem[] =
225225
"Recovery",
226226
"Recovered",
227227
"InMinor",
228+
"OutOfClique",
228229
"OutOfService"
229230
};
230231

@@ -373,6 +374,7 @@ void MtmLock(LWLockMode mode)
373374
if (mode == LW_EXCLUSIVE) {
374375
Assert(MtmLockCount == 0);
375376
Mtm->lastLockHolder = MyProcPid;
377+
Assert(MyProcPid);
376378
MtmLockCount = 1;
377379
}
378380
}
@@ -1155,7 +1157,6 @@ bool MtmWatchdog(timestamp_t now)
11551157
MTM_LOG1("[STATE] Node %i: Disconnect due to heartbeat timeout (%d msec)",
11561158
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
11571159
MtmOnNodeDisconnect(i+1);
1158-
MtmDisableNode(i+1);
11591160
allAlive = false;
11601161
}
11611162
}
@@ -1176,6 +1177,7 @@ void MtmPrecommitTransaction(char const* gid)
11761177
MTM_ELOG(WARNING, "MtmPrecommitTransaction: transaction '%s' is not found", gid);
11771178
} else {
11781179
MtmTransState* ts = tm->state;
1180+
// Assert(ts != NULL);
11791181
if (ts == NULL) {
11801182
MTM_ELOG(WARNING, "MtmPrecommitTransaction: transaction '%s' is not yet prepared, status %s", gid, MtmTxnStatusMnem[tm->status]);
11811183
MtmUnlock();
@@ -1501,6 +1503,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
15011503
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
15021504
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
15031505
{
1506+
MtmUnlock();
15041507
MTM_ELOG(ERROR, "Attempt to commit %s transaction %s (%llu)",
15051508
MtmTxnStatusMnem[ts->status], ts->gid, (long64)ts->xid);
15061509
}
@@ -2026,15 +2029,19 @@ static int64 MtmGetSlotLag(int nodeId)
20262029
*/
20272030
bool MtmIsRecoveredNode(int nodeId)
20282031
{
2029-
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2030-
if (!MtmIsRecoverySession) {
2031-
MTM_ELOG(ERROR, "Node %d is marked as disabled but is not in recovery mode", nodeId);
2032-
}
2033-
return true;
2034-
} else {
2035-
MtmIsRecoverySession = false; /* recovery is completed */
2036-
return false;
2037-
}
2032+
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
2033+
Assert(!MtmIsRecoverySession);
2034+
2035+
return BIT_CHECK(Mtm->disabledNodeMask, nodeId-1) && MtmIsRecoverySession;
2036+
// if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2037+
// if (!MtmIsRecoverySession) {
2038+
// MTM_ELOG(WARNING, "Node %d is marked as disabled but is not in recovery mode", nodeId);
2039+
// }
2040+
// return true;
2041+
// } else {
2042+
// MtmIsRecoverySession = false; /* recovery is completed */
2043+
// return false;
2044+
// }
20382045
}
20392046

20402047
/*
@@ -2060,7 +2067,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20602067
*/
20612068
MTM_LOG1("Node %d is almost caught-up: slot position %llx, WAL position %llx, active transactions %d",
20622069
nodeId, slotLSN, walLSN, Mtm->nActiveTransactions);
2063-
BIT_SET(Mtm->originLockNodeMask, nodeId-1);
2070+
BIT_SET(Mtm->originLockNodeMask, nodeId-1); // XXXX: log that
20642071
} else {
20652072
MTM_LOG2("Continue recovery of node %d, slot position %llx, WAL position %llx,"
20662073
" WAL sender position %llx, lockers %llx, active transactions %d", nodeId, slotLSN,
@@ -2082,6 +2089,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20822089
if (MtmIsRecoveredNode(nodeId) && Mtm->nActiveTransactions == 0) {
20832090
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20842091
caughtUp = true;
2092+
MtmIsRecoverySession = false;
20852093
}
20862094
MtmUnlock();
20872095
return caughtUp;
@@ -2099,6 +2107,7 @@ MtmLockCluster(void)
20992107
}
21002108
MtmLock(LW_EXCLUSIVE);
21012109
if (BIT_CHECK(Mtm->originLockNodeMask, MtmNodeId-1)) {
2110+
MtmUnlock();
21022111
elog(ERROR, "There is already pending exclusive lock");
21032112
}
21042113
BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1);
@@ -2351,6 +2360,7 @@ static void MtmInitialize()
23512360
Mtm->nLiveNodes = 0; //MtmNodes;
23522361
Mtm->nAllNodes = MtmNodes;
23532362
Mtm->disabledNodeMask = 7; //XXXX
2363+
Mtm->clique = 7; // XXXX
23542364
Mtm->stalledNodeMask = 0;
23552365
Mtm->stoppedNodeMask = 0;
23562366
Mtm->deadNodeMask = 0;
@@ -2383,7 +2393,7 @@ static void MtmInitialize()
23832393
for (i = 0; i < MtmNodes; i++) {
23842394
Mtm->nodes[i].oldestSnapshot = 0;
23852395
Mtm->nodes[i].disabledNodeMask = 0;
2386-
Mtm->nodes[i].connectivityMask = 7;
2396+
Mtm->nodes[i].connectivityMask = 7; // XXXX
23872397
Mtm->nodes[i].lockGraphUsed = 0;
23882398
Mtm->nodes[i].lockGraphAllocated = 0;
23892399
Mtm->nodes[i].lockGraphData = NULL;
@@ -3308,17 +3318,56 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33083318
Mtm->preparedTransactionsLoaded = true;
33093319
}
33103320

3311-
while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1) ||
3312-
BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1))
3321+
// while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1) ||
3322+
// BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3323+
// !BIT_CHECK(Mtm->clique, nodeId - 1) ||
3324+
// !BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
3325+
// {
3326+
// if (*shutdown)
3327+
// {
3328+
// MtmUnlock();
3329+
// return REPLMODE_EXIT;
3330+
// }
3331+
3332+
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) &&
3333+
// (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1)))
3334+
// {
3335+
// /* Lock on us */
3336+
// Mtm->recoverySlot = nodeId;
3337+
// MtmPollStatusOfPreparedTransactions();
3338+
// MtmUnlock();
3339+
// return REPLMODE_RECOVERY;
3340+
// }
3341+
3342+
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3343+
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3344+
3345+
// MtmUnlock();
3346+
// /* delay opening of other slots until recovery is completed */
3347+
// MtmSleep(STATUS_POLL_DELAY);
3348+
// MtmLock(LW_EXCLUSIVE);
3349+
// }
3350+
3351+
// MtmUnlock();
3352+
3353+
// return REPLMODE_RECOVERED;
3354+
3355+
/* Await until node is connected and both receiver and sender are in clique */
3356+
while (BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3357+
!BIT_CHECK(Mtm->clique, nodeId - 1) ||
3358+
!BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
33133359
{
3360+
MtmUnlock();
33143361
if (*shutdown)
3315-
{
3316-
MtmUnlock();
33173362
return REPLMODE_EXIT;
3318-
}
3363+
MtmSleep(STATUS_POLL_DELAY);
3364+
MtmLock(LW_EXCLUSIVE);
3365+
}
33193366

3320-
if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) &&
3321-
(!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1)))
3367+
if (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1))
3368+
{
3369+
/* Ok, then start recovery by luckiest walreceiver */
3370+
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
33223371
{
33233372
/* Lock on us */
33243373
Mtm->recoverySlot = nodeId;
@@ -3327,87 +3376,19 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33273376
return REPLMODE_RECOVERY;
33283377
}
33293378

3330-
MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3331-
nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3332-
3333-
MtmUnlock();
3334-
/* delay opening of other slots until recovery is completed */
3335-
MtmSleep(STATUS_POLL_DELAY);
3336-
MtmLock(LW_EXCLUSIVE);
3379+
/* And force less lucky walreceivers wait until recovery is completed */
3380+
while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1))
3381+
{
3382+
MtmUnlock();
3383+
if (*shutdown)
3384+
return REPLMODE_EXIT;
3385+
MtmSleep(STATUS_POLL_DELAY);
3386+
MtmLock(LW_EXCLUSIVE);
3387+
}
33373388
}
33383389

33393390
MtmUnlock();
3340-
33413391
return REPLMODE_RECOVERED;
3342-
3343-
3344-
3345-
3346-
3347-
// while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_RECOVERED && Mtm->status != MTM_ONLINE)
3348-
// || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3349-
// // while (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3350-
// {
3351-
// if (*shutdown)
3352-
// {
3353-
// MtmUnlock();
3354-
// return REPLMODE_EXIT;
3355-
// }
3356-
// // /* We are not interested in receiving any deteriorated logical messages from recovered node, so recreate slot */
3357-
// // if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
3358-
// // mode = REPLMODE_CREATE_NEW;
3359-
// // }
3360-
// // MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
3361-
3362-
// if (Mtm->status == MTM_RECOVERY) {
3363-
// mode = REPLMODE_RECOVERED;
3364-
// /* Choose node for recovery if
3365-
// * 1. It is not chosen yet or the same node was chosen before
3366-
// * 2. It is donor node or there is no donor node
3367-
// * 3. Connections with all other live nodes were established
3368-
// */
3369-
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3370-
// && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId)
3371-
// && (SELF_CONNECTIVITY_MASK & ~Mtm->disabledNodeMask) == 0)
3372-
// {
3373-
// /* Choose for recovery first available slot or slot of donor node (if any) */
3374-
// if (Mtm->nAllNodes >= 3) {
3375-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d restartLSNs={%llx, %llx, %llx}",
3376-
// MyProcPid, nodeId, Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
3377-
// } else {
3378-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3379-
// }
3380-
// Mtm->recoverySlot = nodeId;
3381-
// // Mtm->nReceivers = 0;
3382-
// // Mtm->nSenders = 0;
3383-
// // Mtm->recoveryCount += 1;
3384-
// // Mtm->pglogicalReceiverMask = 0;
3385-
// // Mtm->pglogicalSenderMask = 0;
3386-
// MtmPollStatusOfPreparedTransactions();
3387-
// MtmUnlock();
3388-
// return REPLMODE_RECOVERY;
3389-
// }
3390-
// }
3391-
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3392-
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3393-
// MtmUnlock();
3394-
// /* delay opening of other slots until recovery is completed */
3395-
// MtmSleep(STATUS_POLL_DELAY);
3396-
// MtmLock(LW_EXCLUSIVE);
3397-
// }
3398-
// if (Mtm->status == MTM_RECOVERED) {
3399-
// mode = REPLMODE_RECOVERED;
3400-
// }
3401-
// // if (mode == REPLMODE_RECOVERED) {
3402-
// // MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
3403-
// // } else if (mode == REPLMODE_CREATE_NEW) {
3404-
// // MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
3405-
// // } else {
3406-
// // MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
3407-
// // }
3408-
// BIT_SET(Mtm->reconnectMask, nodeId-1); /* arbiter should try to reestablish connection with this node */
3409-
// MtmUnlock();
3410-
// return mode;
34113392
}
34123393

34133394
static bool MtmIsBroadcast()

multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ typedef enum
108108
/* Identifier of global transaction */
109109
typedef struct
110110
{
111-
int node; /* Zero based index of node initiating transaction */
111+
int node; /* One based id of node initiating transaction */
112112
TransactionId xid; /* Transaction ID at this node */
113113
} GlobalTransactionId;
114114

@@ -137,6 +137,7 @@ typedef enum
137137
MTM_RECOVERY, /* Node is in recovery process */
138138
MTM_RECOVERED, /* Node is recovered by is not yet switched to ONLINE because not all sender/receivers are restarted */
139139
MTM_IN_MINORITY, /* Node is out of quorum */
140+
MTM_OUT_OF_CLIQUE, /* Node is out of cluster by clique detector */
140141
MTM_OUT_OF_SERVICE /* Node is not available to to critical, non-recoverable error */
141142
} MtmNodeStatus;
142143

@@ -288,6 +289,7 @@ typedef struct
288289
LWLockPadded *locks; /* multimaster lock tranche */
289290
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
290291
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes */
292+
nodemask_t clique; /* Bitmask of nodes that are connected and we allowed to connect/send wal/receive wal with them */
291293
nodemask_t deadNodeMask; /* Bitmask of nodes considered as dead by referee */
292294
nodemask_t recoveredNodeMask; /* Bitmask of nodes recoverd after been reported as dead by referee */
293295
nodemask_t stalledNodeMask; /* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
@@ -417,7 +419,6 @@ extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* co
417419
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
418420
extern bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr);
419421
extern void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN);
420-
extern void MtmRecoveryCompleted(void);
421422
extern void MtmMakeTableLocal(char const* schema, char const* name);
422423
extern void MtmHandleApplyError(void);
423424
extern void MtmUpdateLsnMapping(int nodeId, lsn_t endLsn);

pglogical_apply.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ process_remote_commit(StringInfo in)
658658
csn_t csn;
659659
lsn_t end_lsn;
660660
lsn_t origin_lsn;
661+
lsn_t commit_lsn;
661662
int origin_node;
662663
char gid[MULTIMASTER_MAX_GID_SIZE];
663664

@@ -668,7 +669,7 @@ process_remote_commit(StringInfo in)
668669
MtmReplicationNodeId = pq_getmsgbyte(in);
669670

670671
/* read fields */
671-
pq_getmsgint64(in); /* commit_lsn */
672+
commit_lsn = pq_getmsgint64(in); /* commit_lsn */
672673
end_lsn = pq_getmsgint64(in); /* end_lsn */
673674
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
674675

@@ -692,7 +693,7 @@ process_remote_commit(StringInfo in)
692693
}
693694
case PGLOGICAL_COMMIT:
694695
{
695-
MTM_LOG2("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
696+
MTM_LOG1("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
696697
if (IsTransactionState()) {
697698
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
698699
MtmBeginSession(origin_node);
@@ -969,10 +970,10 @@ process_remote_update(StringInfo s, Relation rel)
969970
{
970971
StringInfoData o;
971972
initStringInfo(&o);
972-
tuple_to_stringinfo(&o, RelationGetDescr(rel), oldslot->tts_tuple);
973+
tuple_to_stringinfo(&o, RelationGetDescr(rel), oldslot->tts_tuple, false);
973974
appendStringInfo(&o, " to");
974-
tuple_to_stringinfo(&o, RelationGetDescr(rel), remote_tuple);
975-
MTM_LOG1(DEBUG1, "UPDATE:%s", o.data);
975+
tuple_to_stringinfo(&o, RelationGetDescr(rel), remote_tuple, false);
976+
MTM_LOG1("%lu: UPDATE: %s", GetCurrentTransactionId(), o.data);
976977
resetStringInfo(&o);
977978
}
978979
#endif
@@ -1191,7 +1192,6 @@ void MtmExecutor(void* work, size_t size)
11911192
}
11921193
case 'Z':
11931194
{
1194-
// MtmRecoveryCompleted();
11951195
MtmStateProcessEvent(MTM_RECOVERY_FINISH2);
11961196
inside_transaction = false;
11971197
break;

0 commit comments

Comments
 (0)