Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9fae1e8

Browse files
committed
node management functions: low-level add/drop functions, config reload in workers
1 parent d2df714 commit 9fae1e8

16 files changed

+1047
-463
lines changed

Cluster.pm

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,10 +176,16 @@ sub start
176176
foreach my $node (@$nodes)
177177
{
178178
$node->start();
179+
note( "Starting node with connstr 'port=@{[ $node->port() ]} host=@{[ $node->host() ]}'");
180+
}
181+
182+
$self->await_nodes( (0..$self->{nodenum}-1) );
183+
184+
foreach my $node (@$nodes)
185+
{
179186
$node->safe_psql($node->{dbname}, "create extension multimaster;");
180187
$node->safe_psql($node->{dbname}, "select mtm.init_node($node_id, '{$connstrs}');");
181188
$node_id = $node_id + 1;
182-
note( "Starting node with connstr 'port=@{[ $node->port() ]} host=@{[ $node->host() ]}'");
183189
}
184190
}
185191

multimaster--1.0.sql

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,71 @@ BEGIN
1010
END
1111
$$;
1212

13-
13+
-- message queue receiver, for internal use only
1414
CREATE FUNCTION mtm.dmq_receiver_loop(sender_name text) RETURNS void
1515
AS 'MODULE_PATHNAME','dmq_receiver_loop'
1616
LANGUAGE C;
1717

18+
---
19+
--- Plumbering of node management: internal tables and triggers.
20+
--- Not indended to be used directly by users but rather through add/drop/init
21+
--- functions.
22+
---
1823

19-
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
20-
AS 'MODULE_PATHNAME','mtm_make_table_local'
24+
CREATE TABLE mtm.nodes(
25+
id int primary key not null,
26+
conninfo text not null,
27+
is_self bool not null
28+
);
29+
30+
CREATE FUNCTION mtm.after_node_create()
31+
RETURNS TRIGGER
32+
AS 'MODULE_PATHNAME','mtm_after_node_create'
2133
LANGUAGE C;
2234

23-
CREATE FUNCTION mtm.init_node(node_id integer, connstrs text[]) RETURNS void
24-
AS 'MODULE_PATHNAME','mtm_init_node'
35+
CREATE TRIGGER on_node_create
36+
AFTER INSERT ON mtm.nodes
37+
FOR EACH ROW
38+
EXECUTE FUNCTION mtm.after_node_create();
39+
40+
CREATE FUNCTION mtm.after_node_drop()
41+
RETURNS TRIGGER
42+
AS 'MODULE_PATHNAME','mtm_after_node_drop'
43+
LANGUAGE C;
44+
45+
CREATE TRIGGER on_node_drop
46+
AFTER DELETE ON mtm.nodes
47+
FOR EACH ROW
48+
EXECUTE FUNCTION mtm.after_node_drop();
49+
50+
---
51+
--- User facing API for node management.
52+
---
53+
54+
CREATE OR REPLACE FUNCTION mtm.init_node(node_id integer, connstrs text[]) RETURNS void AS
55+
$$
56+
BEGIN
57+
IF node_id <= 0 OR node_id > least(16, array_length(connstrs, 1)) THEN
58+
RAISE EXCEPTION 'node_id should be in range [1 .. length(connstrs)]';
59+
END IF;
60+
EXECUTE 'SET mtm.emerging_node_id = ' || node_id || ';';
61+
INSERT INTO mtm.nodes SELECT
62+
ordinality::int as id,
63+
unnest as conninfo,
64+
ordinality = current_setting('mtm.emerging_node_id')::int as is_self
65+
FROM
66+
unnest(connstrs)
67+
WITH ORDINALITY;
68+
END
69+
$$
70+
LANGUAGE plpgsql;
71+
72+
---
73+
--- Various
74+
---
75+
76+
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
77+
AS 'MODULE_PATHNAME','mtm_make_table_local'
2578
LANGUAGE C;
2679

2780
CREATE FUNCTION mtm.dump_lock_graph() RETURNS text
@@ -50,11 +103,6 @@ CREATE TABLE mtm.syncpoints(
50103
primary key(node_id, origin_lsn)
51104
);
52105

53-
CREATE TABLE mtm.nodes(
54-
id int primary key not null,
55-
conninfo text not null,
56-
is_self bool not null
57-
);
58106

59107
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
60108
$$

src/bgwpool.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include "tcop/pquery.h"
1414
#include "utils/guc.h"
1515
#include "tcop/tcopprot.h"
16+
#include "utils/syscache.h"
17+
#include "utils/inval.h"
1618

1719
#include "bgwpool.h"
1820
#include "multimaster.h"
@@ -40,6 +42,13 @@ BgwShutdownHandler(int sig)
4042
die(sig);
4143
}
4244

45+
static void
46+
subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
47+
{
48+
receiver_mtm_cfg_valid = false;
49+
}
50+
51+
4352
static void
4453
BgwPoolMainLoop(BgwPool* pool)
4554
{
@@ -70,6 +79,10 @@ BgwPoolMainLoop(BgwPool* pool)
7079
ActivePortal->sourceText = "";
7180

7281
receiver_mtm_cfg = MtmLoadConfig();
82+
/* Keep us informed about subscription changes. */
83+
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
84+
subscription_change_cb,
85+
(Datum) 0);
7386

7487
while (true)
7588
{
@@ -209,7 +222,7 @@ static void BgwStartExtraWorker(BgwPool* pool)
209222
MemSet(&worker, 0, sizeof(BackgroundWorker));
210223
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
211224
worker.bgw_start_time = BgWorkerStart_ConsistentState;
212-
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
225+
worker.bgw_restart_time = BGW_NEVER_RESTART;
213226
worker.bgw_main_arg = PointerGetDatum(pool);
214227
sprintf(worker.bgw_library_name, "multimaster");
215228
sprintf(worker.bgw_function_name, "BgwPoolDynamicWorkerMainLoop");

src/commit.c

Lines changed: 38 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#include "access/transam.h"
1515
#include "storage/proc.h"
1616
#include "utils/guc.h"
17+
#include "utils/syscache.h"
18+
#include "utils/inval.h"
1719
#include "miscadmin.h"
1820
#include "commands/dbcommands.h"
1921
#include "catalog/pg_subscription.h"
@@ -26,7 +28,9 @@
2628
#include "state.h"
2729
#include "syncpoint.h"
2830

29-
static bool backend_init_done;
31+
static bool subchange_cb_registered;
32+
static bool config_valid;
33+
// XXX: change dmq api and avoid that
3034
static int sender_to_node[MTM_MAX_NODES];
3135
static MtmConfig *mtm_cfg;
3236

@@ -38,22 +42,24 @@ static void GatherPrecommits(TransactionId xid, nodemask_t participantsMask,
3842
MtmMessageCode code);
3943

4044
static void
41-
backend_init()
45+
pubsub_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4246
{
43-
int i,
44-
sender_id = 0;
47+
config_valid = false;
48+
}
4549

46-
/* load mm config */
47-
mtm_cfg = MtmLoadConfig();
50+
static void
51+
attach_node(int node_id, MtmConfig *new_cfg, Datum arg)
52+
{
53+
int sender_id = dmq_attach_receiver(psprintf(MTM_DMQNAME_FMT, node_id),
54+
node_id - 1);
55+
sender_to_node[sender_id] = node_id;
56+
}
4857

49-
/* attach ourself to receiver queues and fill out sender_to_node[] */
50-
for (i = 0; i < mtm_cfg->n_nodes; i++)
51-
{
52-
if (i + 1 == mtm_cfg->my_node_id)
53-
continue;
54-
dmq_attach_receiver(psprintf("node%d", i + 1), i);
55-
sender_to_node[sender_id++] = i + 1;
56-
}
58+
static void
59+
detach_node(int node_id, MtmConfig *new_cfg, Datum arg)
60+
{
61+
/* detach incoming queues from this node */
62+
dmq_detach_receiver(psprintf(MTM_DMQNAME_FMT, node_id));
5763
}
5864

5965
void
@@ -103,11 +109,23 @@ MtmBeginTransaction()
103109
return;
104110
}
105111

106-
// XXX: reload on new node. (origin inval callback?)
107-
if (!backend_init_done)
112+
if (!subchange_cb_registered)
113+
{
114+
/* Keep us informed about subscription changes. */
115+
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
116+
pubsub_change_cb,
117+
(Datum) 0);
118+
CacheRegisterSyscacheCallback(PUBLICATIONOID,
119+
pubsub_change_cb,
120+
(Datum) 0);
121+
subchange_cb_registered = true;
122+
}
123+
124+
AcceptInvalidationMessages();
125+
if (!config_valid)
108126
{
109-
backend_init();
110-
backend_init_done = true;
127+
mtm_cfg = MtmReloadConfig(mtm_cfg, attach_node, detach_node, (Datum) NULL);
128+
config_valid = true;
111129
}
112130

113131
/* Reset MtmTx */
@@ -120,7 +138,6 @@ MtmBeginTransaction()
120138

121139
MtmDDLResetStatement();
122140

123-
124141
/* Application name can be changed using PGAPPNAME environment variable */
125142
if (Mtm->status != MTM_ONLINE
126143
&& strcmp(application_name, MULTIMASTER_ADMIN) != 0
@@ -191,9 +208,8 @@ MtmTwoPhaseCommit()
191208
LWLockAcquire(MtmCommitBarrier, LW_SHARED);
192209

193210
MtmLock(LW_SHARED);
194-
participantsMask = (((nodemask_t)1 << mtm_cfg->n_nodes) - 1) &
195-
~Mtm->disabledNodeMask &
196-
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
211+
participantsMask = ~Mtm->disabledNodeMask &
212+
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
197213
if (Mtm->status != MTM_ONLINE)
198214
mtm_log(ERROR, "This node became offline during current transaction");
199215
MtmUnlock();
@@ -242,8 +258,6 @@ GatherPrepares(TransactionId xid, nodemask_t participantsMask, int *failed_at)
242258
{
243259
bool prepared = true;
244260

245-
Assert(participantsMask != 0);
246-
247261
while (participantsMask != 0)
248262
{
249263
bool ret;
@@ -311,8 +325,6 @@ GatherPrepares(TransactionId xid, nodemask_t participantsMask, int *failed_at)
311325
static void
312326
GatherPrecommits(TransactionId xid, nodemask_t participantsMask, MtmMessageCode code)
313327
{
314-
Assert(participantsMask != 0);
315-
316328
while (participantsMask != 0)
317329
{
318330
bool ret;

0 commit comments

Comments
 (0)