}
}
-static void
-xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *parsed)
{
- xl_xact_parsed_commit parsed;
- int i;
+ char *bufptr;
- ParseCommitRecord(info, xlrec, &parsed);
+ bufptr = ((char *) xlrec) + MAXALIGN(sizeof(xl_xact_prepare));
- /* If this is a prepared xact, show the xid of the original xact */
- if (TransactionIdIsValid(parsed.twophase_xid))
- appendStringInfo(buf, "%u: ", parsed.twophase_xid);
+ memset(parsed, 0, sizeof(*parsed));
- appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+ parsed->xact_time = xlrec->prepared_at;
+ parsed->origin_lsn = xlrec->origin_lsn;
+ parsed->origin_timestamp = xlrec->origin_timestamp;
+ parsed->twophase_xid = xlrec->xid;
+ parsed->dbId = xlrec->database;
+ parsed->nsubxacts = xlrec->nsubxacts;
+ parsed->nrels = xlrec->ncommitrels;
+ parsed->nabortrels = xlrec->nabortrels;
+ parsed->nmsgs = xlrec->ninvalmsgs;
+
+ strncpy(parsed->twophase_gid, bufptr, xlrec->gidlen);
+ bufptr += MAXALIGN(xlrec->gidlen);
+
+ parsed->subxacts = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(xlrec->nsubxacts * sizeof(TransactionId));
+
+ parsed->xnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(xlrec->ncommitrels * sizeof(RelFileNode));
+
+ parsed->abortnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(xlrec->nabortrels * sizeof(RelFileNode));
- if (parsed.nrels > 0)
+ parsed->msgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(xlrec->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
+static void
+xact_desc_relations(StringInfo buf, char *label, int nrels,
+ RelFileNode *xnodes)
+{
+ int i;
+
+ if (nrels > 0)
{
- appendStringInfoString(buf, "; rels:");
- for (i = 0; i < parsed.nrels; i++)
+ appendStringInfo(buf, "; %s:", label);
+ for (i = 0; i < nrels; i++)
{
- char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
+ char *path = relpathperm(xnodes[i], MAIN_FORKNUM);
appendStringInfo(buf, " %s", path);
pfree(path);
}
}
- if (parsed.nsubxacts > 0)
+}
+
+static void
+xact_desc_subxacts(StringInfo buf, int nsubxacts, TransactionId *subxacts)
+{
+ int i;
+
+ if (nsubxacts > 0)
{
appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < parsed.nsubxacts; i++)
- appendStringInfo(buf, " %u", parsed.subxacts[i]);
- }
- if (parsed.nmsgs > 0)
- {
- standby_desc_invalidations(
- buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
- XactCompletionRelcacheInitFileInval(parsed.xinfo));
+ for (i = 0; i < nsubxacts; i++)
+ appendStringInfo(buf, " %u", subxacts[i]);
}
+}
+
+static void
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
+{
+ xl_xact_parsed_commit parsed;
+
+ ParseCommitRecord(info, xlrec, &parsed);
+
+ /* If this is a prepared xact, show the xid of the original xact */
+ if (TransactionIdIsValid(parsed.twophase_xid))
+ appendStringInfo(buf, "%u: ", parsed.twophase_xid);
+
+ appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+
+ xact_desc_relations(buf, "rels", parsed.nrels, parsed.xnodes);
+ xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
+
+ standby_desc_invalidations(
+ buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
+ XactCompletionRelcacheInitFileInval(parsed.xinfo));
if (XactCompletionForceSyncCommit(parsed.xinfo))
appendStringInfoString(buf, "; sync");
xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
{
xl_xact_parsed_abort parsed;
- int i;
ParseAbortRecord(info, xlrec, &parsed);
appendStringInfo(buf, "%u: ", parsed.twophase_xid);
appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
- if (parsed.nrels > 0)
- {
- appendStringInfoString(buf, "; rels:");
- for (i = 0; i < parsed.nrels; i++)
- {
- char *path = relpathperm(parsed.xnodes[i], MAIN_FORKNUM);
- appendStringInfo(buf, " %s", path);
- pfree(path);
- }
- }
+ xact_desc_relations(buf, "rels", parsed.nrels, parsed.xnodes);
+ xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
+}
- if (parsed.nsubxacts > 0)
- {
- appendStringInfoString(buf, "; subxacts:");
- for (i = 0; i < parsed.nsubxacts; i++)
- appendStringInfo(buf, " %u", parsed.subxacts[i]);
- }
+static void
+xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec)
+{
+ xl_xact_parsed_prepare parsed;
+
+ ParsePrepareRecord(info, xlrec, &parsed);
+
+ appendStringInfo(buf, "gid %s: ", parsed.twophase_gid);
+ appendStringInfoString(buf, timestamptz_to_str(parsed.xact_time));
+
+ xact_desc_relations(buf, "rels(commit)", parsed.nrels, parsed.xnodes);
+ xact_desc_relations(buf, "rels(abort)", parsed.nabortrels,
+ parsed.abortnodes);
+ xact_desc_subxacts(buf, parsed.nsubxacts, parsed.subxacts);
+
+ standby_desc_invalidations(
+ buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
+ xlrec->initfileinval);
}
static void
xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
}
+ else if (info == XLOG_XACT_PREPARE)
+ {
+ xl_xact_prepare *xlrec = (xl_xact_prepare *) rec;
+
+ xact_desc_prepare(buf, XLogRecGetInfo(record), xlrec);
+ }
else if (info == XLOG_XACT_ASSIGNMENT)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) rec;
*/
#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
-typedef struct TwoPhaseFileHeader
-{
- uint32 magic; /* format identifier */
- uint32 total_len; /* actual file length */
- TransactionId xid; /* original transaction XID */
- Oid database; /* OID of database it was in */
- TimestampTz prepared_at; /* time of preparation */
- Oid owner; /* user running the transaction */
- int32 nsubxacts; /* number of following subxact XIDs */
- int32 ncommitrels; /* number of delete-on-commit rels */
- int32 nabortrels; /* number of delete-on-abort rels */
- int32 ninvalmsgs; /* number of cache invalidation messages */
- bool initfileinval; /* does relcache init file need invalidation? */
- uint16 gidlen; /* length of the GID - GID follows the header */
- XLogRecPtr origin_lsn; /* lsn of this record at origin node */
- TimestampTz origin_timestamp; /* time of prepare at origin node */
-} TwoPhaseFileHeader;
+typedef xl_xact_prepare TwoPhaseFileHeader;
/*
* Header for each record in a state file
return buf;
}
-/*
- * ParsePrepareRecord
- */
-void
-ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
-{
- TwoPhaseFileHeader *hdr;
- char *bufptr;
-
- hdr = (TwoPhaseFileHeader *) xlrec;
- bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
-
- parsed->origin_lsn = hdr->origin_lsn;
- parsed->origin_timestamp = hdr->origin_timestamp;
- parsed->twophase_xid = hdr->xid;
- parsed->dbId = hdr->database;
- parsed->nsubxacts = hdr->nsubxacts;
- parsed->nrels = hdr->ncommitrels;
- parsed->nabortrels = hdr->nabortrels;
- parsed->nmsgs = hdr->ninvalmsgs;
-
- strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
- bufptr += MAXALIGN(hdr->gidlen);
-
- parsed->subxacts = (TransactionId *) bufptr;
- bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
-
- parsed->xnodes = (RelFileNode *) bufptr;
- bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
-
- parsed->abortnodes = (RelFileNode *) bufptr;
- bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
-
- parsed->msgs = (SharedInvalidationMessage *) bufptr;
- bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
-}
-
-
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
} xl_xact_abort;
#define MinSizeOfXactAbort sizeof(xl_xact_abort)
+typedef struct xl_xact_prepare
+{
+ uint32 magic; /* format identifier */
+ uint32 total_len; /* actual file length */
+ TransactionId xid; /* original transaction XID */
+ Oid database; /* OID of database it was in */
+ TimestampTz prepared_at; /* time of preparation */
+ Oid owner; /* user running the transaction */
+ int32 nsubxacts; /* number of following subxact XIDs */
+ int32 ncommitrels; /* number of delete-on-commit rels */
+ int32 nabortrels; /* number of delete-on-abort rels */
+ int32 ninvalmsgs; /* number of cache invalidation messages */
+ bool initfileinval; /* does relcache init file need invalidation? */
+ uint16 gidlen; /* length of the GID - GID follows the header */
+ XLogRecPtr origin_lsn; /* lsn of this record at origin node */
+ TimestampTz origin_timestamp; /* time of prepare at origin node */
+} xl_xact_prepare;
+
/*
* Commit/Abort records in the above form are a bit verbose to parse, so
* there's a deconstructed versions generated by ParseCommit/AbortRecord() for
/* also in xactdesc.c, so they can be shared between front/backend code */
extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
+extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parsed_prepare *parsed);
extern void EnterParallelMode(void);
extern void ExitParallelMode(void);