Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 81de182

Browse files
committed
newer version of pg_prepared_xact_status; peek pg_precommit_prepared
1 parent d5dd44b commit 81de182

File tree

3 files changed

+153
-71
lines changed

3 files changed

+153
-71
lines changed

src/backend/access/transam/twophase.c

Lines changed: 148 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
#include <unistd.h>
7878

7979
#include "access/commit_ts.h"
80+
#include "access/clog.h"
8081
#include "access/htup_details.h"
8182
#include "access/subtrans.h"
8283
#include "access/transam.h"
@@ -2577,106 +2578,182 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
25772578
return;
25782579
}
25792580

2581+
/*
2582+
* pg_precommit_prepared
2583+
*
2584+
* Alter state of prepared transaction. This function can be used to implement
2585+
* 3PC at user level.
2586+
*/
25802587
Datum
2581-
pg_prepared_xact_status(PG_FUNCTION_ARGS)
2588+
pg_precommit_prepared(PG_FUNCTION_ARGS)
2589+
{
2590+
char const* gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
2591+
char const* state = text_to_cstring(PG_GETARG_TEXT_PP(1));
2592+
SetPreparedTransactionState(gid, state);
2593+
PG_RETURN_VOID();
2594+
}
2595+
2596+
XidStatus
2597+
GetLoggedPreparedXactState(char const *gid)
25822598
{
2583-
char const* gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
25842599
XLogRecord *record;
25852600
XLogReaderState *xlogreader;
2586-
char *errormsg;
2601+
char *errormsg;
25872602
XLogRecPtr start_lsn;
25882603
XLogRecPtr lsn;
2589-
char const* xact_status = "unknown";
2590-
bool done = false;
2604+
XidStatus xact_status = TRANSACTION_STATUS_UNKNOWN;
25912605
TimeLineID timeline;
25922606
TransactionId xid = InvalidTransactionId;
2593-
XLogRecPtr end_lsn = GetFlushRecPtr();
2607+
XLogRecPtr end_wal_lsn = GetFlushRecPtr();
2608+
XLogRecPtr end_lsn = end_wal_lsn;
25942609

25952610
GetOldestRestartPoint(&start_lsn, &timeline);
2596-
2597-
xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
2598-
if (!xlogreader)
2599-
ereport(ERROR,
2600-
(errcode(ERRCODE_OUT_OF_MEMORY),
2601-
errmsg("out of memory"),
2602-
errdetail("Failed while allocating a WAL reading processor.")));
2603-
while (true)
2611+
if (start_lsn != InvalidXLogRecPtr)
26042612
{
2605-
lsn = start_lsn;
2606-
do
2613+
xlogreader = XLogReaderAllocate(&read_local_xlog_page, NULL);
2614+
if (!xlogreader)
2615+
ereport(ERROR,
2616+
(errcode(ERRCODE_OUT_OF_MEMORY),
2617+
errmsg("out of memory"),
2618+
errdetail("Failed while allocating a WAL reading processor.")));
2619+
2620+
PG_TRY();
26072621
{
2608-
record = XLogReadRecord(xlogreader, lsn, &errormsg);
2609-
if (record == NULL)
2610-
break;
2611-
lsn = InvalidXLogRecPtr; /* continue after the record */
2612-
if (XLogRecGetRmid(xlogreader) == RM_XACT_ID)
2622+
/*
2623+
* If checkpoint interval is large enough it may be more efficient
2624+
* to start scanning from last WAL segment
2625+
*/
2626+
XLogSegNoOffsetToRecPtr(end_lsn / XLogSegSize, 0, lsn);
2627+
lsn = XLogFindNextRecord(xlogreader, lsn);
2628+
if (lsn != InvalidXLogRecPtr && lsn > start_lsn)
2629+
start_lsn = lsn;
2630+
2631+
while (start_lsn != InvalidXLogRecPtr)
26132632
{
2614-
uint32 info = XLogRecGetInfo(xlogreader);
2615-
switch (info & XLOG_XACT_OPMASK)
2633+
lsn = start_lsn;
2634+
do
26162635
{
2617-
case XLOG_XACT_PREPARE:
2636+
record = XLogReadRecord(xlogreader, lsn, &errormsg);
2637+
if (record == NULL)
2638+
break;
2639+
lsn = InvalidXLogRecPtr; /* continue after the record */
2640+
if (XLogRecGetRmid(xlogreader) == RM_XACT_ID)
26182641
{
2619-
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *)XLogRecGetData(xlogreader);
2620-
char* xact_gid = (char*)hdr + MAXALIGN(sizeof(TwoPhaseFileHeader));
2621-
if (strcmp(xact_gid, gid) == 0)
2642+
uint32 info = XLogRecGetInfo(xlogreader);
2643+
switch (info & XLOG_XACT_OPMASK)
2644+
{
2645+
case XLOG_XACT_PREPARE:
26222646
{
2623-
xid = hdr->xid;
2624-
xact_status = "prepared";
2647+
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *)XLogRecGetData(xlogreader);
2648+
char *xact_gid = (char *)hdr + MAXALIGN(sizeof(TwoPhaseFileHeader));
2649+
if (strcmp(xact_gid, gid) == 0)
2650+
{
2651+
xid = hdr->xid;
2652+
/*
2653+
* continue traversal until we find commit/rollback
2654+
* prepared or end of WAL
2655+
*/
2656+
end_lsn = end_wal_lsn;
2657+
xact_status = TRANSACTION_STATUS_IN_PROGRESS;
2658+
}
2659+
break;
26252660
}
2626-
break;
2627-
}
2628-
case XLOG_XACT_COMMIT_PREPARED:
2629-
{
2630-
xl_xact_commit *xlrec;
2631-
xl_xact_parsed_commit parsed;
2632-
2633-
xlrec = (xl_xact_commit *) XLogRecGetData(xlogreader);
2634-
ParseCommitRecord(info, xlrec, &parsed);
2635-
if (xid == parsed.twophase_xid)
2661+
case XLOG_XACT_COMMIT_PREPARED:
26362662
{
2637-
Assert(TransactionIdIsValid(xid));
2638-
xact_status = "committed";
2639-
done = true;
2663+
xl_xact_commit *xlrec;
2664+
xl_xact_parsed_commit parsed;
2665+
2666+
xlrec = (xl_xact_commit *)XLogRecGetData(xlogreader);
2667+
ParseCommitRecord(info, xlrec, &parsed);
2668+
if (xid == parsed.twophase_xid)
2669+
{
2670+
Assert(TransactionIdIsValid(xid));
2671+
xact_status = TRANSACTION_STATUS_COMMITTED;
2672+
}
2673+
break;
26402674
}
2641-
break;
2642-
}
2643-
case XLOG_XACT_ABORT_PREPARED:
2644-
{
2645-
xl_xact_abort *xlrec;
2646-
xl_xact_parsed_abort parsed;
26472675

2648-
xlrec = (xl_xact_abort *) XLogRecGetData(xlogreader);
2649-
ParseAbortRecord(info, xlrec, &parsed);
2650-
if (xid == parsed.twophase_xid)
2676+
case XLOG_XACT_ABORT_PREPARED:
26512677
{
2652-
Assert(TransactionIdIsValid(xid));
2653-
xact_status = "aborted";
2654-
done = true;
2678+
xl_xact_abort *xlrec;
2679+
xl_xact_parsed_abort parsed;
2680+
2681+
xlrec = (xl_xact_abort *)XLogRecGetData(xlogreader);
2682+
ParseAbortRecord(info, xlrec, &parsed);
2683+
if (xid == parsed.twophase_xid)
2684+
{
2685+
Assert(TransactionIdIsValid(xid));
2686+
xact_status = TRANSACTION_STATUS_ABORTED;
2687+
}
2688+
break;
2689+
}
2690+
default:
2691+
break;
26552692
}
2656-
break;
26572693
}
2658-
default:
2659-
break;
2694+
} while ((xact_status == TRANSACTION_STATUS_UNKNOWN
2695+
|| xact_status == TRANSACTION_STATUS_IN_PROGRESS)
2696+
&& xlogreader->EndRecPtr < end_lsn);
2697+
2698+
if (xact_status != TRANSACTION_STATUS_UNKNOWN)
2699+
break;
2700+
2701+
end_lsn = start_lsn;
2702+
/* Get LSN of first record in the current segment */
2703+
XLogSegNoOffsetToRecPtr(end_lsn / XLogSegSize, 0, start_lsn);
2704+
start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
2705+
/*
2706+
* If we didn't start from the beginning of segment, then restart
2707+
* scan from the beginning of segment
2708+
*/
2709+
if (start_lsn == end_lsn)
2710+
{
2711+
/* ... otherwise check if it is not the first segment */
2712+
if (end_lsn <= XLogSegSize * 2)
2713+
break;
2714+
/* ... and if not: shift to previous segment */
2715+
XLogSegNoOffsetToRecPtr(end_lsn / XLogSegSize - 1, 0, start_lsn);
2716+
/* ... and check that pending segment is actually exists */
2717+
if (start_lsn / XLogSegSize <= XLogGetLastRemovedSegno())
2718+
break;
2719+
start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
26602720
}
26612721
}
2662-
} while (!done && xlogreader->EndRecPtr < end_lsn);
2663-
2664-
if (done)
2665-
break;
2666-
2667-
end_lsn = start_lsn;
2668-
XLogSegNoOffsetToRecPtr(end_lsn/XLogSegSize, 0, start_lsn);
2669-
start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
2670-
if (start_lsn == end_lsn)
2722+
}
2723+
PG_CATCH();
26712724
{
2672-
if (end_lsn <= XLogSegSize)
2673-
break;
2674-
XLogSegNoOffsetToRecPtr(lsn/XLogSegSize-1, 0, start_lsn);
2675-
start_lsn = XLogFindNextRecord(xlogreader, start_lsn);
2725+
/* Catch access to unexisted WAL segment */
2726+
FlushErrorState();
26762727
}
2728+
PG_END_TRY();
2729+
XLogReaderFree(xlogreader);
2730+
}
2731+
2732+
return xact_status;
2733+
}
2734+
2735+
Datum
2736+
pg_prepared_xact_status(PG_FUNCTION_ARGS)
2737+
{
2738+
char const* gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
2739+
char const* xact_status;
2740+
2741+
switch (GetLoggedPreparedXactState(gid))
2742+
{
2743+
case TRANSACTION_STATUS_IN_PROGRESS:
2744+
xact_status = "prepared";
2745+
break;
2746+
case TRANSACTION_STATUS_COMMITTED:
2747+
xact_status = "committed";
2748+
break;
2749+
case TRANSACTION_STATUS_ABORTED:
2750+
xact_status = "aborted";
2751+
break;
2752+
case TRANSACTION_STATUS_UNKNOWN:
2753+
xact_status = "unknown";
2754+
break;
26772755
}
26782756

2679-
XLogReaderFree(xlogreader);
26802757
PG_RETURN_TEXT_P(cstring_to_text(xact_status));
26812758
}
26822759

src/include/access/twophase.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#ifndef TWOPHASE_H
1515
#define TWOPHASE_H
1616

17+
#include "access/clog.h"
1718
#include "access/xlogdefs.h"
1819
#include "access/xact.h"
1920
#include "datatype/timestamp.h"
@@ -71,6 +72,8 @@ extern void SetPreparedTransactionState(char const* gid, char const* state);
7172

7273
extern bool GetPreparedTransactionState(char const* gid, char* state);
7374

75+
extern XidStatus GetLoggedPreparedXactState(char const *gid);
76+
7477
extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
7578
XLogRecPtr end_lsn, RepOriginId origin_id);
7679
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);

src/include/catalog/pg_proc.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3160,6 +3160,8 @@ DESCR("view two-phase transactions");
31603160

31613161
DATA(insert OID = 6123 ( pg_prepared_xact_status PGNSP PGUID 12 1 0 0 0 f f f f t f i s 1 0 25 "25" _null_ _null_ _null_ _null_ _null_ pg_prepared_xact_status _null_ _null_ _null_ ));
31623162
DESCR("I/O");
3163+
DATA(insert OID = 4001 ( pg_precommit_prepared PGNSP PGUID 12 1 0 0 0 f f f f t f v s 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_precommit_prepared _null_ _null_ _null_ ));
3164+
DESCR("alter state of prepared transaction (used for 3pc)");
31633165

31643166
DATA(insert OID = 3819 ( pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ _null_ pg_get_multixact_members _null_ _null_ _null_ ));
31653167
DESCR("view members of a multixactid");

0 commit comments

Comments
 (0)