Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit bc32882

Browse files
knizhnikkelvich
authored andcommitted
Fix unreleased lock in GetPreparedTransactionState
1 parent 025f3fd commit bc32882

File tree

4 files changed

+77
-15
lines changed

4 files changed

+77
-15
lines changed

contrib/mmts/multimaster.c

+20-7
Original file line numberDiff line numberDiff line change
@@ -1778,6 +1778,10 @@ void MtmRecoveryCompleted(void)
17781778
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, endLSN=%lx, live nodes=%d",
17791779
MtmNodeId, (long long) Mtm->disabledNodeMask,
17801780
(long long)SELF_CONNECTIVITY_MASK, GetXLogInsertRecPtr(), Mtm->nLiveNodes);
1781+
if (Mtm->nAllNodes >= 3) {
1782+
elog(WARNING, "restartLSNs at the end of recovery: {%lx, %lx, %lx}",
1783+
Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
1784+
}
17811785
MtmLock(LW_EXCLUSIVE);
17821786
Mtm->recoverySlot = 0;
17831787
Mtm->recoveredLSN = GetXLogInsertRecPtr();
@@ -3249,7 +3253,12 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
32493253
|| Mtm->recoverySlot == nodeId)
32503254
{
32513255
/* Choose for recovery first available slot or slot of donor node (if any) */
3252-
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3256+
if (Mtm->nAllNodes >= 3) {
3257+
elog(WARNING, "Process %d starts recovery from node %d restartLSNs={%lx, %lx, %lx}",
3258+
MyProcPid, nodeId, Mtm->nodes[0].restartLSN, Mtm->nodes[1].restartLSN, Mtm->nodes[2].restartLSN);
3259+
} else {
3260+
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
3261+
}
32533262
Mtm->recoverySlot = nodeId;
32543263
Mtm->nReceivers = 0;
32553264
Mtm->nSenders = 0;
@@ -3388,7 +3397,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
33883397
sscanf(strVal(elem->arg), "%lx", &recoveredLSN);
33893398
MTM_LOG1("Recovered position of node %d is %lx", MtmReplicationNodeId, recoveredLSN);
33903399
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3391-
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
3400+
MTM_LOG1("Advance restartLSN for node %d from %lx to %lx (MtmReplicationStartupHook)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
33923401
Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == InvalidXLogRecPtr
33933402
|| recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
33943403
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
@@ -3592,9 +3601,13 @@ bool MtmFilterTransaction(char* record, int size)
35923601
origin_node = pq_getmsgbyte(&s);
35933602
origin_lsn = pq_getmsgint64(&s);
35943603

3595-
Assert(replication_node == MtmReplicationNodeId &&
3596-
origin_node != 0 &&
3597-
(Mtm->status == MTM_RECOVERY || origin_node == replication_node));
3604+
Assert(replication_node == MtmReplicationNodeId);
3605+
if (!(origin_node != 0 &&
3606+
(Mtm->status == MTM_RECOVERY || origin_node == replication_node)))
3607+
{
3608+
elog(WARNING, "Receive redirected commit event %d from node %d origin node %d origin LSN %lx in %s mode",
3609+
event, replication_node, origin_node, origin_lsn, MtmNodeStatusMnem[Mtm->status]);
3610+
}
35983611

35993612
switch (event)
36003613
{
@@ -3621,8 +3634,8 @@ bool MtmFilterTransaction(char* record, int size)
36213634
}
36223635

36233636
if (duplicate) {
3624-
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx)",
3625-
gid, replication_node, event, Mtm->nodes[origin_node-1].restartLSN, origin_node, restart_lsn, end_lsn, origin_lsn);
3637+
MTM_LOG1("Ignore transaction %s from node %d event=%x because our LSN position %lx for origin node %d is greater or equal than LSN %lx of this transaction (end_lsn=%lx, origin_lsn=%lx) mode %s",
3638+
gid, replication_node, event, Mtm->nodes[origin_node-1].restartLSN, origin_node, restart_lsn, end_lsn, origin_lsn, MtmNodeStatusMnem[Mtm->status]);
36263639
} else {
36273640
MTM_LOG2("Apply transaction %s from node %d lsn %lx, event=%x, origin node %d, original lsn=%lx, current lsn=%lx",
36283641
gid, replication_node, end_lsn, event, origin_node, origin_lsn, restart_lsn);

contrib/mmts/pglogical_receiver.c

+7-5
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,11 @@ pglogical_receiver_main(Datum main_arg)
342342
* Them are either empty, either new node is synchronized using base_backup.
343343
* So we assume that LSNs are the same for local and remote node
344344
*/
345-
originStartPos = Mtm->status == MTM_RECOVERY && Mtm->donorNodeId == nodeId ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
345+
originStartPos = (Mtm->status == MTM_RECOVERY && Mtm->donorNodeId == nodeId) ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
346346
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
347347
} else {
348348
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
349-
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (pglogical_receiver_mains)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
349+
MTM_LOG1("Advance restartLSN for node %d: from %lx to %lx (pglogical_receiver_main)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
350350
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
351351
}
352352
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
@@ -545,16 +545,17 @@ pglogical_receiver_main(Datum main_arg)
545545
}
546546
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A' || stmt[1] == 'C'))) {
547547
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
548-
if ( stmt[1] == 'C') { /* concurrent DDL */
548+
if (stmt[0] == 'M' && stmt[1] == 'C') { /* concurrent DDL should be executed by parallel workers */
549549
MtmExecute(stmt, rc - hdr_len);
550550
} else {
551-
MtmExecutor(stmt, rc - hdr_len);
551+
MtmExecutor(stmt, rc - hdr_len); /* all other messages can be processed by receiver itself */
552552
}
553553
} else {
554554
ByteBufferAppend(&buf, stmt, rc - hdr_len);
555555
if (stmt[0] == 'C') /* commit */
556556
{
557-
if (!MtmFilterTransaction(stmt, rc - hdr_len)) {
557+
if (!MtmFilterTransaction(stmt, rc - hdr_len))
558+
{
558559
if (spill_file >= 0) {
559560
ByteBufferAppend(&buf, ")", 1);
560561
pq_sendbyte(&spill_info, '(');
@@ -574,6 +575,7 @@ pglogical_receiver_main(Datum main_arg)
574575
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
575576
}
576577
} else {
578+
Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT); /* all other commits should be applied in place */
577579
MtmExecute(buf.data, buf.used);
578580
}
579581
}

src/backend/access/transam/twophase.c

+11-2
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
475475
{
476476
if (strcmp(gxact->gid, gid) == 0)
477477
{
478+
LWLockRelease(TwoPhaseStateLock);
478479
ereport(ERROR,
479480
(errcode(ERRCODE_DUPLICATE_OBJECT),
480481
errmsg("transaction identifier \"%s\" is already in use",
@@ -484,11 +485,14 @@ MarkAsPreparing(TransactionId xid, const char *gid,
484485

485486
/* Get a free gxact from the freelist */
486487
if (TwoPhaseState->freeGXacts == NULL)
488+
{
489+
LWLockRelease(TwoPhaseStateLock);
487490
ereport(ERROR,
488491
(errcode(ERRCODE_OUT_OF_MEMORY),
489492
errmsg("maximum number of prepared transactions reached"),
490493
errhint("Increase max_prepared_transactions (currently %d).",
491494
max_prepared_xacts)));
495+
}
492496
gxact = TwoPhaseState->freeGXacts;
493497
TwoPhaseState->freeGXacts = gxact->next;
494498

@@ -793,6 +797,7 @@ bool GetPreparedTransactionState(char const* gid, char* state)
793797
{
794798
int i;
795799
GlobalTransaction gxact;
800+
bool result = false;
796801

797802
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
798803
i = string_hash(gid, 0) % max_prepared_xacts;
@@ -801,11 +806,12 @@ bool GetPreparedTransactionState(char const* gid, char* state)
801806
if (strcmp(gxact->gid, gid) == 0)
802807
{
803808
strcpy(state, gxact->state_3pc);
804-
return true;
809+
result = true;
810+
break;
805811
}
806812
}
807813
LWLockRelease(TwoPhaseStateLock);
808-
return false;
814+
return result;
809815
}
810816

811817

@@ -845,6 +851,9 @@ void SetPreparedTransactionState(char const* gid, char const* state)
845851
START_CRIT_SECTION();
846852
MyPgXact->delayChkpt = true;
847853

854+
hdr->xl_origin.origin_lsn = replorigin_session_origin_lsn;
855+
hdr->xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
856+
848857
XLogBeginInsert();
849858
XLogRegisterData(buf, hdr->total_len - sizeof(pg_crc32c));
850859
XLogIncludeOrigin();

src/bin/pg_xlogdump/pg_xlogdump.c

+39-1
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
#include "access/xlogreader.h"
1919
#include "access/xlogrecord.h"
2020
#include "access/xlog_internal.h"
21+
#include "access/xact.h"
2122
#include "access/transam.h"
2223
#include "common/fe_memutils.h"
2324
#include "getopt_long.h"
2425
#include "rmgrdesc.h"
26+
#include "replication/origin.h"
2527

2628

2729
static const char *progname;
@@ -42,6 +44,7 @@ typedef struct XLogDumpConfig
4244
int stop_after_records;
4345
int already_displayed_records;
4446
bool follow;
47+
bool dump_origin;
4548
bool stats;
4649
bool stats_per_record;
4750

@@ -439,6 +442,35 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
439442
XLogRecGetXid(record),
440443
(uint32) (record->ReadRecPtr >> 32), (uint32) record->ReadRecPtr,
441444
(uint32) (xl_prev >> 32), (uint32) xl_prev);
445+
446+
if (config->dump_origin && XLogRecGetOrigin(record) != InvalidRepOriginId) {
447+
switch (info & XLOG_XACT_OPMASK) {
448+
case XLOG_XACT_COMMIT:
449+
case XLOG_XACT_COMMIT_PREPARED:
450+
{
451+
xl_xact_commit *xlrec;
452+
xl_xact_parsed_commit parsed;
453+
454+
xlrec = (xl_xact_commit *) XLogRecGetData(record);
455+
ParseCommitRecord(info, xlrec, &parsed);
456+
printf("origin_id=%d, origin_lsn=%llx, ", XLogRecGetOrigin(record), (long long)parsed.origin_lsn);
457+
break;
458+
}
459+
case XLOG_XACT_ABORT:
460+
case XLOG_XACT_ABORT_PREPARED:
461+
{
462+
xl_xact_abort *xlrec;
463+
xl_xact_parsed_abort parsed;
464+
465+
xlrec = (xl_xact_abort *) XLogRecGetData(record);
466+
ParseAbortRecord(info, xlrec, &parsed);
467+
468+
printf("origin_id=%d, origin_lsn=%llx, ", XLogRecGetOrigin(record), (long long)parsed.origin_lsn);
469+
break;
470+
}
471+
}
472+
}
473+
442474
printf("desc: %s ", id);
443475

444476
/* the desc routine will printf the description directly to stdout */
@@ -678,6 +710,7 @@ usage(void)
678710
printf(" -b, --bkp-details output detailed information about backup blocks\n");
679711
printf(" -e, --end=RECPTR stop reading at log position RECPTR\n");
680712
printf(" -f, --follow keep retrying after reaching end of WAL\n");
713+
printf(" -o, --origin dump origins\n");
681714
printf(" -n, --limit=N number of records to display\n");
682715
printf(" -p, --path=PATH directory in which to find log segment files\n");
683716
printf(" (default: ./pg_xlog)\n");
@@ -710,6 +743,7 @@ main(int argc, char **argv)
710743
{"bkp-details", no_argument, NULL, 'b'},
711744
{"end", required_argument, NULL, 'e'},
712745
{"follow", no_argument, NULL, 'f'},
746+
{"origin", no_argument, NULL, 'o'},
713747
{"help", no_argument, NULL, '?'},
714748
{"limit", required_argument, NULL, 'n'},
715749
{"path", required_argument, NULL, 'p'},
@@ -740,6 +774,7 @@ main(int argc, char **argv)
740774
config.stop_after_records = -1;
741775
config.already_displayed_records = 0;
742776
config.follow = false;
777+
config.dump_origin = false;
743778
config.filter_by_rmgr = -1;
744779
config.filter_by_xid = InvalidTransactionId;
745780
config.filter_by_xid_enabled = false;
@@ -752,7 +787,7 @@ main(int argc, char **argv)
752787
goto bad_argument;
753788
}
754789

755-
while ((option = getopt_long(argc, argv, "be:?fn:p:r:s:t:Vx:z",
790+
while ((option = getopt_long(argc, argv, "be:?f?on:p:r:s:t:Vx:z",
756791
long_options, &optindex)) != -1)
757792
{
758793
switch (option)
@@ -772,6 +807,9 @@ main(int argc, char **argv)
772807
case 'f':
773808
config.follow = true;
774809
break;
810+
case 'o':
811+
config.dump_origin = true;
812+
break;
775813
case '?':
776814
usage();
777815
exit(EXIT_SUCCESS);

0 commit comments

Comments
 (0)