Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2811c94

Browse files
knizhnikkelvich
authored andcommitted
Rewrite locking mecnanism
1 parent a0af6e5 commit 2811c94

File tree

3 files changed

+92
-105
lines changed

3 files changed

+92
-105
lines changed

arbiter.c

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -299,12 +299,20 @@ static void MtmSetSocketOptions(int sd)
299299
#endif
300300
}
301301

302+
/*
303+
* Check response message and update onde state
304+
*/
302305
static void MtmCheckResponse(MtmArbiterMessage* resp)
303306
{
304307
if (resp->lockReq) {
305-
BIT_SET(Mtm->globalLockerMask, resp->node-1);
308+
BIT_SET(Mtm->inducedLockNodeMask, resp->node-1);
309+
} else {
310+
BIT_CLEAR(Mtm->inducedLockNodeMask, resp->node-1);
311+
}
312+
if (resp->locked) {
313+
BIT_SET(Mtm->currentLockNodeMask, resp->node-1);
306314
} else {
307-
BIT_CLEAR(Mtm->globalLockerMask, resp->node-1);
315+
BIT_CLEAR(Mtm->currentLockNodeMask, resp->node-1);
308316
}
309317
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1)
310318
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
@@ -340,11 +348,7 @@ static void MtmSendHeartbeat()
340348
int i;
341349
MtmArbiterMessage msg;
342350
timestamp_t now = MtmGetSystemTime();
343-
msg.code = MSG_HEARTBEAT;
344-
msg.disabledNodeMask = Mtm->disabledNodeMask;
345-
msg.connectivityMask = SELF_CONNECTIVITY_MASK;
346-
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
347-
msg.lockReq = Mtm->nodeLockerMask != 0;
351+
MtmInitMessage(&msg, MSG_HEARTBEAT);
348352
msg.node = MtmNodeId;
349353
msg.csn = now;
350354
if (last_sent_heartbeat != 0 && last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
@@ -483,13 +487,11 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
483487
}
484488
}
485489
MtmSetSocketOptions(sd);
486-
req.hdr.code = MSG_HANDSHAKE;
490+
MtmInitMessage(&req.hdr, MSG_HANDSHAKE);
487491
req.hdr.node = MtmNodeId;
488492
req.hdr.dxid = HANDSHAKE_MAGIC;
489493
req.hdr.sxid = ShmemVariableCache->nextXid;
490494
req.hdr.csn = MtmGetCurrentTime();
491-
req.hdr.disabledNodeMask = Mtm->disabledNodeMask;
492-
req.hdr.connectivityMask = SELF_CONNECTIVITY_MASK;
493495
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
494496
if (!MtmWriteSocket(sd, &req, sizeof req)) {
495497
MTM_ELOG(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
@@ -638,9 +640,7 @@ static void MtmAcceptOneConnection()
638640
MtmCheckResponse(&req.hdr);
639641
MtmUnlock();
640642

641-
resp.code = MSG_STATUS;
642-
resp.disabledNodeMask = Mtm->disabledNodeMask;
643-
resp.connectivityMask = SELF_CONNECTIVITY_MASK;
643+
MtmInitMessage(&resp, MSG_STATUS);
644644
resp.dxid = HANDSHAKE_MAGIC;
645645
resp.sxid = ShmemVariableCache->nextXid;
646646
resp.csn = MtmGetCurrentTime();
@@ -949,10 +949,7 @@ static void MtmReceiver(Datum arg)
949949
msg->csn = tm->state->csn;
950950
MTM_LOG1("Send response %s for transaction %s to node %d", MtmTxnStatusMnem[msg->status], msg->gid, node);
951951
}
952-
msg->disabledNodeMask = Mtm->disabledNodeMask;
953-
msg->connectivityMask = SELF_CONNECTIVITY_MASK;
954-
msg->oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
955-
msg->code = MSG_POLL_STATUS;
952+
MtmInitMessage(msg, MSG_POLL_STATUS);
956953
MtmSendMessage(msg);
957954
continue;
958955
case MSG_POLL_STATUS:

multimaster.c

Lines changed: 70 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
159159
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
160160
static void MtmProcessDDLCommand(char const* queryString, bool transactional);
161161

162-
static void MtmSuspendNode(void);
163-
static void MtmResumeNode(void);
162+
static void MtmLockCluster(void);
163+
static void MtmUnlockCluster(void);
164164

165165
MtmState* Mtm;
166166

@@ -254,7 +254,7 @@ static bool MtmIgnoreTablesWithoutPk;
254254
static int MtmLockCount;
255255
static bool MtmMajorNode;
256256
static bool MtmBreakConnection;
257-
static bool MtmSuspended;
257+
static bool MtmClusterLocked;
258258
static bool MtmInsideTransaction;
259259

260260
static ExecutorStart_hook_type PreviousExecutorStartHook;
@@ -288,8 +288,8 @@ void MtmReleaseLocks(void)
288288
MtmInsideTransaction = false;
289289
MtmUnlock();
290290
}
291-
if (MtmSuspended) {
292-
MtmResumeNode();
291+
if (MtmClusterLocked) {
292+
MtmUnlockCluster();
293293
}
294294
if (MtmLockCount != 0) {
295295
Assert(Mtm->lastLockHolder == MyProcPid);
@@ -876,8 +876,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
876876
* Also allow user to complete explicit 2PC transactions.
877877
*/
878878
if (x->isDistributed
879-
&& (Mtm->exclusiveLock || (!x->isReplicated && !x->isTwoPhase))
880-
&& !MtmSuspended
879+
&& !MtmClusterLocked /* do not lock myself */
881880
&& strcmp(application_name, MULTIMASTER_ADMIN) != 0)
882881
{
883882
MtmCheckClusterLock();
@@ -886,7 +885,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
886885
Mtm->nRunningTransactions += 1;
887886

888887
x->snapshot = MtmAssignCSN();
889-
MTM_LOG1("Start transaction %lld with snapshot %lld", (long64)x->xid, x->snapshot);
888+
MTM_LOG2("Start transaction %lld with snapshot %lld", (long64)x->xid, x->snapshot);
890889

891890
MtmUnlock();
892891

@@ -1448,11 +1447,25 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
14481447
if (!MyReplicationSlot) {
14491448
MtmCheckSlots();
14501449
}
1451-
if (MtmSuspended) {
1452-
MtmResumeNode();
1450+
if (MtmClusterLocked) {
1451+
MtmUnlockCluster();
14531452
}
14541453
}
14551454

1455+
/*
1456+
* Initialize message
1457+
*/
1458+
void MtmInitMessage(MtmArbiterMessage* msg, MtmMessageCode code)
1459+
{
1460+
msg->code = code;
1461+
msg->disabledNodeMask = Mtm->disabledNodeMask;
1462+
msg->connectivityMask = SELF_CONNECTIVITY_MASK;
1463+
msg->oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
1464+
msg->lockReq = Mtm->originLockNodeMask != 0;
1465+
msg->locked = (Mtm->originLockNodeMask|Mtm->inducedLockNodeMask) != 0;
1466+
}
1467+
1468+
14561469
/*
14571470
* Send arbiter's message
14581471
*/
@@ -1489,13 +1502,9 @@ void MtmSendMessage(MtmArbiterMessage* msg)
14891502
void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
14901503
{
14911504
MtmArbiterMessage msg;
1492-
msg.code = cmd;
1505+
MtmInitMessage(&msg, cmd);
14931506
msg.sxid = ts->xid;
14941507
msg.csn = ts->csn;
1495-
msg.disabledNodeMask = Mtm->disabledNodeMask;
1496-
msg.connectivityMask = SELF_CONNECTIVITY_MASK;
1497-
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
1498-
msg.lockReq = Mtm->nodeLockerMask != 0;
14991508
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
15001509

15011510
Assert(!MtmIsCoordinator(ts)); /* All broadcasts are now done through logical decoding */
@@ -1516,11 +1525,7 @@ static void MtmBroadcastPollMessage(MtmTransState* ts)
15161525
{
15171526
int i;
15181527
MtmArbiterMessage msg;
1519-
msg.code = MSG_POLL_REQUEST;
1520-
msg.disabledNodeMask = Mtm->disabledNodeMask;
1521-
msg.connectivityMask = SELF_CONNECTIVITY_MASK;
1522-
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
1523-
msg.lockReq = Mtm->nodeLockerMask != 0;
1528+
MtmInitMessage(&msg, MSG_POLL_REQUEST);
15241529
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
15251530
ts->votedMask = 0;
15261531

@@ -1928,7 +1933,7 @@ void MtmRecoveryCompleted(void)
19281933
* logical replication connections with this node.
19291934
* Under the intensive workload start of logical replication can be delayed for unpredictable amount of time
19301935
*/
1931-
BIT_SET(Mtm->nodeLockerMask, MtmNodeId-1); /* it is trick: this mask was originally used by WAL senders performing recovery, but here we are in opposite (recovered) side:
1936+
BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1); /* it is trick: this mask was originally used by WAL senders performing recovery, but here we are in opposite (recovered) side:
19321937
* if this mask is not zero loadReq will be broadcasted to all other nodes by heartbeat, suspending their activity
19331938
*/
19341939
MtmSwitchClusterMode(MTM_RECOVERED);
@@ -2017,7 +2022,7 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20172022
MtmLock(LW_EXCLUSIVE);
20182023
if (MtmIsRecoveredNode(nodeId)) {
20192024
lsn_t walLSN = GetXLogInsertRecPtr();
2020-
if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
2025+
if (!BIT_CHECK(Mtm->originLockNodeMask, nodeId-1)
20212026
&& slotLSN + MtmMinRecoveryLag > walLSN)
20222027
{
20232028
/*
@@ -2028,14 +2033,11 @@ void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
20282033
*/
20292034
MTM_LOG1("Node %d is almost caught-up: slot position %llx, WAL position %llx, active transactions %d",
20302035
nodeId, slotLSN, walLSN, Mtm->nActiveTransactions);
2031-
Assert(MyWalSnd != NULL); /* This function is called by WAL-sender, so it should not be NULL */
2032-
BIT_SET(Mtm->nodeLockerMask, nodeId-1);
2033-
BIT_SET(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
2034-
Mtm->nLockers += 1;
2036+
BIT_SET(Mtm->originLockNodeMask, nodeId-1);
20352037
} else {
20362038
MTM_LOG2("Continue recovery of node %d, slot position %llx, WAL position %llx,"
2037-
" WAL sender position %llx, lockers %d, active transactions %d", nodeId, slotLSN,
2038-
walLSN, MyWalSnd->sentPtr, Mtm->nLockers, Mtm->nActiveTransactions);
2039+
" WAL sender position %llx, lockers %llx, active transactions %d", nodeId, slotLSN,
2040+
walLSN, MyWalSnd->sentPtr, Mtm->orinLockNodeMask, Mtm->nActiveTransactions);
20392041
}
20402042
}
20412043
MtmUnlock();
@@ -2051,11 +2053,13 @@ bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
20512053
bool caughtUp = false;
20522054
MtmLock(LW_EXCLUSIVE);
20532055
if (MtmIsRecoveredNode(nodeId) && Mtm->nActiveTransactions == 0) {
2054-
if (BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)) {
2056+
if (BIT_CHECK(Mtm->originLockNodeMask, nodeId-1)) {
20552057
MTM_LOG1("Node %d is caught-up at WAL position %llx", nodeId, walEndPtr);
2056-
BIT_CLEAR(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
2057-
BIT_CLEAR(Mtm->nodeLockerMask, nodeId-1);
2058-
Mtm->nLockers -= 1;
2058+
Assert(BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
2059+
BIT_CLEAR(Mtm->originLockNodeMask, nodeId-1);
2060+
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
2061+
Mtm->nLiveNodes += 1;
2062+
MtmCheckQuorum();
20592063
} else {
20602064
MTM_LOG1("Node %d is caught-up at WAL position %llx without locking cluster", nodeId, walEndPtr);
20612065
/* We are lucky: caught-up without locking cluster! */
@@ -2082,40 +2086,44 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
20822086
* Prevent start of any new transactions at this node
20832087
*/
20842088
static void
2085-
MtmSuspendNode(void)
2089+
MtmLockCluster(void)
20862090
{
20872091
timestamp_t delay = MIN_WAIT_TIMEOUT;
2088-
Assert(!MtmSuspended);
2092+
Assert(!MtmClusterLocked);
20892093
MtmLock(LW_EXCLUSIVE);
2090-
if (Mtm->exclusiveLock) {
2094+
if (BIT_CHECK(Mtm->originLockNodeMask, MtmNodeId-1)) {
20912095
elog(ERROR, "There is already pending exclusive lock");
20922096
}
2093-
Mtm->exclusiveLock = true;
2094-
MtmSuspended = true;
2095-
MTM_LOG2("Transaction %lld tries to suspend node at %lld insideTransaction=%d, active transactions=%lld",
2096-
(long64)MtmTx.xid, MtmGetCurrentTime(), insideTransaction, (long64)Mtm->nRunningTransactions);
2097-
while (Mtm->nRunningTransactions != 1) { /* I am one */
2097+
BIT_SET(Mtm->originLockNodeMask, MtmNodeId-1);
2098+
MtmClusterLocked = true;
2099+
MTM_LOG1("Transaction %lld tries to lock cluster at %lld, running transactions=%lld",
2100+
(long64)MtmTx.xid, MtmGetCurrentTime(), (long64)Mtm->nRunningTransactions);
2101+
/* Wait until everything is locked */
2102+
while (Mtm->nRunningTransactions != 1 /* I am one */
2103+
|| ((((nodemask_t)1 << Mtm->nAllNodes)-1) & ~(Mtm->currentLockNodeMask|Mtm->originLockNodeMask) & ~Mtm->disabledNodeMask) != 0)
2104+
{
20982105
MtmUnlock();
20992106
MtmSleep(delay);
21002107
if (delay*2 <= MAX_WAIT_TIMEOUT) {
21012108
delay *= 2;
21022109
}
21032110
MtmLock(LW_EXCLUSIVE);
21042111
}
2105-
MTM_LOG2("Transaction %lld suspended node at %lld, LSN %lld, active transactions=%lld", (long64)MtmTx.xid, MtmGetCurrentTime(), (long64)GetXLogInsertRecPtr(), (long64)Mtm->nRunningTransactions);
2112+
MTM_LOG1("Transaction %lld locked cluster at %lld, LSN %lld, active transactions=%lld",
2113+
(long64)MtmTx.xid, MtmGetCurrentTime(), (long64)GetXLogInsertRecPtr(), (long64)Mtm->nRunningTransactions);
21062114
MtmUnlock();
21072115
}
21082116

21092117
/*
2110-
* Resume transaction processing at node (blocked by MtmSuspendNode)
2118+
* Remove global cluster lock set by MtmLockCluster
21112119
*/
21122120
static void
2113-
MtmResumeNode(void)
2121+
MtmUnlockCluster(void)
21142122
{
21152123
MtmLock(LW_EXCLUSIVE);
2116-
MTM_LOG2("Transaction %lld resume node at %lld status %s LSN %lld", (long64)MtmTx.xid, MtmGetCurrentTime(), MtmTxnStatusMnem[MtmTx.status], (long64)GetXLogInsertRecPtr());
2117-
Mtm->exclusiveLock = false;
2118-
MtmSuspended = false;
2124+
MTM_LOG1("Transaction %lld unlock cluster at %lld status %s LSN %lld", (long64)MtmTx.xid, MtmGetCurrentTime(), MtmTxnStatusMnem[MtmTx.status], (long64)GetXLogInsertRecPtr());
2125+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1);
2126+
MtmClusterLocked = false;
21192127
MtmUnlock();
21202128
}
21212129

@@ -2128,33 +2136,15 @@ static void
21282136
MtmCheckClusterLock()
21292137
{
21302138
timestamp_t delay = MIN_WAIT_TIMEOUT;
2131-
while (true)
2132-
{
2133-
if (Mtm->exclusiveLock || (Mtm->globalLockerMask | Mtm->walSenderLockerMask)) {
2134-
/* some "almost cautch-up" wal-senders are still working. */
2135-
/* Do not start new transactions until them are completed. */
2136-
MtmUnlock();
2137-
MtmSleep(delay);
2138-
if (delay*2 <= MAX_WAIT_TIMEOUT) {
2139-
delay *= 2;
2140-
}
2141-
MtmLock(LW_EXCLUSIVE);
2142-
} else {
2143-
if (Mtm->nodeLockerMask != 0) {
2144-
/* All lockers have synchronized their logs */
2145-
/* Remove lock and mark them as recovered */
2146-
MTM_LOG1("Complete recovery of %d nodes (node mask %llx)", Mtm->nLockers, Mtm->nodeLockerMask);
2147-
Assert(Mtm->walSenderLockerMask == 0);
2148-
Assert((Mtm->nodeLockerMask & Mtm->disabledNodeMask) == Mtm->nodeLockerMask);
2149-
Mtm->disabledNodeMask &= ~Mtm->nodeLockerMask;
2150-
Mtm->nConfigChanges += 1;
2151-
Mtm->nLiveNodes += Mtm->nLockers;
2152-
Mtm->nLockers = 0;
2153-
Mtm->nodeLockerMask = 0;
2154-
MtmCheckQuorum();
2155-
}
2156-
break;
2139+
while (Mtm->originLockNodeMask | Mtm->inducedLockNodeMask) {
2140+
/* some "almost cautch-up" wal-senders are still working. */
2141+
/* Do not start new transactions until them are completed. */
2142+
MtmUnlock();
2143+
MtmSleep(delay);
2144+
if (delay*2 <= MAX_WAIT_TIMEOUT) {
2145+
delay *= 2;
21572146
}
2147+
MtmLock(LW_EXCLUSIVE);
21582148
}
21592149
}
21602150

@@ -2548,13 +2538,11 @@ static void MtmInitialize()
25482538
Mtm->stoppedNodeMask = 0;
25492539
Mtm->pglogicalReceiverMask = 0;
25502540
Mtm->pglogicalSenderMask = 0;
2551-
Mtm->walSenderLockerMask = 0;
2552-
Mtm->globalLockerMask = 0;
2553-
Mtm->nodeLockerMask = 0;
2541+
Mtm->inducedLockNodeMask = 0;
2542+
Mtm->currentLockNodeMask = 0;
2543+
Mtm->originLockNodeMask = 0;
25542544
Mtm->reconnectMask = 0;
25552545
Mtm->recoveredLSN = INVALID_LSN;
2556-
Mtm->nLockers = 0;
2557-
Mtm->exclusiveLock = false;
25582546
Mtm->nActiveTransactions = 0;
25592547
Mtm->nRunningTransactions = 0;
25602548
Mtm->votingTransactions = NULL;
@@ -3326,7 +3314,7 @@ void MtmReceiverStarted(int nodeId)
33263314
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1
33273315
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
33283316
{
3329-
BIT_CLEAR(Mtm->nodeLockerMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
3317+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
33303318
MtmSwitchClusterMode(MTM_ONLINE);
33313319
}
33323320
}
@@ -3656,7 +3644,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
36563644
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
36573645
{
36583646
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
3659-
BIT_CLEAR(Mtm->nodeLockerMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
3647+
BIT_CLEAR(Mtm->originLockNodeMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
36603648
MtmSwitchClusterMode(MTM_ONLINE);
36613649
}
36623650
}
@@ -4070,7 +4058,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
40704058
usrfctx->values[3] = BoolGetDatum(BIT_CHECK(Mtm->stalledNodeMask, usrfctx->nodeId-1));
40714059
usrfctx->values[4] = BoolGetDatum(BIT_CHECK(Mtm->stoppedNodeMask, usrfctx->nodeId-1));
40724060

4073-
usrfctx->values[5] = BoolGetDatum(BIT_CHECK(Mtm->nodeLockerMask, usrfctx->nodeId-1));
4061+
usrfctx->values[5] = BoolGetDatum(BIT_CHECK(Mtm->originLockNodeMask, usrfctx->nodeId-1));
40744062
lag = MtmGetSlotLag(usrfctx->nodeId);
40754063
usrfctx->values[6] = Int64GetDatum(lag);
40764064
usrfctx->nulls[6] = lag < 0;
@@ -4196,7 +4184,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
41964184
values[1] = CStringGetTextDatum(MtmNodeStatusMnem[Mtm->status]);
41974185
values[2] = Int64GetDatum(Mtm->disabledNodeMask);
41984186
values[3] = Int64GetDatum(SELF_CONNECTIVITY_MASK);
4199-
values[4] = Int64GetDatum(Mtm->nodeLockerMask);
4187+
values[4] = Int64GetDatum(Mtm->originLockNodeMask);
42004188
values[5] = Int32GetDatum(Mtm->nLiveNodes);
42014189
values[6] = Int32GetDatum(Mtm->nAllNodes);
42024190
values[7] = Int32GetDatum((int)Mtm->pool.active);
@@ -5032,7 +5020,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50325020

50335021
case T_TruncateStmt:
50345022
skipCommand = false;
5035-
MtmSuspendNode();
5023+
MtmLockCluster();
50365024
break;
50375025

50385026
case T_DropStmt:

0 commit comments

Comments
 (0)