Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e87072b

Browse files
kelvicharssher
authored andcommitted
global_snapshot_xmin support in procarray
(cherry picked from commit 18190aab39dc68c87dfc27e6a9a5442e2f280298)
1 parent d84c349 commit e87072b

File tree

5 files changed

+49
-55
lines changed

5 files changed

+49
-55
lines changed

contrib/postgres_fdw/t/001_bank_check.pl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@
6868
my $isolation_error = 0;
6969

7070
my $pgb_path = catfile(dirname(__FILE__), "bank.pgb");
71-
$master->pgbench(-n, -c => 5, -t => 10, -f => "$pgb_path", 'postgres' );
72-
71+
$master->pgbench(-n, -c => 20, -t => 30, -f => "$pgb_path", 'postgres' );
7372
my $pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => "$pgb_path", 'postgres' );
7473

7574
my $started = time();

contrib/postgres_fdw/t/bank.pgb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22

33
BEGIN;
44
UPDATE accounts SET amount = amount - 1 WHERE id = :id;
5+
--select pg_sleep(0.5*random());
56
UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
67
COMMIT;

src/backend/access/transam/global_snapshot.c

Lines changed: 20 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "storage/s_lock.h"
1717
#include "storage/spin.h"
1818
#include "storage/lmgr.h"
19+
#include "storage/procarray.h"
1920
#include "storage/shmem.h"
2021
#include "storage/ipc.h"
2122
#include "access/xlogdefs.h"
@@ -60,8 +61,6 @@ typedef struct
6061
{
6162
cid_t cid; /* last assigned CSN; used to provide unique
6263
* ascending CSNs */
63-
TransactionId oldest_xid; /* XID of oldest transaction visible by any
64-
* active transaction (local or global) */
6564
long time_shift; /* correction to system time */
6665
volatile slock_t lock; /* spinlock to protect access to hash table */
6766
DtmTransStatus *trans_list_head; /* L1 list of finished transactions
@@ -94,8 +93,6 @@ static bool DtmRecordCommits = 0;
9493

9594
DtmCurrentTrans dtm_tx; // XXXX: make static
9695

97-
static Snapshot DtmGetSnapshot(Snapshot snapshot);
98-
static TransactionId DtmGetOldestXmin(Relation rel, int flags);
9996
static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
10097
static void DtmAdjustOldestXid(void);
10198
static bool DtmDetectGlobalDeadLock(PGPROC *proc);
@@ -109,9 +106,9 @@ static void DtmDeserializeTransactionState(void* ctx);
109106
static TransactionManager DtmTM = {
110107
PgTransactionIdGetStatus,
111108
PgTransactionIdSetTreeStatus,
112-
DtmGetSnapshot,
109+
PgGetSnapshotData,
113110
PgGetNewTransactionId,
114-
DtmGetOldestXmin,
111+
PgGetOldestXmin,
115112
PgTransactionIdIsInProgress,
116113
PgGetGlobalTransactionId,
117114
DtmXidInMVCCSnapshot,
@@ -331,16 +328,16 @@ static void
331328
DtmAdjustOldestXid()
332329
{
333330
DtmTransStatus *ts,
334-
*prev = NULL;
331+
*prev = NULL;
332+
timestamp_t cutoff_time;
333+
TransactionId oldest_xid = InvalidTransactionId;
334+
int total = 0,
335+
deleted = 0;
335336

336-
timestamp_t cutoff_time = dtm_get_current_time() - DtmVacuumDelay * USEC;
337-
int total = 0, deleted = 0;
337+
cutoff_time = dtm_get_current_time() - DtmVacuumDelay * USEC;
338338

339339
SpinLockAcquire(&local->lock);
340340

341-
for (ts = local->trans_list_head; ts != NULL; ts = ts->next)
342-
total++;
343-
344341
for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next)
345342
{
346343
if (prev != NULL)
@@ -353,53 +350,24 @@ DtmAdjustOldestXid()
353350
if (prev != NULL)
354351
local->trans_list_head = prev;
355352

356-
if (ts != NULL)
357-
local->oldest_xid = ts->xid;
358-
else
359-
local->oldest_xid = InvalidTransactionId;
353+
if (local->trans_list_head)
354+
oldest_xid = local->trans_list_head->xid;
360355

361-
SpinLockRelease(&local->lock);
362-
363-
// elog(LOG, "DtmAdjustOldestXid total=%d, deleted=%d, xid=%d, prev=%p, ts=%p", total, deleted, local->oldest_xid, prev, ts);
364-
}
365-
366-
Snapshot
367-
DtmGetSnapshot(Snapshot snapshot)
368-
{
369-
snapshot = PgGetSnapshotData(snapshot);
370-
// RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
371-
SpinLockAcquire(&local->lock);
372-
373-
if (TransactionIdIsValid(local->oldest_xid) &&
374-
TransactionIdPrecedes(local->oldest_xid, RecentGlobalXmin))
375-
RecentGlobalXmin = local->oldest_xid;
376-
377-
if (TransactionIdIsValid(local->oldest_xid) &&
378-
TransactionIdPrecedes(local->oldest_xid, RecentGlobalDataXmin))
379-
RecentGlobalDataXmin = local->oldest_xid;
356+
for (ts = local->trans_list_head; ts != NULL; ts = ts->next)
357+
{
358+
if (TransactionIdPrecedes(ts->xid, oldest_xid))
359+
oldest_xid = ts->xid;
360+
total++;
361+
}
380362

381363
SpinLockRelease(&local->lock);
382-
return snapshot;
383-
}
384-
385-
TransactionId
386-
DtmGetOldestXmin(Relation rel, int flags)
387-
{
388-
TransactionId xmin = PgGetOldestXmin(rel, flags);
389364

390-
// xmin = DtmAdjustOldestXid(xmin);
365+
ProcArraySetGlobalSnapshotXmin(oldest_xid);
391366

392-
SpinLockAcquire(&local->lock);
393-
394-
if (TransactionIdIsValid(local->oldest_xid) &&
395-
TransactionIdPrecedes(local->oldest_xid, xmin))
396-
xmin = local->oldest_xid;
397-
398-
SpinLockRelease(&local->lock);
399-
400-
return xmin;
367+
// elog(LOG, "DtmAdjustOldestXid total=%d, deleted=%d, xid=%d, prev=%p, ts=%p", total, deleted, oldest_xid, prev, ts);
401368
}
402369

370+
403371
/*
404372
* Check tuple bisibility based on CSN of current transaction.
405373
* If there is no niformation about transaction with this XID, then use standard PostgreSQL visibility rules.
@@ -489,7 +457,6 @@ DtmInitialize()
489457
if (!found)
490458
{
491459
local->time_shift = 0;
492-
local->oldest_xid = FirstNormalTransactionId;
493460
local->cid = dtm_get_current_time();
494461
local->trans_list_head = NULL;
495462
local->trans_list_tail = &local->trans_list_head;

src/backend/storage/ipc/procarray.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ typedef struct ProcArrayStruct
9393
/* oldest catalog xmin of any replication slot */
9494
TransactionId replication_slot_catalog_xmin;
9595

96+
/* xmin of oldest active global snapshot */
97+
TransactionId global_snapshot_xmin;
98+
9699
/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
97100
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
98101
} ProcArrayStruct;
@@ -247,6 +250,7 @@ CreateSharedProcArray(void)
247250
procArray->lastOverflowedXid = InvalidTransactionId;
248251
procArray->replication_slot_xmin = InvalidTransactionId;
249252
procArray->replication_slot_catalog_xmin = InvalidTransactionId;
253+
procArray->global_snapshot_xmin = InvalidTransactionId;
250254
}
251255

252256
allProcs = ProcGlobal->allProcs;
@@ -1407,6 +1411,7 @@ PgGetOldestXmin(Relation rel, int flags)
14071411

14081412
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
14091413
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1414+
volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
14101415

14111416
/*
14121417
* If we're not computing a relation specific limit, or if a shared
@@ -1468,6 +1473,7 @@ PgGetOldestXmin(Relation rel, int flags)
14681473
/* fetch into volatile var while ProcArrayLock is held */
14691474
replication_slot_xmin = procArray->replication_slot_xmin;
14701475
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1476+
global_snapshot_xmin = procArray->global_snapshot_xmin;
14711477

14721478
if (RecoveryInProgress())
14731479
{
@@ -1509,6 +1515,10 @@ PgGetOldestXmin(Relation rel, int flags)
15091515
result = FirstNormalTransactionId;
15101516
}
15111517

1518+
if (TransactionIdIsValid(global_snapshot_xmin) &&
1519+
NormalTransactionIdPrecedes(global_snapshot_xmin, result))
1520+
result = global_snapshot_xmin;
1521+
15121522
/*
15131523
* Check whether there are replication slots requiring an older xmin.
15141524
*/
@@ -1610,6 +1620,7 @@ PgGetSnapshotData(Snapshot snapshot)
16101620
bool suboverflowed = false;
16111621
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
16121622
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1623+
volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
16131624

16141625
Assert(snapshot != NULL);
16151626

@@ -1801,6 +1812,7 @@ PgGetSnapshotData(Snapshot snapshot)
18011812
/* fetch into volatile var while ProcArrayLock is held */
18021813
replication_slot_xmin = procArray->replication_slot_xmin;
18031814
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1815+
global_snapshot_xmin = procArray->global_snapshot_xmin;
18041816

18051817
if (!TransactionIdIsValid(MyPgXact->xmin))
18061818
MyPgXact->xmin = TransactionXmin = xmin;
@@ -1821,6 +1833,10 @@ PgGetSnapshotData(Snapshot snapshot)
18211833
else
18221834
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
18231835

1836+
if (TransactionIdIsValid(global_snapshot_xmin) &&
1837+
TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
1838+
RecentGlobalXmin = global_snapshot_xmin;
1839+
18241840
/* Check whether there's a replication slot requiring an older xmin. */
18251841
if (TransactionIdIsValid(replication_slot_xmin) &&
18261842
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -3099,6 +3115,16 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
30993115
LWLockRelease(ProcArrayLock);
31003116
}
31013117

3118+
/*
3119+
* ProcArraySetGlobalSnapshotXmin
3120+
*/
3121+
void
3122+
ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
3123+
{
3124+
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3125+
procArray->global_snapshot_xmin = xmin;
3126+
LWLockRelease(ProcArrayLock);
3127+
}
31023128

31033129
#define XidCacheRemove(i) \
31043130
do { \

src/include/storage/procarray.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,5 +127,6 @@ extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
127127

128128
extern void ResumePgXact(PGXACT *pgxact);
129129
extern void SuspendPgXact(PGXACT *pgxact);
130+
extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
130131

131132
#endif /* PROCARRAY_H */

0 commit comments

Comments
 (0)