Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a570eb0

Browse files
committed
node management functions: join_node and detect basebackups
1 parent 9fae1e8 commit a570eb0

10 files changed

+414
-88
lines changed

multimaster--1.0.sql

+29-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ LANGUAGE C;
2424
CREATE TABLE mtm.nodes(
2525
id int primary key not null,
2626
conninfo text not null,
27-
is_self bool not null
27+
is_self bool not null,
28+
init_done bool not null default 'f'
2829
);
2930

3031
CREATE FUNCTION mtm.after_node_create()
@@ -47,6 +48,11 @@ CREATE TRIGGER on_node_drop
4748
FOR EACH ROW
4849
EXECUTE FUNCTION mtm.after_node_drop();
4950

51+
CREATE FUNCTION mtm.join_node(node_id int)
52+
RETURNS VOID
53+
AS 'MODULE_PATHNAME','mtm_join_node'
54+
LANGUAGE C;
55+
5056
---
5157
--- User facing API for node management.
5258
---
@@ -57,6 +63,7 @@ BEGIN
5763
IF node_id <= 0 OR node_id > least(16, array_length(connstrs, 1)) THEN
5864
RAISE EXCEPTION 'node_id should be in range [1 .. length(connstrs)]';
5965
END IF;
66+
-- XXX
6067
EXECUTE 'SET mtm.emerging_node_id = ' || node_id || ';';
6168
INSERT INTO mtm.nodes SELECT
6269
ordinality::int as id,
@@ -69,6 +76,27 @@ END
6976
$$
7077
LANGUAGE plpgsql;
7178

79+
CREATE OR REPLACE FUNCTION mtm.add_node(connstr text) RETURNS void AS
80+
$$
81+
DECLARE
82+
new_node_id int;
83+
BEGIN
84+
-- XXX: add only to a configured mm?
85+
86+
INSERT INTO mtm.nodes SELECT
87+
min(unused_ids.id), connstr, 'false'
88+
FROM (
89+
SELECT id FROM generate_series(1,16) id
90+
EXCEPT
91+
SELECT id FROM mtm.nodes
92+
) unused_ids
93+
RETURNING id INTO new_node_id;
94+
95+
-- SELECT mtm.node_join(new_node_id, connstr);
96+
END
97+
$$
98+
LANGUAGE plpgsql;
99+
72100
---
73101
--- Various
74102
---

src/bgwpool.c

+12
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,18 @@ BgwPoolStart(BgwPool* pool, char *poolName, Oid db_id, Oid user_id)
199199
strncpy(pool->poolName, poolName, MAX_NAME_LEN);
200200
pool->db_id = db_id;
201201
pool->user_id = user_id;
202+
pool->nWorkers = 0;
203+
pool->shutdown = false;
204+
pool->producerBlocked = false;
205+
pool->head = 0;
206+
pool->tail = 0;
207+
pool->active = 0;
208+
pool->pending = 0;
209+
pool->lastPeakTime = 0;
210+
pool->lastDynamicWorkerStartTime = 0;
211+
212+
PGSemaphoreReset(pool->available);
213+
PGSemaphoreReset(pool->overflow);
202214
}
203215

204216
size_t BgwPoolGetQueueSize(BgwPool* pool)

src/commit.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ MtmTwoPhaseCommit()
248248
dmq_stream_unsubscribe(stream);
249249
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, stream);
250250

251-
MaybeLogSyncpoint();
251+
MaybeLogSyncpoint(false);
252252

253253
return true;
254254
}

src/include/logger.h

+3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ typedef enum MtmLogTag
7575
/* syncpoints */
7676
SyncpointCreated = LOG,
7777
SyncpointApply = LOG,
78+
79+
/* Node add/drop */
80+
NodeMgmt = LOG
7881
} MtmLogTag;
7982

8083
// XXX: also meaningful process name would be cool

src/include/multimaster.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,12 @@ typedef uint64 nodemask_t;
5656
/*
5757
* Definitions for the "mtm.nodes" table.
5858
*/
59-
#define MTM_NODES "nodes"
59+
#define MTM_NODES "mtm.nodes"
6060
#define Natts_mtm_nodes 3
6161
#define Anum_mtm_nodes_id 1 /* node_id, same accross cluster */
6262
#define Anum_mtm_nodes_connifo 2 /* connection string */
6363
#define Anum_mtm_nodes_is_self 3 /* is that tuple for our node? */
64+
#define Anum_mtm_nodes_init_done 4 /* did monitor already create slots? */
6465

6566

6667
/* Identifier of global transaction */
@@ -145,6 +146,7 @@ typedef struct
145146
int node_id;
146147
char *conninfo;
147148
RepOriginId origin_id;
149+
bool init_done;
148150
} MtmNode;
149151

150152
typedef struct
@@ -213,6 +215,7 @@ extern bool MtmBackgroundWorker;
213215
extern bool MtmIsLogicalReceiver;
214216
extern bool MtmIsReceiver;
215217
extern bool MtmIsPoolWorker;
218+
extern bool MtmIsMonitorWorker;
216219

217220
/* GUCs */
218221
extern int MtmTransSpillThreshold;

src/include/syncpoint.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ typedef struct
3333
XLogRecPtr origin_lsn;
3434
} FilterEntry;
3535

36-
extern void MaybeLogSyncpoint(void);
36+
extern void MaybeLogSyncpoint(bool force);
3737
extern void SyncpointRegister(int node_id, XLogRecPtr origin_lsn,
3838
XLogRecPtr local_lsn, XLogRecPtr trim_lsn);
3939
extern Syncpoint SyncpointGetLatest(int node_id);

0 commit comments

Comments
 (0)