Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 509e809

Browse files
committed
port 3pc stuff
1 parent 0e19f12 commit 509e809

File tree

9 files changed

+210
-13
lines changed

9 files changed

+210
-13
lines changed

src/backend/access/transam/twophase.c

Lines changed: 157 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ typedef struct GlobalTransactionData
171171
bool ondisk; /* TRUE if prepare state file is on disk */
172172
bool inredo; /* TRUE if entry was added via xlog_redo */
173173
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
174+
char state_3pc[MAX_3PC_STATE_SIZE]; /* 3PC transaction state */
174175
} GlobalTransactionData;
175176

176177
/*
@@ -422,6 +423,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
422423
TwoPhaseState->freeGXacts = gxact->next;
423424

424425
MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
426+
*gxact->state_3pc = '\0';
425427

426428
gxact->ondisk = false;
427429

@@ -735,7 +737,7 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
735737

736738
/* build tupdesc for result tuples */
737739
/* this had better match pg_prepared_xacts view in system_views.sql */
738-
tupdesc = CreateTemplateTupleDesc(5, false);
740+
tupdesc = CreateTemplateTupleDesc(6, false);
739741
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
740742
XIDOID, -1, 0);
741743
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
@@ -746,6 +748,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
746748
OIDOID, -1, 0);
747749
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
748750
OIDOID, -1, 0);
751+
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "state3pc",
752+
TEXTOID, -1, 0);
749753

750754
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
751755

@@ -770,8 +774,8 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
770774
GlobalTransaction gxact = &status->array[status->currIdx++];
771775
PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno];
772776
PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
773-
Datum values[5];
774-
bool nulls[5];
777+
Datum values[6];
778+
bool nulls[6];
775779
HeapTuple tuple;
776780
Datum result;
777781

@@ -789,6 +793,7 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
789793
values[2] = TimestampTzGetDatum(gxact->prepared_at);
790794
values[3] = ObjectIdGetDatum(gxact->owner);
791795
values[4] = ObjectIdGetDatum(proc->databaseId);
796+
values[5] = CStringGetTextDatum(gxact->state_3pc);
792797

793798
tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
794799
result = HeapTupleGetDatum(tuple);
@@ -916,6 +921,7 @@ typedef struct TwoPhaseFileHeader
916921
uint16 gidlen; /* length of the GID - GID follows the header */
917922
XLogRecPtr origin_lsn; /* lsn of this record at origin node */
918923
TimestampTz origin_timestamp; /* time of prepare at origin node */
924+
char state_3pc[MAX_3PC_STATE_SIZE]; /* 3PC state */
919925
} TwoPhaseFileHeader;
920926

921927
/*
@@ -1026,7 +1032,8 @@ StartPrepare(GlobalTransaction gxact)
10261032
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
10271033
hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
10281034
&hdr.initfileinval);
1029-
hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1035+
hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1036+
*hdr.state_3pc = '\0';
10301037

10311038
save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
10321039
save_state_data(gxact->gid, hdr.gidlen);
@@ -1330,6 +1337,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
13301337
parsed->ncommitrels = hdr->ncommitrels;
13311338
parsed->nabortrels = hdr->nabortrels;
13321339
parsed->nmsgs = hdr->ninvalmsgs;
1340+
strncpy(parsed->state_3pc, hdr->state_3pc, MAX_3PC_STATE_SIZE);
13331341

13341342
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
13351343
bufptr += MAXALIGN(hdr->gidlen);
@@ -1657,11 +1665,27 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
16571665
pg_crc32c statefile_crc;
16581666
int fd;
16591667

1668+
/* Crutch to fix crc and len check on 2pc file reading */
1669+
Assert( ((TwoPhaseFileHeader *) content)->total_len - sizeof(pg_crc32c) <= len);
1670+
len = ((TwoPhaseFileHeader *) content)->total_len - sizeof(pg_crc32c);
1671+
16601672
/* Recompute CRC */
16611673
INIT_CRC32C(statefile_crc);
16621674
COMP_CRC32C(statefile_crc, content, len);
16631675
FIN_CRC32C(statefile_crc);
16641676

1677+
/*
1678+
* 3PC hacky support. Xid for xlog record is set during xlog insert
1679+
* via GetCurrentTransactionIdIfAny() call. However this tx isn't already
1680+
* active so allow it to be zero in xlog, but override here during recovery
1681+
* so the file name will be valid xid.
1682+
*/
1683+
if (!TransactionIdIsValid(xid))
1684+
{
1685+
Assert( *(((TwoPhaseFileHeader *) content)->state_3pc) != '\0');
1686+
xid = ((TwoPhaseFileHeader *) content)->xid;
1687+
}
1688+
16651689
TwoPhaseFilePath(path, xid);
16661690

16671691
fd = OpenTransientFile(path,
@@ -2388,6 +2412,34 @@ RecordTransactionAbortPrepared(TransactionId xid,
23882412
SyncRepWaitForLSN(recptr, false);
23892413
}
23902414

2415+
2416+
int
2417+
GetPreparedTransactions(PreparedTransaction* pxacts)
2418+
{
2419+
int num;
2420+
int i;
2421+
2422+
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2423+
2424+
num = TwoPhaseState->numPrepXacts;
2425+
if (num == 0)
2426+
{
2427+
LWLockRelease(TwoPhaseStateLock);
2428+
*pxacts = NULL;
2429+
return 0;
2430+
}
2431+
2432+
*pxacts = (PreparedTransaction)palloc(sizeof(PreparedTransactionData) * num);
2433+
for (i = 0; i < num; i++) {
2434+
(*pxacts)[i].owner = TwoPhaseState->prepXacts[i]->owner;
2435+
strcpy((*pxacts)[i].gid, TwoPhaseState->prepXacts[i]->gid);
2436+
strcpy((*pxacts)[i].state_3pc, TwoPhaseState->prepXacts[i]->state_3pc);
2437+
}
2438+
LWLockRelease(TwoPhaseStateLock);
2439+
2440+
return num;
2441+
}
2442+
23912443
/*
23922444
* PrepareRedoAdd
23932445
*
@@ -2404,6 +2456,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
24042456
char *bufptr;
24052457
const char *gid;
24062458
GlobalTransaction gxact;
2459+
int i;
24072460

24082461
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
24092462
Assert(RecoveryInProgress());
@@ -2422,6 +2475,24 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
24222475
* that it got added in the redo phase
24232476
*/
24242477

2478+
/*
2479+
* MTM-SPECIFIC.
2480+
* In our case 3PC is done via logging prepare record twice with different state_3pc.
2481+
* So try to find state with same xid and change the state. Other option is to use
2482+
* twice bigger numPrepXacts, but then we anyway need iterate whole array upon
2483+
* deletion to wipe all mentions of this tx.
2484+
*/
2485+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2486+
{
2487+
gxact = TwoPhaseState->prepXacts[i];
2488+
if (gxact->xid == hdr->xid)
2489+
{
2490+
Assert(gxact->inredo);
2491+
strncpy(gxact->state_3pc, hdr->state_3pc, MAX_3PC_STATE_SIZE);
2492+
return;
2493+
}
2494+
}
2495+
24252496
/* Get a free gxact from the freelist */
24262497
if (TwoPhaseState->freeGXacts == NULL)
24272498
ereport(ERROR,
@@ -2442,6 +2513,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
24422513
gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
24432514
gxact->inredo = true; /* yes, added in redo */
24442515
strcpy(gxact->gid, gid);
2516+
strncpy(gxact->state_3pc, hdr->state_3pc, MAX_3PC_STATE_SIZE);
24452517

24462518
/* And insert it into the active array */
24472519
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
@@ -2607,3 +2679,84 @@ pg_prepared_xact_status(PG_FUNCTION_ARGS)
26072679
XLogReaderFree(xlogreader);
26082680
PG_RETURN_TEXT_P(cstring_to_text(xact_status));
26092681
}
2682+
2683+
bool GetPreparedTransactionState(char const* gid, char* state)
2684+
{
2685+
int i;
2686+
GlobalTransaction gxact;
2687+
bool result = false;
2688+
2689+
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2690+
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2691+
{
2692+
gxact = TwoPhaseState->prepXacts[i];
2693+
if (strcmp(gxact->gid, gid) == 0)
2694+
{
2695+
strcpy(state, gxact->state_3pc);
2696+
result = true;
2697+
break;
2698+
}
2699+
}
2700+
LWLockRelease(TwoPhaseStateLock);
2701+
return result;
2702+
}
2703+
2704+
2705+
/*
2706+
* SetPrepareTransactionState
2707+
* Alter 3PC state of prepared transaction
2708+
*/
2709+
void SetPreparedTransactionState(char const* gid, char const* state)
2710+
{
2711+
GlobalTransaction gxact;
2712+
PGXACT *pgxact;
2713+
TwoPhaseFileHeader *hdr;
2714+
char* buf;
2715+
bool replorigin;
2716+
2717+
if (strlen(state) >= MAX_3PC_STATE_SIZE)
2718+
ereport(ERROR,
2719+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2720+
errmsg("transaction state \"%s\" is too long",
2721+
state)));
2722+
2723+
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2724+
replorigin_session_origin != DoNotReplicateId);
2725+
2726+
gxact = LockGXact(gid, GetUserId(), false);
2727+
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
2728+
strcpy(gxact->state_3pc, state);
2729+
2730+
if (gxact->ondisk)
2731+
buf = ReadTwoPhaseFile(pgxact->xid, true);
2732+
else
2733+
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2734+
hdr = (TwoPhaseFileHeader *)buf;
2735+
strcpy(hdr->state_3pc, state);
2736+
2737+
START_CRIT_SECTION();
2738+
MyPgXact->delayChkpt = true;
2739+
2740+
hdr->origin_lsn = replorigin_session_origin_lsn;
2741+
hdr->origin_timestamp = replorigin_session_origin_timestamp;
2742+
2743+
XLogBeginInsert();
2744+
XLogRegisterData(buf, hdr->total_len - sizeof(pg_crc32c));
2745+
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
2746+
2747+
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
2748+
2749+
if (replorigin)
2750+
/* Move LSNs forward for this replication origin */
2751+
replorigin_session_advance(replorigin_session_origin_lsn,
2752+
gxact->prepare_end_lsn);
2753+
2754+
XLogFlush(gxact->prepare_end_lsn);
2755+
2756+
gxact->prepare_start_lsn = ProcLastRecPtr;
2757+
MyPgXact->delayChkpt = false;
2758+
2759+
END_CRIT_SECTION();
2760+
2761+
PostPrepare_Twophase();
2762+
}

src/backend/catalog/system_views.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ CREATE VIEW pg_available_extension_versions AS
283283

284284
CREATE VIEW pg_prepared_xacts AS
285285
SELECT P.transaction, P.gid, P.prepared,
286-
U.rolname AS owner, D.datname AS database
286+
U.rolname AS owner, D.datname AS database, P.state3pc AS state3pc
287287
FROM pg_prepared_xact() AS P
288288
LEFT JOIN pg_authid U ON P.ownerid = U.oid
289289
LEFT JOIN pg_database D ON P.dbid = D.oid;

src/backend/replication/logical/decode.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
657657
{
658658
XLogRecPtr origin_lsn = parsed->origin_lsn;
659659
TimestampTz commit_time = parsed->origin_timestamp;
660-
XLogRecPtr origin_id = XLogRecGetOrigin(buf->record);
660+
RepOriginId origin_id = XLogRecGetOrigin(buf->record);
661661
int i;
662662
TransactionId xid = parsed->twophase_xid;
663663

@@ -666,7 +666,7 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
666666
* transaction's contents, since the various caches need to always be
667667
* consistent.
668668
*/
669-
if (parsed->nmsgs > 0)
669+
if (parsed->nmsgs > 0 && *parsed->state_3pc == '\0')
670670
{
671671
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
672672
parsed->nmsgs, parsed->msgs);
@@ -686,6 +686,23 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
686686
return;
687687
}
688688

689+
/* Shorthack for PRECOMMIT */
690+
if (*parsed->state_3pc != '\0')
691+
{
692+
ReorderBufferTXN txn;
693+
txn.xid = xid;
694+
txn.first_lsn = buf->origptr;
695+
txn.final_lsn = buf->origptr;
696+
txn.end_lsn = buf->endptr;
697+
txn.commit_time = commit_time;
698+
txn.origin_id = origin_id;
699+
txn.origin_lsn = origin_lsn;
700+
strcpy(txn.gid, parsed->twophase_gid);
701+
strcpy(txn.state_3pc, parsed->state_3pc);
702+
ctx->reorder->prepare(ctx->reorder, &txn, buf->origptr);
703+
return;
704+
}
705+
689706
/* tell the reorderbuffer about the surviving subtransactions */
690707
for (i = 0; i < parsed->nsubxacts; i++)
691708
{
@@ -695,7 +712,8 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
695712

696713
/* replay actions of all transaction + subtransactions in order */
697714
ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
698-
commit_time, origin_id, origin_lsn, parsed->twophase_gid);
715+
commit_time, origin_id, origin_lsn,
716+
parsed->twophase_gid, parsed->state_3pc);
699717
}
700718

701719
/*

src/backend/replication/logical/reorderbuffer.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1768,7 +1768,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
17681768
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
17691769
TimestampTz commit_time,
17701770
RepOriginId origin_id, XLogRecPtr origin_lsn,
1771-
char *gid)
1771+
char *gid, char *state_3pc)
17721772
{
17731773
ReorderBufferTXN *txn;
17741774

@@ -1781,6 +1781,7 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
17811781

17821782
txn->txn_flags |= TXN_PREPARE;
17831783
strcpy(txn->gid, gid);
1784+
strcpy(txn->state_3pc, state_3pc);
17841785

17851786
ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
17861787
commit_time, origin_id, origin_lsn);

src/include/access/twophase.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@
1919
#include "datatype/timestamp.h"
2020
#include "storage/lock.h"
2121

22+
typedef struct PreparedTransactionData
23+
{
24+
Oid owner;
25+
char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
26+
char state_3pc[MAX_3PC_STATE_SIZE]; /* 3PC transaction state */
27+
} PreparedTransactionData, *PreparedTransaction;
28+
29+
2230
/*
2331
* GlobalTransactionData is defined in twophase.c; other places have no
2432
* business knowing the internal definition.
@@ -57,6 +65,12 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
5765
extern void FinishPreparedTransaction(const char *gid, bool isCommit,
5866
bool missing_ok);
5967

68+
extern int GetPreparedTransactions(PreparedTransaction* pxacts);
69+
70+
extern void SetPreparedTransactionState(char const* gid, char const* state);
71+
72+
extern bool GetPreparedTransactionState(char const* gid, char* state);
73+
6074
extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
6175
XLogRecPtr end_lsn, RepOriginId origin_id);
6276
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);

src/include/access/xact.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@
2929
*/
3030
#define GIDSIZE 200
3131

32+
/*
33+
* Maximal size of 3PC transaction state
34+
*/
35+
#define MAX_3PC_STATE_SIZE 16
36+
3237
/*
3338
* Xact isolation levels
3439
*/
@@ -342,6 +347,7 @@ typedef struct xl_xact_parsed_prepare
342347

343348
TransactionId twophase_xid;
344349
char twophase_gid[GIDSIZE];
350+
char state_3pc[MAX_3PC_STATE_SIZE];
345351

346352
XLogRecPtr origin_lsn;
347353
TimestampTz origin_timestamp;

0 commit comments

Comments
 (0)