Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7889c2f

Browse files
knizhnikkelvich
authored andcommitted
Fix issues with deadlock detection
1 parent 38f712d commit 7889c2f

File tree

6 files changed

+81
-27
lines changed

6 files changed

+81
-27
lines changed

arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ static void MtmOpenConnections()
335335
if (i+1 != MtmNodeId) {
336336
sockets[i] = MtmConnectSocket(host, MtmArbiterPort + i + 1, MtmConnectAttempts);
337337
if (sockets[i] < 0) {
338-
MtmOnLostConnection(i+1);
338+
MtmOnNodeDisconnect(i+1);
339339
}
340340
} else {
341341
sockets[i] = -1;
@@ -360,7 +360,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
360360
}
361361
sockets[node] = MtmConnectSocket(hosts[node], MtmArbiterPort + node + 1, MtmReconnectAttempts);
362362
if (sockets[node] < 0) {
363-
MtmOnLostConnection(node+1);
363+
MtmOnNodeDisconnect(node+1);
364364
return false;
365365
}
366366
}
@@ -403,7 +403,7 @@ static void MtmAcceptOneConnection()
403403
} else {
404404
elog(NOTICE, "Arbiter established connection with node %d", msg.node);
405405
BIT_CLEAR(ds->connectivityMask, msg.node-1);
406-
MtmOnConnectNode(msg.node);
406+
MtmOnNodeConnect(msg.node);
407407
MtmRegisterSocket(fd, msg.node-1);
408408
sockets[msg.node-1] = fd;
409409
}

bkb.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "postgres.h"
1+
#include <stdint.h>
22
#include "bkb.h"
33

44
/*
@@ -12,7 +12,6 @@ typedef struct {
1212

1313
static void list_append(NodeList* list, int n)
1414
{
15-
Assert(list->size < MAX_NODES);
1615
list->nodes[list->size++] = n;
1716
}
1817

@@ -44,7 +43,7 @@ static void findMaximumIndependentSet(NodeList* cur, NodeList* result, nodemask_
4443
int pos = -1;
4544

4645
for (j = ne; j < ce; j++) {
47-
if (!BIT_CHECK(graph[p], oldSet[j])) {
46+
if (BIT_CHECK(graph[p], oldSet[j])) {
4847
if (++cnt == minnod) {
4948
break;
5049
}
@@ -71,13 +70,13 @@ static void findMaximumIndependentSet(NodeList* cur, NodeList* result, nodemask_
7170

7271
newne = 0;
7372
for (i = 0; i < ne; i++) {
74-
if (BIT_CHECK(graph[sel], oldSet[i])) {
73+
if (!BIT_CHECK(graph[sel], oldSet[i])) {
7574
newSet[newne++] = oldSet[i];
7675
}
7776
}
7877
newce = newne;
7978
for (i = ne + 1; i < ce; i++) {
80-
if (BIT_CHECK(graph[sel], oldSet[i])) {
79+
if (!BIT_CHECK(graph[sel], oldSet[i])) {
8180
newSet[newce++] = oldSet[i];
8281
}
8382
}
@@ -92,8 +91,9 @@ static void findMaximumIndependentSet(NodeList* cur, NodeList* result, nodemask_
9291
}
9392
}
9493
cur->size -= 1;
94+
ne += 1;
9595
if (k > 1) {
96-
for (s = ++ne; BIT_CHECK(graph[fixp], oldSet[s]); s++);
96+
for (s = ++ne; !BIT_CHECK(graph[fixp], oldSet[s]); s++);
9797
}
9898
}
9999
}
@@ -114,7 +114,7 @@ nodemask_t MtmFindMaxClique(nodemask_t* graph, int n_nodes)
114114
findMaximumIndependentSet(&tmp, &result, graph, all, 0, n_nodes);
115115
mask = 0;
116116
for (i = 0; i < result.size; i++) {
117-
mask |= (nodemask_t)1 << result.nodes[i];
117+
BIT_SET(mask, result.nodes[i]);
118118
}
119119
return mask;
120120
}

bkbtest.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#include <stdio.h>
2+
#include <stdint.h>
3+
#include "bkb.h"
4+
5+
int main() {
6+
nodemask_t matrix[64] = {0};
7+
nodemask_t clique;
8+
matrix[0] = 6;
9+
matrix[1] = 4;
10+
matrix[2] = 1;
11+
matrix[4] = 3;
12+
clique = MtmFindMaxClique(matrix, 64);
13+
printf("Clique=%lx\n", clique);
14+
return 0;
15+
}
16+

multimaster.c

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ int MtmArbiterPort;
141141
int MtmNodes;
142142
int MtmConnectAttempts;
143143
int MtmConnectTimeout;
144+
int MtmKeepaliveTimeout;
144145
int MtmReconnectAttempts;
145146

146147
static int MtmQueueSize;
@@ -986,6 +987,21 @@ _PG_init(void)
986987
NULL
987988
);
988989

990+
DefineCustomIntVariable(
991+
"multimaster.keepalive_timeout",
992+
"Multimaster keepalive interval for sockets",
993+
"Timeout in microseconds before polling state of nodes",
994+
&MtmKeepaliveTimeout,
995+
1000000,
996+
1,
997+
INT_MAX,
998+
PGC_BACKEND,
999+
0,
1000+
NULL,
1001+
NULL,
1002+
NULL
1003+
);
1004+
9891005
DefineCustomIntVariable(
9901006
"multimaster.connect_attempts",
9911007
"Multimaster number of connect attemts",
@@ -1528,8 +1544,12 @@ MtmGetGtid(TransactionId xid, GlobalTransactionId* gtid)
15281544

15291545
MtmLock(LW_SHARED);
15301546
ts = (MtmTransState*)hash_search(xid2state, &xid, HASH_FIND, NULL);
1531-
Assert(ts != NULL);
1532-
*gtid = ts->gtid;
1547+
if (ts != NULL) {
1548+
*gtid = ts->gtid;
1549+
} else {
1550+
gtid->node = MtmNodeId;
1551+
gtid->xid = xid;
1552+
}
15331553
MtmUnlock();
15341554
}
15351555

@@ -1601,15 +1621,19 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16011621

16021622
ByteBufferAlloc(&buf);
16031623
EnumerateLocks(MtmSerializeLock, &buf);
1604-
ByteBufferFree(&buf);
1605-
PaxosSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.size);
1624+
PaxosSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used);
16061625
MtmGraphInit(&graph);
1607-
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.size/sizeof(GlobalTransactionId));
1626+
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
1627+
ByteBufferFree(&buf);
16081628
for (i = 0; i < MtmNodes; i++) {
16091629
if (i+1 != MtmNodeId && !BIT_CHECK(dtm->disabledNodeMask, i)) {
16101630
int size;
16111631
void* data = PaxosGet(psprintf("lock-graph-%d", i+1), &size, NULL);
1612-
MtmGraphAdd(&graph, (GlobalTransactionId*)data, size/sizeof(GlobalTransactionId));
1632+
if (data == NULL) {
1633+
hasDeadlock = true; /* Just temporary hack until no Paxos */
1634+
} else {
1635+
MtmGraphAdd(&graph, (GlobalTransactionId*)data, size/sizeof(GlobalTransactionId));
1636+
}
16131637
}
16141638
}
16151639
MtmGetGtid(pgxact->xid, &gtid);
@@ -1636,42 +1660,56 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
16361660

16371661
void MtmUpdateClusterStatus(void)
16381662
{
1639-
nodemask_t mask, clique;
1663+
nodemask_t mask, clique, disconnectedMask;
16401664
nodemask_t matrix[MAX_NODES];
16411665
int i;
16421666

16431667
MtmBuildConnectivityMatrix(matrix);
16441668

16451669
clique = MtmFindMaxClique(matrix, MtmNodes);
1646-
1670+
disconnectedMask = ~clique & (((nodemask_t)1 << MtmNodes)-1);
16471671
MtmLock(LW_EXCLUSIVE);
1648-
mask = clique & ~dtm->disabledNodeMask;
1672+
mask = disconnectedMask & ~dtm->disabledNodeMask;
16491673
for (i = 0; mask != 0; i++, mask >>= 1) {
16501674
if (mask & 1) {
16511675
dtm->nNodes -= 1;
16521676
BIT_SET(dtm->disabledNodeMask, i);
16531677
}
16541678
}
1655-
if (dtm->disabledNodeMask != clique) {
1656-
dtm->disabledNodeMask |= clique;
1679+
if (dtm->disabledNodeMask != disconnectedMask) {
1680+
dtm->disabledNodeMask |= disconnectedMask;
16571681
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &dtm->disabledNodeMask, sizeof dtm->disabledNodeMask);
16581682
}
16591683
MtmUnlock();
16601684
}
16611685

1662-
void MtmOnLostConnection(int nodeId)
1686+
void MtmOnNodeDisconnect(int nodeId)
16631687
{
16641688
BIT_SET(dtm->connectivityMask, nodeId-1);
16651689
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &dtm->connectivityMask, sizeof dtm->connectivityMask);
16661690

16671691
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
1668-
MtmSleep(MtmConnectTimeout);
1692+
MtmSleep(MtmKeepaliveTimeout);
16691693

16701694
MtmUpdateClusterStatus();
16711695
}
16721696

1673-
void MtmOnConnectNode(int nodeId)
1697+
void MtmOnNodeConnect(int nodeId)
16741698
{
16751699
BIT_CLEAR(dtm->connectivityMask, nodeId-1);
16761700
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &dtm->connectivityMask, sizeof dtm->connectivityMask);
16771701
}
1702+
1703+
/*
1704+
* Paxos function stubs (until them are miplemented)
1705+
*/
1706+
void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts)
1707+
{
1708+
if (size != NULL) {
1709+
*size = 0;
1710+
}
1711+
return NULL;
1712+
}
1713+
1714+
void PaxosSet(char const* key, void const* value, int size)
1715+
{}

multimaster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
141141
extern void MtmLock(LWLockMode mode);
142142
extern void MtmUnlock(void);
143143
extern void MtmDropNode(int nodeId, bool dropSlot);
144-
extern void MtmOnLostConnection(int nodeId);
145-
extern void MtmOnConnectNode(int nodeId);
144+
extern void MtmOnNodeDisconnect(int nodeId);
145+
extern void MtmOnNodeConnect(int nodeId);
146146
extern MtmState* MtmGetState(void);
147147
extern timestamp_t MtmGetCurrentTime(void);
148148
extern void MtmSleep(timestamp_t interval);

tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ void initializeDatabase()
195195
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
196196
txn.commit();
197197
}
198-
printf("Initialization completed in %f seconds\n", (start - getCurrentTime())/100000.0);
198+
printf("Initialization completed in %f seconds\n", (getCurrentTime() - start)/100000.0);
199199
}
200200

201201
int main (int argc, char* argv[])

0 commit comments

Comments
 (0)