Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d5a5f87

Browse files
committed
buggy
1 parent e681d3f commit d5a5f87

File tree

10 files changed

+211
-219
lines changed

10 files changed

+211
-219
lines changed

contrib/mmts/arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -991,7 +991,7 @@ static void MtmReceiver(Datum arg)
991991
default:
992992
break;
993993
}
994-
if (BIT_CHECK(msg->disabledNodeMask, node-1)) {
994+
if (BIT_CHECK(msg->disabledNodeMask, node-1) || BIT_CHECK(Mtm->disabledNodeMask, node-1)) {
995995
MTM_ELOG(WARNING, "Ignore message from dead node %d\n", node);
996996
continue;
997997
}

contrib/mmts/multimaster.c

Lines changed: 81 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ char const* const MtmNodeStatusMnem[] =
221221
"Recovery",
222222
"Recovered",
223223
"InMinor",
224+
"OutOfClique",
224225
"OutOfService"
225226
};
226227

@@ -366,6 +367,7 @@ void MtmLock(LWLockMode mode)
366367
if (mode == LW_EXCLUSIVE) {
367368
Assert(MtmLockCount == 0);
368369
Mtm->lastLockHolder = MyProcPid;
370+
Assert(MyProcPid);
369371
MtmLockCount = 1;
370372
}
371373
}
@@ -1145,7 +1147,6 @@ bool MtmWatchdog(timestamp_t now)
11451147
MTM_LOG1("[STATE] Node %i: Disconnect due to heartbeat timeout (%d msec)",
11461148
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
11471149
MtmOnNodeDisconnect(i+1);
1148-
MtmDisableNode(i+1);
11491150
allAlive = false;
11501151
}
11511152
}
@@ -1166,8 +1167,11 @@ void MtmPrecommitTransaction(char const* gid)
11661167
MTM_ELOG(WARNING, "MtmPrecommitTransaction: transaction '%s' is not found", gid);
11671168
} else {
11681169
MtmTransState* ts = tm->state;
1169-
Assert(ts != NULL);
1170-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
1170+
// Assert(ts != NULL);
1171+
if (ts == NULL) {
1172+
MTM_ELOG(WARNING, "MtmPrecommitTransaction: transaction '%s' is not yet prepared, status %s", gid, MtmTxnStatusMnem[tm->status]);
1173+
MtmUnlock();
1174+
} else if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
11711175
ts->status = TRANSACTION_STATUS_UNKNOWN;
11721176
ts->csn = MtmAssignCSN();
11731177
MtmAdjustSubtransactions(ts);
@@ -1489,6 +1493,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
14891493
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
14901494
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
14911495
{
1496+
MtmUnlock();
14921497
MTM_ELOG(ERROR, "Attempt to commit %s transaction %s (%llu)",
14931498
MtmTxnStatusMnem[ts->status], ts->gid, (long64)ts->xid);
14941499
}
@@ -2014,15 +2019,19 @@ static int64 MtmGetSlotLag(int nodeId)
20142019
*/
20152020
bool MtmIsRecoveredNode(int nodeId)
20162021
{
2017-
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2018-
if (!MtmIsRecoverySession) {
2019-
MTM_ELOG(ERROR, "Node %d is marked as disabled but is not in recovery mode", nodeId);
2020-
}
2021-
return true;
2022-
} else {
2023-
MtmIsRecoverySession = false; /* recovery is completed */
2024-
return false;
2025-
}
2022+
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
2023+
Assert(!MtmIsRecoverySession);
2024+
2025+
return BIT_CHECK(Mtm->disabledNodeMask, nodeId-1) && MtmIsRecoverySession;
2026+
// if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2027+
// if (!MtmIsRecoverySession) {
2028+
// MTM_ELOG(WARNING, "Node %d is marked as disabled but is not in recovery mode", nodeId);
2029+
// }
2030+
// return true;
2031+
// } else {
2032+
// MtmIsRecoverySession = false; /* recovery is completed */
2033+
// return false;
2034+
// }
20262035
}
20272036

20282037
/*
@@ -2048,7 +2057,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20482057
*/
20492058
MTM_LOG1("Node %d is almost caught-up: slot position %llx, WAL position %llx, active transactions %d",
20502059
nodeId, slotLSN, walLSN, Mtm->nActiveTransactions);
2051-
BIT_SET(Mtm->originLockNodeMask, nodeId-1);
2060+
BIT_SET(Mtm->originLockNodeMask, nodeId-1); // XXXX: log that
20522061
} else {
20532062
MTM_LOG2("Continue recovery of node %d, slot position %llx, WAL position %llx,"
20542063
" WAL sender position %llx, lockers %llx, active transactions %d", nodeId, slotLSN,
@@ -2070,6 +2079,7 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20702079
if (MtmIsRecoveredNode(nodeId) && Mtm->nActiveTransactions == 0) {
20712080
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_RECOVERY_CAUGHTUP);
20722081
caughtUp = true;
2082+
MtmIsRecoverySession = false;
20732083
}
20742084
MtmUnlock();
20752085
return caughtUp;
@@ -2087,6 +2097,7 @@ MtmLockCluster(void)
20872097
}
20882098
MtmLock(LW_EXCLUSIVE);
20892099
if (BIT_CHECK(Mtm->originLockNodeMask, MtmNodeId-1)) {
2100+
MtmUnlock();
20902101
elog(ERROR, "There is already pending exclusive lock");
20912102
}
20922103
BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1);
@@ -2339,6 +2350,7 @@ static void MtmInitialize()
23392350
Mtm->nLiveNodes = 0; //MtmNodes;
23402351
Mtm->nAllNodes = MtmNodes;
23412352
Mtm->disabledNodeMask = 7; //XXXX
2353+
Mtm->clique = 7; // XXXX
23422354
Mtm->stalledNodeMask = 0;
23432355
Mtm->stoppedNodeMask = 0;
23442356
Mtm->deadNodeMask = 0;
@@ -2371,7 +2383,7 @@ static void MtmInitialize()
23712383
for (i = 0; i < MtmNodes; i++) {
23722384
Mtm->nodes[i].oldestSnapshot = 0;
23732385
Mtm->nodes[i].disabledNodeMask = 0;
2374-
Mtm->nodes[i].connectivityMask = 7;
2386+
Mtm->nodes[i].connectivityMask = 7; // XXXX
23752387
Mtm->nodes[i].lockGraphUsed = 0;
23762388
Mtm->nodes[i].lockGraphAllocated = 0;
23772389
Mtm->nodes[i].lockGraphData = NULL;
@@ -3214,17 +3226,56 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32143226
Mtm->preparedTransactionsLoaded = true;
32153227
}
32163228

3217-
while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1) ||
3218-
BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1))
3229+
// while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1) ||
3230+
// BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3231+
// !BIT_CHECK(Mtm->clique, nodeId - 1) ||
3232+
// !BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
3233+
// {
3234+
// if (*shutdown)
3235+
// {
3236+
// MtmUnlock();
3237+
// return REPLMODE_EXIT;
3238+
// }
3239+
3240+
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) &&
3241+
// (!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1)))
3242+
// {
3243+
// /* Lock on us */
3244+
// Mtm->recoverySlot = nodeId;
3245+
// MtmPollStatusOfPreparedTransactions();
3246+
// MtmUnlock();
3247+
// return REPLMODE_RECOVERY;
3248+
// }
3249+
3250+
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3251+
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3252+
3253+
// MtmUnlock();
3254+
// /* delay opening of other slots until recovery is completed */
3255+
// MtmSleep(STATUS_POLL_DELAY);
3256+
// MtmLock(LW_EXCLUSIVE);
3257+
// }
3258+
3259+
// MtmUnlock();
3260+
3261+
// return REPLMODE_RECOVERED;
3262+
3263+
/* Await until node is connected and both receiver and sender are in clique */
3264+
while (BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1) ||
3265+
!BIT_CHECK(Mtm->clique, nodeId - 1) ||
3266+
!BIT_CHECK(Mtm->clique, MtmNodeId - 1) )
32193267
{
3268+
MtmUnlock();
32203269
if (*shutdown)
3221-
{
3222-
MtmUnlock();
32233270
return REPLMODE_EXIT;
3224-
}
3271+
MtmSleep(STATUS_POLL_DELAY);
3272+
MtmLock(LW_EXCLUSIVE);
3273+
}
32253274

3226-
if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) &&
3227-
(!BIT_CHECK(SELF_CONNECTIVITY_MASK, nodeId - 1)))
3275+
if (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1))
3276+
{
3277+
/* Ok, then start recovery by luckiest walreceiver */
3278+
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
32283279
{
32293280
/* Lock on us */
32303281
Mtm->recoverySlot = nodeId;
@@ -3233,87 +3284,19 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32333284
return REPLMODE_RECOVERY;
32343285
}
32353286

3236-
MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3237-
nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3238-
3239-
MtmUnlock();
3240-
/* delay opening of other slots until recovery is completed */
3241-
MtmSleep(STATUS_POLL_DELAY);
3242-
MtmLock(LW_EXCLUSIVE);
3287+
/* And force less lucky walreceivers wait until recovery is completed */
3288+
while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1))
3289+
{
3290+
MtmUnlock();
3291+
if (*shutdown)
3292+
return REPLMODE_EXIT;
3293+
MtmSleep(STATUS_POLL_DELAY);
3294+
MtmLock(LW_EXCLUSIVE);
3295+
}
32433296
}
32443297

32453298
MtmUnlock();
3246-
32473299
return REPLMODE_RECOVERED;
3248-
3249-
3250-
3251-
3252-
3253-
// while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_RECOVERED && Mtm->status != MTM_ONLINE)
3254-
// || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3255-
// // while (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3256-
// {
3257-
// if (*shutdown)
3258-
// {
3259-
// MtmUnlock();
3260-
// return REPLMODE_EXIT;
3261-
// }
3262-
// // /* We are not interested in receiving any deteriorated logical messages from recovered node, so recreate slot */
3263-
// // if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
3264-
// // mode = REPLMODE_CREATE_NEW;
3265-
// // }
3266-
// // MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
3267-
3268-
// if (Mtm->status == MTM_RECOVERY) {
3269-
// mode = REPLMODE_RECOVERED;
3270-
// /* Choose node for recovery if
3271-
// * 1. It is not chosen yet or the same node was chosen before
3272-
// * 2. It is donor node or there is no donor node
3273-
// * 3. Connections with all other live nodes were established
3274-
// */
3275-
// if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3276-
// && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId)
3277-
// && (SELF_CONNECTIVITY_MASK & ~Mtm->disabledNodeMask) == 0)
3278-
// {
3279-
// /* Choose for recovery first available slot or slot of donor node (if any) */
3280-
// if (Mtm->nAllNodes >= 3) {
3281-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d restartLSNs={%llx, %llx, %llx}",
3282-
// MyProcPid, nodeId, Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
3283-
// } else {
3284-
// MTM_ELOG(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3285-
// }
3286-
// Mtm->recoverySlot = nodeId;
3287-
// // Mtm->nReceivers = 0;
3288-
// // Mtm->nSenders = 0;
3289-
// // Mtm->recoveryCount += 1;
3290-
// // Mtm->pglogicalReceiverMask = 0;
3291-
// // Mtm->pglogicalSenderMask = 0;
3292-
// MtmPollStatusOfPreparedTransactions();
3293-
// MtmUnlock();
3294-
// return REPLMODE_RECOVERY;
3295-
// }
3296-
// }
3297-
// MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3298-
// nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
3299-
// MtmUnlock();
3300-
// /* delay opening of other slots until recovery is completed */
3301-
// MtmSleep(STATUS_POLL_DELAY);
3302-
// MtmLock(LW_EXCLUSIVE);
3303-
// }
3304-
// if (Mtm->status == MTM_RECOVERED) {
3305-
// mode = REPLMODE_RECOVERED;
3306-
// }
3307-
// // if (mode == REPLMODE_RECOVERED) {
3308-
// // MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
3309-
// // } else if (mode == REPLMODE_CREATE_NEW) {
3310-
// // MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
3311-
// // } else {
3312-
// // MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
3313-
// // }
3314-
// BIT_SET(Mtm->reconnectMask, nodeId-1); /* arbiter should try to reestablish connection with this node */
3315-
// MtmUnlock();
3316-
// return mode;
33173300
}
33183301

33193302
static bool MtmIsBroadcast()

contrib/mmts/multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ typedef enum
108108
/* Identifier of global transaction */
109109
typedef struct
110110
{
111-
int node; /* Zero based index of node initiating transaction */
111+
int node; /* One based id of node initiating transaction */
112112
TransactionId xid; /* Transaction ID at this node */
113113
} GlobalTransactionId;
114114

@@ -137,6 +137,7 @@ typedef enum
137137
MTM_RECOVERY, /* Node is in recovery process */
138138
MTM_RECOVERED, /* Node is recovered by is not yet switched to ONLINE because not all sender/receivers are restarted */
139139
MTM_IN_MINORITY, /* Node is out of quorum */
140+
MTM_OUT_OF_CLIQUE, /* Node is out of cluster by clique detector */
140141
MTM_OUT_OF_SERVICE /* Node is not available to to critical, non-recoverable error */
141142
} MtmNodeStatus;
142143

@@ -288,6 +289,7 @@ typedef struct
288289
LWLockPadded *locks; /* multimaster lock tranche */
289290
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
290291
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes */
292+
nodemask_t clique; /* Bitmask of nodes that are connected and we allowed to connect/send wal/receive wal with them */
291293
nodemask_t deadNodeMask; /* Bitmask of nodes considered as dead by referee */
292294
nodemask_t recoveredNodeMask; /* Bitmask of nodes recoverd after been reported as dead by referee */
293295
nodemask_t stalledNodeMask; /* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
@@ -415,7 +417,6 @@ extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* co
415417
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
416418
extern bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr);
417419
extern void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN);
418-
extern void MtmRecoveryCompleted(void);
419420
extern void MtmMakeTableLocal(char const* schema, char const* name);
420421
extern void MtmHandleApplyError(void);
421422
extern void MtmUpdateLsnMapping(int nodeId, lsn_t endLsn);

contrib/mmts/pglogical_apply.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -658,6 +658,7 @@ process_remote_commit(StringInfo in)
658658
csn_t csn;
659659
lsn_t end_lsn;
660660
lsn_t origin_lsn;
661+
lsn_t commit_lsn;
661662
int origin_node;
662663
char gid[MULTIMASTER_MAX_GID_SIZE];
663664

@@ -668,7 +669,7 @@ process_remote_commit(StringInfo in)
668669
MtmReplicationNodeId = pq_getmsgbyte(in);
669670

670671
/* read fields */
671-
pq_getmsgint64(in); /* commit_lsn */
672+
commit_lsn = pq_getmsgint64(in); /* commit_lsn */
672673
end_lsn = pq_getmsgint64(in); /* end_lsn */
673674
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
674675

@@ -692,7 +693,7 @@ process_remote_commit(StringInfo in)
692693
}
693694
case PGLOGICAL_COMMIT:
694695
{
695-
MTM_LOG2("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
696+
MTM_LOG1("%d: PGLOGICAL_COMMIT %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
696697
if (IsTransactionState()) {
697698
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
698699
MtmBeginSession(origin_node);
@@ -969,10 +970,10 @@ process_remote_update(StringInfo s, Relation rel)
969970
{
970971
StringInfoData o;
971972
initStringInfo(&o);
972-
tuple_to_stringinfo(&o, RelationGetDescr(rel), oldslot->tts_tuple);
973+
tuple_to_stringinfo(&o, RelationGetDescr(rel), oldslot->tts_tuple, false);
973974
appendStringInfo(&o, " to");
974-
tuple_to_stringinfo(&o, RelationGetDescr(rel), remote_tuple);
975-
MTM_LOG1(DEBUG1, "UPDATE:%s", o.data);
975+
tuple_to_stringinfo(&o, RelationGetDescr(rel), remote_tuple, false);
976+
MTM_LOG1("%lu: UPDATE: %s", GetCurrentTransactionId(), o.data);
976977
resetStringInfo(&o);
977978
}
978979
#endif
@@ -1191,7 +1192,6 @@ void MtmExecutor(void* work, size_t size)
11911192
}
11921193
case 'Z':
11931194
{
1194-
// MtmRecoveryCompleted();
11951195
MtmStateProcessEvent(MTM_RECOVERY_FINISH2);
11961196
inside_transaction = false;
11971197
break;

0 commit comments

Comments
 (0)