Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9d60e38

Browse files
committed
Check transaction body (all trasnasction elements are decoded)
1 parent 484e3d8 commit 9d60e38

File tree

8 files changed

+114
-71
lines changed

8 files changed

+114
-71
lines changed

src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars
4242
* present */
4343

4444
parsed->xact_time = xlrec->xact_time;
45+
parsed->n_changes = xlrec->n_changes;
4546

4647
if (info & XLOG_XACT_HAS_INFO)
4748
{

src/backend/access/transam/twophase.c

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ typedef struct TwoPhaseFileHeader
210210
int32 ncommitrels; /* number of delete-on-commit rels */
211211
int32 nabortrels; /* number of delete-on-abort rels */
212212
int32 ninvalmsgs; /* number of cache invalidation messages */
213+
int64 n_changes; /* number of changes done by transaction */
213214
bool initfileinval; /* does relcache init file need invalidation? */
214215
uint16 gidlen; /* length of the GID - GID follows the header */
215216
xl_xact_origin xl_origin; /* replication origin information */
@@ -330,7 +331,7 @@ TwoPhaseShmemInit(void)
330331
((char *) TwoPhaseState +
331332
MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
332333
sizeof(GlobalTransaction) * 2 * max_prepared_xacts));
333-
334+
334335
TwoPhaseState->hashTable = &TwoPhaseState->prepXacts[max_prepared_xacts];
335336

336337
for (i = 0; i < max_prepared_xacts; i++)
@@ -530,10 +531,10 @@ MarkAsPreparing(TransactionId xid, const char *gid,
530531
proc->lwWaitMode = 0;
531532
proc->waitLock = NULL;
532533
proc->waitProcLock = NULL;
533-
534+
534535
cached_xid = xid;
535536
cached_gxact = gxact;
536-
537+
537538
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
538539
SHMQueueInit(&(proc->myProcLocks[i]));
539540
/* subxid data must be filled later by GXactLoadSubxactData */
@@ -547,15 +548,15 @@ MarkAsPreparing(TransactionId xid, const char *gid,
547548
gxact->owner = owner;
548549
gxact->locking_pid = MyProcPid;
549550
gxact->valid = false;
550-
gxact->ondisk = false;
551+
gxact->ondisk = false;
551552
gxact->prep_index = TwoPhaseState->numPrepXacts;
552553
strcpy(gxact->gid, gid);
553554
*gxact->state_3pc = '\0';
554555

555556
/* And insert it into the active array */
556557
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
557558
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
558-
559+
559560
/*
560561
* Remember that we have this GlobalTransaction entry locked for us. If we
561562
* abort after this, we must release it.
@@ -655,7 +656,7 @@ LockGXact(const char *gid, Oid user)
655656
LWLockRelease(TwoPhaseStateLock);
656657

657658
if (MyLockedGxact != gxact) {
658-
if (MyLockedGxact != NULL) {
659+
if (MyLockedGxact != NULL) {
659660
SpinLockRelease(&MyLockedGxact->spinlock);
660661
}
661662
MyLockedGxact = gxact;
@@ -670,13 +671,13 @@ LockGXact(const char *gid, Oid user)
670671
errmsg("prepared transaction with identifier \"%s\" is not valid",
671672
gid)));
672673
}
673-
674+
674675
if (user != gxact->owner && !superuser_arg(user)) {
675676
ereport(ERROR,
676677
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
677678
errmsg("permission denied to finish prepared transaction"),
678679
errhint("Must be superuser or the user that prepared the transaction.")));
679-
}
680+
}
680681

681682
/*
682683
* Note: it probably would be possible to allow committing from
@@ -697,11 +698,11 @@ LockGXact(const char *gid, Oid user)
697698
gxact->locking_pid = MyProcPid;
698699

699700
return gxact;
700-
}
701+
}
701702
}
702703

703704
LWLockRelease(TwoPhaseStateLock);
704-
if (MyLockedGxact != NULL) {
705+
if (MyLockedGxact != NULL) {
705706
SpinLockRelease(&MyLockedGxact->spinlock);
706707
MyLockedGxact = NULL;
707708
}
@@ -829,13 +830,13 @@ bool GetPreparedTransactionState(char const* gid, char* state)
829830
* Alter 3PC state of prepared transaction
830831
*/
831832
void SetPreparedTransactionState(char const* gid, char const* state)
832-
{
833+
{
833834
GlobalTransaction gxact;
834835
PGXACT *pgxact;
835836
TwoPhaseFileHeader *hdr;
836837
char* buf;
837838
bool replorigin;
838-
839+
839840

840841
if (strlen(state) >= MAX_3PC_STATE_SIZE)
841842
ereport(ERROR,
@@ -846,7 +847,7 @@ void SetPreparedTransactionState(char const* gid, char const* state)
846847
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
847848
replorigin_session_origin != DoNotReplicateId);
848849

849-
gxact = LockGXact(gid, GetUserId());
850+
gxact = LockGXact(gid, GetUserId());
850851
pgxact = &ProcGlobal->allPgXact[gxact->pgprocno];
851852
strcpy(gxact->state_3pc, state);
852853

@@ -904,7 +905,7 @@ pg_precommit_prepared(PG_FUNCTION_ARGS)
904905
char const* state = PG_GETARG_CSTRING(1);
905906
SetPreparedTransactionState(gid, state);
906907
PG_RETURN_VOID();
907-
}
908+
}
908909

909910
/*
910911
* pg_prepared_xact
@@ -1152,6 +1153,7 @@ StartPrepare(GlobalTransaction gxact)
11521153
hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
11531154
hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
11541155
&hdr.initfileinval);
1156+
hdr.n_changes = MyXactLogicalChanges;
11551157
hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
11561158
*hdr.state_3pc = '\0';
11571159

@@ -1209,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact)
12091211
Assert(hdr->magic == TWOPHASE_MAGIC);
12101212
hdr->total_len = records.total_len + sizeof(pg_crc32c);
12111213

1212-
if (replorigin) {
1214+
if (replorigin) {
12131215
Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
12141216
hdr->xl_origin.origin_lsn = replorigin_session_origin_lsn;
12151217
hdr->xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
@@ -1445,7 +1447,7 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
14451447

14461448
hdr = (TwoPhaseFileHeader *) xlrec;
14471449
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1448-
1450+
14491451
parsed->origin_lsn = hdr->xl_origin.origin_lsn;
14501452
parsed->origin_timestamp = hdr->xl_origin.origin_timestamp;
14511453
parsed->xinfo = hdr->xl_origin.origin_lsn != InvalidXLogRecPtr ? XACT_XINFO_HAS_ORIGIN : 0;
@@ -1459,7 +1461,8 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
14591461
parsed->nsubxacts = hdr->nsubxacts;
14601462
parsed->nrels = hdr->ncommitrels;
14611463
parsed->nmsgs = hdr->ninvalmsgs;
1462-
1464+
parsed->n_changes = hdr->n_changes;
1465+
14631466
parsed->subxacts = (TransactionId *) bufptr;
14641467
bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
14651468

@@ -1575,10 +1578,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
15751578
int i;
15761579

15771580

1578-
if (isCommit)
1579-
{
1581+
if (isCommit)
1582+
{
15801583
CallXactCallbacks(XACT_EVENT_PRE_COMMIT_PREPARED);
1581-
}
1584+
}
15821585

15831586
/*
15841587
* Validate the GID, and lock the GXACT to ensure that two backends do not
@@ -1644,7 +1647,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
16441647
gid);
16451648
CallXactCallbacks(XACT_EVENT_ABORT_PREPARED);
16461649
}
1647-
1650+
16481651

16491652
ProcArrayRemove(proc, latestXid);
16501653

src/backend/access/transam/xact.c

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ int DefaultXactIsoLevel = XACT_READ_COMMITTED;
7474
int XactIsoLevel;
7575

7676
bool DefaultXactReadOnly = false;
77-
/*
77+
/*
7878
* We need initialization because only initialized vars appear in
7979
* postges.def and accssible from loadable extension
8080
*/
@@ -119,6 +119,10 @@ TransactionId *ParallelCurrentXids;
119119
*/
120120
bool MyXactAccessedTempRel = false;
121121

122+
/*
123+
* Number of logical changes (insert,delte,update,lgical message) performed by transaction
124+
*/
125+
int64 MyXactLogicalChanges = 0;
122126

123127
/*
124128
* transaction states - transaction state from server perspective
@@ -1916,6 +1920,8 @@ StartTransaction(void)
19161920
XactIsoLevel = DefaultXactIsoLevel;
19171921
forceSyncCommit = false;
19181922
MyXactAccessedTempRel = false;
1923+
MyXactLogicalChanges = 0;
1924+
19191925

19201926
/*
19211927
* reinitialize within-transaction counters
@@ -2181,7 +2187,7 @@ CommitTransaction(void)
21812187
* waiting for lock on a relation we've modified, we want them to know
21822188
* about the catalog change before they start using the relation).
21832189
*/
2184-
xactHasCatcacheInvalidationMessages = HasCatcacheInvalidationMessages();
2190+
xactHasCatcacheInvalidationMessages = HasCatcacheInvalidationMessages();
21852191
xactHasRelcacheInvalidationMessages = HasRelcacheInvalidationMessages();
21862192
AtEOXact_Inval(true);
21872193

@@ -2196,7 +2202,7 @@ CommitTransaction(void)
21962202
RESOURCE_RELEASE_AFTER_LOCKS,
21972203
true, true);
21982204

2199-
if (!is_autonomous_transaction)
2205+
if (!is_autonomous_transaction)
22002206
{
22012207
/*
22022208
* Likewise, dropping of files deleted during the transaction is best done
@@ -2215,13 +2221,13 @@ CommitTransaction(void)
22152221

22162222
AtCommit_Notify();
22172223
AtEOXact_GUC(true, s->gucNestLevel);
2218-
if (!is_autonomous_transaction)
2224+
if (!is_autonomous_transaction)
22192225
{
22202226
AtEOXact_SPI(true);
22212227
}
22222228
AtEOXact_on_commit_actions(true);
22232229
AtEOXact_Namespace(true, is_parallel_worker);
2224-
if (!is_autonomous_transaction)
2230+
if (!is_autonomous_transaction)
22252231
{
22262232
AtEOXact_SMgr();
22272233
AtEOXact_Files();
@@ -2689,7 +2695,7 @@ AbortTransaction(void)
26892695
AtEOXact_Buffers(false);
26902696
}
26912697
AtEOXact_RelationCache(false);
2692-
xactHasCatcacheInvalidationMessages = HasCatcacheInvalidationMessages();
2698+
xactHasCatcacheInvalidationMessages = HasCatcacheInvalidationMessages();
26932699
xactHasRelcacheInvalidationMessages = HasRelcacheInvalidationMessages();
26942700
AtEOXact_Inval(false);
26952701
AtEOXact_MultiXact();
@@ -3537,14 +3543,14 @@ void SuspendTransaction(void)
35373543
sus->TopTransactionStateData = TopTransactionStateData;
35383544

35393545
sus->SnapshotState = SuspendSnapshot(); /* only before the resource-owner stuff */
3540-
3541-
if (HasCatcacheInvalidationMessages())
3546+
3547+
if (HasCatcacheInvalidationMessages())
35423548
{
35433549
ResetCatalogCaches();
35443550
}
3545-
if (HasRelcacheInvalidationMessages())
3551+
if (HasRelcacheInvalidationMessages())
35463552
{
3547-
RelationCacheInvalidate();
3553+
RelationCacheInvalidate();
35483554
}
35493555
sus->InvalidationInfo = SuspendInvalidationInfo();
35503556
xactHasCatcacheInvalidationMessages = false;
@@ -3601,7 +3607,7 @@ void SuspendTransaction(void)
36013607

36023608
sus->PgStatState = PgStatSuspend();
36033609
sus->TriggerState = TriggerSuspend();
3604-
sus->SPIState = SuspendSPI();
3610+
sus->SPIState = SuspendSPI();
36053611
}
36063612

36073613
AtStart_Memory();
@@ -3660,13 +3666,13 @@ bool ResumeTransaction(void)
36603666

36613667
ResumeSnapshot(sus->SnapshotState); /* only after the resource-owner stuff */
36623668
ResumeInvalidationInfo(sus->InvalidationInfo);
3663-
if (xactHasCatcacheInvalidationMessages)
3669+
if (xactHasCatcacheInvalidationMessages)
36643670
{
36653671
ResetCatalogCaches();
36663672
}
3667-
if (xactHasRelcacheInvalidationMessages)
3673+
if (xactHasRelcacheInvalidationMessages)
36683674
{
3669-
RelationCacheInvalidate();
3675+
RelationCacheInvalidate();
36703676
}
36713677

36723678
MyProc->backendId = sus->vxid.backendId;
@@ -3864,7 +3870,7 @@ EndTransactionBlock(bool autonomous)
38643870
* to COMMIT.
38653871
*/
38663872
case TBLOCK_INPROGRESS:
3867-
if (autonomous && getNestLevelATX() == 0) {
3873+
if (autonomous && getNestLevelATX() == 0) {
38683874
ereport(WARNING,
38693875
(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
38703876
errmsg("there is no autonomous transaction in progress")));
@@ -3993,7 +3999,7 @@ UserAbortTransactionBlock(bool autonomous)
39933999
* exit the transaction block.
39944000
*/
39954001
case TBLOCK_INPROGRESS:
3996-
if (autonomous && getNestLevelATX() == 0) {
4002+
if (autonomous && getNestLevelATX() == 0) {
39974003
ereport(WARNING,
39984004
(errcode(ERRCODE_NO_ACTIVE_SQL_TRANSACTION),
39994005
errmsg("there is no autonomous transaction in progress")));
@@ -5173,7 +5179,7 @@ EstimateTransactionStateSpace(void)
51735179
nxids = add_size(nxids, s->nChildXids);
51745180
}
51755181

5176-
nxids = add_size(nxids, nParallelCurrentXids);
5182+
nxids = add_size(nxids, nParallelCurrentXids);
51775183
nxids = mul_size(nxids, sizeof(TransactionId));
51785184
return add_size(nxids, TM->GetTransactionStateSize());
51795185
}
@@ -5489,6 +5495,7 @@ XactLogCommitRecord(TimestampTz commit_time,
54895495
/* First figure out and collect all the information needed */
54905496

54915497
xlrec.xact_time = commit_time;
5498+
xlrec.n_changes = MyXactLogicalChanges;
54925499

54935500
if (relcacheInval)
54945501
xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
@@ -5890,7 +5897,7 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
58905897
* because subtransaction commit is never WAL logged.
58915898
*/
58925899
static void
5893-
xact_redo_abort(xl_xact_parsed_abort *parsed,
5900+
xact_redo_abort(xl_xact_parsed_abort *parsed,
58945901
TransactionId xid,
58955902
XLogRecPtr lsn,
58965903
RepOriginId origin_id)
@@ -6036,9 +6043,9 @@ xact_redo(XLogReaderState *record)
60366043
RecreateTwoPhaseFile(XLogRecGetXid(record),
60376044
XLogRecGetData(record), XLogRecGetDataLen(record));
60386045

6039-
if (originId != InvalidRepOriginId && originId != DoNotReplicateId)
6046+
if (originId != InvalidRepOriginId && originId != DoNotReplicateId)
60406047
{
6041-
xl_xact_parsed_prepare parsed;
6048+
xl_xact_parsed_prepare parsed;
60426049
ParsePrepareRecord(XLogRecGetXid(record), XLogRecGetData(record), &parsed);
60436050
Assert(parsed.origin_lsn != InvalidXLogRecPtr);
60446051
/* recover apply progress */
@@ -6059,17 +6066,17 @@ xact_redo(XLogReaderState *record)
60596066
}
60606067

60616068
Datum pg_current_tx_nest_level(PG_FUNCTION_ARGS)
6062-
{
6069+
{
60636070
PG_RETURN_INT64(GetCurrentTransactionNestLevel());
60646071
}
60656072

60666073
Datum pg_current_atx_nest_level(PG_FUNCTION_ARGS)
6067-
{
6074+
{
60686075
PG_RETURN_INT64(getNestLevelATX());
60696076
}
60706077

60716078
Datum pg_current_atx_has_ancestor(PG_FUNCTION_ARGS)
6072-
{
6079+
{
60736080
int64 xid = PG_GETARG_INT64(0);
60746081
PG_RETURN_BOOL(TransactionIdIsAncestorOfCurrentATX(xid));
60756082
}

src/backend/access/transam/xloginsert.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,12 @@ XLogInsert(RmgrId rmid, uint8 info)
423423

424424
TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
425425

426+
if (rmid == RM_LOGICALMSG_ID || rmid == RM_HEAP_ID)
427+
{
428+
MyXactLogicalChanges += 1;
429+
}
430+
431+
426432
/*
427433
* In bootstrap mode, we don't actually log anything but XLOG resources;
428434
* return a phony record pointer.

0 commit comments

Comments
 (0)