Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6cd5575

Browse files
knizhnikkelvich
authored andcommitted
Prepare to start raftable from multimaster
1 parent 4a3fa8f commit 6cd5575

File tree

3 files changed

+57
-3
lines changed

3 files changed

+57
-3
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ ifndef RAFTABLE_PATH
55
RAFTABLE_PATH = ../raftable
66
endif
77

8-
override CPPFLAGS += -I$(RAFTABLE_PATH)
8+
override CPPFLAGS += -I$(RAFTABLE_PATH) -I$(RAFTABLE_PATH)/raft/include
99

1010
EXTRA_INSTALL = contrib/raftable contrib/mmts
1111

arbiter.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,16 @@ static void MtmOpenConnections()
488488
}
489489
for (i = 0; i < nNodes; i++) {
490490
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
491-
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, MtmArbiterPort + i + 1, MtmConnectTimeout);
491+
int arbiterPort;
492+
char const* arbiterPortStr = strstr(Mtm->nodes[i].con.connStr, "arbiterport=");
493+
if (arbiterPortStr != NULL) {
494+
if (sscanf(arbiterPortStr+12, "%d", &arbiterPort) != 1) {
495+
elog(ERROR, "Invalid arbiter port: %s", arbiterPortStr+12);
496+
}
497+
} else {
498+
arbiterPort = MtmArbiterPort + i + 1;
499+
}
500+
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, arbiterPort, MtmConnectTimeout);
492501
if (sockets[i] < 0) {
493502
MtmOnNodeDisconnect(i+1);
494503
}

multimaster.c

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "ddd.h"
6262
#include "raftable_wrapper.h"
6363
#include "raftable.h"
64+
#include "worker.h"
6465

6566
typedef struct {
6667
TransactionId xid; /* local transaction ID */
@@ -195,6 +196,7 @@ int MtmNodes;
195196
int MtmNodeId;
196197
int MtmReplicationNodeId;
197198
int MtmArbiterPort;
199+
int MtmRaftablePort;
198200
int MtmConnectTimeout;
199201
int MtmReconnectTimeout;
200202
int MtmNodeDisableDelay;
@@ -1795,6 +1797,33 @@ static void MtmSplitConnStrs(void)
17951797
pfree(copy);
17961798
}
17971799

1800+
static void MtmRaftableInitialize()
1801+
{
1802+
int i;
1803+
WorkerConfig wcfg;
1804+
1805+
for (i = 0; i < RAFTABLE_PEERS_MAX; i++)
1806+
{
1807+
wcfg.peers[i].up = false;
1808+
}
1809+
1810+
for (i = 0; i < MtmNodes; i++)
1811+
{
1812+
char const* raftport = strstr(MtmConnections[i].connStr, "raftport=");
1813+
if (raftport != NULL) {
1814+
if (sscanf(raftport+9, "%d", &wcfg.peers[i].port) != 1) {
1815+
elog(ERROR, "Invalid raftable port: %s", raftport+9);
1816+
}
1817+
} else {
1818+
wcfg.peers[i].port = MtmRaftablePort + i;
1819+
}
1820+
wcfg.peers[i].up = true;
1821+
strncpy(wcfg.peers[i].host, MtmConnections[i].hostName, sizeof(wcfg.peers[i].host));
1822+
}
1823+
wcfg.id = MtmNodeId-1;
1824+
worker_register(&wcfg);
1825+
}
1826+
17981827
void
17991828
_PG_init(void)
18001829
{
@@ -2051,7 +2080,22 @@ _PG_init(void)
20512080
"Base value for assigning arbiter ports",
20522081
NULL,
20532082
&MtmArbiterPort,
2054-
54321,
2083+
54320,
2084+
0,
2085+
INT_MAX,
2086+
PGC_BACKEND,
2087+
0,
2088+
NULL,
2089+
NULL,
2090+
NULL
2091+
);
2092+
2093+
DefineCustomIntVariable(
2094+
"multimaster.raftable_port",
2095+
"Base value for assigning raftable ports",
2096+
NULL,
2097+
&MtmRaftablePort,
2098+
6543,
20552099
0,
20562100
INT_MAX,
20572101
PGC_BACKEND,
@@ -2132,6 +2176,7 @@ _PG_init(void)
21322176

21332177
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
21342178

2179+
//MtmRaftableInitialize();
21352180
MtmArbiterInitialize();
21362181

21372182
/*

0 commit comments

Comments
 (0)