@@ -176,7 +176,6 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId *subxids, int
176
176
177
177
static void MtmShmemStartup (void );
178
178
179
- static BgwPool * MtmPoolConstructor (void );
180
179
static bool MtmRunUtilityStmt (PGconn * conn , char const * sql , char * * errmsg );
181
180
static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError , int forceOnNode );
182
181
static void MtmProcessDDLCommand (char const * queryString , bool transactional );
@@ -266,12 +265,12 @@ bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some
266
265
bool MtmMajorNode ;
267
266
char * MtmRefereeConnStr ;
268
267
bool MtmEnforceLocalTx ;
268
+ int MtmWorkers ;
269
269
270
270
static char * MtmConnStrs ;
271
271
static char * MtmRemoteFunctionsList ;
272
272
static char * MtmClusterName ;
273
273
static int MtmQueueSize ;
274
- static int MtmWorkers ;
275
274
static int MtmVacuumDelay ;
276
275
static int MtmMinRecoveryLag ;
277
276
static int MtmMaxRecoveryLag ;
@@ -2558,7 +2557,8 @@ static void MtmInitialize()
2558
2557
Mtm -> inject2PCError = 0 ;
2559
2558
Mtm -> sendQueue = NULL ;
2560
2559
Mtm -> freeQueue = NULL ;
2561
- for (i = 0 ; i < MtmNodes ; i ++ ) {
2560
+ for (i = 0 ; i < MtmMaxNodes ; i ++ )
2561
+ {
2562
2562
Mtm -> nodes [i ].oldestSnapshot = 0 ;
2563
2563
Mtm -> nodes [i ].disabledNodeMask = 0 ;
2564
2564
Mtm -> nodes [i ].connectivityMask = (((nodemask_t )1 << MtmNodes ) - 1 );
@@ -2576,14 +2576,14 @@ static void MtmInitialize()
2576
2576
Mtm -> nodes [i ].nHeartbeats = 0 ;
2577
2577
Mtm -> nodes [i ].manualRecovery = false;
2578
2578
Mtm -> nodes [i ].slotDeleted = false;
2579
+ BgwPoolInit (& Mtm -> nodes [i ].pool , MtmExecutor , MtmDatabaseName , MtmDatabaseUser , MtmQueueSize , 0 );
2579
2580
}
2580
2581
Mtm -> nodes [MtmNodeId - 1 ].originId = DoNotReplicateId ;
2581
2582
/* All transaction originated from the current node should be ignored during recovery */
2582
2583
Mtm -> nodes [MtmNodeId - 1 ].restartLSN = (lsn_t )PG_UINT64_MAX ;
2583
2584
Mtm -> sendSemaphore = PGSemaphoreCreate ();
2584
2585
PGSemaphoreReset (Mtm -> sendSemaphore );
2585
2586
SpinLockInit (& Mtm -> queueSpinlock );
2586
- BgwPoolInit (& Mtm -> pool , MtmExecutor , MtmDatabaseName , MtmDatabaseUser , MtmQueueSize , MtmWorkers );
2587
2587
RegisterXactCallback (MtmXactCallback , NULL );
2588
2588
MtmTx .snapshot = INVALID_CSN ;
2589
2589
MtmTx .xid = InvalidTransactionId ;
@@ -2754,7 +2754,7 @@ static void MtmSplitConnStrs(void)
2754
2754
MTM_ELOG (ERROR , "More than %d nodes are specified" , MtmMaxNodes );
2755
2755
}
2756
2756
MtmNodes = i ;
2757
- MtmConnections = (MtmConnectionInfo * )palloc (MtmMaxNodes * sizeof (MtmConnectionInfo ));
2757
+ MtmConnections = (MtmConnectionInfo * )palloc0 (MtmMaxNodes * sizeof (MtmConnectionInfo ));
2758
2758
2759
2759
if (f != NULL ) {
2760
2760
fseek (f , SEEK_SET , 0 );
@@ -3369,11 +3369,9 @@ _PG_init(void)
3369
3369
* the postmaster process.) We'll allocate or attach to the shared
3370
3370
* resources in mtm_shmem_startup().
3371
3371
*/
3372
- RequestAddinShmemSpace (MTM_SHMEM_SIZE + MtmQueueSize );
3372
+ RequestAddinShmemSpace (MTM_SHMEM_SIZE + MtmMaxNodes * MtmQueueSize );
3373
3373
RequestNamedLWLockTranche (MULTIMASTER_NAME , 1 + MtmMaxNodes * 2 );
3374
3374
3375
- BgwPoolStart (MtmWorkers , MtmPoolConstructor );
3376
-
3377
3375
MtmArbiterInitialize ();
3378
3376
3379
3377
/*
@@ -4313,18 +4311,30 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
4313
4311
TupleDesc desc ;
4314
4312
Datum values [Natts_mtm_cluster_state ];
4315
4313
bool nulls [Natts_mtm_cluster_state ] = {false};
4314
+ int i ,
4315
+ pool_active = 0 ,
4316
+ pool_pending = 0 ,
4317
+ pool_queue_size = 0 ;
4318
+
4316
4319
get_call_result_type (fcinfo , NULL , & desc );
4317
4320
4321
+ for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
4322
+ {
4323
+ pool_active += (int ) Mtm -> nodes [i ].pool .active ;
4324
+ pool_pending += (int ) Mtm -> nodes [i ].pool .pending ;
4325
+ pool_queue_size += (int ) BgwPoolGetQueueSize (& Mtm -> nodes [i ].pool );
4326
+ }
4327
+
4318
4328
values [0 ] = Int32GetDatum (MtmNodeId );
4319
4329
values [1 ] = CStringGetTextDatum (MtmNodeStatusMnem [Mtm -> status ]);
4320
4330
values [2 ] = Int64GetDatum (Mtm -> disabledNodeMask );
4321
4331
values [3 ] = Int64GetDatum (SELF_CONNECTIVITY_MASK );
4322
4332
values [4 ] = Int64GetDatum (Mtm -> originLockNodeMask );
4323
4333
values [5 ] = Int32GetDatum (Mtm -> nLiveNodes );
4324
4334
values [6 ] = Int32GetDatum (Mtm -> nAllNodes );
4325
- values [7 ] = Int32GetDatum (( int ) Mtm -> pool . active );
4326
- values [8 ] = Int32GetDatum (( int ) Mtm -> pool . pending );
4327
- values [9 ] = Int64GetDatum (BgwPoolGetQueueSize ( & Mtm -> pool ) );
4335
+ values [7 ] = Int32GetDatum (pool_active );
4336
+ values [8 ] = Int32GetDatum (pool_pending );
4337
+ values [9 ] = Int64GetDatum (pool_queue_size );
4328
4338
values [10 ] = Int64GetDatum (Mtm -> transCount );
4329
4339
values [11 ] = Int64GetDatum (Mtm -> timeShift );
4330
4340
values [12 ] = Int32GetDatum (Mtm -> recoverySlot );
@@ -5608,28 +5618,6 @@ static void MtmSeqNextvalHook(Oid seqid, int64 next)
5608
5618
}
5609
5619
}
5610
5620
5611
- /*
5612
- * -------------------------------------------
5613
- * Executor pool interface
5614
- * -------------------------------------------
5615
- */
5616
-
5617
- void MtmExecute (void * work , int size )
5618
- {
5619
- if (Mtm -> status == MTM_RECOVERY ) {
5620
- /* During recovery apply changes sequentially to preserve commit order */
5621
- MtmExecutor (work , size );
5622
- } else {
5623
- BgwPoolExecute (& Mtm -> pool , work , size );
5624
- }
5625
- }
5626
-
5627
- static BgwPool *
5628
- MtmPoolConstructor (void )
5629
- {
5630
- return & Mtm -> pool ;
5631
- }
5632
-
5633
5621
/*
5634
5622
* -------------------------------------------
5635
5623
* Deadlock detection
@@ -5743,21 +5731,21 @@ MtmDetectGlobalDeadLockForXid(TransactionId xid)
5743
5731
MtmGetGtid (xid , & gtid );
5744
5732
hasDeadlock = MtmGraphFindLoop (& graph , & gtid );
5745
5733
MTM_ELOG (LOG , "Distributed deadlock check by backend %d for %u:%llu = %d" , MyProcPid , gtid .node , (long64 )gtid .xid , hasDeadlock );
5746
- if (!hasDeadlock ) {
5747
- /* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
5748
- * can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
5749
- * refelected in lock graph
5750
- */
5751
- timestamp_t lastPeekTime = BgwGetLastPeekTime (& Mtm -> pool );
5752
- if (lastPeekTime != 0 && MtmGetSystemTime () - lastPeekTime >= MSEC_TO_USEC (DeadlockTimeout )) {
5753
- hasDeadlock = true;
5754
- MTM_ELOG (WARNING , "Apply workers were blocked more than %d msec" ,
5755
- (int )USEC_TO_MSEC (MtmGetSystemTime () - lastPeekTime ));
5756
- } else {
5757
- MTM_LOG1 ("Enable deadlock timeout in backend %d for transaction %llu" , MyProcPid , (long64 )xid );
5758
- enable_timeout_after (DEADLOCK_TIMEOUT , DeadlockTimeout );
5759
- }
5760
- }
5734
+ // if (!hasDeadlock) {
5735
+ // /* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
5736
+ // * can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
5737
+ // * refelected in lock graph
5738
+ // */
5739
+ // timestamp_t lastPeekTime = minBgwGetLastPeekTime (&Mtm->pool);
5740
+ // if (lastPeekTime != 0 && MtmGetSystemTime() - lastPeekTime >= MSEC_TO_USEC(DeadlockTimeout)) {
5741
+ // hasDeadlock = true;
5742
+ // MTM_ELOG(WARNING, "Apply workers were blocked more than %d msec",
5743
+ // (int)USEC_TO_MSEC(MtmGetSystemTime() - lastPeekTime));
5744
+ // } else {
5745
+ // MTM_LOG1("Enable deadlock timeout in backend %d for transaction %llu", MyProcPid, (long64)xid);
5746
+ // enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
5747
+ // }
5748
+ // }
5761
5749
}
5762
5750
return hasDeadlock ;
5763
5751
}
0 commit comments