@@ -255,6 +255,7 @@ static int MtmLockCount;
255
255
static bool MtmMajorNode ;
256
256
static bool MtmBreakConnection ;
257
257
static bool MtmSuspended ;
258
+ static bool MtmInsideTransaction ;
258
259
259
260
static ExecutorStart_hook_type PreviousExecutorStartHook ;
260
261
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -278,6 +279,15 @@ static bool MtmAtExitHookRegistered = false;
278
279
*/
279
280
void MtmReleaseLocks (void )
280
281
{
282
+ MtmResetTransaction ();
283
+ if (MtmInsideTransaction )
284
+ {
285
+ MtmLock (LW_EXCLUSIVE );
286
+ Assert (Mtm -> nRunningTransactions > 0 );
287
+ Mtm -> nRunningTransactions -= 1 ;
288
+ MtmInsideTransaction = false;
289
+ MtmUnlock ();
290
+ }
281
291
if (MtmSuspended ) {
282
292
MtmResumeNode ();
283
293
}
@@ -287,6 +297,7 @@ void MtmReleaseLocks(void)
287
297
Mtm -> lastLockHolder = 0 ;
288
298
LWLockRelease ((LWLockId )& Mtm -> locks [MTM_STATE_LOCK_ID ]);
289
299
}
300
+
290
301
}
291
302
292
303
/*
@@ -870,14 +881,20 @@ MtmBeginTransaction(MtmCurrentTrans* x)
870
881
&& strcmp (application_name , MULTIMASTER_ADMIN ) != 0 )
871
882
{
872
883
MtmCheckClusterLock ();
873
- }
884
+ }
885
+ MtmInsideTransaction = true;
886
+ Mtm -> nRunningTransactions += 1 ;
887
+
874
888
x -> snapshot = MtmAssignCSN ();
889
+ MTM_LOG1 ("Start transaction %lld with snapshot %lld" , (long64 )x -> xid , x -> snapshot );
875
890
876
891
MtmUnlock ();
877
892
878
893
MTM_LOG3 ("%d: MtmLocalTransaction: %s transaction %u uses local snapshot %llu" ,
879
894
MyProcPid , x -> isDistributed ? "distributed" : "local" , x -> xid , x -> snapshot );
880
- }
895
+ } else {
896
+ Assert (MtmInsideTransaction );
897
+ }
881
898
}
882
899
883
900
@@ -1328,15 +1345,20 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
1328
1345
static void
1329
1346
MtmEndTransaction (MtmCurrentTrans * x , bool commit )
1330
1347
{
1331
- MTM_LOG2 ("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s" ,
1332
- MyProcPid , x -> xid , x -> isPrepared , x -> isReplicated , x -> isDistributed , x -> isTwoPhase , x -> gid , commit ? "commit" : "abort" );
1333
- if (MtmSuspended ) {
1334
- MtmResumeNode ();
1335
- }
1348
+ MTM_LOG3 ("%d: End transaction %lld, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s, LSN %lld" ,
1349
+ MyProcPid , (long64 )x -> xid , x -> isPrepared , x -> isReplicated , x -> isDistributed , x -> isTwoPhase , x -> gid , commit ? "commit" : "abort" , (long64 )GetXLogInsertRecPtr ());
1336
1350
commit &= (x -> status != TRANSACTION_STATUS_ABORTED );
1351
+
1352
+ MtmLock (LW_EXCLUSIVE );
1353
+
1354
+ if (MtmInsideTransaction ) {
1355
+ Assert (Mtm -> nRunningTransactions > 0 );
1356
+ Mtm -> nRunningTransactions -= 1 ;
1357
+ MtmInsideTransaction = false;
1358
+ }
1359
+
1337
1360
if (x -> isDistributed && (x -> isPrepared || x -> isReplicated ) && !x -> isTwoPhase ) {
1338
1361
MtmTransState * ts = NULL ;
1339
- MtmLock (LW_EXCLUSIVE );
1340
1362
if (x -> isPrepared ) {
1341
1363
ts = (MtmTransState * )hash_search (MtmXid2State , & x -> xid , HASH_FIND , NULL );
1342
1364
Assert (ts != NULL );
@@ -1419,12 +1441,16 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
1419
1441
#endif
1420
1442
}
1421
1443
Assert (!x -> isActive );
1422
- MtmUnlock ();
1423
1444
}
1445
+ MtmUnlock ();
1446
+
1424
1447
MtmResetTransaction ();
1425
1448
if (!MyReplicationSlot ) {
1426
1449
MtmCheckSlots ();
1427
1450
}
1451
+ if (MtmSuspended ) {
1452
+ MtmResumeNode ();
1453
+ }
1428
1454
}
1429
1455
1430
1456
/*
@@ -2059,22 +2085,24 @@ static void
2059
2085
MtmSuspendNode (void )
2060
2086
{
2061
2087
timestamp_t delay = MIN_WAIT_TIMEOUT ;
2062
- bool insideTransaction = MtmTx .isActive ;
2063
2088
Assert (!MtmSuspended );
2064
2089
MtmLock (LW_EXCLUSIVE );
2065
2090
if (Mtm -> exclusiveLock ) {
2066
2091
elog (ERROR , "There is already pending exclusive lock" );
2067
2092
}
2068
2093
Mtm -> exclusiveLock = true;
2069
2094
MtmSuspended = true;
2070
- while (Mtm -> nActiveTransactions != insideTransaction ) {
2095
+ MTM_LOG2 ("Transaction %lld tries to suspend node at %lld insideTransaction=%d, active transactions=%lld" ,
2096
+ (long64 )MtmTx .xid , MtmGetCurrentTime (), insideTransaction , (long64 )Mtm -> nRunningTransactions );
2097
+ while (Mtm -> nRunningTransactions != 1 ) { /* I am one */
2071
2098
MtmUnlock ();
2072
2099
MtmSleep (delay );
2073
2100
if (delay * 2 <= MAX_WAIT_TIMEOUT ) {
2074
2101
delay *= 2 ;
2075
2102
}
2076
2103
MtmLock (LW_EXCLUSIVE );
2077
2104
}
2105
+ MTM_LOG2 ("Transaction %lld suspended node at %lld, LSN %lld, active transactions=%lld" , (long64 )MtmTx .xid , MtmGetCurrentTime (), (long64 )GetXLogInsertRecPtr (), (long64 )Mtm -> nRunningTransactions );
2078
2106
MtmUnlock ();
2079
2107
}
2080
2108
@@ -2085,6 +2113,7 @@ static void
2085
2113
MtmResumeNode (void )
2086
2114
{
2087
2115
MtmLock (LW_EXCLUSIVE );
2116
+ MTM_LOG2 ("Transaction %lld resume node at %lld status %s LSN %lld" , (long64 )MtmTx .xid , MtmGetCurrentTime (), MtmTxnStatusMnem [MtmTx .status ], (long64 )GetXLogInsertRecPtr ());
2088
2117
Mtm -> exclusiveLock = false;
2089
2118
MtmSuspended = false;
2090
2119
MtmUnlock ();
@@ -2527,6 +2556,7 @@ static void MtmInitialize()
2527
2556
Mtm -> nLockers = 0 ;
2528
2557
Mtm -> exclusiveLock = false;
2529
2558
Mtm -> nActiveTransactions = 0 ;
2559
+ Mtm -> nRunningTransactions = 0 ;
2530
2560
Mtm -> votingTransactions = NULL ;
2531
2561
Mtm -> transListHead = NULL ;
2532
2562
Mtm -> transListTail = & Mtm -> transListHead ;
@@ -3369,7 +3399,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
3369
3399
MtmTx .isActive = true;
3370
3400
FinishPreparedTransaction (ts -> gid , commit );
3371
3401
if (commit ) {
3372
- MTM_LOG2 ("Distributed transaction %s is committed" , ts -> gid );
3402
+ MTM_LOG2 ("Distributed transaction %s (%lld) is committed at %lld with LSN=%lld " , ts -> gid , ( long64 ) ts -> xid , MtmGetCurrentTime (), ( long64 ) GetXLogInsertRecPtr () );
3373
3403
}
3374
3404
if (!insideTransaction ) {
3375
3405
CommitTransactionCommand ();
@@ -4556,7 +4586,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
4556
4586
MTM_ELOG (ERROR , "Transaction %s (%llu) is aborted by DTM" , x -> gid , (long64 )x -> xid );
4557
4587
} else {
4558
4588
FinishPreparedTransaction (x -> gid , true);
4559
- MTM_LOG2 ("Distributed transaction %s is committed" , x -> gid );
4589
+ MTM_LOG2 ("Distributed transaction %s (%lld) is committed at %lld with LSN=%lld " , x -> gid , ( long64 ) x -> xid , MtmGetCurrentTime (), ( long64 ) GetXLogInsertRecPtr () );
4560
4590
}
4561
4591
}
4562
4592
}
0 commit comments