Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6d61a95

Browse files
knizhnikkelvich
authored andcommitted
Fix unreleased lock in GetPreparedTransactionState
1 parent a0e253b commit 6d61a95

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

multimaster.c

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,6 +1773,10 @@ void MtmRecoveryCompleted(void)
17731773
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, endLSN=%lx, live nodes=%d",
17741774
MtmNodeId, (long long) Mtm->disabledNodeMask,
17751775
(long long)SELF_CONNECTIVITY_MASK, GetXLogInsertRecPtr(), Mtm->nLiveNodes);
1776+
if (Mtm->nAllNodes >= 3) {
1777+
elog(WARNING, "restartLSNs at the end of recovery: {%lx, %lx, %lx}",
1778+
Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
1779+
}
17761780
MtmLock(LW_EXCLUSIVE);
17771781
Mtm->recoverySlot = 0;
17781782
Mtm->recoveredLSN = GetXLogInsertRecPtr();
@@ -3244,7 +3248,12 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32443248
|| Mtm->recoverySlot == nodeId)
32453249
{
32463250
/* Choose for recovery first available slot or slot of donor node (if any) */
3247-
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3251+
if (Mtm->nAllNodes >= 3) {
3252+
elog(WARNING, "Process %d starts recovery from node %d restartLSNs={%lx, %lx, %lx}",
3253+
MyProcPid, nodeId, Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
3254+
} else {
3255+
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3256+
}
32483257
Mtm->recoverySlot = nodeId;
32493258
Mtm->nReceivers = 0;
32503259
Mtm->nSenders = 0;
@@ -3383,7 +3392,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
33833392
sscanf(strVal(elem->arg), "%lx", &recoveredLSN);
33843393
MTM_LOG1("Recovered position of node %d is %lx", MtmReplicationNodeId, recoveredLSN);
33853394
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3386-
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
3395+
MTM_LOG1("Advance restartLSN for node %d from %lx to %lx (MtmReplicationStartupHook)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
33873396
Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == InvalidXLogRecPtr
33883397
|| recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
33893398
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
@@ -3587,9 +3596,13 @@ bool MtmFilterTransaction(char* record, int size)
35873596
origin_node = pq_getmsgbyte(&s);
35883597
origin_lsn = pq_getmsgint64(&s);
35893598

3590-
Assert(replication_node == MtmReplicationNodeId &&
3591-
origin_node != 0 &&
3592-
(Mtm->status == MTM_RECOVERY || origin_node == replication_node));
3599+
Assert(replication_node == MtmReplicationNodeId);
3600+
if (!(origin_node != 0 &&
3601+
(Mtm->status == MTM_RECOVERY || origin_node == replication_node)))
3602+
{
3603+
elog(WARNING, "Receive redirected commit event %d from node %d origin node %d origin LSN %lx in %s mode",
3604+
event, replication_node, origin_node, origin_lsn, MtmNodeStatusMnem[Mtm->status]);
3605+
}
35933606

35943607
switch (event)
35953608
{
@@ -3616,8 +3629,8 @@ bool MtmFilterTransaction(char* record, int size)
36163629
}
36173630

36183631
if (duplicate) {
3619-
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx)",
3620-
gid, replication_node, event, Mtm->nodes[origin_node-1].restartLSN, origin_node, restart_lsn, end_lsn, origin_lsn);
3632+
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx) mode %s",
3633+
gid, replication_node, event, Mtm->nodes[origin_node-1].restartLSN, origin_node, restart_lsn, end_lsn, origin_lsn, MtmNodeStatusMnem[Mtm->status]);
36213634
} else {
36223635
MTM_LOG2("Apply transaction %s from node %d lsn %lx, event=%x, origin node %d, original lsn=%lx, current lsn=%lx",
36233636
gid, replication_node, end_lsn, event, origin_node, origin_lsn, restart_lsn);

pglogical_receiver.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,11 @@ pglogical_receiver_main(Datum main_arg)
342342
* Them are either empty, either new node is synchronized using base_backup.
343343
* So we assume that LSNs are the same for local and remote node
344344
*/
345-
originStartPos = Mtm->status == MTM_RECOVERY && Mtm->donorNodeId == nodeId ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
345+
originStartPos = (Mtm->status == MTM_RECOVERY && Mtm->donorNodeId == nodeId) ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
346346
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
347347
} else {
348348
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
349-
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (pglogical_receiver_mains)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
349+
MTM_LOG1("Advance restartLSN for node %d: from %lx to %lx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
350350
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
351351
}
352352
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
@@ -545,16 +545,17 @@ pglogical_receiver_main(Datum main_arg)
545545
}
546546
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A' || stmt[1] == 'C'))) {
547547
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
548-
if ( stmt[1] == 'C') { /* concurrent DDL */
548+
if (stmt[0] == 'M' && stmt[1] == 'C') { /* concurrent DDL should be executed by parallel workers */
549549
MtmExecute(stmt, rc - hdr_len);
550550
} else {
551-
MtmExecutor(stmt, rc - hdr_len);
551+
MtmExecutor(stmt, rc - hdr_len); /* all other messages can be processed by receiver itself */
552552
}
553553
} else {
554554
ByteBufferAppend(&buf, stmt, rc - hdr_len);
555555
if (stmt[0] == 'C') /* commit */
556556
{
557-
if (!MtmFilterTransaction(stmt, rc - hdr_len)) {
557+
if (!MtmFilterTransaction(stmt, rc - hdr_len))
558+
{
558559
if (spill_file >= 0) {
559560
ByteBufferAppend(&buf, ")", 1);
560561
pq_sendbyte(&spill_info, '(');
@@ -574,6 +575,7 @@ pglogical_receiver_main(Datum main_arg)
574575
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
575576
}
576577
} else {
578+
Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT); /* all other commits should be applied in place */
577579
MtmExecute(buf.data, buf.used);
578580
}
579581
}

0 commit comments

Comments
 (0)