Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f47723b

Browse files
committed
Support rollback at replia
1 parent 837c4e0 commit f47723b

File tree

7 files changed

+69
-24
lines changed

7 files changed

+69
-24
lines changed

contrib/pgstattuple/pgstattuple.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ pgstat_heap(Relation rel, FunctionCallInfo fcinfo)
349349
/* must hold a buffer lock to call HeapTupleSatisfiesVisibility */
350350
LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
351351

352-
if (HeapTupleSatisfiesVisibility(tuple, &SnapshotDirty, hscan->rs_cbuf))
352+
if (HeapTupleSatisfiesVisibility(rel, tuple, &SnapshotDirty, hscan->rs_cbuf))
353353
{
354354
stat.tuple_len += tuple->t_len;
355355
stat.tuple_count++;

src/backend/access/heap/heapam.c

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ heapgetpage(TableScanDesc sscan, BlockNumber page)
444444
if (all_visible)
445445
valid = true;
446446
else
447-
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
447+
valid = HeapTupleSatisfiesVisibility(scan->rs_base.rs_rd, &loctup, snapshot, buffer);
448448

449449
CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
450450
&loctup, buffer, snapshot);
@@ -664,7 +664,8 @@ heapgettup(HeapScanDesc scan,
664664
/*
665665
* if current tuple qualifies, return it.
666666
*/
667-
valid = HeapTupleSatisfiesVisibility(tuple,
667+
valid = HeapTupleSatisfiesVisibility(scan->rs_base.rs_rd,
668+
tuple,
668669
snapshot,
669670
scan->rs_cbuf);
670671

@@ -1474,7 +1475,7 @@ heap_fetch(Relation relation,
14741475
/*
14751476
* check tuple visibility, then release lock
14761477
*/
1477-
valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer);
1478+
valid = HeapTupleSatisfiesVisibility(relation, tuple, snapshot, buffer);
14781479

14791480
if (valid)
14801481
PredicateLockTuple(relation, tuple, snapshot);
@@ -1612,7 +1613,7 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
16121613
ItemPointerSet(&(heapTuple->t_self), BufferGetBlockNumber(buffer), offnum);
16131614

16141615
/* If it's visible per the snapshot, we must return it */
1615-
valid = HeapTupleSatisfiesVisibility(heapTuple, snapshot, buffer);
1616+
valid = HeapTupleSatisfiesVisibility(relation, heapTuple, snapshot, buffer);
16161617
CheckForSerializableConflictOut(valid, relation, heapTuple,
16171618
buffer, snapshot);
16181619
/* reset to original, non-redirected, tid */
@@ -1754,7 +1755,7 @@ heap_get_latest_tid(TableScanDesc sscan,
17541755
* Check tuple visibility; if visible, set it as the new result
17551756
* candidate.
17561757
*/
1757-
valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer);
1758+
valid = HeapTupleSatisfiesVisibility(relation, &tp, snapshot, buffer);
17581759
CheckForSerializableConflictOut(valid, relation, &tp, buffer, snapshot);
17591760
if (valid)
17601761
*tid = ctid;
@@ -1851,6 +1852,14 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
18511852
}
18521853

18531854

1855+
static TransactionId
1856+
GetTransactionId(Relation relation)
1857+
{
1858+
TransactionId xid = relation->rd_rel->relpersistence == RELPERSISTENCE_SESSION
1859+
? GetReplicaTransactionId()
1860+
: GetCurrentTransactionId();
1861+
}
1862+
18541863
/*
18551864
* heap_insert - insert tuple into a heap
18561865
*
@@ -1873,7 +1882,7 @@ void
18731882
heap_insert(Relation relation, HeapTuple tup, CommandId cid,
18741883
int options, BulkInsertState bistate)
18751884
{
1876-
TransactionId xid = GetCurrentTransactionId();
1885+
TransactionId xid = GetTransactionId(relation);
18771886
HeapTuple heaptup;
18781887
Buffer buffer;
18791888
Buffer vmbuffer = InvalidBuffer;
@@ -2110,7 +2119,7 @@ void
21102119
heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
21112120
CommandId cid, int options, BulkInsertState bistate)
21122121
{
2113-
TransactionId xid = GetCurrentTransactionId();
2122+
TransactionId xid = GetTransactionId(relation);
21142123
HeapTuple *heaptuples;
21152124
int i;
21162125
int ndone;
@@ -2449,7 +2458,7 @@ heap_delete(Relation relation, ItemPointer tid,
24492458
TM_FailureData *tmfd, bool changingPart)
24502459
{
24512460
TM_Result result;
2452-
TransactionId xid = GetCurrentTransactionId();
2461+
TransactionId xid = GetTransactionId(relation);
24532462
ItemId lp;
24542463
HeapTupleData tp;
24552464
Page page;
@@ -2514,7 +2523,7 @@ heap_delete(Relation relation, ItemPointer tid,
25142523
tp.t_self = *tid;
25152524

25162525
l1:
2517-
result = HeapTupleSatisfiesUpdate(&tp, cid, buffer);
2526+
result = HeapTupleSatisfiesUpdate(relation, &tp, cid, buffer);
25182527

25192528
if (result == TM_Invisible)
25202529
{
@@ -2633,7 +2642,7 @@ heap_delete(Relation relation, ItemPointer tid,
26332642
if (crosscheck != InvalidSnapshot && result == TM_Ok)
26342643
{
26352644
/* Perform additional check for transaction-snapshot mode RI updates */
2636-
if (!HeapTupleSatisfiesVisibility(&tp, crosscheck, buffer))
2645+
if (!HeapTupleSatisfiesVisibility(relation, &tp, crosscheck, buffer))
26372646
result = TM_Updated;
26382647
}
26392648

@@ -2900,7 +2909,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
29002909
TM_FailureData *tmfd, LockTupleMode *lockmode)
29012910
{
29022911
TM_Result result;
2903-
TransactionId xid = GetCurrentTransactionId();
2912+
TransactionId xid = GetTransactionId(relation);
29042913
Bitmapset *hot_attrs;
29052914
Bitmapset *key_attrs;
29062915
Bitmapset *id_attrs;
@@ -3070,7 +3079,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
30703079
l2:
30713080
checked_lockers = false;
30723081
locker_remains = false;
3073-
result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer);
3082+
result = HeapTupleSatisfiesUpdate(relation, &oldtup, cid, buffer);
30743083

30753084
/* see below about the "no wait" case */
30763085
Assert(result != TM_BeingModified || wait);
@@ -3262,7 +3271,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
32623271
if (crosscheck != InvalidSnapshot && result == TM_Ok)
32633272
{
32643273
/* Perform additional check for transaction-snapshot mode RI updates */
3265-
if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
3274+
if (!HeapTupleSatisfiesVisibility(relation, &oldtup, crosscheck, buffer))
32663275
{
32673276
result = TM_Updated;
32683277
Assert(!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));
@@ -4018,7 +4027,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
40184027
tuple->t_tableOid = RelationGetRelid(relation);
40194028

40204029
l3:
4021-
result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);
4030+
result = HeapTupleSatisfiesUpdate(relation, tuple, cid, *buffer);
40224031

40234032
if (result == TM_Invisible)
40244033
{
@@ -4193,7 +4202,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
41934202
TM_Result res;
41944203

41954204
res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
4196-
GetCurrentTransactionId(),
4205+
GetTransactionId(relation),
41974206
mode);
41984207
if (res != TM_Ok)
41994208
{
@@ -4441,7 +4450,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
44414450
TM_Result res;
44424451

44434452
res = heap_lock_updated_tuple(relation, tuple, &t_ctid,
4444-
GetCurrentTransactionId(),
4453+
GetTransactionId(relation),
44454454
mode);
44464455
if (res != TM_Ok)
44474456
{
@@ -4550,7 +4559,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple,
45504559
* state if multixact.c elogs.
45514560
*/
45524561
compute_new_xmax_infomask(xmax, old_infomask, tuple->t_data->t_infomask2,
4553-
GetCurrentTransactionId(), mode, false,
4562+
GetTransactionId(relation), mode, false,
45544563
&xid, &new_infomask, &new_infomask2);
45554564

45564565
START_CRIT_SECTION();
@@ -5570,7 +5579,7 @@ heap_finish_speculative(Relation relation, ItemPointer tid)
55705579
void
55715580
heap_abort_speculative(Relation relation, ItemPointer tid)
55725581
{
5573-
TransactionId xid = GetCurrentTransactionId();
5582+
TransactionId xid = GetTransactionId(relation);
55745583
ItemId lp;
55755584
HeapTupleData tp;
55765585
Page page;

src/backend/access/heap/heapam_handler.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,8 @@ heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
226226
* Caller should be holding pin, but not lock.
227227
*/
228228
LockBuffer(bslot->buffer, BUFFER_LOCK_SHARE);
229-
res = HeapTupleSatisfiesVisibility(bslot->base.tuple, snapshot,
229+
230+
res = HeapTupleSatisfiesVisibility(rel, bslot->base.tuple, snapshot,
230231
bslot->buffer);
231232
LockBuffer(bslot->buffer, BUFFER_LOCK_UNLOCK);
232233

@@ -2161,7 +2162,7 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
21612162
loctup.t_len = ItemIdGetLength(lp);
21622163
loctup.t_tableOid = scan->rs_rd->rd_id;
21632164
ItemPointerSet(&loctup.t_self, page, offnum);
2164-
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
2165+
valid = HeapTupleSatisfiesVisibility(scan->rs_rd, &loctup, snapshot, buffer);
21652166
if (valid)
21662167
{
21672168
hscan->rs_vistuples[ntup++] = offnum;
@@ -2481,7 +2482,7 @@ SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
24812482
else
24822483
{
24832484
/* Otherwise, we have to check the tuple individually. */
2484-
return HeapTupleSatisfiesVisibility(tuple, scan->rs_snapshot,
2485+
return HeapTupleSatisfiesVisibility(scan->rs_rd, tuple, scan->rs_snapshot,
24852486
buffer);
24862487
}
24872488
}

src/backend/access/heap/heapam_visibility.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1687,8 +1687,11 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
16871687
* if so, the indicated buffer is marked dirty.
16881688
*/
16891689
bool
1690-
HeapTupleSatisfiesVisibility(HeapTuple tup, Snapshot snapshot, Buffer buffer)
1690+
HeapTupleSatisfiesVisibility(Relation relation, HeapTuple tup, Snapshot snapshot, Buffer buffer)
16911691
{
1692+
if (relation->rd_rel->relpersistence == RELPERSISTENCE_SESSION)
1693+
return TempTupleSatisfiesMVCC(tup, snapshot, buffer);
1694+
16921695
switch (snapshot->snapshot_type)
16931696
{
16941697
case SNAPSHOT_MVCC:

src/backend/access/transam/xact.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ typedef struct TransactionStateData
192192
int parallelModeLevel; /* Enter/ExitParallelMode counter */
193193
bool chain; /* start a new block after this one */
194194
struct TransactionStateData *parent; /* back link to parent */
195+
TransactionId replicXid; /* pseudo XID for inserting data in global temp tables at replica */
195196
} TransactionStateData;
196197

197198
typedef TransactionStateData *TransactionState;
@@ -286,6 +287,9 @@ typedef struct XactCallbackItem
286287

287288
static XactCallbackItem *Xact_callbacks = NULL;
288289

290+
static TransactionId replicaTransIdCount;
291+
static Bitmapset* replicaAbortedXids;
292+
289293
/*
290294
* List of add-on start- and end-of-subxact callbacks
291295
*/
@@ -429,6 +433,22 @@ GetCurrentTransactionId(void)
429433
return XidFromFullTransactionId(s->fullTransactionId);
430434
}
431435

436+
437+
TransactionId
438+
GetReplicaTransactionId(void)
439+
{
440+
TransactionState s = CurrentTransactionState;
441+
if (!TransactionIdIsValid(s->replicaTransactionId))
442+
s->replicaTransactionId = ++replicaTransIdCount;
443+
return s->replicaTransactionId;
444+
}
445+
446+
bool
447+
IsReplicaTransactionAborted(TransactionId xid)
448+
{
449+
return bms_is_member(replicaAbortedXids, xid);
450+
}
451+
432452
/*
433453
* GetCurrentTransactionIdIfAny
434454
*
@@ -1905,6 +1925,7 @@ StartTransaction(void)
19051925
*/
19061926
s->state = TRANS_START;
19071927
s->fullTransactionId = InvalidFullTransactionId; /* until assigned */
1928+
s->replicaTransactionId = InvalidTransactionId; /* until assigned */
19081929

19091930
/* Determine if statements are logged in this transaction */
19101931
xact_is_sampled = log_xact_sample_rate != 0 &&
@@ -2570,6 +2591,10 @@ AbortTransaction(void)
25702591
/* Prevent cancel/die interrupt while cleaning up */
25712592
HOLD_INTERRUPTS();
25722593

2594+
/* Mark transactions involved global temp table at replica as aborted */
2595+
if (TransactionIdIsValid(s->replicaTransactionId))
2596+
replicaAbortedXids = bms_add_member(replicaAbortedXids, s->replicaTransactionId);
2597+
25732598
/* Make sure we have a valid memory context and resource owner */
25742599
AtAbort_Memory();
25752600
AtAbort_ResourceOwner();
@@ -4859,6 +4884,10 @@ AbortSubTransaction(void)
48594884
/* Prevent cancel/die interrupt while cleaning up */
48604885
HOLD_INTERRUPTS();
48614886

4887+
/* Mark transactions involved global temp table at replica as aborted */
4888+
if (TransactionIdIsValid(s->replicaTransactionId))
4889+
replicaAbortedXids = bms_add_member(replicaAbortedXids, s->replicaTransactionId);
4890+
48624891
/* Make sure we have a valid memory context and resource owner */
48634892
AtSubAbort_Memory();
48644893
AtSubAbort_ResourceOwner();

src/include/access/heapam.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ extern void heap_vacuum_rel(Relation onerel,
195195
struct VacuumParams *params, BufferAccessStrategy bstrategy);
196196

197197
/* in heap/heapam_visibility.c */
198-
extern bool HeapTupleSatisfiesVisibility(HeapTuple stup, Snapshot snapshot,
198+
extern bool HeapTupleSatisfiesVisibility(Relation relation, HeapTuple stup, Snapshot snapshot,
199199
Buffer buffer);
200200
extern TM_Result HeapTupleSatisfiesUpdate(HeapTuple stup, CommandId curcid,
201201
Buffer buffer);

src/include/access/xact.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,4 +440,7 @@ extern void EnterParallelMode(void);
440440
extern void ExitParallelMode(void);
441441
extern bool IsInParallelMode(void);
442442

443+
extern TransactionId GetReplicaTransactionId(void);
444+
extern bool IsReplicaTransactionAborted(TransactionId xid);
445+
443446
#endif /* XACT_H */

0 commit comments

Comments
 (0)