Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 97c667c

Browse files
knizhnikkelvich
authored andcommitted
Improve deadlock detection algorithm by taking in account hidden dependencies between transactions caused by lack of vacant workers in apply pool
1 parent 46a5c82 commit 97c667c

File tree

4 files changed

+69
-26
lines changed

4 files changed

+69
-26
lines changed

bgwpool.c

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ static void BgwPoolMainLoop(Datum arg)
3535
work = malloc(size);
3636
pool->pending -= 1;
3737
pool->active += 1;
38+
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
39+
pool->lastPeakTime = MtmGetSystemTime();
40+
}
3841
if (pool->head + size + 4 > pool->size) {
3942
memcpy(work, pool->queue, size);
4043
pool->head = INTALIGN(size);
@@ -48,17 +51,19 @@ static void BgwPoolMainLoop(Datum arg)
4851
if (pool->producerBlocked) {
4952
pool->producerBlocked = false;
5053
PGSemaphoreUnlock(&pool->overflow);
54+
pool->lastPeakTime = 0;
5155
}
5256
SpinLockRelease(&pool->lock);
5357
pool->executor(id, work, size);
5458
free(work);
5559
SpinLockAcquire(&pool->lock);
5660
pool->active -= 1;
61+
pool->lastPeakTime = 0;
5762
SpinLockRelease(&pool->lock);
5863
}
5964
}
6065

61-
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize)
66+
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize, size_t nWorkers)
6267
{
6368
pool->queue = (char*)ShmemAlloc(queueSize);
6469
pool->executor = executor;
@@ -73,8 +78,15 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7378
pool->size = queueSize;
7479
pool->active = 0;
7580
pool->pending = 0;
81+
pool->nWorkers = nWorkers;
82+
pool->lastPeakTime = 0;
7683
strcpy(pool->dbname, dbname);
7784
}
85+
86+
timestamp_t BgwGetLastPeekTime(BgwPool* pool)
87+
{
88+
return pool->lastPeakTime;
89+
}
7890

7991
void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
8092
{
@@ -123,12 +135,18 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
123135
if ((pool->head <= pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
124136
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
125137
{
126-
pool->producerBlocked = true;
138+
if (pool->lastPeakTime == 0) {
139+
pool->lastPeakTime = MtmGetSystemTime();
140+
}
141+
pool->producerBlocked = true;
127142
SpinLockRelease(&pool->lock);
128143
PGSemaphoreLock(&pool->overflow);
129144
SpinLockAcquire(&pool->lock);
130145
} else {
131146
pool->pending += 1;
147+
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
148+
pool->lastPeakTime = MtmGetSystemTime();
149+
}
132150
*(int*)&pool->queue[pool->tail] = size;
133151
if (pool->size - pool->tail >= size + 4) {
134152
memcpy(&pool->queue[pool->tail+4], work, size);

bgwpool.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
typedef void(*BgwPoolExecutor)(int id, void* work, size_t size);
99

10+
typedef uint64 timestamp_t;
11+
1012
#define MAX_DBNAME_LEN 30
1113
#define MULTIMASTER_BGW_RESTART_TIMEOUT 1 /* seconds */
1214

@@ -21,6 +23,8 @@ typedef struct
2123
size_t size;
2224
size_t active;
2325
size_t pending;
26+
size_t nWorkers;
27+
time_t lastPeakTime;
2428
bool producerBlocked;
2529
char dbname[MAX_DBNAME_LEN];
2630
char* queue;
@@ -30,10 +34,12 @@ typedef BgwPool*(*BgwPoolConstructor)(void);
3034

3135
extern void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor);
3236

33-
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize);
37+
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, size_t queueSize, size_t nWorkers);
3438

3539
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
3640

3741
extern size_t BgwPoolGetQueueSize(BgwPool* pool);
3842

43+
extern timestamp_t BgwGetLastPeekTime(BgwPool* pool);
44+
3945
#endif

multimaster.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -255,13 +255,18 @@ void MtmUnlockNode(int nodeId)
255255
*/
256256

257257

258-
timestamp_t MtmGetCurrentTime(void)
258+
timestamp_t MtmGetSystemTime(void)
259259
{
260260
struct timeval tv;
261261
gettimeofday(&tv, NULL);
262262
return (timestamp_t)tv.tv_sec*USEC + tv.tv_usec + Mtm->timeShift;
263263
}
264264

265+
timestamp_t MtmGetCurrentTime(void)
266+
{
267+
return MtmGetSystemTime() + Mtm->timeShift;
268+
}
269+
265270
void MtmSleep(timestamp_t interval)
266271
{
267272
struct timespec ts;
@@ -1045,7 +1050,7 @@ void MtmRecoveryCompleted(void)
10451050
MtmLock(LW_EXCLUSIVE);
10461051
Mtm->recoverySlot = 0;
10471052
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1048-
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = time(NULL);
1053+
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
10491054
/* Mode will be changed to online once all locagical reciever are connected */
10501055
MtmSwitchClusterMode(MTM_CONNECTED);
10511056
MtmUnlock();
@@ -1134,7 +1139,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
11341139
/* We are lucky: caugth-up without locking cluster! */
11351140
}
11361141
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
1137-
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
1142+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
11381143
Mtm->nNodes += 1;
11391144
caughtUp = true;
11401145
} else if (!BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)
@@ -1279,15 +1284,15 @@ bool MtmRefreshClusterStatus(bool nowait)
12791284
if (mask & 1) {
12801285
Mtm->nNodes -= 1;
12811286
BIT_SET(Mtm->disabledNodeMask, i);
1282-
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
1287+
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
12831288
}
12841289
}
12851290
mask = clique & Mtm->disabledNodeMask; /* new enabled nodes mask */
12861291
for (i = 0; mask != 0; i++, mask >>= 1) {
12871292
if (mask & 1) {
12881293
Mtm->nNodes += 1;
12891294
BIT_CLEAR(Mtm->disabledNodeMask, i);
1290-
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
1295+
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
12911296
}
12921297
}
12931298
MtmCheckQuorum();
@@ -1327,7 +1332,7 @@ void MtmOnNodeDisconnect(int nodeId)
13271332
{
13281333
MtmTransState *ts;
13291334

1330-
if (Mtm->nodes[nodeId-1].lastStatusChangeTime + MtmNodeDisableDelay > time(NULL)) {
1335+
if (Mtm->nodes[nodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) > MtmGetSystemTime()) {
13311336
/* Avoid false detection of node failure and prevent node status blinking */
13321337
return;
13331338
}
@@ -1342,7 +1347,7 @@ void MtmOnNodeDisconnect(int nodeId)
13421347
if (!MtmRefreshClusterStatus(false)) {
13431348
MtmLock(LW_EXCLUSIVE);
13441349
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1345-
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
1350+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
13461351
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
13471352
Mtm->nNodes -= 1;
13481353
MtmCheckQuorum();
@@ -1510,14 +1515,14 @@ static void MtmInitialize()
15101515
for (i = 0; i < MtmNodes; i++) {
15111516
Mtm->nodes[i].oldestSnapshot = 0;
15121517
Mtm->nodes[i].transDelay = 0;
1513-
Mtm->nodes[i].lastStatusChangeTime = time(NULL);
1518+
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
15141519
Mtm->nodes[i].con = MtmConnections[i];
15151520
Mtm->nodes[i].flushPos = 0;
15161521
}
15171522
PGSemaphoreCreate(&Mtm->votingSemaphore);
15181523
PGSemaphoreReset(&Mtm->votingSemaphore);
15191524
SpinLockInit(&Mtm->spinlock);
1520-
BgwPoolInit(&Mtm->pool, MtmExecutor, MtmDatabaseName, MtmQueueSize);
1525+
BgwPoolInit(&Mtm->pool, MtmExecutor, MtmDatabaseName, MtmQueueSize, MtmWorkers);
15211526
RegisterXactCallback(MtmXactCallback, NULL);
15221527
MtmTx.snapshot = INVALID_CSN;
15231528
MtmTx.xid = InvalidTransactionId;
@@ -1681,10 +1686,10 @@ _PG_init(void)
16811686

16821687
DefineCustomIntVariable(
16831688
"multimaster.node_disable_delay",
1684-
"Minamal amount of time (sec) between node status change",
1689+
"Minamal amount of time (msec) between node status change",
16851690
"This delay is used to avoid false detection of node failure and to prevent blinking of node status node",
16861691
&MtmNodeDisableDelay,
1687-
1,
1692+
1000,
16881693
1,
16891694
INT_MAX,
16901695
PGC_BACKEND,
@@ -2032,7 +2037,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
20322037
{
20332038
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nNodes);
20342039
}
2035-
Mtm->nodes[nodeId-1].lastStatusChangeTime = time(NULL);
2040+
Mtm->nodes[nodeId-1].lastStatusChangeTime = MtmGetSystemTime();
20362041
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
20372042
Mtm->nNodes -= 1;
20382043
MtmCheckQuorum();
@@ -2083,15 +2088,15 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20832088
if (MtmIsRecoverySession) {
20842089
MTM_LOG1("%d: Node %d start recovery of node %d", MyProcPid, MtmNodeId, MtmReplicationNodeId);
20852090
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
2086-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = time(NULL);
2091+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
20872092
BIT_SET(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
20882093
Mtm->nNodes -= 1;
20892094
MtmCheckQuorum();
20902095
}
20912096
} else if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
20922097
if (recoveryCompleted) {
20932098
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication", MtmNodeId, MtmReplicationNodeId);
2094-
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = time(NULL);
2099+
Mtm->nodes[MtmReplicationNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
20952100
BIT_CLEAR(Mtm->disabledNodeMask, MtmReplicationNodeId-1);
20962101
Mtm->nNodes += 1;
20972102
MtmCheckQuorum();
@@ -2238,7 +2243,7 @@ mtm_poll_node(PG_FUNCTION_ARGS)
22382243
}
22392244
if (!nowait) {
22402245
/* Just wait some time until logical repication channels will be reestablished */
2241-
MtmSleep(MtmNodeDisableDelay);
2246+
MtmSleep(MSEC_TO_USEC(MtmNodeDisableDelay));
22422247
}
22432248
PG_RETURN_BOOL(online);
22442249
}
@@ -2297,7 +2302,7 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22972302
usrfctx->values[4] = Int64GetDatum(lag);
22982303
usrfctx->nulls[4] = lag < 0;
22992304
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
2300-
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2305+
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USEC));
23012306
usrfctx->values[7] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23022307
usrfctx->nodeId += 1;
23032308

@@ -3058,6 +3063,18 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
30583063
MtmGetGtid(pgxact->xid, &gtid);
30593064
hasDeadlock = MtmGraphFindLoop(&graph, &gtid);
30603065
elog(WARNING, "Distributed deadlock check for %u:%u = %d", gtid.node, gtid.xid, hasDeadlock);
3066+
if (!hasDeadlock) {
3067+
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
3068+
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
3069+
* refelected in lock graph
3070+
*/
3071+
timestamp_t lastPeekTime = BgwGetLastPeekTime(&Mtm->pool);
3072+
if (lastPeekTime != 0 && MtmGetSystemTime() - lastPeekTime >= MSEC_TO_USEC(DeadlockTimeout)) {
3073+
hasDeadlock = true;
3074+
elog(WARNING, "Apply workers were blocked more than %d msec",
3075+
(int)USEC_TO_MSEC(MtmGetSystemTime() - lastPeekTime));
3076+
}
3077+
}
30613078
}
30623079
return hasDeadlock;
30633080
}

multimaster.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848

4949
#define USEC 1000000
5050

51+
#define USEC_TO_MSEC(t) ((t)/1000)
52+
#define MSEC_TO_USEC(t) ((t)*1000)
53+
5154
#define Natts_mtm_ddl_log 2
5255
#define Anum_mtm_ddl_log_issued 1
5356
#define Anum_mtm_ddl_log_query 2
@@ -72,8 +75,6 @@ typedef uint64 csn_t; /* commit serial number */
7275
#define PGLOGICAL_CAUGHT_UP 0x04
7376

7477

75-
typedef uint64 timestamp_t;
76-
7778
/* Identifier of global transaction */
7879
typedef struct
7980
{
@@ -122,9 +123,9 @@ typedef struct
122123
typedef struct
123124
{
124125
MtmConnectionInfo con;
125-
time_t transDelay;
126-
time_t lastStatusChangeTime;
127-
XLogRecPtr flushPos;
126+
timestamp_t transDelay;
127+
timestamp_t lastStatusChangeTime;
128+
XLogRecPtr flushPos;
128129
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
129130
} MtmNodeInfo;
130131

@@ -232,8 +233,9 @@ extern void MtmRecoverNode(int nodeId);
232233
extern void MtmOnNodeDisconnect(int nodeId);
233234
extern void MtmOnNodeConnect(int nodeId);
234235
extern void MtmWakeUpBackend(MtmTransState* ts);
235-
extern timestamp_t MtmGetCurrentTime(void);
236-
extern void MtmSleep(timestamp_t interval);
236+
extern timestamp_t MtmGetSystemTime(void); /* non-adjusted current system time */
237+
extern timestamp_t MtmGetCurrentTime(void); /* adjusted current system time */
238+
extern void MtmSleep(timestamp_t interval);
237239
extern void MtmAbortTransaction(MtmTransState* ts);
238240
extern void MtmSetCurrentTransactionGID(char const* gid);
239241
extern csn_t MtmGetTransactionCSN(TransactionId xid);

0 commit comments

Comments
 (0)