Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7fea40d

Browse files
committed
Rewrite storing origin in prepared record
1 parent 7c8d160 commit 7fea40d

File tree

3 files changed

+24
-59
lines changed

3 files changed

+24
-59
lines changed

contrib/mmts/multimaster.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11651165
: ts->status == TRANSACTION_STATUS_COMMITTED ? "committed" : "not prepared",
11661166
ts->xid, ts->gid);
11671167
}
1168-
//Assert(ts->xid != 10000);
1168+
Assert(ts->xid != 10000);
11691169
if (x->csn > ts->csn || Mtm->status == MTM_RECOVERY) {
11701170
Assert(x->csn != INVALID_CSN);
11711171
ts->csn = x->csn;

src/backend/access/transam/twophase.c

+15-50
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ typedef struct TwoPhaseFileHeader
871871
int32 ninvalmsgs; /* number of cache invalidation messages */
872872
bool initfileinval; /* does relcache init file need invalidation? */
873873
uint16 gidlen; /* length of the GID - GID follows the header */
874+
xl_xact_origin xl_origin; /* replication origin information */
874875
} TwoPhaseFileHeader;
875876

876877
/*
@@ -1022,12 +1023,10 @@ EndPrepare(GlobalTransaction gxact)
10221023
{
10231024
TwoPhaseFileHeader *hdr;
10241025
StateFileChunk *record;
1025-
xl_xact_origin xl_origin;
1026-
xl_xact_xinfo xl_xinfo;
10271026
uint8 info = XLOG_XACT_PREPARE;
1028-
bool replorigin;
1027+
bool replorigin;
10291028

1030-
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1029+
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
10311030
replorigin_session_origin != DoNotReplicateId);
10321031

10331032
/* Add the end sentinel to the list of 2PC records */
@@ -1039,6 +1038,15 @@ EndPrepare(GlobalTransaction gxact)
10391038
Assert(hdr->magic == TWOPHASE_MAGIC);
10401039
hdr->total_len = records.total_len + sizeof(pg_crc32c);
10411040

1041+
if (replorigin) {
1042+
Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1043+
hdr->xl_origin.origin_lsn = replorigin_session_origin_lsn;
1044+
hdr->xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
1045+
} else {
1046+
hdr->xl_origin.origin_lsn = InvalidXLogRecPtr;
1047+
hdr->xl_origin.origin_timestamp = 0;
1048+
}
1049+
10421050
/*
10431051
* If the data size exceeds MaxAllocSize, we won't be able to read it in
10441052
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1065,31 +1073,12 @@ EndPrepare(GlobalTransaction gxact)
10651073
XLogEnsureRecordSpace(0, records.num_chunks);
10661074

10671075
START_CRIT_SECTION();
1068-
xl_xinfo.xinfo = 0;
1069-
/* dump transaction origin information */
1070-
if (replorigin)
1071-
{
1072-
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
1073-
Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
1074-
xl_origin.origin_lsn = replorigin_session_origin_lsn;
1075-
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
1076-
}
1077-
1078-
if (xl_xinfo.xinfo != 0)
1079-
info |= XLOG_XACT_HAS_INFO;
1080-
10811076
MyPgXact->delayChkpt = true;
10821077

10831078
XLogBeginInsert();
10841079
for (record = records.head; record != NULL; record = record->next)
10851080
XLogRegisterData(record->data, record->len);
10861081

1087-
if (xl_xinfo.xinfo != 0)
1088-
XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
1089-
1090-
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
1091-
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
1092-
10931082
XLogIncludeOrigin();
10941083

10951084
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, info);
@@ -1285,6 +1274,9 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12851274
hdr = (TwoPhaseFileHeader *) xlrec;
12861275
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
12871276

1277+
parsed->origin_lsn = hdr->xl_origin.origin_lsn;
1278+
parsed->origin_timestamp = hdr->xl_origin.origin_timestamp;
1279+
12881280
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
12891281
bufptr += MAXALIGN(hdr->gidlen);
12901282

@@ -1305,33 +1297,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
13051297

13061298
parsed->msgs = (SharedInvalidationMessage *) bufptr;
13071299
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1308-
1309-
/*
1310-
* Parse xinfo now.
1311-
*/
1312-
1313-
parsed->xinfo = 0;
1314-
bufptr = xlrec + hdr->total_len - sizeof(pg_crc32c);
1315-
1316-
if (info & XLOG_XACT_HAS_INFO)
1317-
{
1318-
xl_xact_xinfo *xl_xinfo = (xl_xact_xinfo *) bufptr;
1319-
parsed->xinfo = xl_xinfo->xinfo;
1320-
bufptr += sizeof(xl_xact_xinfo);
1321-
}
1322-
1323-
1324-
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
1325-
{
1326-
xl_xact_origin xl_origin;
1327-
/* we're only guaranteed 4 byte alignment, so copy onto stack */
1328-
memcpy(&xl_origin, bufptr, sizeof(xl_origin));
1329-
parsed->origin_lsn = xl_origin.origin_lsn;
1330-
Assert(parsed->origin_lsn != 0);
1331-
parsed->origin_timestamp = xl_origin.origin_timestamp;
1332-
bufptr += sizeof(xl_xact_origin);
1333-
}
1334-
13351300
}
13361301

13371302

src/backend/access/transam/xact.c

+8-8
Original file line numberDiff line numberDiff line change
@@ -5657,20 +5657,20 @@ xact_redo(XLogReaderState *record)
56575657
}
56585658
else if (info == XLOG_XACT_PREPARE)
56595659
{
5660-
xl_xact_parsed_prepare parsed;
5661-
5662-
ParsePrepareRecord(XLogRecGetXid(record), XLogRecGetData(record), &parsed);
5660+
RepOriginId originId = XLogRecGetOrigin(record);
56635661

56645662
/* the record contents are exactly the 2PC file */
56655663
RecreateTwoPhaseFile(XLogRecGetXid(record),
5666-
XLogRecGetData(record), XLogRecGetDataLen(record));
5664+
XLogRecGetData(record), XLogRecGetDataLen(record));
56675665

5668-
if (parsed.xinfo & XACT_XINFO_HAS_ORIGIN)
5666+
if (originId != InvalidRepOriginId && originId != DoNotReplicateId)
56695667
{
5670-
Assert(XLogRecGetOrigin(record) != InvalidRepOriginId);
5668+
xl_xact_parsed_prepare parsed;
5669+
ParsePrepareRecord(XLogRecGetXid(record), XLogRecGetData(record), &parsed);
5670+
Assert(parsed.origin_lsn != InvalidXLogRecPtr);
56715671
/* recover apply progress */
5672-
replorigin_advance(XLogRecGetOrigin(record), parsed.origin_lsn,
5673-
record->EndRecPtr, false /* backward */ , false /* WAL */ );
5672+
replorigin_advance(originId, parsed.origin_lsn,
5673+
record->EndRecPtr, false /* backward */ , false /* WAL */ );
56745674
}
56755675
}
56765676
else if (info == XLOG_XACT_ASSIGNMENT)

0 commit comments

Comments
 (0)