Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 57da7fc

Browse files
committed
Fix node locks
1 parent 8b6595d commit 57da7fc

File tree

5 files changed

+46
-22
lines changed

5 files changed

+46
-22
lines changed

contrib/mmts/multimaster.c

+31-18
Original file line numberDiff line numberDiff line change
@@ -946,14 +946,20 @@ void MtmPrecommitTransaction(char const* gid)
946946
} else {
947947
MtmTransState* ts = tm->state;
948948
Assert(ts != NULL);
949-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
950-
ts->status = TRANSACTION_STATUS_UNKNOWN;
951-
ts->csn = MtmAssignCSN();
952-
MtmAdjustSubtransactions(ts);
953-
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
954-
Assert(replorigin_session_origin != InvalidRepOriginId);
955-
MtmUnlock();
956-
SetPreparedTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
949+
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
950+
ts->status = TRANSACTION_STATUS_UNKNOWN;
951+
ts->csn = MtmAssignCSN();
952+
MtmAdjustSubtransactions(ts);
953+
if (Mtm->status != MTM_RECOVERY) {
954+
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
955+
}
956+
MtmUnlock();
957+
Assert(replorigin_session_origin != InvalidRepOriginId);
958+
SetPreparedTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
959+
} else {
960+
elog(WARNING, "MtmPrecommitTransaction: transaction '%s' is already in %s state", gid, MtmTxnStatusMnem[ts->status]);
961+
MtmUnlock();
962+
}
957963
}
958964
}
959965
}
@@ -1038,7 +1044,8 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10381044
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10391045
elog(LOG, "Distributed transaction is not committed in %ld msec", USEC_TO_MSEC(now - start));
10401046
} else {
1041-
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(timeout));
1047+
elog(WARNING, "Commit of distributed transaction %s (%d) is canceled because of %ld msec timeout expiration",
1048+
ts->gid, ts->xid, USEC_TO_MSEC(timeout));
10421049
MtmAbortTransaction(ts);
10431050
break;
10441051
}
@@ -1383,15 +1390,15 @@ static void MtmLoadPreparedTransactions(void)
13831390
if (!found) {
13841391
TransactionId xid = GetNewTransactionId(false);
13851392
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_ENTER, &found);
1386-
MTM_LOG1("Recover prepared transaction %s xid %d", gid, xid);
1393+
MTM_LOG1("Recover prepared transaction %s xid=%d state=%s", gid, xid, pxacts[i].state_3pc);
13871394
MyPgXact->xid = InvalidTransactionId; /* dirty hack:((( */
13881395
Assert(!found);
13891396
Mtm->nActiveTransactions += 1;
13901397
ts->isEnqueued = false;
13911398
ts->isActive = true;
13921399
ts->status = strcmp(pxacts[i].state_3pc, MULTIMASTER_PRECOMMITTED) == 0 ? TRANSACTION_STATUS_UNKNOWN : TRANSACTION_STATUS_IN_PROGRESS;
13931400
ts->isLocal = true;
1394-
ts->isPrepared = false;
1401+
ts->isPrepared = true;
13951402
ts->isPinned = false;
13961403
ts->snapshot = INVALID_CSN;
13971404
ts->isTwoPhase = false;
@@ -1491,6 +1498,10 @@ XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus new_stat
14911498
if (old_status != TRANSACTION_STATUS_ABORTED) {
14921499
tm->status = new_status;
14931500
}
1501+
if (tm->state != NULL && old_status == TRANSACTION_STATUS_IN_PROGRESS) {
1502+
/* Return UNKNOWN to mark that transaction was prepared */
1503+
old_status = TRANSACTION_STATUS_UNKNOWN;
1504+
}
14941505
} else {
14951506
tm->state = NULL;
14961507
tm->status = new_status;
@@ -1605,7 +1616,7 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
16051616
MtmBroadcastPollMessage(ts);
16061617
}
16071618
} else {
1608-
MTM_LOG1("Skip transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx",
1619+
MTM_LOG2("Skip transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx",
16091620
ts->xid, ts->gid, MtmTxnStatusMnem[ts->status], ts->gtid.node, ts->gtid.xid, ts->votedMask);
16101621
}
16111622
}
@@ -1656,7 +1667,8 @@ void MtmRecoveryCompleted(void)
16561667
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16571668
}
16581669
/* Mode will be changed to online once all logical receiver are connected */
1659-
MtmSwitchClusterMode(MTM_CONNECTED);
1670+
elog(LOG, "Recovery completed with %d active receivers from %d", Mtm->nReceivers, Mtm->nLiveNodes-1);
1671+
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
16601672
MtmUnlock();
16611673
}
16621674

@@ -2549,7 +2561,7 @@ _PG_init(void)
25492561
0,
25502562
INT_MAX,
25512563
PGC_BACKEND,
2552-
0,
2564+
0,\
25532565
NULL,
25542566
NULL,
25552567
NULL
@@ -2894,6 +2906,7 @@ void MtmReceiverStarted(int nodeId)
28942906
MtmEnableNode(nodeId);
28952907
MtmCheckQuorum();
28962908
}
2909+
elog(LOG, "Start %d receivers from %d cluster status %s", Mtm->nReceivers+1, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
28972910
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
28982911
if (Mtm->status == MTM_CONNECTED) {
28992912
MtmSwitchClusterMode(MTM_ONLINE);
@@ -3672,11 +3685,11 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
36723685
{
36733686
size_t lockGraphSize;
36743687
char *lockGraphData;
3675-
MtmLockNode(i + MtmMaxNodes, LW_SHARED);
3688+
MtmLockNode(i + 1 + MtmMaxNodes, LW_SHARED);
36763689
lockGraphSize = Mtm->nodes[i].lockGraphUsed;
36773690
lockGraphData = palloc(lockGraphSize);
36783691
memcpy(lockGraphData, Mtm->nodes[i].lockGraphData, lockGraphSize);
3679-
MtmUnlockNode(i + MtmMaxNodes);
3692+
MtmUnlockNode(i + 1 + MtmMaxNodes);
36803693

36813694
if (lockGraphData) {
36823695
GlobalTransactionId *gtid = (GlobalTransactionId *) lockGraphData;
@@ -4602,11 +4615,11 @@ MtmDetectGlobalDeadLockForXid(TransactionId xid)
46024615
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
46034616
size_t lockGraphSize;
46044617
void* lockGraphData;
4605-
MtmLockNode(i + MtmMaxNodes, LW_SHARED);
4618+
MtmLockNode(i + 1 + MtmMaxNodes, LW_SHARED);
46064619
lockGraphSize = Mtm->nodes[i].lockGraphUsed;
46074620
lockGraphData = palloc(lockGraphSize);
46084621
memcpy(lockGraphData, Mtm->nodes[i].lockGraphData, lockGraphSize);
4609-
MtmUnlockNode(i + MtmMaxNodes);
4622+
MtmUnlockNode(i + 1 + MtmMaxNodes);
46104623

46114624
if (lockGraphData == NULL) {
46124625
return true;

contrib/mmts/multimaster.h

+2
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,8 @@ extern VacuumStmt* MtmVacuumStmt;
335335
extern IndexStmt* MtmIndexStmt;
336336
extern DropStmt* MtmDropStmt;
337337
extern MemoryContext MtmApplyContext;
338+
extern XLogRecPtr MtmSenderWalEnd;
339+
338340

339341
extern void MtmArbiterInitialize(void);
340342
extern void MtmStartReceivers(void);

contrib/mmts/pglogical_apply.c

+8-2
Original file line numberDiff line numberDiff line change
@@ -433,14 +433,20 @@ process_remote_message(StringInfo s)
433433
/* This function is called directly by receiver, so there is no race condition and we can update
434434
* restartLSN without locks
435435
*/
436+
if (origin_node == MtmReplicationNodeId) {
437+
Assert(msg->origin_lsn == InvalidXLogRecPtr);
438+
msg->origin_lsn = MtmSenderWalEnd;
439+
}
436440
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
437441
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
438442
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
439443
replorigin_session_origin_lsn = msg->origin_lsn;
440444
MtmRollbackPreparedTransaction(origin_node, msg->gid);
441445
} else {
442-
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %lx <= %lx",
443-
msg->gid, origin_node, msg->origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
446+
if (msg->origin_lsn != InvalidXLogRecPtr) {
447+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %lx <= %lx",
448+
msg->gid, origin_node, msg->origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
449+
}
444450
}
445451
standalone = true;
446452
break;

contrib/mmts/pglogical_receiver.c

+4
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ static char worker_proc[BGW_MAXLEN];
5757

5858
/* Lastly written positions */
5959
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
60+
XLogRecPtr MtmSenderWalEnd;
6061

6162
/* Stream functions */
6263
static void fe_sendint64(int64 i, char *buf);
@@ -510,6 +511,9 @@ pglogical_receiver_main(Datum main_arg)
510511
hdr_len += 8; /* WALEnd */
511512
hdr_len += 8; /* sendTime */
512513

514+
/* WAL position of the end of this message at WAL sender */
515+
MtmSenderWalEnd = walEnd;
516+
513517
/*ereport(LOG, (errmsg("%s: receive message %c length %d", worker_proc, copybuf[hdr_len], rc - hdr_len)));*/
514518

515519
Assert(rc >= hdr_len);

contrib/mmts/tests2/docker-entrypoint.sh

+1-2
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ if [ "$1" = 'postgres' ]; then
6767
multimaster.conn_strings = '$CONNSTRS'
6868
multimaster.heartbeat_recv_timeout = 1100
6969
multimaster.heartbeat_send_timeout = 250
70-
multimaster.twopc_min_timeout = 50000
71-
multimaster.min_2pc_timeout = 50000
70+
multimaster.min_2pc_timeout = 2000
7271
EOF
7372

7473
cat $PGDATA/postgresql.conf

0 commit comments

Comments
 (0)