Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit dd1634f

Browse files
knizhnikkelvich
authored andcommitted
Send deadlock graph ysing logical messages
1 parent 496a138 commit dd1634f

9 files changed

+186
-65
lines changed

arbiter.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ static void MtmSendHeartbeat()
341341
timestamp_t now = MtmGetSystemTime();
342342
msg.code = MSG_HEARTBEAT;
343343
msg.disabledNodeMask = Mtm->disabledNodeMask;
344+
msg.connectivityMask = Mtm->connectivityMask;
344345
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
345346
msg.node = MtmNodeId;
346347
msg.csn = now;
@@ -464,6 +465,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
464465
req.hdr.sxid = ShmemVariableCache->nextXid;
465466
req.hdr.csn = MtmGetCurrentTime();
466467
req.hdr.disabledNodeMask = Mtm->disabledNodeMask;
468+
req.hdr.connectivityMask = Mtm->connectivityMask;
467469
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
468470
if (!MtmWriteSocket(sd, &req, sizeof req)) {
469471
elog(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
@@ -600,6 +602,7 @@ static void MtmAcceptOneConnection()
600602

601603
resp.code = MSG_STATUS;
602604
resp.disabledNodeMask = Mtm->disabledNodeMask;
605+
resp.connectivityMask = Mtm->connectivityMask;
603606
resp.dxid = HANDSHAKE_MAGIC;
604607
resp.sxid = ShmemVariableCache->nextXid;
605608
resp.csn = MtmGetCurrentTime();
@@ -882,6 +885,8 @@ static void MtmReceiver(Datum arg)
882885

883886
Assert(node > 0 && node <= nNodes && node != MtmNodeId);
884887
Mtm->nodes[node-1].oldestSnapshot = msg->oldestSnapshot;
888+
Mtm->nodes[node-1].disabledNodeMask = msg->disabledNodeMask;
889+
Mtm->nodes[node-1].connectivityMask = msg->connectivityMask;
885890
Mtm->nodes[node-1].lastHeartbeat = MtmGetSystemTime();
886891

887892
switch (msg->code) {

bgwpool.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ static void BgwStartExtraWorker(BgwPool* pool)
130130
{
131131
if (pool->nWorkers < MtmMaxWorkers) {
132132
timestamp_t now = MtmGetSystemTime();
133-
if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now) {
133+
/*if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now)*/
134+
{
134135
BackgroundWorker worker;
135136
BackgroundWorkerHandle* handle;
136137
MemSet(&worker, 0, sizeof(BackgroundWorker));

multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ CREATE FUNCTION mtm.inject_2pc_error(stage integer) RETURNS void
7070
AS 'MODULE_PATHNAME','mtm_inject_2pc_error'
7171
LANGUAGE C;
7272

73+
CREATE FUNCTION mtm.check_deadlock(xid integer) RETURNS boolean
74+
AS 'MODULE_PATHNAME','mtm_check_deadlock'
75+
LANGUAGE C;
76+
7377
-- CREATE TABLE IF NOT EXISTS public.ddl_log (issued timestamp with time zone not null, query text);
7478

7579
-- CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));

multimaster.c

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
#include "access/twophase.h"
3939
#include "utils/guc.h"
4040
#include "utils/hsearch.h"
41+
#include "utils/timeout.h"
4142
#include "utils/tqual.h"
4243
#include "utils/array.h"
4344
#include "utils/builtins.h"
@@ -93,7 +94,7 @@ typedef enum
9394
#define MTM_MAP_SIZE MTM_HASH_SIZE
9495
#define MIN_WAIT_TIMEOUT 1000
9596
#define MAX_WAIT_TIMEOUT 100000
96-
#define MAX_WAIT_LOOPS 100
97+
#define MAX_WAIT_LOOPS 100 // 1000000
9798
#define STATUS_POLL_DELAY USECS_PER_SEC
9899

99100
void _PG_init(void);
@@ -116,6 +117,7 @@ PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
116117
PG_FUNCTION_INFO_V1(mtm_make_table_local);
117118
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
118119
PG_FUNCTION_INFO_V1(mtm_inject_2pc_error);
120+
PG_FUNCTION_INFO_V1(mtm_check_deadlock);
119121

120122
static Snapshot MtmGetSnapshot(Snapshot snapshot);
121123
static void MtmInitialize(void);
@@ -274,15 +276,15 @@ void MtmUnlock(void)
274276
Mtm->lastLockHolder = 0;
275277
}
276278

277-
void MtmLockNode(int nodeId)
279+
void MtmLockNode(int nodeId, LWLockMode mode)
278280
{
279-
Assert(nodeId > 0 && nodeId <= Mtm->nAllNodes);
280-
LWLockAcquire((LWLockId)&Mtm->locks[nodeId], LW_EXCLUSIVE);
281+
Assert(nodeId > 0 && nodeId <= MtmMaxNodes*2);
282+
LWLockAcquire((LWLockId)&Mtm->locks[nodeId], mode);
281283
}
282284

283285
void MtmUnlockNode(int nodeId)
284286
{
285-
Assert(nodeId > 0 && nodeId <= Mtm->nAllNodes);
287+
Assert(nodeId > 0 && nodeId <= MtmMaxNodes*2);
286288
LWLockRelease((LWLockId)&Mtm->locks[nodeId]);
287289
}
288290

@@ -437,6 +439,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
437439
static timestamp_t totalSleepTime;
438440
static timestamp_t maxSleepTime;
439441
#endif
442+
timestamp_t start = MtmGetSystemTime();
440443
timestamp_t delay = MIN_WAIT_TIMEOUT;
441444
int i;
442445
Assert(xid != InvalidTransactionId);
@@ -460,7 +463,10 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
460463
if (ts->csn > MtmTx.snapshot) {
461464
MTM_LOG4("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld",
462465
MyProcPid, xid, ts->csn, MtmTx.snapshot);
463-
MtmUnlock();
466+
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
467+
elog(WARNING, "Backend %d waits for transaction %x status %ld usecs", MyProcPid, xid, MtmGetSystemTime() - start);
468+
}
469+
MtmUnlock();
464470
return true;
465471
}
466472
if (ts->status == TRANSACTION_STATUS_UNKNOWN)
@@ -499,6 +505,9 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
499505
MTM_LOG4("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld",
500506
MyProcPid, xid, ts->csn, invisible ? "rollbacked" : "committed", MtmTx.snapshot);
501507
MtmUnlock();
508+
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
509+
elog(WARNING, "Backend %d waits for %s transaction %x %ld usecs", MyProcPid, invisible ? "rollbacked" : "committed", xid, MtmGetSystemTime() - start);
510+
}
502511
return invisible;
503512
}
504513
}
@@ -510,7 +519,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
510519
}
511520
}
512521
MtmUnlock();
513-
elog(ERROR, "Failed to get status of XID %d", xid);
522+
elog(ERROR, "Failed to get status of XID %d in %ld usec", xid, MtmGetSystemTime() - start);
514523
return true;
515524
}
516525

@@ -1091,6 +1100,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
10911100
msg.sxid = ts->xid;
10921101
msg.csn = ts->csn;
10931102
msg.disabledNodeMask = Mtm->disabledNodeMask;
1103+
msg.connectivityMask = Mtm->connectivityMask;
10941104
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
10951105
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
10961106

@@ -1118,6 +1128,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
11181128
MtmArbiterMessage msg;
11191129
msg.code = MSG_POLL_REQUEST;
11201130
msg.disabledNodeMask = Mtm->disabledNodeMask;
1131+
msg.connectivityMask = Mtm->connectivityMask;
11211132
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
11221133
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
11231134

@@ -1703,8 +1714,6 @@ void MtmCheckQuorum(void)
17031714

17041715
void MtmOnNodeDisconnect(int nodeId)
17051716
{
1706-
MtmTransState *ts;
1707-
17081717
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
17091718
{
17101719
/* Node is already disabled */
@@ -1733,11 +1742,13 @@ void MtmOnNodeDisconnect(int nodeId)
17331742
}
17341743

17351744
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
1736-
1745+
#if 0
17371746
if (!MtmUseRaftable)
17381747
{
17391748
MtmLock(LW_EXCLUSIVE);
17401749
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
1750+
MtmTransState *ts;
1751+
17411752
MtmDisableNode(nodeId);
17421753
MtmCheckQuorum();
17431754
/* Interrupt voting for active transaction and abort them */
@@ -1751,7 +1762,10 @@ void MtmOnNodeDisconnect(int nodeId)
17511762
}
17521763
}
17531764
MtmUnlock();
1754-
} else {
1765+
}
1766+
else
1767+
#endif
1768+
{
17551769
MtmRefreshClusterStatus(false, 0);
17561770
}
17571771
}
@@ -1964,6 +1978,11 @@ static void MtmInitialize()
19641978
Mtm->freeQueue = NULL;
19651979
for (i = 0; i < MtmNodes; i++) {
19661980
Mtm->nodes[i].oldestSnapshot = 0;
1981+
Mtm->nodes[i].disabledNodeMask = 0;
1982+
Mtm->nodes[i].connectivityMask = 0;
1983+
Mtm->nodes[i].lockGraphUsed = 0;
1984+
Mtm->nodes[i].lockGraphAllocated = 0;
1985+
Mtm->nodes[i].lockGraphData = NULL;
19671986
Mtm->nodes[i].transDelay = 0;
19681987
Mtm->nodes[i].lastStatusChangeTime = MtmGetSystemTime();
19691988
Mtm->nodes[i].con = MtmConnections[i];
@@ -2603,7 +2622,7 @@ _PG_init(void)
26032622
* resources in mtm_shmem_startup().
26042623
*/
26052624
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmQueueSize);
2606-
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes);
2625+
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2);
26072626

26082627
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
26092628

@@ -3260,7 +3279,7 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
32603279
for (i = 0; i < Mtm->nAllNodes; i++)
32613280
{
32623281
size_t size;
3263-
char *data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, false);
3282+
char* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, false);
32643283
if (data) {
32653284
GlobalTransactionId *gtid = (GlobalTransactionId *)data;
32663285
GlobalTransactionId *last = (GlobalTransactionId *)(data + size);
@@ -3672,12 +3691,28 @@ static bool MtmProcessDDLCommand(char const* queryString)
36723691
}
36733692

36743693
MTM_LOG1("Sending utility: %s", queryWithContext);
3675-
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
3694+
LogLogicalMessage("G", queryWithContext, strlen(queryWithContext)+1, true);
36763695

36773696
MtmTx.containsDML = true;
36783697
return false;
36793698
}
36803699

3700+
void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
3701+
{
3702+
int allocated;
3703+
MtmLockNode(nodeId + MtmMaxNodes, LW_EXCLUSIVE);
3704+
allocated = Mtm->nodes[nodeId-1].lockGraphAllocated;
3705+
if (messageSize > allocated) {
3706+
allocated = Max(Max(MULTIMASTER_LOCK_BUF_INIT_SIZE, allocated*2), messageSize);
3707+
Mtm->nodes[nodeId-1].lockGraphData = ShmemAlloc(allocated);
3708+
Mtm->nodes[nodeId-1].lockGraphAllocated = allocated;
3709+
}
3710+
memcpy(Mtm->nodes[nodeId-1].lockGraphData, messageBody, messageSize);
3711+
Mtm->nodes[nodeId-1].lockGraphUsed = messageSize;
3712+
MtmUnlockNode(nodeId + MtmMaxNodes);
3713+
MTM_LOG1("Update deadlock graph for node %d size %d", nodeId, messageSize);
3714+
}
3715+
36813716
static void MtmProcessUtility(Node *parsetree, const char *queryString,
36823717
ProcessUtilityContext context, ParamListInfo params,
36833718
DestReceiver *dest, char *completionTag)
@@ -3996,20 +4031,19 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
39964031
}
39974032

39984033
static bool
3999-
MtmDetectGlobalDeadLock(PGPROC* proc)
4034+
MtmDetectGlobalDeadLockFortXid(TransactionId xid)
40004035
{
4001-
ByteBuffer buf;
4002-
PGXACT* pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
40034036
bool hasDeadlock = false;
4004-
4005-
if (TransactionIdIsValid(pgxact->xid)) {
4037+
if (TransactionIdIsValid(xid)) {
4038+
ByteBuffer buf;
40064039
MtmGraph graph;
40074040
GlobalTransactionId gtid;
40084041
int i;
40094042

40104043
ByteBufferAlloc(&buf);
40114044
EnumerateLocks(MtmSerializeLock, &buf);
40124045
RaftableSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, false);
4046+
MtmSleep(MSEC_TO_USEC(DeadlockTimeout));
40134047
MtmGraphInit(&graph);
40144048
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
40154049
ByteBufferFree(&buf);
@@ -4024,9 +4058,9 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
40244058
}
40254059
}
40264060
}
4027-
MtmGetGtid(pgxact->xid, &gtid);
4061+
MtmGetGtid(xid, &gtid);
40284062
hasDeadlock = MtmGraphFindLoop(&graph, &gtid);
4029-
elog(WARNING, "Distributed deadlock check for %u:%u = %d", gtid.node, gtid.xid, hasDeadlock);
4063+
elog(WARNING, "Distributed deadlock check by backend %d for %u:%u = %d", MyProcPid, gtid.node, gtid.xid, hasDeadlock);
40304064
if (!hasDeadlock) {
40314065
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
40324066
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
@@ -4037,8 +4071,27 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
40374071
hasDeadlock = true;
40384072
elog(WARNING, "Apply workers were blocked more than %d msec",
40394073
(int)USEC_TO_MSEC(MtmGetSystemTime() - lastPeekTime));
4074+
} else {
4075+
MTM_LOG1("Enable deadlock timeout in backend %d for transaction %d", MyProcPid, xid);
4076+
enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
40404077
}
40414078
}
40424079
}
40434080
return hasDeadlock;
40444081
}
4082+
4083+
static bool
4084+
MtmDetectGlobalDeadLock(PGPROC* proc)
4085+
{
4086+
PGXACT* pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
4087+
4088+
MTM_LOG1("Detect global deadlock for %d by backend %d", pgxact->xid, MyProcPid);
4089+
4090+
return MtmDetectGlobalDeadLockFortXid(pgxact->xid);
4091+
}
4092+
4093+
Datum mtm_check_deadlock(PG_FUNCTION_ARGS)
4094+
{
4095+
TransactionId xid = PG_GETARG_INT32(0);
4096+
PG_RETURN_BOOL(MtmDetectGlobalDeadLockFortXid(xid));
4097+
}

multimaster.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#define MULTIMASTER_MAX_LOCAL_TABLES 256
5555
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
56+
#define MULTIMASTER_LOCK_BUF_INIT_SIZE 4096
5657
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5758
#define MULTIMASTER_ADMIN "mtm_admin"
5859

@@ -138,8 +139,9 @@ typedef struct
138139
TransactionId sxid; /* Transaction ID at sender node */
139140
XidStatus status; /* Transaction status */
140141
csn_t csn; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
141-
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes at the sender of message */
142142
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
143+
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes at the sender of message */
144+
nodemask_t connectivityMask; /* Connectivity bittmask at the sender of message */
143145
pgid_t gid; /* Global transaction identifier */
144146
} MtmArbiterMessage;
145147

@@ -179,13 +181,18 @@ typedef struct
179181
timestamp_t receiverStartTime;
180182
timestamp_t senderStartTime;
181183
timestamp_t lastHeartbeat;
184+
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes received from this node */
185+
nodemask_t connectivityMask; /* Connectivity mask at this node */
182186
int senderPid;
183187
int receiverPid;
184188
XLogRecPtr flushPos;
185-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
189+
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
186190
XLogRecPtr restartLsn;
187191
RepOriginId originId;
188192
int timeline;
193+
void* lockGraphData;
194+
int lockGraphAllocated;
195+
int lockGraphUsed;
189196
} MtmNodeInfo;
190197

191198
typedef struct MtmTransState
@@ -223,7 +230,7 @@ typedef struct
223230
MtmNodeStatus status; /* Status of this node */
224231
int recoverySlot; /* NodeId of recovery slot or 0 if none */
225232
volatile slock_t spinlock; /* spinlock used to protect access to hash table */
226-
PGSemaphoreData sendSemaphore; /* semaphore used to notify mtm-sender about new responses to coordinator */
233+
PGSemaphoreData sendSemaphore; /* semaphore used to notify mtm-sender about new responses to coordinator */
227234
LWLockPadded *locks; /* multimaster lock tranche */
228235
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
229236
nodemask_t disabledNodeMask; /* bitmask of disabled nodes */
@@ -310,7 +317,7 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
310317
extern void MtmBroadcastPollMessage(MtmTransState* ts);
311318
extern void MtmLock(LWLockMode mode);
312319
extern void MtmUnlock(void);
313-
extern void MtmLockNode(int nodeId);
320+
extern void MtmLockNode(int nodeId, LWLockMode mode);
314321
extern void MtmUnlockNode(int nodeId);
315322
extern void MtmDropNode(int nodeId, bool dropSlot);
316323
extern void MtmRecoverNode(int nodeId);
@@ -340,6 +347,7 @@ extern XLogRecPtr MtmGetFlushPosition(int nodeId);
340347
extern bool MtmWatchdog(timestamp_t now);
341348
extern void MtmCheckHeartbeat(void);
342349
extern void MtmResetTransaction(void);
350+
extern void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize);
343351
extern PGconn *PQconnectdb_safe(const char *conninfo);
344352

345353

0 commit comments

Comments
 (0)