Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3a62970

Browse files
knizhnikkelvich
authored andcommitted
Distributed deadlock detections
1 parent e8cf78f commit 3a62970

File tree

5 files changed

+166
-12
lines changed

5 files changed

+166
-12
lines changed

arbiter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ static void MtmOpenConnections()
335335
if (i+1 != MtmNodeId) {
336336
sockets[i] = MtmConnectSocket(host, MtmArbiterPort + i + 1, MtmConnectAttempts);
337337
if (sockets[i] < 0) {
338-
MtmDropNode(i+1, false);
338+
MtmOnLostConnection(i+1);
339339
}
340340
} else {
341341
sockets[i] = -1;
@@ -358,7 +358,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
358358
close(sockets[node]);
359359
sockets[node] = MtmConnectSocket(hosts[node], MtmArbiterPort + node + 1, MtmReconnectAttempts);
360360
if (sockets[node] < 0) {
361-
MtmDropNode(node+1, false);
361+
MtmOnLostConnection(node+1);
362362
return false;
363363
}
364364
}

ddd.c

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include <stddef.h>
2+
#include <stdlib.h>
3+
#include <string.h>
4+
#include "ddd.h"
5+
6+
7+
void MtmGraphInit(MtmGraph* graph)
8+
{
9+
memset(graph->hashtable, 0, sizeof(graph->hashtable));
10+
}
11+
12+
static inline MtmVertex* findVertex(MtmGraph* graph, GlobalTransactionId* gtid)
13+
{
14+
xid_t h = gtid->xid % MAX_TRANSACTIONS;
15+
MtmVertex* v;
16+
for (v = graph->hashtable[h]; v != NULL; v = v->next) {
17+
if (v->gtid == *gtid) {
18+
return v;
19+
}
20+
}
21+
v = (MtmVertex*)palloc(sizeof(MtmVertex));
22+
v->gtid = *gtid;
23+
v->outgoingEdges = NULL;
24+
v->collision = graph->hashtable[h];
25+
graph->hashtable[h] = v;
26+
return v;
27+
}
28+
29+
void MtmGraphAdd(MtmGraph* graph, GlobalTransactionId* gtid, int size)
30+
{
31+
GlobalTransactionId* last = gtid + size;
32+
MtmEdge *e, *next, *edges = NULL;
33+
while (gtid != last) {
34+
Vertex* src = findVertex(graph, gtid++);
35+
while (gtid->node != 0) {
36+
Vertex* dst = findVertex(graph, gtid++);
37+
e = (MtmEdge*)palloc(sizeof(MtmEdge));
38+
dst->nIncomingEdges += 1;
39+
e->dst = dst;
40+
e->src = src;
41+
e->next = v->outgoingEdges;
42+
v->outgoingEdges = e;
43+
}
44+
gtid += 1;
45+
}
46+
}
47+
48+
static bool recursiveTraverseGraph(MtmVertex* root, MtmVertex* v)
49+
{
50+
Edge* e;
51+
v->node = VISITED_NODE_MARK;
52+
for (e = v->outgoingEdges; e != NULL; e = e->next) {
53+
if (e->dst == root) {
54+
return true;
55+
} else if (e->dst->node != VISITED_NODE_MAR && recursiveTraverseGraph(root, e->dst)) { /* loop */
56+
return true;
57+
}
58+
}
59+
return false;
60+
}
61+
62+
bool MtmGraphFindLoop(MtmGraph* graph, GlobalTransactionId* root)
63+
{
64+
Vertex* v;
65+
for (v = graph->hashtable[root->xid % MAX_TRANSACTIONS]; v != NULL; v = v->next) {
66+
if (v->gtid == *root) {
67+
if (recursiveTraverseGraph(v, v)) {
68+
return true;
69+
}
70+
break;
71+
}
72+
}
73+
return false;
74+
}

ddd.h

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#ifndef __DDD_H__
2+
3+
#include "multimaster.h"
4+
5+
#define MAX_TRANSACTIONS 1024
6+
#define VISITED_NODE_MARK 0
7+
8+
typedef struct MtmEdge {
9+
struct MtmEdge* next; /* list of outgoing edges */
10+
struct MtmVertex* dst;
11+
struct MtmVertex* src;
12+
} MtmEdge;
13+
14+
typedef struct MtmVertex
15+
{
16+
struct MtmEdge* outgoingEdges;
17+
struct MtmVertex* collision;
18+
GlobalTransactionId gtid;
19+
} MtmVertex;
20+
21+
typedef struct MtmGraph
22+
{
23+
MtmVertex* hashtable[MTM_MAX_TRANSACTIONS];
24+
} Graph;
25+
26+
extern void MtmGraphInit(MtmGraph* graph);
27+
extern void MtmGraphAdd(MtmGraph* graph, GlobalTransactionId* subgraph, int size);
28+
extern bool MtmGraphFindLoop(MtmGraph* graph, GlobalTransactionId* root);
29+
30+
#endif

multimaster.c

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ static BgwPool* MtmPoolConstructor(void);
112112
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql);
113113
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
114114
static void MtmVoteForTransaction(MtmTransState* ts);
115+
static void MtmSerializeLockGraph(ByteBuffer* buf)
115116

116117
static HTAB* xid2state;
117118
static MtmCurrentTrans dtmTx;
@@ -805,13 +806,6 @@ MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids,
805806
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
806807
}
807808

808-
static bool
809-
MtmDetectGlobalDeadLock(PGPROC* proc)
810-
{
811-
elog(WARNING, "Global deadlock?");
812-
return true;
813-
}
814-
815809
static void
816810
MtmShmemStartup(void)
817811
{
@@ -1582,7 +1576,63 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
15821576
}
15831577
}
15841578

1585-
void MtmSerializeLockGraph(ByteBuffer* buf)
1579+
/* Stubs */
1580+
typedef PaxosTimestamp {
1581+
time_t time; /* local time at master */
1582+
uint32 naster; /* master node for this operation */
1583+
uint32 psn; /* PAXOS serial number */
1584+
} PaxosTimestamp;
1585+
1586+
void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts);
1587+
void PaxosSet(char const* key, void* value, int size);
1588+
1589+
1590+
static bool
1591+
MtmDetectGlobalDeadLock(PGPROC* proc)
1592+
{
1593+
bool hasDeadlock = false;
1594+
ByteBuffer buf;
1595+
PGXACT* pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
1596+
bool hasDeadlock = false;
1597+
if (TransactionIdIsValid(pgxact->xid)) {
1598+
MtmGraph graph;
1599+
GlobalTransactionId gtid;
1600+
int i;
1601+
1602+
ByteBufferAlloc(&buf);
1603+
EnumerateLocks(DtmSerializeLock, &buf);
1604+
ByteBufferFree(&buf);
1605+
PaxosPut(dsprintf("lock-graph-%d", MMNodeId), buf.data, buf.size);
1606+
MtmGraphInit(&graph);
1607+
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.size/sizeof(GlobalTransactionId));
1608+
for (i = 0; i < MtmNodes; i++) {
1609+
if (i+1 MtmNodeId && !BIT_CHECK(dtm->disabledNodeMask, i)) {
1610+
int size;
1611+
void* data = PaxosGet(dsprintf("lock-graph-%d", i+1), &size, NULL);
1612+
MtmGraphAdd(&graph, (GlobalTransactionId*)data, size/sizeof(GlobalTransactoinId));
1613+
}
1614+
}
1615+
MtmGetGtid(pgxact->xid, &gtid);
1616+
hasDeadlock = MtmGraphFindLoop(&graph, &gtid);
1617+
elog(WARNING, "Distributed deadlock check for %u:%u = %d", gtid.node, gtid.xid, hasDeadlock);
1618+
}
1619+
return hasDeadlock;
1620+
}
1621+
1622+
void MtmOnLostConnection(int nodeId)
15861623
{
1587-
EnumerateLocks(MtmSerializeLock, buf);
1624+
int i;
1625+
nodemask_t mask = dtm->disabledNodeMask;
1626+
nodemask_t matrix = (nodemask_t*)palloc0(sizeof(nodemask_t)*MtmNodes);
1627+
1628+
mask |= (nodemask_t)1 << (nodeId-1);
1629+
PaxosPut(dsprintf("node-mask-%d", MMNodeId), &mask, sizeof mask);
1630+
matrix[MtmNodeId-1] = mask;
1631+
MtmSleep(MtmConnectTimeout);
1632+
for (i = 0; i < MtnNodes; i++) {
1633+
if (i+1 != MtmNodeId && !BIT_CHECK(dtm->disabledNodeMask, i)) {
1634+
void* data = PaxosGet(dsprintf("node-mask-%d", i+1), NULL, NULL);
1635+
matrix[i] = *(nodemask_t*)data;
1636+
}
1637+
}
15881638
}

multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,10 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
141141
extern void MtmLock(LWLockMode mode);
142142
extern void MtmUnlock(void);
143143
extern void MtmDropNode(int nodeId, bool dropSlot);
144+
extern void MtmOnLostConnection(int nodeId);
144145
extern MtmState* MtmGetState(void);
145146
extern timestamp_t MtmGetCurrentTime(void);
146147
extern void MtmSleep(timestamp_t interval);
147148
extern bool MtmIsRecoveredNode(int nodeId);
148-
extern void MtmSerializeLockGraph(ByteBuffer* buf);
149149

150150
#endif

0 commit comments

Comments
 (0)