Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 18190aa

Browse files
kelvicharssher
authored andcommitted
global_snapshot_xmin support in procarray
1 parent 8652379 commit 18190aa

File tree

5 files changed

+50
-55
lines changed

5 files changed

+50
-55
lines changed

contrib/postgres_fdw/t/001_bank_check.pl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@
6868
my $isolation_error = 0;
6969

7070
my $pgb_path = catfile(dirname(__FILE__), "bank.pgb");
71-
$master->pgbench(-n, -c => 5, -t => 10, -f => "$pgb_path", 'postgres' );
72-
71+
$master->pgbench(-n, -c => 20, -t => 30, -f => "$pgb_path", 'postgres' );
7372
my $pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => "$pgb_path", 'postgres' );
7473

7574
my $started = time();

contrib/postgres_fdw/t/bank.pgb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22

33
BEGIN;
44
UPDATE accounts SET amount = amount - 1 WHERE id = :id;
5+
--select pg_sleep(0.5*random());
56
UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1);
67
COMMIT;

src/backend/access/transam/global_snapshot.c

Lines changed: 20 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "storage/s_lock.h"
1717
#include "storage/spin.h"
1818
#include "storage/lmgr.h"
19+
#include "storage/procarray.h"
1920
#include "storage/shmem.h"
2021
#include "storage/ipc.h"
2122
#include "access/xlogdefs.h"
@@ -60,8 +61,6 @@ typedef struct
6061
{
6162
cid_t cid; /* last assigned CSN; used to provide unique
6263
* ascending CSNs */
63-
TransactionId oldest_xid; /* XID of oldest transaction visible by any
64-
* active transaction (local or global) */
6564
long time_shift; /* correction to system time */
6665
volatile slock_t lock; /* spinlock to protect access to hash table */
6766
DtmTransStatus *trans_list_head; /* L1 list of finished transactions
@@ -94,8 +93,6 @@ static bool DtmRecordCommits = 0;
9493

9594
DtmCurrentTrans dtm_tx; // XXXX: make static
9695

97-
static Snapshot DtmGetSnapshot(Snapshot snapshot);
98-
static TransactionId DtmGetOldestXmin(Relation rel, int flags);
9996
static bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
10097
static void DtmAdjustOldestXid(void);
10198
static bool DtmDetectGlobalDeadLock(PGPROC *proc);
@@ -109,9 +106,9 @@ static void DtmDeserializeTransactionState(void* ctx);
109106
static TransactionManager DtmTM = {
110107
PgTransactionIdGetStatus,
111108
PgTransactionIdSetTreeStatus,
112-
DtmGetSnapshot,
109+
PgGetSnapshotData,
113110
PgGetNewTransactionId,
114-
DtmGetOldestXmin,
111+
PgGetOldestXmin,
115112
PgTransactionIdIsInProgress,
116113
PgGetGlobalTransactionId,
117114
DtmXidInMVCCSnapshot,
@@ -331,16 +328,16 @@ static void
331328
DtmAdjustOldestXid()
332329
{
333330
DtmTransStatus *ts,
334-
*prev = NULL;
331+
*prev = NULL;
332+
timestamp_t cutoff_time;
333+
TransactionId oldest_xid = InvalidTransactionId;
334+
int total = 0,
335+
deleted = 0;
335336

336-
timestamp_t cutoff_time = dtm_get_current_time() - DtmVacuumDelay * USEC;
337-
int total = 0, deleted = 0;
337+
cutoff_time = dtm_get_current_time() - DtmVacuumDelay * USEC;
338338

339339
SpinLockAcquire(&local->lock);
340340

341-
for (ts = local->trans_list_head; ts != NULL; ts = ts->next)
342-
total++;
343-
344341
for (ts = local->trans_list_head; ts != NULL && ts->cid < cutoff_time; prev = ts, ts = ts->next)
345342
{
346343
if (prev != NULL)
@@ -353,53 +350,24 @@ DtmAdjustOldestXid()
353350
if (prev != NULL)
354351
local->trans_list_head = prev;
355352

356-
if (ts != NULL)
357-
local->oldest_xid = ts->xid;
358-
else
359-
local->oldest_xid = InvalidTransactionId;
353+
if (local->trans_list_head)
354+
oldest_xid = local->trans_list_head->xid;
360355

361-
SpinLockRelease(&local->lock);
362-
363-
// elog(LOG, "DtmAdjustOldestXid total=%d, deleted=%d, xid=%d, prev=%p, ts=%p", total, deleted, local->oldest_xid, prev, ts);
364-
}
365-
366-
Snapshot
367-
DtmGetSnapshot(Snapshot snapshot)
368-
{
369-
snapshot = PgGetSnapshotData(snapshot);
370-
// RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
371-
SpinLockAcquire(&local->lock);
372-
373-
if (TransactionIdIsValid(local->oldest_xid) &&
374-
TransactionIdPrecedes(local->oldest_xid, RecentGlobalXmin))
375-
RecentGlobalXmin = local->oldest_xid;
376-
377-
if (TransactionIdIsValid(local->oldest_xid) &&
378-
TransactionIdPrecedes(local->oldest_xid, RecentGlobalDataXmin))
379-
RecentGlobalDataXmin = local->oldest_xid;
356+
for (ts = local->trans_list_head; ts != NULL; ts = ts->next)
357+
{
358+
if (TransactionIdPrecedes(ts->xid, oldest_xid))
359+
oldest_xid = ts->xid;
360+
total++;
361+
}
380362

381363
SpinLockRelease(&local->lock);
382-
return snapshot;
383-
}
384-
385-
TransactionId
386-
DtmGetOldestXmin(Relation rel, int flags)
387-
{
388-
TransactionId xmin = PgGetOldestXmin(rel, flags);
389364

390-
// xmin = DtmAdjustOldestXid(xmin);
365+
ProcArraySetGlobalSnapshotXmin(oldest_xid);
391366

392-
SpinLockAcquire(&local->lock);
393-
394-
if (TransactionIdIsValid(local->oldest_xid) &&
395-
TransactionIdPrecedes(local->oldest_xid, xmin))
396-
xmin = local->oldest_xid;
397-
398-
SpinLockRelease(&local->lock);
399-
400-
return xmin;
367+
// elog(LOG, "DtmAdjustOldestXid total=%d, deleted=%d, xid=%d, prev=%p, ts=%p", total, deleted, oldest_xid, prev, ts);
401368
}
402369

370+
403371
/*
404372
* Check tuple bisibility based on CSN of current transaction.
405373
* If there is no niformation about transaction with this XID, then use standard PostgreSQL visibility rules.
@@ -489,7 +457,6 @@ DtmInitialize()
489457
if (!found)
490458
{
491459
local->time_shift = 0;
492-
local->oldest_xid = FirstNormalTransactionId;
493460
local->cid = dtm_get_current_time();
494461
local->trans_list_head = NULL;
495462
local->trans_list_tail = &local->trans_list_head;

src/backend/storage/ipc/procarray.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ typedef struct ProcArrayStruct
9292
/* oldest catalog xmin of any replication slot */
9393
TransactionId replication_slot_catalog_xmin;
9494

95+
/* xmin of oldest active global snapshot */
96+
TransactionId global_snapshot_xmin;
97+
9598
/* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */
9699
int pgprocnos[FLEXIBLE_ARRAY_MEMBER];
97100
} ProcArrayStruct;
@@ -246,6 +249,7 @@ CreateSharedProcArray(void)
246249
procArray->lastOverflowedXid = InvalidTransactionId;
247250
procArray->replication_slot_xmin = InvalidTransactionId;
248251
procArray->replication_slot_catalog_xmin = InvalidTransactionId;
252+
procArray->global_snapshot_xmin = InvalidTransactionId;
249253
}
250254

251255
allProcs = ProcGlobal->allProcs;
@@ -1333,6 +1337,7 @@ PgGetOldestXmin(Relation rel, int flags)
13331337

13341338
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
13351339
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1340+
volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
13361341

13371342
/*
13381343
* If we're not computing a relation specific limit, or if a shared
@@ -1394,6 +1399,7 @@ PgGetOldestXmin(Relation rel, int flags)
13941399
/* fetch into volatile var while ProcArrayLock is held */
13951400
replication_slot_xmin = procArray->replication_slot_xmin;
13961401
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1402+
global_snapshot_xmin = procArray->global_snapshot_xmin;
13971403

13981404
if (RecoveryInProgress())
13991405
{
@@ -1435,6 +1441,10 @@ PgGetOldestXmin(Relation rel, int flags)
14351441
result = FirstNormalTransactionId;
14361442
}
14371443

1444+
if (TransactionIdIsValid(global_snapshot_xmin) &&
1445+
NormalTransactionIdPrecedes(global_snapshot_xmin, result))
1446+
result = global_snapshot_xmin;
1447+
14381448
/*
14391449
* Check whether there are replication slots requiring an older xmin.
14401450
*/
@@ -1536,6 +1546,7 @@ PgGetSnapshotData(Snapshot snapshot)
15361546
bool suboverflowed = false;
15371547
volatile TransactionId replication_slot_xmin = InvalidTransactionId;
15381548
volatile TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
1549+
volatile TransactionId global_snapshot_xmin = InvalidTransactionId;
15391550

15401551
Assert(snapshot != NULL);
15411552

@@ -1724,6 +1735,7 @@ PgGetSnapshotData(Snapshot snapshot)
17241735
/* fetch into volatile var while ProcArrayLock is held */
17251736
replication_slot_xmin = procArray->replication_slot_xmin;
17261737
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
1738+
global_snapshot_xmin = procArray->global_snapshot_xmin;
17271739

17281740
if (!TransactionIdIsValid(MyPgXact->xmin))
17291741
MyPgXact->xmin = TransactionXmin = xmin;
@@ -1743,6 +1755,10 @@ PgGetSnapshotData(Snapshot snapshot)
17431755
if (!TransactionIdIsNormal(RecentGlobalXmin))
17441756
RecentGlobalXmin = FirstNormalTransactionId;
17451757

1758+
if (TransactionIdIsValid(global_snapshot_xmin) &&
1759+
TransactionIdPrecedes(global_snapshot_xmin, RecentGlobalXmin))
1760+
RecentGlobalXmin = global_snapshot_xmin;
1761+
17461762
/* Check whether there's a replication slot requiring an older xmin. */
17471763
if (TransactionIdIsValid(replication_slot_xmin) &&
17481764
NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
@@ -3015,6 +3031,16 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
30153031
LWLockRelease(ProcArrayLock);
30163032
}
30173033

3034+
/*
3035+
* ProcArraySetGlobalSnapshotXmin
3036+
*/
3037+
void
3038+
ProcArraySetGlobalSnapshotXmin(TransactionId xmin)
3039+
{
3040+
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
3041+
procArray->global_snapshot_xmin = xmin;
3042+
LWLockRelease(ProcArrayLock);
3043+
}
30183044

30193045
#define XidCacheRemove(i) \
30203046
do { \

src/include/storage/procarray.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,4 +124,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
124124
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
125125
TransactionId *catalog_xmin);
126126

127+
extern void ProcArraySetGlobalSnapshotXmin(TransactionId xmin);
128+
127129
#endif /* PROCARRAY_H */

0 commit comments

Comments
 (0)