Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6849188

Browse files
committed
decouple raftable
1 parent a7ec13e commit 6849188

File tree

4 files changed

+44
-192
lines changed

4 files changed

+44
-192
lines changed

Cluster.pm

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ sub new
4040
foreach my $i (1..$nodenum)
4141
{
4242
my $host = "127.0.0.1";
43-
my ($pgport, $raftport) = allocate_ports($host, 2);
43+
my ($pgport, $arbiter_port) = allocate_ports($host, 2);
4444
my $node = new PostgresNode("node$i", $host, $pgport);
4545
$node->{id} = $i;
46-
$node->{raftport} = $raftport;
46+
$node->{arbiter_port} = $arbiter_port;
4747
push(@$nodes, $node);
4848
}
4949

@@ -71,16 +71,16 @@ sub configure
7171
{
7272
my ($self) = @_;
7373
my $nodes = $self->{nodes};
74+
my $nnodes = scalar @{ $nodes };
7475

75-
my $connstr = join(',', map { "${ \$_->connstr('postgres') }" } @$nodes);
76-
my $raftpeers = join(',', map { join(':', $_->{id}, $_->host, $_->{raftport}) } @$nodes);
76+
my $connstr = join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
7777

7878
foreach my $node (@$nodes)
7979
{
8080
my $id = $node->{id};
8181
my $host = $node->host;
8282
my $pgport = $node->port;
83-
my $raftport = $node->{raftport};
83+
my $arbiter_port = $node->{arbiter_port};
8484

8585
$node->append_conf("postgresql.conf", qq(
8686
log_statement = none
@@ -94,20 +94,22 @@ sub configure
9494
fsync = off
9595
max_wal_senders = 10
9696
wal_sender_timeout = 0
97-
default_transaction_isolation = 'repeatable read'
97+
default_transaction_isolation = 'repeatable read'
9898
max_replication_slots = 10
99-
shared_preload_libraries = 'raftable,multimaster'
99+
shared_preload_libraries = 'multimaster'
100+
101+
multimaster.arbiter_port = $arbiter_port
100102
multimaster.workers = 10
101103
multimaster.queue_size = 10485760 # 10mb
102104
multimaster.node_id = $id
103105
multimaster.conn_strings = '$connstr'
104-
multimaster.use_raftable = false
105106
multimaster.heartbeat_recv_timeout = 1000
106107
multimaster.heartbeat_send_timeout = 250
107-
multimaster.max_nodes = 3
108+
multimaster.max_nodes = $nnodes
108109
multimaster.ignore_tables_without_pk = true
109-
multimaster.twopc_min_timeout = 2000
110-
log_line_prefix = '%t: '
110+
multimaster.twopc_min_timeout = 50000
111+
multimaster.min_2pc_timeout = 50000
112+
log_line_prefix = '%t: '
111113
));
112114

113115
$node->append_conf("pg_hba.conf", qq(

Makefile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o
2+
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o
33

44
ifndef RAFTABLE_PATH
55
RAFTABLE_PATH = ../raftable
66
endif
77

88
override CPPFLAGS += -I$(RAFTABLE_PATH) -I$(RAFTABLE_PATH)/raft/include
99

10-
EXTRA_INSTALL = contrib/raftable contrib/mmts
11-
1210
EXTENSION = multimaster
1311
DATA = multimaster--1.0.sql
1412

multimaster.c

Lines changed: 30 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@
7272

7373
#include "multimaster.h"
7474
#include "ddd.h"
75-
#include "raftable_wrapper.h"
76-
#include "raftable.h"
77-
#include "worker.h"
7875

7976
typedef struct {
8077
TransactionId xid; /* local transaction ID */
@@ -215,7 +212,6 @@ int MtmNodes;
215212
int MtmNodeId;
216213
int MtmReplicationNodeId;
217214
int MtmArbiterPort;
218-
int MtmRaftablePort;
219215
int MtmConnectTimeout;
220216
int MtmReconnectTimeout;
221217
int MtmNodeDisableDelay;
@@ -225,7 +221,6 @@ int MtmHeartbeatSendTimeout;
225221
int MtmHeartbeatRecvTimeout;
226222
int MtmMin2PCTimeout;
227223
int MtmMax2PCRatio;
228-
bool MtmUseRaftable;
229224
bool MtmUseDtm;
230225
bool MtmPreserveCommitOrder;
231226
bool MtmVolksWagenMode;
@@ -1745,7 +1740,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
17451740

17461741
for (i = 0; i < n; i++) {
17471742
if (i+1 != MtmNodeId) {
1748-
void* data = RaftableGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
1743+
void *data = &Mtm->nodes[i].connectivityMask;
17491744
if (data == NULL) {
17501745
return false;
17511746
}
@@ -1901,17 +1896,6 @@ void MtmOnNodeDisconnect(int nodeId)
19011896
MTM_LOG1("Disconnect node %d connectivity mask %llx", nodeId, (long long) Mtm->connectivityMask);
19021897
MtmUnlock();
19031898

1904-
if (!RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false))
1905-
{
1906-
elog(WARNING, "Disable node which is in minority according to RAFT");
1907-
MtmLock(LW_EXCLUSIVE);
1908-
if (Mtm->status == MTM_ONLINE) {
1909-
MtmSwitchClusterMode(MTM_IN_MINORITY);
1910-
}
1911-
MtmUnlock();
1912-
return;
1913-
}
1914-
19151899
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
19161900
MtmRefreshClusterStatus(false);
19171901
}
@@ -1922,9 +1906,6 @@ void MtmOnNodeConnect(int nodeId)
19221906
BIT_CLEAR(Mtm->connectivityMask, nodeId-1);
19231907
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
19241908
MtmUnlock();
1925-
1926-
MTM_LOG1("Reconnect node %d, connectivityMask=%llx", nodeId, (long long) Mtm->connectivityMask);
1927-
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
19281909
}
19291910

19301911

@@ -2034,21 +2015,6 @@ static void MtmLoadLocalTables(void)
20342015
heap_close(rel, RowExclusiveLock);
20352016
}
20362017
}
2037-
2038-
static void MtmRaftableInitialize()
2039-
{
2040-
int i;
2041-
2042-
for (i = 0; i < MtmNodes; i++)
2043-
{
2044-
int port = MtmConnections[i].raftablePort;
2045-
if (port == 0) {
2046-
port = MtmRaftablePort + i;
2047-
}
2048-
raftable_peer(i, MtmConnections[i].hostName, port);
2049-
}
2050-
raftable_start(MtmNodeId - 1);
2051-
}
20522018

20532019
static void MtmCheckControlFile(void)
20542020
{
@@ -2197,19 +2163,10 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
21972163
memcpy(conn->hostName, host, hostLen);
21982164
conn->hostName[hostLen] = '\0';
21992165

2200-
port = strstr(connStr, "raftport=");
2166+
port = strstr(connStr, "arbiter_port=");
22012167
if (port != NULL) {
2202-
if (sscanf(port+9, "%d", &conn->raftablePort) != 1) {
2203-
elog(ERROR, "Invalid raftable port: %s", port+9);
2204-
}
2205-
} else {
2206-
conn->raftablePort = 0;
2207-
}
2208-
2209-
port = strstr(connStr, "arbiterport=");
2210-
if (port != NULL) {
2211-
if (sscanf(port+12, "%d", &conn->arbiterPort) != 1) {
2212-
elog(ERROR, "Invalid arbiter port: %s", port+12);
2168+
if (sscanf(port+13, "%d", &conn->arbiterPort) != 1) {
2169+
elog(ERROR, "Invalid arbiter port: %s", port+13);
22132170
}
22142171
} else {
22152172
conn->arbiterPort = MULTIMASTER_DEFAULT_ARBITER_PORT;
@@ -2538,19 +2495,6 @@ _PG_init(void)
25382495
NULL
25392496
);
25402497

2541-
DefineCustomBoolVariable(
2542-
"multimaster.use_raftable",
2543-
"Use raftable plugin for internode communication",
2544-
NULL,
2545-
&MtmUseRaftable,
2546-
true,
2547-
PGC_BACKEND,
2548-
0,
2549-
NULL,
2550-
NULL,
2551-
NULL
2552-
);
2553-
25542498
DefineCustomBoolVariable(
25552499
"multimaster.ignore_tables_without_pk",
25562500
"Do not replicate tables withpout primary key",
@@ -2708,21 +2652,6 @@ _PG_init(void)
27082652
NULL
27092653
);
27102654

2711-
DefineCustomIntVariable(
2712-
"multimaster.raftable_port",
2713-
"Base value for assigning raftable ports",
2714-
NULL,
2715-
&MtmRaftablePort,
2716-
6543,
2717-
0,
2718-
INT_MAX,
2719-
PGC_BACKEND,
2720-
0,
2721-
NULL,
2722-
NULL,
2723-
NULL
2724-
);
2725-
27262655
DefineCustomStringVariable(
27272656
"multimaster.conn_strings",
27282657
"Multimaster node connection strings separated by commas, i.e. 'replication=database dbname=postgres host=localhost port=5001,replication=database dbname=postgres host=localhost port=5002'",
@@ -2813,9 +2742,6 @@ _PG_init(void)
28132742

28142743
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
28152744

2816-
if (MtmUseRaftable) {
2817-
MtmRaftableInitialize();
2818-
}
28192745
MtmArbiterInitialize();
28202746

28212747
/*
@@ -3516,8 +3442,7 @@ PGconn *PQconnectdb_safe(const char *conninfo)
35163442
{
35173443
PGconn *conn;
35183444
char *safe_connstr = pstrdup(conninfo);
3519-
erase_option_from_connstr("raftport", safe_connstr);
3520-
erase_option_from_connstr("arbiterport", safe_connstr);
3445+
erase_option_from_connstr("arbiter_port", safe_connstr);
35213446

35223447
conn = PQconnectdb(safe_connstr);
35233448

@@ -3626,11 +3551,17 @@ Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
36263551
int i;
36273552
for (i = 0; i < Mtm->nAllNodes; i++)
36283553
{
3629-
size_t size;
3630-
char* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, false);
3631-
if (data) {
3632-
GlobalTransactionId *gtid = (GlobalTransactionId *)data;
3633-
GlobalTransactionId *last = (GlobalTransactionId *)(data + size);
3554+
size_t lockGraphSize;
3555+
char *lockGraphData;
3556+
MtmLockNode(i + MtmMaxNodes, LW_SHARED);
3557+
lockGraphSize = Mtm->nodes[i].lockGraphUsed;
3558+
lockGraphData = palloc(lockGraphSize);
3559+
memcpy(lockGraphData, Mtm->nodes[i].lockGraphData, lockGraphSize);
3560+
MtmUnlockNode(i + MtmMaxNodes);
3561+
3562+
if (lockGraphData) {
3563+
GlobalTransactionId *gtid = (GlobalTransactionId *) lockGraphData;
3564+
GlobalTransactionId *last = (GlobalTransactionId *) (lockGraphData + lockGraphSize);
36343565
appendStringInfo(s, "node-%d lock graph: ", i+1);
36353566
while (gtid != last) {
36363567
GlobalTransactionId *src = gtid++;
@@ -4543,19 +4474,28 @@ MtmDetectGlobalDeadLockFortXid(TransactionId xid)
45434474

45444475
ByteBufferAlloc(&buf);
45454476
EnumerateLocks(MtmSerializeLock, &buf);
4546-
RaftableSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, false);
4477+
4478+
Assert(replorigin_session_origin == InvalidRepOriginId);
4479+
XLogFlush(LogLogicalMessage("L", buf.data, buf.used, false));
4480+
45474481
MtmSleep(MSEC_TO_USEC(DeadlockTimeout));
45484482
MtmGraphInit(&graph);
45494483
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
45504484
ByteBufferFree(&buf);
45514485
for (i = 0; i < Mtm->nAllNodes; i++) {
45524486
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
4553-
size_t size;
4554-
void* data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, false);
4555-
if (data == NULL) {
4487+
size_t lockGraphSize;
4488+
void* lockGraphData;
4489+
MtmLockNode(i + MtmMaxNodes, LW_SHARED);
4490+
lockGraphSize = Mtm->nodes[i].lockGraphUsed;
4491+
lockGraphData = palloc(lockGraphSize);
4492+
memcpy(lockGraphData, Mtm->nodes[i].lockGraphData, lockGraphSize);
4493+
MtmUnlockNode(i + MtmMaxNodes);
4494+
4495+
if (lockGraphData == NULL) {
45564496
return true; /* If using Raftable is disabled */
45574497
} else {
4558-
MtmGraphAdd(&graph, (GlobalTransactionId*)data, size/sizeof(GlobalTransactionId));
4498+
MtmGraphAdd(&graph, (GlobalTransactionId*)lockGraphData, lockGraphSize/sizeof(GlobalTransactionId));
45594499
}
45604500
}
45614501
}

raftable.c

Lines changed: 0 additions & 88 deletions
This file was deleted.

0 commit comments

Comments
 (0)