@@ -89,15 +89,15 @@ static HTAB *xid2status;
89
89
static HTAB * gtid2xid ;
90
90
static DtmNodeState * local ;
91
91
static uint64 totalSleepInterrupts ;
92
- static int DtmVacuumDelay = 10 ;
92
+ static int DtmVacuumDelay = 2 ; /* sec */
93
93
static bool DtmRecordCommits = 0 ;
94
94
95
95
DtmCurrentTrans dtm_tx ; // XXXX: make static
96
96
97
97
static Snapshot DtmGetSnapshot (Snapshot snapshot );
98
98
static TransactionId DtmGetOldestXmin (Relation rel , int flags );
99
99
static bool DtmXidInMVCCSnapshot (TransactionId xid , Snapshot snapshot );
100
- static TransactionId DtmAdjustOldestXid (TransactionId xid );
100
+ static void DtmAdjustOldestXid (void );
101
101
static bool DtmDetectGlobalDeadLock (PGPROC * proc );
102
102
static void DtmAddSubtransactions (DtmTransStatus * ts , TransactionId * subxids , int nSubxids );
103
103
static char const * DtmGetName (void );
@@ -325,47 +325,58 @@ DtmTransactionListInsertAfter(DtmTransStatus * after, DtmTransStatus * ts)
325
325
* is older than it more than DtmVacuumDelay.
326
326
* If no such XID can be located, then return previously observed oldest XID
327
327
*/
328
- static TransactionId
329
- DtmAdjustOldestXid (TransactionId xid )
328
+ static void
329
+ DtmAdjustOldestXid ()
330
330
{
331
- if (TransactionIdIsValid (xid ))
332
- {
333
- DtmTransStatus * ts ,
334
- * prev = NULL ;
335
- timestamp_t now = dtm_get_current_time ();
336
- timestamp_t cutoff_time = now - DtmVacuumDelay * USEC ;
331
+ DtmTransStatus * ts ,
332
+ * prev = NULL ;
337
333
338
- SpinLockAcquire (& local -> lock );
339
- ts = (DtmTransStatus * ) hash_search (xid2status , & xid , HASH_FIND , NULL );
340
- if (ts != NULL )
341
- {
342
- cutoff_time = ts -> cid - DtmVacuumDelay * USEC ;
334
+ timestamp_t cutoff_time = dtm_get_current_time () - DtmVacuumDelay * USEC ;
335
+ int total = 0 , deleted = 0 ;
343
336
344
- for (ts = local -> trans_list_head ; ts != NULL && ts -> cid < cutoff_time ; prev = ts , ts = ts -> next )
345
- {
346
- if (prev != NULL )
347
- hash_search (xid2status , & prev -> xid , HASH_REMOVE , NULL );
348
- }
349
- }
337
+ SpinLockAcquire (& local -> lock );
338
+
339
+ for (ts = local -> trans_list_head ; ts != NULL ; ts = ts -> next )
340
+ total ++ ;
341
+
342
+ for (ts = local -> trans_list_head ; ts != NULL && ts -> cid < cutoff_time ; prev = ts , ts = ts -> next )
343
+ {
350
344
if (prev != NULL )
351
345
{
352
- local -> trans_list_head = prev ;
353
- local -> oldest_xid = xid = prev -> xid ;
346
+ hash_search ( xid2status , & prev -> xid , HASH_REMOVE , NULL ) ;
347
+ deleted ++ ;
354
348
}
355
- else
356
- {
357
- xid = local -> oldest_xid ;
358
- }
359
- SpinLockRelease (& local -> lock );
360
349
}
361
- return xid ;
350
+
351
+ if (prev != NULL )
352
+ local -> trans_list_head = prev ;
353
+
354
+ if (ts != NULL )
355
+ local -> oldest_xid = ts -> xid ;
356
+ else
357
+ local -> oldest_xid = InvalidTransactionId ;
358
+
359
+ SpinLockRelease (& local -> lock );
360
+
361
+ // elog(LOG, "DtmAdjustOldestXid total=%d, deleted=%d, xid=%d, prev=%p, ts=%p", total, deleted, local->oldest_xid, prev, ts);
362
362
}
363
363
364
364
Snapshot
365
365
DtmGetSnapshot (Snapshot snapshot )
366
366
{
367
367
snapshot = PgGetSnapshotData (snapshot );
368
- RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid (RecentGlobalDataXmin );
368
+ // RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
369
+ SpinLockAcquire (& local -> lock );
370
+
371
+ if (TransactionIdIsValid (local -> oldest_xid ) &&
372
+ TransactionIdPrecedes (local -> oldest_xid , RecentGlobalXmin ))
373
+ RecentGlobalXmin = local -> oldest_xid ;
374
+
375
+ if (TransactionIdIsValid (local -> oldest_xid ) &&
376
+ TransactionIdPrecedes (local -> oldest_xid , RecentGlobalDataXmin ))
377
+ RecentGlobalDataXmin = local -> oldest_xid ;
378
+
379
+ SpinLockRelease (& local -> lock );
369
380
return snapshot ;
370
381
}
371
382
@@ -374,7 +385,16 @@ DtmGetOldestXmin(Relation rel, int flags)
374
385
{
375
386
TransactionId xmin = PgGetOldestXmin (rel , flags );
376
387
377
- xmin = DtmAdjustOldestXid (xmin );
388
+ // xmin = DtmAdjustOldestXid(xmin);
389
+
390
+ SpinLockAcquire (& local -> lock );
391
+
392
+ if (TransactionIdIsValid (local -> oldest_xid ) &&
393
+ TransactionIdPrecedes (local -> oldest_xid , xmin ))
394
+ xmin = local -> oldest_xid ;
395
+
396
+ SpinLockRelease (& local -> lock );
397
+
378
398
return xmin ;
379
399
}
380
400
@@ -670,6 +690,9 @@ DtmLocalCommitPrepared(DtmCurrentTrans * x)
670
690
DTM_TRACE ((stderr , "Global transaction %u(%s) is precommitted\n" , x -> xid , gtid ));
671
691
}
672
692
SpinLockRelease (& local -> lock );
693
+
694
+ DtmAdjustOldestXid ();
695
+ // elog(LOG, "DtmLocalCommitPrepared %d", x->xid);
673
696
}
674
697
675
698
/*
@@ -717,6 +740,9 @@ DtmLocalCommit(DtmCurrentTrans * x)
717
740
DTM_TRACE ((stderr , "Local transaction %u is committed at %lu\n" , x -> xid , x -> cid ));
718
741
}
719
742
SpinLockRelease (& local -> lock );
743
+
744
+ DtmAdjustOldestXid ();
745
+ // elog(LOG, "DtmLocalCommit %d", x->xid);
720
746
}
721
747
722
748
/*
0 commit comments