Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7fa827b

Browse files
committed
Send deadlock graph ysing logical messages
1 parent 1b8929d commit 7fa827b

File tree

10 files changed

+187
-60
lines changed

10 files changed

+187
-60
lines changed

contrib/mmts/arbiter.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ static void MtmSendHeartbeat()
340340
timestamp_t now = MtmGetSystemTime();
341341
msg.code = MSG_HEARTBEAT;
342342
msg.disabledNodeMask = Mtm->disabledNodeMask;
343+
msg.connectivityMask = Mtm->connectivityMask;
343344
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
344345
msg.node = MtmNodeId;
345346
msg.csn = now;
@@ -463,6 +464,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
463464
req.hdr.sxid = ShmemVariableCache->nextXid;
464465
req.hdr.csn = MtmGetCurrentTime();
465466
req.hdr.disabledNodeMask = Mtm->disabledNodeMask;
467+
req.hdr.connectivityMask = Mtm->connectivityMask;
466468
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
467469
if (!MtmWriteSocket(sd, &req, sizeof req)) {
468470
elog(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
@@ -599,6 +601,7 @@ static void MtmAcceptOneConnection()
599601

600602
resp.code = MSG_STATUS;
601603
resp.disabledNodeMask = Mtm->disabledNodeMask;
604+
resp.connectivityMask = Mtm->connectivityMask;
602605
resp.dxid = HANDSHAKE_MAGIC;
603606
resp.sxid = ShmemVariableCache->nextXid;
604607
resp.csn = MtmGetCurrentTime();
@@ -881,6 +884,8 @@ static void MtmReceiver(Datum arg)
881884

882885
Assert(node > 0 && node <= nNodes && node != MtmNodeId);
883886
Mtm->nodes[node-1].oldestSnapshot = msg->oldestSnapshot;
887+
Mtm->nodes[node-1].disabledNodeMask = msg->disabledNodeMask;
888+
Mtm->nodes[node-1].connectivityMask = msg->connectivityMask;
884889
Mtm->nodes[node-1].lastHeartbeat = MtmGetSystemTime();
885890

886891
switch (msg->code) {

contrib/mmts/bgwpool.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ static void BgwStartExtraWorker(BgwPool* pool)
130130
{
131131
if (pool->nWorkers < MtmMaxWorkers) {
132132
timestamp_t now = MtmGetSystemTime();
133-
if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now) {
133+
/*if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now)*/
134+
{
134135
BackgroundWorker worker;
135136
BackgroundWorkerHandle* handle;
136137
MemSet(&worker, 0, sizeof(BackgroundWorker));

contrib/mmts/multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ CREATE FUNCTION mtm.inject_2pc_error(stage integer) RETURNS void
7070
AS 'MODULE_PATHNAME','mtm_inject_2pc_error'
7171
LANGUAGE C;
7272

73+
CREATE FUNCTION mtm.check_deadlock(xid integer) RETURNS boolean
74+
AS 'MODULE_PATHNAME','mtm_check_deadlock'
75+
LANGUAGE C;
76+
7377
-- CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
7478

7579
-- CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));

contrib/mmts/multimaster.c

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "access/twophase.h"
3939
#include "utils/guc.h"
4040
#include "utils/hsearch.h"
41+
#include "utils/timeout.h"
4142
#include "utils/tqual.h"
4243
#include "utils/array.h"
4344
#include "utils/builtins.h"
@@ -94,7 +95,7 @@ typedef enum
9495
#define MTM_MAP_SIZE MTM_HASH_SIZE
9596
#define MIN_WAIT_TIMEOUT 1000
9697
#define MAX_WAIT_TIMEOUT 100000
97-
#define MAX_WAIT_LOOPS 100
98+
#define MAX_WAIT_LOOPS 100 // 1000000
9899
#define STATUS_POLL_DELAY USECS_PER_SEC
99100

100101
void _PG_init(void);
@@ -117,6 +118,7 @@ PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
117118
PG_FUNCTION_INFO_V1(mtm_make_table_local);
118119
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
119120
PG_FUNCTION_INFO_V1(mtm_inject_2pc_error);
121+
PG_FUNCTION_INFO_V1(mtm_check_deadlock);
120122

121123
static Snapshot MtmGetSnapshot(Snapshot snapshot);
122124
static void MtmInitialize(void);
@@ -274,15 +276,15 @@ void MtmUnlock(void)
274276
Mtm->lastLockHolder = 0;
275277
}
276278

277-
void MtmLockNode(int nodeId)
279+
void MtmLockNode(int nodeId, LWLockMode mode)
278280
{
279-
Assert(nodeId > 0 && nodeId <= Mtm->nAllNodes);
280-
LWLockAcquire((LWLockId)&Mtm->locks[nodeId], LW_EXCLUSIVE);
281+
Assert(nodeId > 0 && nodeId <= MtmMaxNodes*2);
282+
LWLockAcquire((LWLockId)&Mtm->locks[nodeId], mode);
281283
}
282284

283285
void MtmUnlockNode(int nodeId)
284286
{
285-
Assert(nodeId > 0 && nodeId <= Mtm->nAllNodes);
287+
Assert(nodeId > 0 && nodeId <= MtmMaxNodes*2);
286288
LWLockRelease((LWLockId)&Mtm->locks[nodeId]);
287289
}
288290

@@ -437,6 +439,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
437439
static timestamp_t totalSleepTime;
438440
static timestamp_t maxSleepTime;
439441
#endif
442+
timestamp_t start = MtmGetSystemTime();
440443
timestamp_t delay = MIN_WAIT_TIMEOUT;
441444
int i;
442445
Assert(xid != InvalidTransactionId);
@@ -460,7 +463,10 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
460463
if (ts->csn > MtmTx.snapshot) {
461464
MTM_LOG4("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld",
462465
MyProcPid, xid, ts->csn, MtmTx.snapshot);
463-
MtmUnlock();
466+
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
467+
elog(WARNING, "Backend %d waits for transaction %x status %ld usecs", MyProcPid, xid, MtmGetSystemTime() - start);
468+
}
469+
MtmUnlock();
464470
return true;
465471
}
466472
if (ts->status == TRANSACTION_STATUS_UNKNOWN)
@@ -499,6 +505,9 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
499505
MTM_LOG4("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld",
500506
MyProcPid, xid, ts->csn, invisible ? "rollbacked" : "committed", MtmTx.snapshot);
501507
MtmUnlock();
508+
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
509+
elog(WARNING, "Backend %d waits for %s transaction %x %ld usecs", MyProcPid, invisible ? "rollbacked" : "committed", xid, MtmGetSystemTime() - start);
510+
}
502511
return invisible;
503512
}
504513
}
@@ -510,7 +519,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
510519
}
511520
}
512521
MtmUnlock();
513-
elog(ERROR, "Failed to get status of XID %d", xid);
522+
elog(ERROR, "Failed to get status of XID %d in %ld usec", xid, MtmGetSystemTime() - start);
514523
return true;
515524
}
516525

@@ -1091,6 +1100,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
10911100
msg.sxid = ts->xid;
10921101
msg.csn = ts->csn;
10931102
msg.disabledNodeMask = Mtm->disabledNodeMask;
1103+
msg.connectivityMask = Mtm->connectivityMask;
10941104
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
10951105
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
10961106

@@ -1118,6 +1128,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
11181128
MtmArbiterMessage msg;
11191129
msg.code = MSG_POLL_REQUEST;
11201130
msg.disabledNodeMask = Mtm->disabledNodeMask;
1131+
msg.connectivityMask = Mtm->connectivityMask;
11211132
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
11221133
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
11231134

@@ -1681,8 +1692,6 @@ void MtmCheckQuorum(void)
16811692

16821693
void MtmOnNodeDisconnect(int nodeId)
16831694
{
1684-
MtmTransState *ts;
1685-
16861695
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
16871696
{
16881697
/* Node is already disabled */
@@ -1711,11 +1720,13 @@ void MtmOnNodeDisconnect(int nodeId)
17111720
}
17121721

17131722
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
1714-
1723+
#if 0
17151724
if (!MtmUseRaftable)
17161725
{
17171726
MtmLock(LW_EXCLUSIVE);
17181727
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1728+
MtmTransState *ts;
1729+
17191730
MtmDisableNode(nodeId);
17201731
MtmCheckQuorum();
17211732
/* Interrupt voting for active transaction and abort them */
@@ -1729,7 +1740,10 @@ void MtmOnNodeDisconnect(int nodeId)
17291740
}
17301741
}
17311742
MtmUnlock();
1732-
} else {
1743+
}
1744+
else
1745+
#endif
1746+
{
17331747
MtmRefreshClusterStatus(false, 0);
17341748
}
17351749
}
@@ -1942,6 +1956,11 @@ static void MtmInitialize()
19421956
Mtm->freeQueue = NULL;
19431957
for (i = 0; i < MtmNodes; i++) {
19441958
Mtm->nodes[i].oldestSnapshot = 0;
1959+
Mtm->nodes[i].disabledNodeMask = 0;
1960+
Mtm->nodes[i].connectivityMask = 0;
1961+
Mtm->nodes[i].lockGraphUsed = 0;
1962+
Mtm->nodes[i].lockGraphAllocated = 0;
1963+
Mtm->nodes[i].lockGraphData = NULL;
19451964
Mtm->nodes[i].transDelay = 0;
19461965
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
19471966
Mtm->nodes[i].con = MtmConnections[i];
@@ -2581,7 +2600,7 @@ _PG_init(void)
25812600
* resources in mtm_shmem_startup().
25822601
*/
25832602
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmQueueSize);
2584-
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes);
2603+
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2);
25852604

25862605
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
25872606

@@ -3238,7 +3257,7 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
32383257
for (i = 0; i < Mtm->nAllNodes; i++)
32393258
{
32403259
size_t size;
3241-
char *data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, false);
3260+
char* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, false);
32423261
if (data) {
32433262
GlobalTransactionId *gtid = (GlobalTransactionId *)data;
32443263
GlobalTransactionId *last = (GlobalTransactionId *)(data + size);
@@ -3630,12 +3649,28 @@ static bool MtmProcessDDLCommand(char const* queryString)
36303649
}
36313650

36323651
MTM_LOG1("Sending utility: %s", queryWithContext);
3633-
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
3652+
LogLogicalMessage("G", queryWithContext, strlen(queryWithContext)+1, true);
36343653

36353654
MtmTx.containsDML = true;
36363655
return false;
36373656
}
36383657

3658+
void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
3659+
{
3660+
int allocated;
3661+
MtmLockNode(nodeId + MtmMaxNodes, LW_EXCLUSIVE);
3662+
allocated = Mtm->nodes[nodeId-1].lockGraphAllocated;
3663+
if (messageSize > allocated) {
3664+
allocated = Max(Max(MULTIMASTER_LOCK_BUF_INIT_SIZE, allocated*2), messageSize);
3665+
Mtm->nodes[nodeId-1].lockGraphData = ShmemAlloc(allocated);
3666+
Mtm->nodes[nodeId-1].lockGraphAllocated = allocated;
3667+
}
3668+
memcpy(Mtm->nodes[nodeId-1].lockGraphData, messageBody, messageSize);
3669+
Mtm->nodes[nodeId-1].lockGraphUsed = messageSize;
3670+
MtmUnlockNode(nodeId + MtmMaxNodes);
3671+
MTM_LOG1("Update deadlock graph for node %d size %d", nodeId, messageSize);
3672+
}
3673+
36393674
static void MtmProcessUtility(Node *parsetree, const char *queryString,
36403675
ProcessUtilityContext context, ParamListInfo params,
36413676
DestReceiver *dest, char *completionTag)
@@ -3953,20 +3988,19 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
39533988
}
39543989

39553990
static bool
3956-
MtmDetectGlobalDeadLock(PGPROC* proc)
3991+
MtmDetectGlobalDeadLockFortXid(TransactionId xid)
39573992
{
3958-
ByteBuffer buf;
3959-
PGXACT* pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
39603993
bool hasDeadlock = false;
3961-
3962-
if (TransactionIdIsValid(pgxact->xid)) {
3994+
if (TransactionIdIsValid(xid)) {
3995+
ByteBuffer buf;
39633996
MtmGraph graph;
39643997
GlobalTransactionId gtid;
39653998
int i;
39663999

39674000
ByteBufferAlloc(&buf);
39684001
EnumerateLocks(MtmSerializeLock, &buf);
39694002
RaftableSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, false);
4003+
MtmSleep(MSEC_TO_USEC(DeadlockTimeout));
39704004
MtmGraphInit(&graph);
39714005
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
39724006
ByteBufferFree(&buf);
@@ -3981,9 +4015,9 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
39814015
}
39824016
}
39834017
}
3984-
MtmGetGtid(pgxact->xid, &gtid);
4018+
MtmGetGtid(xid, &gtid);
39854019
hasDeadlock = MtmGraphFindLoop(&graph, &gtid);
3986-
elog(WARNING, "Distributed deadlock check for %u:%u = %d", gtid.node, gtid.xid, hasDeadlock);
4020+
elog(WARNING, "Distributed deadlock check by backend %d for %u:%u = %d", MyProcPid, gtid.node, gtid.xid, hasDeadlock);
39874021
if (!hasDeadlock) {
39884022
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
39894023
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
@@ -3994,8 +4028,27 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
39944028
hasDeadlock = true;
39954029
elog(WARNING, "Apply workers were blocked more than %d msec",
39964030
(int)USEC_TO_MSEC(MtmGetSystemTime() - lastPeekTime));
4031+
} else {
4032+
MTM_LOG1("Enable deadlock timeout in backend %d for transaction %d", MyProcPid, xid);
4033+
enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
39974034
}
39984035
}
39994036
}
40004037
return hasDeadlock;
40014038
}
4039+
4040+
static bool
4041+
MtmDetectGlobalDeadLock(PGPROC* proc)
4042+
{
4043+
PGXACT* pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
4044+
4045+
MTM_LOG1("Detect global deadlock for %d by backend %d", pgxact->xid, MyProcPid);
4046+
4047+
return MtmDetectGlobalDeadLockFortXid(pgxact->xid);
4048+
}
4049+
4050+
Datum mtm_check_deadlock(PG_FUNCTION_ARGS)
4051+
{
4052+
TransactionId xid = PG_GETARG_INT32(0);
4053+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4054+
}

contrib/mmts/multimaster.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#define MULTIMASTER_MAX_LOCAL_TABLES 256
5555
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
56+
#define MULTIMASTER_LOCK_BUF_INIT_SIZE 4096
5657
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5758
#define MULTIMASTER_ADMIN "mtm_admin"
5859

@@ -138,8 +139,9 @@ typedef struct
138139
TransactionId sxid; /* Transaction ID at sender node */
139140
XidStatus status; /* Transaction status */
140141
csn_t csn; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
141-
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes at the sender of message */
142142
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
143+
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes at the sender of message */
144+
nodemask_t connectivityMask; /* Connectivity bittmask at the sender of message */
143145
pgid_t gid; /* Global transaction identifier */
144146
} MtmArbiterMessage;
145147

@@ -179,13 +181,18 @@ typedef struct
179181
timestamp_t receiverStartTime;
180182
timestamp_t senderStartTime;
181183
timestamp_t lastHeartbeat;
184+
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes received from this node */
185+
nodemask_t connectivityMask; /* Connectivity mask at this node */
182186
int senderPid;
183187
int receiverPid;
184188
XLogRecPtr flushPos;
185-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
189+
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
186190
XLogRecPtr restartLsn;
187191
RepOriginId originId;
188192
int timeline;
193+
void* lockGraphData;
194+
int lockGraphAllocated;
195+
int lockGraphUsed;
189196
} MtmNodeInfo;
190197

191198
typedef struct MtmTransState
@@ -223,7 +230,7 @@ typedef struct
223230
MtmNodeStatus status; /* Status of this node */
224231
int recoverySlot; /* NodeId of recovery slot or 0 if none */
225232
volatile slock_t spinlock; /* spinlock used to protect access to hash table */
226-
PGSemaphoreData sendSemaphore; /* semaphore used to notify mtm-sender about new responses to coordinator */
233+
PGSemaphoreData sendSemaphore; /* semaphore used to notify mtm-sender about new responses to coordinator */
227234
LWLockPadded *locks; /* multimaster lock tranche */
228235
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
229236
nodemask_t disabledNodeMask; /* bitmask of disabled nodes */
@@ -310,7 +317,7 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
310317
extern void MtmBroadcastPollMessage(MtmTransState* ts);
311318
extern void MtmLock(LWLockMode mode);
312319
extern void MtmUnlock(void);
313-
extern void MtmLockNode(int nodeId);
320+
extern void MtmLockNode(int nodeId, LWLockMode mode);
314321
extern void MtmUnlockNode(int nodeId);
315322
extern void MtmDropNode(int nodeId, bool dropSlot);
316323
extern void MtmRecoverNode(int nodeId);
@@ -340,6 +347,7 @@ extern XLogRecPtr MtmGetFlushPosition(int nodeId);
340347
extern bool MtmWatchdog(timestamp_t now);
341348
extern void MtmCheckHeartbeat(void);
342349
extern void MtmResetTransaction(void);
350+
extern void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize);
343351
extern PGconn *PQconnectdb_safe(const char *conninfo);
344352

345353

0 commit comments

Comments
 (0)