@@ -171,6 +171,7 @@ MtmConnectionInfo* MtmConnections;
171
171
static char * MtmConnStrs ;
172
172
static int MtmQueueSize ;
173
173
static int MtmWorkers ;
174
+ static int MtmVacuumDelay ;
174
175
static int MtmMinRecoveryLag ;
175
176
static int MtmMaxRecoveryLag ;
176
177
@@ -408,36 +409,48 @@ MtmAdjustOldestXid(TransactionId xid)
408
409
{
409
410
if (TransactionIdIsValid (xid )) {
410
411
MtmTransState * ts , * prev = NULL ;
411
-
412
+ csn_t oldestSnapshot = 0 ;
413
+ int i ;
414
+
412
415
MtmLock (LW_EXCLUSIVE );
413
- ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
414
- if (ts != NULL && ts -> status == TRANSACTION_STATUS_COMMITTED ) { /* committed transactions have same CSNs at all nodes */
415
- csn_t oldestSnapshot ;
416
- int i ;
417
-
418
- Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot = ts -> csn ;
419
- for (i = 0 ; i < MtmNodes ; i ++ ) {
420
- if (Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot ) {
421
- oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
422
- }
416
+ for (ts = Mtm -> transListHead ; ts != NULL ; ts = ts -> next ) {
417
+ if (TransactionIdPrecedes (ts -> xid , xid )
418
+ && ts -> status == TRANSACTION_STATUS_COMMITTED
419
+ && ts -> csn > oldestSnapshot )
420
+ {
421
+ oldestSnapshot = ts -> csn ;
423
422
}
424
- for (ts = Mtm -> transListHead ;
425
- ts != NULL
426
- && ts -> csn < oldestSnapshot
427
- && (ts -> status == TRANSACTION_STATUS_COMMITTED || ts -> status == TRANSACTION_STATUS_ABORTED )
428
- && TransactionIdPrecedes (ts -> xid , xid );
429
- prev = ts , ts = ts -> next )
423
+ }
424
+ Mtm -> nodes [MtmNodeId - 1 ].oldestSnapshot = oldestSnapshot ;
425
+ for (i = 0 ; i < MtmNodes ; i ++ ) {
426
+ if (!BIT_CHECK (Mtm -> disabledNodeMask , i )
427
+ && Mtm -> nodes [i ].oldestSnapshot < oldestSnapshot )
430
428
{
431
- if (prev != NULL ) {
432
- /* Remove information about too old transactions */
433
- hash_search (MtmXid2State , & prev -> xid , HASH_REMOVE , NULL );
434
- }
429
+ oldestSnapshot = Mtm -> nodes [i ].oldestSnapshot ;
435
430
}
436
- }
437
- if (prev != NULL ) {
438
- Mtm -> transListHead = prev ;
439
- Mtm -> oldestXid = xid = prev -> xid ;
440
- } else {
431
+ }
432
+ oldestSnapshot -= MtmVacuumDelay * USEC ;
433
+ for (ts = Mtm -> transListHead ;
434
+ ts != NULL
435
+ && ts -> csn < oldestSnapshot
436
+ && TransactionIdPrecedes (ts -> xid , xid )
437
+ && (ts -> status == TRANSACTION_STATUS_COMMITTED ||
438
+ ts -> status == TRANSACTION_STATUS_ABORTED );
439
+ ts = ts -> next )
440
+ {
441
+ if (ts -> status == TRANSACTION_STATUS_COMMITTED ) {
442
+ prev = ts ;
443
+ }
444
+ }
445
+ if (prev != NULL ) {
446
+ for (ts = Mtm -> transListHead ; ts != prev ; ts = ts -> next ) {
447
+ /* Remove information about too old transactions */
448
+ Assert (ts -> status != TRANSACTION_STATUS_UNKNOWN );
449
+ hash_search (MtmXid2State , & ts -> xid , HASH_REMOVE , NULL );
450
+ }
451
+ Mtm -> transListHead = prev ;
452
+ Mtm -> oldestXid = xid = prev -> xid ;
453
+ } else if (TransactionIdPrecedes (Mtm -> oldestXid , xid )) {
441
454
xid = Mtm -> oldestXid ;
442
455
}
443
456
MtmUnlock ();
@@ -1325,6 +1338,21 @@ _PG_init(void)
1325
1338
NULL
1326
1339
);
1327
1340
1341
+ DefineCustomIntVariable (
1342
+ "multimaster.vacuum_delay" ,
1343
+ "Minimal age of records which can be vacuumed (seconds)" ,
1344
+ NULL ,
1345
+ & MtmVacuumDelay ,
1346
+ 1 ,
1347
+ 1 ,
1348
+ INT_MAX ,
1349
+ PGC_BACKEND ,
1350
+ 0 ,
1351
+ NULL ,
1352
+ NULL ,
1353
+ NULL
1354
+ );
1355
+
1328
1356
DefineCustomIntVariable (
1329
1357
"multimaster.queue_size" ,
1330
1358
"Multimaster queue size" ,
0 commit comments