Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2292f73

Browse files
committed
Rework init and config infrastructure.
Move nodes conninfo and id's to tables in extension schema. Also change startup sequence to be able to start mm in different databases.
1 parent 3c1e5f3 commit 2292f73

22 files changed

+803
-1727
lines changed

Cluster.pm

+13-20
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ sub new
6262
foreach my $i (1..$nodenum)
6363
{
6464
my $host = "127.0.0.1";
65-
my ($pgport, $arbiter_port) = allocate_ports($host, 2);
65+
my ($pgport) = allocate_ports($host, 1);
6666

6767
if(defined $ENV{MMPORT}) {
6868
$pgport = $ENV{MMPORT};
@@ -72,8 +72,7 @@ sub new
7272
my $node = new PostgresNode("node$i", $host, $pgport);
7373
$node->{id} = $i;
7474
$node->{dbname} = 'postgres';
75-
$node->{arbiter_port} = $arbiter_port;
76-
$node->{mmconnstr} = "${ \$node->connstr($node->{dbname}) } arbiter_port=${ \$node->{arbiter_port} }";
75+
$node->{mmconnstr} = $node->connstr($node->{dbname});
7776
push(@$nodes, $node);
7877
}
7978

@@ -102,7 +101,7 @@ sub all_connstrs
102101
{
103102
my ($self) = @_;
104103
my $nodes = $self->{nodes};
105-
return join(', ', map { "${ \$_->connstr($_->{dbname}) } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
104+
return join(', ', map { "\" ${ \$_->connstr($_->{dbname}) } \"" } @$nodes);
106105
}
107106

108107

@@ -126,15 +125,11 @@ sub configure
126125
}
127126
}
128127

129-
my $connstr = $self->all_connstrs();
130-
$connstr =~ s/'//gms;
131-
132128
foreach my $node (@$nodes)
133129
{
134130
my $id = $node->{id};
135131
my $host = $node->host;
136132
my $pgport = $node->port;
137-
my $arbiter_port = $node->{arbiter_port};
138133
my $unix_sock_dir = $ENV{PGHOST};
139134

140135
$node->append_conf("postgresql.conf", qq(
@@ -148,18 +143,16 @@ sub configure
148143
wal_level = logical
149144
max_wal_senders = 6
150145
wal_sender_timeout = 0
151-
max_replication_slots = 6
146+
max_replication_slots = 12
152147
shared_preload_libraries = 'multimaster'
153148
shared_buffers = 16MB
154149
155-
multimaster.node_id = $id
156-
multimaster.conn_strings = '$connstr'
157150
multimaster.heartbeat_recv_timeout = 8050
158151
multimaster.heartbeat_send_timeout = 250
159152
multimaster.max_nodes = 6
160153
# XXX try without ignore_tables_without_pk
161154
multimaster.ignore_tables_without_pk = true
162-
# multimaster.volkswagen_mode = 1
155+
multimaster.volkswagen_mode = 1
163156
log_line_prefix = '%t [%p]: '
164157
));
165158

@@ -175,11 +168,17 @@ sub start
175168
{
176169
my ($self) = @_;
177170
my $nodes = $self->{nodes};
171+
my $node_id = 1;
172+
173+
my $connstrs = $self->all_connstrs();
174+
$connstrs =~ s/'/''/gms;
178175

179176
foreach my $node (@$nodes)
180177
{
181178
$node->start();
182179
$node->safe_psql($node->{dbname}, "create extension multimaster;");
180+
$node->safe_psql($node->{dbname}, "select mtm.init_node($node_id, '{$connstrs}');");
181+
$node_id = $node_id + 1;
183182
note( "Starting node with connstr 'port=@{[ $node->port() ]} host=@{[ $node->host() ]}'");
184183
}
185184
}
@@ -370,23 +369,20 @@ sub add_node()
370369
my ($self, %params) = @_;
371370

372371
my $pgport;
373-
my $arbiter_port;
374372
my $connstrs;
375373
my $node_id;
376374

377375
if (defined $params{node_id})
378376
{
379377
$node_id = $params{node_id};
380378
$pgport = $params{port};
381-
$arbiter_port = $params{arbiter_port};
382379
$connstrs = $self->all_connstrs();
383380
}
384381
else
385382
{
386383
$node_id = scalar(@{$self->{nodes}}) + 1;
387384
$pgport = (allocate_ports('127.0.0.1', 1))[0];
388-
$arbiter_port = (allocate_ports('127.0.0.1', 1))[0];
389-
$connstrs = $self->all_connstrs() . ", dbname=${ \$self->{nodes}->[0]->{dbname} } host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
385+
$connstrs = $self->all_connstrs() . ", dbname=${ \$self->{nodes}->[0]->{dbname} } host=127.0.0.1 port=$pgport";
390386
}
391387

392388
$connstrs =~ s/'//gms;
@@ -402,12 +398,9 @@ sub add_node()
402398
$node->{_port} = $pgport;
403399
$node->{port} = $pgport;
404400
$node->{host} = '127.0.0.1';
405-
$node->{arbiter_port} = $arbiter_port;
406-
$node->{mmconnstr} = "${ \$node->connstr($node->{dbname}) } arbiter_port=${ \$node->{arbiter_port} }";
401+
$node->{mmconnstr} = $node->connstr($node->{dbname});
407402
$node->append_conf("postgresql.conf", qq(
408-
multimaster.arbiter_port = $arbiter_port
409403
multimaster.conn_strings = '$connstrs'
410-
multimaster.node_id = $node_id
411404
port = $pgport
412405
));
413406
$node->append_conf("pg_hba.conf", qq(

multimaster--1.0.sql

+9-108
Original file line numberDiff line numberDiff line change
@@ -16,123 +16,18 @@ AS 'MODULE_PATHNAME','dmq_receiver_loop'
1616
LANGUAGE C;
1717

1818

19-
-- Stop replication to the node. Node is didsabled, If drop_slot is true, then replication slot is dropped and node can be recovered using basebackup and recover_node function.
20-
-- If drop_slot is false and limit for maximal slot gap was not reached, then node can be restarted using resume_node function.
21-
CREATE FUNCTION mtm.stop_node(node integer, drop_slot bool default false) RETURNS void
22-
AS 'MODULE_PATHNAME','mtm_stop_node'
23-
LANGUAGE C;
24-
25-
-- Add new node to the cluster. Number of nodes should not exeed maximal number of nodes in the cluster.
26-
CREATE FUNCTION mtm.add_node(conn_str text) RETURNS void
27-
AS 'MODULE_PATHNAME','mtm_add_node'
28-
LANGUAGE C;
29-
30-
-- Create replication slot for the node which was previously stalled (its replicatoin slot was deleted)
31-
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
32-
AS 'MODULE_PATHNAME','mtm_recover_node'
33-
LANGUAGE C;
34-
35-
-- Resume previously stopped node with live replication slot. If node was not stopped, this function has no effect.
36-
-- It doesn't create slot and returns error if node is stalled (slot eas dropped)
37-
CREATE FUNCTION mtm.resume_node(node integer) RETURNS void
38-
AS 'MODULE_PATHNAME','mtm_resume_node'
39-
LANGUAGE C;
40-
41-
42-
CREATE TYPE mtm.node_state AS (
43-
"id" integer,
44-
"enabled" bool,
45-
"connected" bool,
46-
"slot_active" bool,
47-
"stopped" bool,
48-
"catch_up" bool,
49-
"slot_lag" bigint,
50-
"avg_trans_delay" bigint,
51-
"last_status_change" timestamp,
52-
"oldest_snapshot" bigint,
53-
"sender_pid" integer,
54-
"sender_start_time" timestamp,
55-
"receiver_pid" integer,
56-
"receiver_start_time" timestamp,
57-
"conn_str" text,
58-
"connectivity_mask" bigint,
59-
"n_heartbeats" bigint
60-
);
61-
62-
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
63-
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
64-
LANGUAGE C;
65-
66-
CREATE TYPE mtm.cluster_state AS (
67-
"id" integer,
68-
"status" text,
69-
"disabled_node_mask" bigint,
70-
"disconnected_node_mask" bigint,
71-
"catch_up_node_mask" bigint,
72-
"live_nodes" integer,
73-
"all_nodes" integer,
74-
"n_active_queries" integer,
75-
"n_pending_queries" integer,
76-
"queue_size" bigint,
77-
"trans_count" bigint,
78-
"time_shift" bigint,
79-
"recovery_slot" integer,
80-
"xid_hash_size" bigint,
81-
"gid_hash_size" bigint,
82-
"oldest_xid" bigint,
83-
"config_changes" integer,
84-
"stalled_node_mask" bigint,
85-
"stopped_node_mask" bigint,
86-
"last_status_change" timestamp
87-
);
88-
89-
CREATE TYPE mtm.trans_state AS (
90-
"status" text,
91-
"gid" text,
92-
"xid" bigint,
93-
"coordinator" integer,
94-
"gxid" bigint,
95-
"csn" timestamp,
96-
"snapshot" timestamp,
97-
"local" boolean,
98-
"prepared" boolean,
99-
"active" boolean,
100-
"twophase" boolean,
101-
"voting_completed" boolean,
102-
"participants" bigint,
103-
"voted" bigint,
104-
"config_changes" integer
105-
);
106-
107-
108-
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
109-
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
110-
LANGUAGE C;
111-
112-
CREATE FUNCTION mtm.collect_cluster_info() RETURNS SETOF mtm.cluster_state
113-
AS 'MODULE_PATHNAME','mtm_collect_cluster_info'
114-
LANGUAGE C;
115-
11619
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
11720
AS 'MODULE_PATHNAME','mtm_make_table_local'
11821
LANGUAGE C;
11922

120-
-- CREATE FUNCTION mtm.broadcast_table(source_table regclass) RETURNS void
121-
-- AS 'MODULE_PATHNAME','mtm_broadcast_table'
122-
-- LANGUAGE C;
123-
124-
-- CREATE FUNCTION mtm.copy_table(source_table regclass, target_node_id integer) RETURNS void
125-
-- AS 'MODULE_PATHNAME','mtm_copy_table'
126-
-- LANGUAGE C;
23+
CREATE FUNCTION mtm.init_node(node_id integer, connstrs text[]) RETURNS void
24+
AS 'MODULE_PATHNAME','mtm_init_node'
25+
LANGUAGE C;
12726

12827
CREATE FUNCTION mtm.dump_lock_graph() RETURNS text
12928
AS 'MODULE_PATHNAME','mtm_dump_lock_graph'
13029
LANGUAGE C;
13130

132-
CREATE FUNCTION mtm.poll_node(node_id integer, no_wait boolean default FALSE) RETURNS boolean
133-
AS 'MODULE_PATHNAME','mtm_poll_node'
134-
LANGUAGE C;
135-
13631
CREATE FUNCTION mtm.check_deadlock(xid bigint) RETURNS boolean
13732
AS 'MODULE_PATHNAME','mtm_check_deadlock'
13833
LANGUAGE C;
@@ -155,6 +50,12 @@ CREATE TABLE mtm.syncpoints(
15550
primary key(node_id, origin_lsn)
15651
);
15752

53+
CREATE TABLE mtm.nodes(
54+
id int primary key not null,
55+
conninfo text not null,
56+
is_self bool not null
57+
);
58+
15859
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
15960
$$
16061
DECLARE

src/bgwpool.c

+11-20
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ int MtmMaxWorkers;
2525

2626
static BgwPool* MtmPool;
2727

28-
void BgwPoolStaticWorkerMainLoop(Datum arg);
2928
void BgwPoolDynamicWorkerMainLoop(Datum arg);
3029

3130
static void
@@ -65,11 +64,13 @@ BgwPoolMainLoop(BgwPool* pool)
6564
pqsignal(SIGHUP, PostgresSigHupHandler);
6665

6766
BackgroundWorkerUnblockSignals();
68-
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser, 0);
67+
BackgroundWorkerInitializeConnectionByOid(pool->db_id, pool->user_id, 0);
6968
ActivePortal = &fakePortal;
7069
ActivePortal->status = PORTAL_ACTIVE;
7170
ActivePortal->sourceText = "";
7271

72+
receiver_mtm_cfg = MtmLoadConfig();
73+
7374
while (true)
7475
{
7576
if (ConfigReloadPending)
@@ -138,7 +139,10 @@ BgwPoolMainLoop(BgwPool* pool)
138139
mtm_log(BgwPoolEvent, "Shutdown background worker %d", MyProcPid);
139140
}
140141

141-
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers)
142+
// XXX: this is called during _PG_init because we need to allocate queue.
143+
// Better to use DSM, so that can be done dynamically.
144+
void
145+
BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, size_t queueSize, size_t nWorkers)
142146
{
143147
MtmPool = pool;
144148

@@ -164,37 +168,24 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
164168
pool->nWorkers = nWorkers;
165169
pool->lastPeakTime = 0;
166170
pool->lastDynamicWorkerStartTime = 0;
167-
strncpy(pool->dbname, dbname, MAX_DBNAME_LEN);
168-
strncpy(pool->dbuser, dbuser, MAX_DBUSER_LEN);
169171
}
170172

171173
timestamp_t BgwGetLastPeekTime(BgwPool* pool)
172174
{
173175
return pool->lastPeakTime;
174176
}
175177

176-
void BgwPoolStaticWorkerMainLoop(Datum arg)
177-
{
178-
BgwPoolMainLoop((BgwPool*)DatumGetPointer(arg));
179-
}
180-
181178
void BgwPoolDynamicWorkerMainLoop(Datum arg)
182179
{
183180
BgwPoolMainLoop((BgwPool*)DatumGetPointer(arg));
184181
}
185182

186-
void BgwPoolStart(BgwPool* pool, char *poolName)
183+
void
184+
BgwPoolStart(BgwPool* pool, char *poolName, Oid db_id, Oid user_id)
187185
{
188-
BackgroundWorker worker;
189-
190-
MemSet(&worker, 0, sizeof(BackgroundWorker));
191-
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
192-
worker.bgw_start_time = BgWorkerStart_ConsistentState;
193-
sprintf(worker.bgw_library_name, "multimaster");
194-
sprintf(worker.bgw_function_name, "BgwPoolStaticWorkerMainLoop");
195-
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
196-
197186
strncpy(pool->poolName, poolName, MAX_NAME_LEN);
187+
pool->db_id = db_id;
188+
pool->user_id = user_id;
198189
}
199190

200191
size_t BgwPoolGetQueueSize(BgwPool* pool)

0 commit comments

Comments
 (0)