Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6275e9c

Browse files
committed
Fix handling of connectivity mask
1 parent a4a6557 commit 6275e9c

File tree

6 files changed

+82
-46
lines changed

6 files changed

+82
-46
lines changed

contrib/mmts/arbiter.c

+13-3
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,11 @@ static void MtmSendHeartbeat()
369369
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
370370
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
371371
} else {
372+
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
373+
if (BIT_CHECK(Mtm->connectivityMask, i)) {
374+
MtmDisconnect(i);
375+
//MtmOnNodeConnect(i+1);
376+
}
372377
MTM_LOG4("Send heartbeat to node %d with timestamp %ld", i+1, now);
373378
}
374379
} else {
@@ -560,7 +565,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
560565
result = false;
561566
break;
562567
}
563-
MTM_LOG3("Arbiter reestablish connection with node %d", node+1);
568+
MTM_LOG1("Arbiter reestablish connection with node %d", node+1);
564569
} else {
565570
result = true;
566571
break;
@@ -718,7 +723,7 @@ static void MtmSender(Datum arg)
718723
* Use shared lock to improve locality,
719724
* because all other process modifying this list are using exclusive lock
720725
*/
721-
MtmLock(LW_SHARED);
726+
SpinLockAcquire(&Mtm->queueSpinlock);
722727

723728
for (curr = Mtm->sendQueue; curr != NULL; curr = next) {
724729
next = curr->next;
@@ -728,7 +733,7 @@ static void MtmSender(Datum arg)
728733
}
729734
Mtm->sendQueue = NULL;
730735

731-
MtmUnlock();
736+
SpinLockRelease(&Mtm->queueSpinlock);
732737

733738
for (i = 0; i < Mtm->nAllNodes; i++) {
734739
if (txBuffer[i].used != 0) {
@@ -892,6 +897,11 @@ static void MtmReceiver(Datum arg)
892897
int node = msg->node;
893898

894899
Assert(node > 0 && node <= nNodes && node != MtmNodeId);
900+
901+
if (Mtm->nodes[node-1].connectivityMask != msg->connectivityMask) {
902+
elog(LOG, "Node %d changes it connectivity mask from %llx to %llx", node, (long long)Mtm->nodes[node-1].connectivityMask, (long long)msg->connectivityMask);
903+
}
904+
895905
Mtm->nodes[node-1].oldestSnapshot = msg->oldestSnapshot;
896906
Mtm->nodes[node-1].disabledNodeMask = msg->disabledNodeMask;
897907
Mtm->nodes[node-1].connectivityMask = msg->connectivityMask;

contrib/mmts/multimaster.c

+52-41
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,6 @@ typedef struct {
8989
pgid_t gid; /* global transaction identifier (used by 2pc) */
9090
} MtmCurrentTrans;
9191

92-
/* #define USE_SPINLOCK 1 */
93-
9492
typedef enum
9593
{
9694
MTM_STATE_LOCK_ID
@@ -245,6 +243,7 @@ static int MtmMaxRecoveryLag;
245243
static int MtmGcPeriod;
246244
static bool MtmIgnoreTablesWithoutPk;
247245
static int MtmLockCount;
246+
static int MtmSenderStarted;
248247

249248
static ExecutorStart_hook_type PreviousExecutorStartHook;
250249
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -273,16 +272,12 @@ void MtmLock(LWLockMode mode)
273272
return;
274273
}
275274
}
276-
#ifdef USE_SPINLOCK
277-
SpinLockAcquire(&Mtm->spinlock);
278-
#else
279275
start = MtmGetSystemTime();
280276
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID], mode);
281277
stop = MtmGetSystemTime();
282278
if (stop > start + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
283279
MTM_LOG1("%d: obtaining %s lock takes %ld microseconds", MyProcPid, (mode == LW_EXCLUSIVE ? "exclusive" : "shared"), stop - start);
284280
}
285-
#endif
286281
Mtm->lastLockHolder = MyProcPid;
287282
}
288283

@@ -291,11 +286,7 @@ void MtmUnlock(void)
291286
if (MtmLockCount != 0 && --MtmLockCount != 0) {
292287
return;
293288
}
294-
#ifdef USE_SPINLOCK
295-
SpinLockRelease(&Mtm->spinlock);
296-
#else
297289
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
298-
#endif
299290
Mtm->lastLockHolder = 0;
300291
}
301292

@@ -1231,7 +1222,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12311222
if (commit) {
12321223
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
12331224
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
1234-
{ Assert(false);
1225+
{
12351226
elog(ERROR, "Attempt to commit %s transaction %d (%s)",
12361227
MtmTxnStatusMnem[ts->status], ts->xid, ts->gid);
12371228
}
@@ -1304,20 +1295,24 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
13041295

13051296
void MtmSendMessage(MtmArbiterMessage* msg)
13061297
{
1307-
MtmMessageQueue* mq = Mtm->freeQueue;
1308-
MtmMessageQueue* sendQueue = Mtm->sendQueue;
1309-
if (mq == NULL) {
1310-
mq = (MtmMessageQueue*)ShmemAlloc(sizeof(MtmMessageQueue));
1311-
} else {
1312-
Mtm->freeQueue = mq->next;
1313-
}
1314-
mq->msg = *msg;
1315-
mq->next = sendQueue;
1316-
Mtm->sendQueue = mq;
1317-
if (sendQueue == NULL) {
1318-
/* singal semaphore only once for the whole list */
1319-
PGSemaphoreUnlock(&Mtm->sendSemaphore);
1298+
SpinLockAcquire(&Mtm->queueSpinlock);
1299+
{
1300+
MtmMessageQueue* mq = Mtm->freeQueue;
1301+
MtmMessageQueue* sendQueue = Mtm->sendQueue;
1302+
if (mq == NULL) {
1303+
mq = (MtmMessageQueue*)ShmemAlloc(sizeof(MtmMessageQueue));
1304+
} else {
1305+
Mtm->freeQueue = mq->next;
1306+
}
1307+
mq->msg = *msg;
1308+
mq->next = sendQueue;
1309+
Mtm->sendQueue = mq;
1310+
if (sendQueue == NULL) {
1311+
/* singal semaphore only once for the whole list */
1312+
PGSemaphoreUnlock(&Mtm->sendSemaphore);
1313+
}
13201314
}
1315+
SpinLockRelease(&Mtm->queueSpinlock);
13211316
}
13221317

13231318
void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
@@ -1667,8 +1662,8 @@ void MtmRecoveryCompleted(void)
16671662
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16681663
}
16691664
/* Mode will be changed to online once all logical receiver are connected */
1670-
elog(LOG, "Recovery completed with %d active receivers from %d", Mtm->nReceivers, Mtm->nLiveNodes-1);
1671-
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
1665+
elog(LOG, "Recovery completed with %d active receivers and %d started senders from %d", Mtm->nReceivers, Mtm->nSenders, Mtm->nLiveNodes-1);
1666+
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
16721667
MtmUnlock();
16731668
}
16741669

@@ -2010,7 +2005,7 @@ void MtmOnNodeDisconnect(int nodeId)
20102005
MtmLock(LW_EXCLUSIVE);
20112006
BIT_SET(Mtm->connectivityMask, nodeId-1);
20122007
BIT_SET(Mtm->reconnectMask, nodeId-1);
2013-
MTM_LOG1("Disconnect node %d connectivity mask %llx", nodeId, (long long) Mtm->connectivityMask);
2008+
elog(LOG, "Disconnect node %d connectivity mask %llx", nodeId, (long long) Mtm->connectivityMask);
20142009
MtmUnlock();
20152010

20162011
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
@@ -2020,6 +2015,7 @@ void MtmOnNodeDisconnect(int nodeId)
20202015
void MtmOnNodeConnect(int nodeId)
20212016
{
20222017
MtmLock(LW_EXCLUSIVE);
2018+
elog(LOG, "Connect node %d connectivity mask %llx", nodeId, (long long) Mtm->connectivityMask);
20232019
BIT_CLEAR(Mtm->connectivityMask, nodeId-1);
20242020
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
20252021
MtmUnlock();
@@ -2198,6 +2194,7 @@ static void MtmInitialize()
21982194
Mtm->transListHead = NULL;
21992195
Mtm->transListTail = &Mtm->transListHead;
22002196
Mtm->nReceivers = 0;
2197+
Mtm->nSenders = 0;
22012198
Mtm->timeShift = 0;
22022199
Mtm->transCount = 0;
22032200
Mtm->gcCount = 0;
@@ -2229,7 +2226,7 @@ static void MtmInitialize()
22292226
Mtm->nodes[MtmNodeId-1].restartLSN = (XLogRecPtr)PG_UINT64_MAX;
22302227
PGSemaphoreCreate(&Mtm->sendSemaphore);
22312228
PGSemaphoreReset(&Mtm->sendSemaphore);
2232-
SpinLockInit(&Mtm->spinlock);
2229+
SpinLockInit(&Mtm->queueSpinlock);
22332230
BgwPoolInit(&Mtm->pool, MtmExecutor, MtmDatabaseName, MtmDatabaseUser, MtmQueueSize, MtmWorkers);
22342231
RegisterXactCallback(MtmXactCallback, NULL);
22352232
MtmTx.snapshot = INVALID_CSN;
@@ -2906,11 +2903,9 @@ void MtmReceiverStarted(int nodeId)
29062903
MtmEnableNode(nodeId);
29072904
MtmCheckQuorum();
29082905
}
2909-
elog(LOG, "Start %d receivers from %d cluster status %s", Mtm->nReceivers+1, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2910-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
2911-
if (Mtm->status == MTM_CONNECTED) {
2912-
MtmSwitchClusterMode(MTM_ONLINE);
2913-
}
2906+
elog(LOG, "Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2907+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
2908+
MtmSwitchClusterMode(MTM_ONLINE);
29142909
}
29152910
}
29162911
MtmUnlock();
@@ -2946,18 +2941,23 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
29462941

29472942
void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
29482943
{
2944+
bool insideTransaction = IsTransactionState();
29492945
Assert(ts->votingCompleted);
2950-
Assert(!IsTransactionState());
29512946
MtmResetTransaction();
2952-
StartTransactionCommand();
2953-
2954-
MtmBeginSession(MtmNodeId);
2947+
2948+
if (!insideTransaction) {
2949+
StartTransactionCommand();
2950+
}
2951+
//MtmBeginSession(MtmNodeId);
29552952
MtmSetCurrentTransactionCSN(ts->csn);
29562953
MtmSetCurrentTransactionGID(ts->gid);
29572954
FinishPreparedTransaction(ts->gid, commit);
2958-
CommitTransactionCommand();
2959-
MtmEndSession(MtmNodeId, true);
2960-
Assert(ts->status == commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED);
2955+
2956+
if (!insideTransaction) {
2957+
CommitTransactionCommand();
2958+
//MtmEndSession(MtmNodeId, true);
2959+
Assert(ts->status == commit ? TRANSACTION_STATUS_COMMITTED : TRANSACTION_STATUS_ABORTED);
2960+
}
29612961
}
29622962

29632963
/*
@@ -2997,6 +2997,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
29982998
Mtm->recoverySlot = nodeId;
29992999
Mtm->nReceivers = 0;
3000+
Mtm->nSenders = 0;
30003001
Mtm->recoveryCount += 1;
30013002
Mtm->pglogicalNodeMask = 0;
30023003
MtmUnlock();
@@ -3015,6 +3016,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
30153016
} else {
30163017
MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
30173018
}
3019+
BIT_SET(Mtm->reconnectMask, nodeId-1); /* arbiter should try to reestblish connection with this node */
30183020
MtmUnlock();
30193021
return mode;
30203022
}
@@ -3144,6 +3146,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
31443146
} else {
31453147
MTM_LOG1("Node %d start logical replication to node %d in normal mode", MtmNodeId, MtmReplicationNodeId);
31463148
}
3149+
elog(LOG, "Start %d senders and %d receivers from %d cluster status %s", Mtm->nSenders+1, Mtm->nReceivers, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
3150+
MtmSenderStarted = 1;
3151+
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3152+
MtmSwitchClusterMode(MTM_ONLINE);
3153+
}
3154+
BIT_SET(Mtm->reconnectMask, MtmReplicationNodeId-1); /* arbiter should try to reestblish connection with this node */
31473155
MtmUnlock();
31483156
on_shmem_exit(MtmOnProcExit, 0);
31493157
}
@@ -3192,6 +3200,9 @@ static void
31923200
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
31933201
{
31943202
if (MtmReplicationNodeId >= 0) {
3203+
MtmLock(LW_EXCLUSIVE);
3204+
Mtm->nSenders -= MtmSenderStarted;
3205+
MtmUnlock();
31953206
MTM_LOG1("Logical replication to node %d is stopped", MtmReplicationNodeId);
31963207
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
31973208
MtmReplicationNodeId = -1; /* defuse on_proc_exit hook */
@@ -3290,7 +3301,7 @@ bool MtmFilterTransaction(char* record, int size)
32903301
}
32913302

32923303
if (duplicate) {
3293-
MTM_LOG1("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
3304+
MTM_LOG2("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
32943305
gid, replication_node, flags, Mtm->nodes[origin_node-1].restartLSN, origin_node, MtmReplicationNodeId, end_lsn, origin_lsn);
32953306
} else {
32963307
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",

contrib/mmts/multimaster.h

+3-2
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ typedef struct
257257
{
258258
MtmNodeStatus status; /* Status of this node */
259259
int recoverySlot; /* NodeId of recovery slot or 0 if none */
260-
volatile slock_t spinlock; /* spinlock used to protect access to hash table */
260+
volatile slock_t queueSpinlock; /* spinlock used to protect sender queue */
261261
PGSemaphoreData sendSemaphore; /* semaphore used to notify mtm-sender about new responses to coordinator */
262262
LWLockPadded *locks; /* multimaster lock tranche */
263263
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
@@ -273,7 +273,8 @@ typedef struct
273273
int inject2PCError; /* Simulate error during 2PC commit at this node */
274274
int nLiveNodes; /* Number of active nodes */
275275
int nAllNodes; /* Total numbber of nodes */
276-
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
276+
int nReceivers; /* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277+
int nSenders; /* Number of started WAL senders (used to determine moment when recovery) */
277278
int nLockers; /* Number of lockers */
278279
int nActiveTransactions; /* Nunmber of active 2PC transactions */
279280
int nConfigChanges; /* Number of cluster configuration changes */

contrib/mmts/pglogical_apply.c

+3
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,9 @@ process_remote_commit(StringInfo in)
708708
default:
709709
Assert(false);
710710
}
711+
if (Mtm->status == MTM_RECOVERY) {
712+
MTM_LOG1("Recover transaction %s flags=%d", gid, flags);
713+
}
711714
MtmUpdateLsnMapping(MtmReplicationNodeId, end_lsn);
712715
if (flags & PGLOGICAL_CAUGHT_UP) {
713716
MtmRecoveryCompleted();

contrib/mmts/pglogical_proto.c

+4
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
210210
Assert(MtmTransactionRecords == 0);
211211
return;
212212
}
213+
if (isRecovery) {
214+
MTM_LOG1("PGLOGICAL_SEND recover transaction: event=%d, gid=%s, xid=%d, commit_lsn=%lx, txn->end_lsn=%lx, xlog=%lx",
215+
flags, txn->gid, txn->xid, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr());
216+
}
213217
if (flags == PGLOGICAL_ABORT_PREPARED) {
214218
MTM_LOG1("Send ABORT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
215219
txn->xid, txn->gid, txn->end_lsn, MtmReplicationNodeId, isRecovery, txn->origin_id, csn);

contrib/mmts/tests2/lib/bank_client.py

+7
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,13 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
166166
# In case of continuous excetions we can loop here without returning
167167
# back to event loop and block it
168168
yield from asyncio.sleep(0.01)
169+
except BaseException as e:
170+
agg.finish_tx(str(e).strip())
171+
print('Catch exception ', e)
172+
# Give evloop some free time.
173+
# In case of continuous excetions we can loop here without returning
174+
# back to event loop and block it
175+
yield from asyncio.sleep(0.01)
169176

170177
print("We've count to infinity!")
171178

0 commit comments

Comments
 (0)