Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit ceaf5e0

Browse files
committed
Add visibility functions for global temp table tuples
1 parent f47723b commit ceaf5e0

File tree

7 files changed

+115
-21
lines changed

7 files changed

+115
-21
lines changed

contrib/pgrowlocks/pgrowlocks.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,8 @@ pgrowlocks(PG_FUNCTION_ARGS)
158158
/* must hold a buffer lock to call HeapTupleSatisfiesUpdate */
159159
LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
160160

161-
htsu = HeapTupleSatisfiesUpdate(tuple,
161+
htsu = HeapTupleSatisfiesUpdate(mydata->rel,
162+
tuple,
162163
GetCurrentCommandId(false),
163164
hscan->rs_cbuf);
164165
xmax = HeapTupleHeaderGetRawXmax(tuple->t_data);

src/backend/access/heap/heapam.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1855,7 +1855,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
18551855
static TransactionId
18561856
GetTransactionId(Relation relation)
18571857
{
1858-
TransactionId xid = relation->rd_rel->relpersistence == RELPERSISTENCE_SESSION
1858+
return relation->rd_rel->relpersistence == RELPERSISTENCE_SESSION && RecoveryInProgress()
18591859
? GetReplicaTransactionId()
18601860
: GetCurrentTransactionId();
18611861
}

src/backend/access/heap/heapam_visibility.c

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
#include "utils/combocid.h"
7878
#include "utils/snapmgr.h"
7979

80+
static bool TempTupleSatisfiesVisibility(HeapTuple htup, CommandId curcid, Buffer buffer);
8081

8182
/*
8283
* SetHintBits()
@@ -454,14 +455,17 @@ HeapTupleSatisfiesToast(HeapTuple htup, Snapshot snapshot,
454455
* test for it themselves.)
455456
*/
456457
TM_Result
457-
HeapTupleSatisfiesUpdate(HeapTuple htup, CommandId curcid,
458+
HeapTupleSatisfiesUpdate(Relation relation, HeapTuple htup, CommandId curcid,
458459
Buffer buffer)
459460
{
460461
HeapTupleHeader tuple = htup->t_data;
461462

462463
Assert(ItemPointerIsValid(&htup->t_self));
463464
Assert(htup->t_tableOid != InvalidOid);
464465

466+
if (relation->rd_rel->relpersistence == RELPERSISTENCE_SESSION && RecoveryInProgress())
467+
return TempTupleSatisfiesVisibility(htup, curcid, buffer) ? TM_Ok : TM_Invisible;
468+
465469
if (!HeapTupleHeaderXminCommitted(tuple))
466470
{
467471
if (HeapTupleHeaderXminInvalid(tuple))
@@ -1676,6 +1680,70 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
16761680
return true;
16771681
}
16781682

1683+
/*
1684+
* TempTupleSatisfiesVisibility
1685+
* True iff global temp table tuple is visible for the given snapshot.
1686+
*
1687+
* Temporary tables are visible only for current backend, so there is no need to
1688+
* handle cases with tuples committed by other backends. We only need to exclude
1689+
* modifications done by aborted transactions or after start of table scan.
1690+
*
1691+
*/
1692+
static bool
1693+
TempTupleSatisfiesVisibility(HeapTuple htup, CommandId curcid, Buffer buffer)
1694+
{
1695+
HeapTupleHeader tuple = htup->t_data;
1696+
TransactionId xmin;
1697+
TransactionId xmax;
1698+
1699+
1700+
Assert(ItemPointerIsValid(&htup->t_self));
1701+
Assert(htup->t_tableOid != InvalidOid);
1702+
1703+
if (HeapTupleHeaderXminInvalid(tuple))
1704+
return false;
1705+
1706+
xmin = HeapTupleHeaderGetRawXmin(tuple);
1707+
1708+
if (IsReplicaTransactionAborted(xmin))
1709+
return false;
1710+
1711+
if (IsReplicaCurrentTransactionId(xmin)
1712+
&& HeapTupleHeaderGetCmin(tuple) >= curcid)
1713+
{
1714+
return false; /* inserted after scan started */
1715+
}
1716+
1717+
if (tuple->t_infomask & HEAP_XMAX_INVALID) /* xid invalid */
1718+
return true;
1719+
1720+
if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask)) /* not deleter */
1721+
return true;
1722+
1723+
if (tuple->t_infomask & HEAP_XMAX_IS_MULTI)
1724+
{
1725+
xmax = HeapTupleGetUpdateXid(tuple);
1726+
1727+
/* updating subtransaction must have aborted */
1728+
if (IsReplicaTransactionAborted(xmax))
1729+
return true;
1730+
1731+
return (HeapTupleHeaderGetCmax(tuple) >= curcid);
1732+
}
1733+
xmax = HeapTupleHeaderGetRawXmax(tuple);
1734+
1735+
/* deleting subtransaction must have aborted */
1736+
if (IsReplicaTransactionAborted(xmax))
1737+
return true;
1738+
1739+
if (IsReplicaCurrentTransactionId(xmax))
1740+
return (HeapTupleHeaderGetCmax(tuple) >= curcid); /* deleted after scan started */
1741+
1742+
/* xmax transaction committed */
1743+
return false;
1744+
}
1745+
1746+
16791747
/*
16801748
* HeapTupleSatisfiesVisibility
16811749
* True iff heap tuple satisfies a time qual.
@@ -1689,8 +1757,8 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
16891757
bool
16901758
HeapTupleSatisfiesVisibility(Relation relation, HeapTuple tup, Snapshot snapshot, Buffer buffer)
16911759
{
1692-
if (relation->rd_rel->relpersistence == RELPERSISTENCE_SESSION)
1693-
return TempTupleSatisfiesMVCC(tup, snapshot, buffer);
1760+
if (relation->rd_rel->relpersistence == RELPERSISTENCE_SESSION && RecoveryInProgress())
1761+
return TempTupleSatisfiesVisibility(tup, snapshot->curcid, buffer);
16941762

16951763
switch (snapshot->snapshot_type)
16961764
{

src/backend/access/transam/varsup.c

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,6 @@ GetNewTransactionId(bool isSubXact)
6969
return FullTransactionIdFromEpochAndXid(0, BootstrapTransactionId);
7070
}
7171

72-
/* Make it possible to access global temporary tables at standby */
73-
if (RecoveryInProgress())
74-
return FullTransactionIdFromEpochAndXid(0, FrozenTransactionId);
75-
/* elog(ERROR, "cannot assign TransactionIds during recovery"); */
76-
7772
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
7873

7974
full_xid = ShmemVariableCache->nextFullXid;

src/backend/access/transam/xact.c

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ typedef struct TransactionStateData
192192
int parallelModeLevel; /* Enter/ExitParallelMode counter */
193193
bool chain; /* start a new block after this one */
194194
struct TransactionStateData *parent; /* back link to parent */
195-
TransactionId replicXid; /* pseudo XID for inserting data in global temp tables at replica */
195+
TransactionId replicaTransactionId; /* pseudo XID for inserting data in global temp tables at replica */
196196
} TransactionStateData;
197197

198198
typedef TransactionStateData *TransactionState;
@@ -288,6 +288,7 @@ typedef struct XactCallbackItem
288288
static XactCallbackItem *Xact_callbacks = NULL;
289289

290290
static TransactionId replicaTransIdCount;
291+
static TransactionId replicaTopTransId;
291292
static Bitmapset* replicaAbortedXids;
292293

293294
/*
@@ -433,7 +434,28 @@ GetCurrentTransactionId(void)
433434
return XidFromFullTransactionId(s->fullTransactionId);
434435
}
435436

437+
/*
438+
* GetCurrentTransactionIdIfAny
439+
*
440+
* This will return the XID of the current sub xact, if one is assigned.
441+
* It will return InvalidTransactionId if we are not currently inside a
442+
* transaction, or inside a transaction that hasn't been assigned an XID yet.
443+
*/
444+
TransactionId
445+
GetCurrentTransactionIdIfAny(void)
446+
{
447+
return XidFromFullTransactionId(CurrentTransactionState->fullTransactionId);
448+
}
436449

450+
/*
451+
* Transactions at replica can update only global temporary tables.
452+
* Them are assigned backend-local XIDs which are independent from normal XIDs received from primary node.
453+
* So tuples of temporary tables at replica requires special visibility rules.
454+
*
455+
* XIDs for such transactions at replica are created on demand (when tuple of temp table is updated).
456+
* XID wrap-around and adjusting XID horizon is not supported. So number of such transactions at replica is
457+
* limited by 2^32 and require up to 2^29 in-memory bitmap for marking aborted transactions.
458+
*/
437459
TransactionId
438460
GetReplicaTransactionId(void)
439461
{
@@ -443,23 +465,28 @@ GetReplicaTransactionId(void)
443465
return s->replicaTransactionId;
444466
}
445467

468+
/*
469+
* At replica transaction can update only temporary tables
470+
* and them are assigned special XIDs (not related with normal XIDs received from primary node).
471+
* As far as we see only own transaction it is not necessary to mark committed transactions.
472+
* Only marking aborted ones is enough. All transactions which are not marked as aborted are treated as
473+
* committed or self in-progress transactions.
474+
*/
446475
bool
447476
IsReplicaTransactionAborted(TransactionId xid)
448477
{
449-
return bms_is_member(replicaAbortedXids, xid);
478+
return bms_is_member(xid, replicaAbortedXids);
450479
}
451480

452481
/*
453-
* GetCurrentTransactionIdIfAny
454-
*
455-
* This will return the XID of the current sub xact, if one is assigned.
456-
* It will return InvalidTransactionId if we are not currently inside a
457-
* transaction, or inside a transaction that hasn't been assigned an XID yet.
482+
* As far as XIDs for transactions at replica are generated individually for each backends,
483+
* we can check that XID belongs to the current transaction or any of its subtransactions by
484+
* just comparing it with XID of top transaction.
458485
*/
459-
TransactionId
460-
GetCurrentTransactionIdIfAny(void)
486+
bool
487+
IsReplicaCurrentTransactionId(TransactionId xid)
461488
{
462-
return XidFromFullTransactionId(CurrentTransactionState->fullTransactionId);
489+
return xid > replicaTopTransId;
463490
}
464491

465492
/*
@@ -1912,6 +1939,8 @@ StartTransaction(void)
19121939
s = &TopTransactionStateData;
19131940
CurrentTransactionState = s;
19141941

1942+
replicaTopTransId = replicaTransIdCount;
1943+
19151944
Assert(!FullTransactionIdIsValid(XactTopFullTransactionId));
19161945

19171946
/* check the current transaction state */

src/include/access/heapam.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ extern void heap_vacuum_rel(Relation onerel,
197197
/* in heap/heapam_visibility.c */
198198
extern bool HeapTupleSatisfiesVisibility(Relation relation, HeapTuple stup, Snapshot snapshot,
199199
Buffer buffer);
200-
extern TM_Result HeapTupleSatisfiesUpdate(HeapTuple stup, CommandId curcid,
200+
extern TM_Result HeapTupleSatisfiesUpdate(Relation relation, HeapTuple stup, CommandId curcid,
201201
Buffer buffer);
202202
extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTuple stup, TransactionId OldestXmin,
203203
Buffer buffer);

src/include/access/xact.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,5 +442,6 @@ extern bool IsInParallelMode(void);
442442

443443
extern TransactionId GetReplicaTransactionId(void);
444444
extern bool IsReplicaTransactionAborted(TransactionId xid);
445+
extern bool IsReplicaCurrentTransactionId(TransactionId xid);
445446

446447
#endif /* XACT_H */

0 commit comments

Comments
 (0)