@@ -464,13 +464,15 @@ static void MtmInitialize()
464
464
dtm -> transListTail = & dtm -> transListHead ;
465
465
dtm -> nReceivers = 0 ;
466
466
dtm -> timeShift = 0 ;
467
+ dtm -> transCount = 0 ;
468
+ memset (dtm -> nodeTransDelay , 0 , sizeof (dtm -> nodeTransDelay ));
467
469
PGSemaphoreCreate (& dtm -> votingSemaphore );
468
470
PGSemaphoreReset (& dtm -> votingSemaphore );
469
471
SpinLockInit (& dtm -> spinlock );
470
472
BgwPoolInit (& dtm -> pool , MtmExecutor , MtmDatabaseName , MtmQueueSize );
471
473
RegisterXactCallback (MtmXactCallback , NULL );
472
474
dtmTx .snapshot = INVALID_CSN ;
473
- dtmTx .xid = InvalidTransactionId ;
475
+ dtmTx .xid = InvalidTransactionId ;
474
476
}
475
477
xid2state = MtmCreateHash ();
476
478
MtmDoReplication = true;
@@ -628,7 +630,8 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
628
630
ts -> procno = MyProc -> pgprocno ;
629
631
ts -> nVotes = 0 ;
630
632
ts -> done = false;
631
-
633
+ dtm -> transCount += 1 ;
634
+
632
635
if (TransactionIdIsValid (x -> gtid .xid )) {
633
636
ts -> gtid = x -> gtid ;
634
637
} else {
@@ -1282,8 +1285,8 @@ typedef struct
1282
1285
int nodeId ;
1283
1286
char * connStrPtr ;
1284
1287
TupleDesc desc ;
1285
- Datum values [6 ];
1286
- bool nulls [6 ];
1288
+ Datum values [7 ];
1289
+ bool nulls [7 ];
1287
1290
} MtmGetNodeStateCtx ;
1288
1291
1289
1292
Datum
@@ -1319,11 +1322,12 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
1319
1322
lag = MtmGetSlotLag (usrfctx -> nodeId );
1320
1323
usrfctx -> values [4 ] = Int64GetDatum (lag );
1321
1324
usrfctx -> nulls [4 ] = lag < 0 ;
1325
+ usrfctx -> values [5 ] = Int64GetDatum (dtm -> transCount ? dtm -> nodeTransDelay [usrfctx -> nodeId - 1 ]/dtm -> transCount : 0 );
1322
1326
p = strchr (usrfctx -> connStrPtr , ',' );
1323
1327
if (p != NULL ) {
1324
1328
* p ++ = '\0' ;
1325
1329
}
1326
- usrfctx -> values [5 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
1330
+ usrfctx -> values [6 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
1327
1331
usrfctx -> connStrPtr = p ;
1328
1332
usrfctx -> nodeId += 1 ;
1329
1333
@@ -1334,8 +1338,8 @@ Datum
1334
1338
mtm_get_cluster_state (PG_FUNCTION_ARGS )
1335
1339
{
1336
1340
TupleDesc desc ;
1337
- Datum values [7 ];
1338
- bool nulls [7 ] = {false};
1341
+ Datum values [10 ];
1342
+ bool nulls [10 ] = {false};
1339
1343
get_call_result_type (fcinfo , NULL , & desc );
1340
1344
1341
1345
values [0 ] = CStringGetTextDatum (MtmNodeStatusMnem [dtm -> status ]);
@@ -1345,6 +1349,10 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
1345
1349
values [4 ] = Int32GetDatum (dtm -> nNodes );
1346
1350
values [5 ] = Int32GetDatum ((int )dtm -> pool .active );
1347
1351
values [6 ] = Int64GetDatum (BgwPoolGetQueueSize (& dtm -> pool ));
1352
+ values [7 ] = Int64GetDatum (dtm -> transCount );
1353
+ values [8 ] = Int64GetDatum (dtm -> timeShift );
1354
+ values [9 ] = Int32GetDatum (dtm -> recoverySlot );
1355
+ nulls [9 ] = dtm -> recoverySlot == 0 ;
1348
1356
1349
1357
PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
1350
1358
}
0 commit comments