Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d67b021

Browse files
committed
Add mtm.resume_node function
1 parent 8514306 commit d67b021

File tree

3 files changed

+61
-6
lines changed

3 files changed

+61
-6
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@ CREATE FUNCTION mtm.stop_replication() RETURNS void
1818
AS 'MODULE_PATHNAME','mtm_stop_replication'
1919
LANGUAGE C;
2020

21+
-- Stop replication to the node. Node is didsabled, If drop_slot is true, then replication slot is dropped and node can be recovered using basebackup and recover_node function.
22+
-- If drop_slot is false and limit for maximal slot gap was not reached, then node can be restarted using resume_node function.
2123
CREATE FUNCTION mtm.stop_node(node integer, drop_slot bool default false) RETURNS void
2224
AS 'MODULE_PATHNAME','mtm_stop_node'
2325
LANGUAGE C;
2426

27+
-- Add new node to the cluster. Number of nodes should not exeed maximal number of nodes in the cluster.
2528
CREATE FUNCTION mtm.add_node(conn_str text) RETURNS void
2629
AS 'MODULE_PATHNAME','mtm_add_node'
2730
LANGUAGE C;
2831

29-
-- Create replication slot for the node which was previously stopped
32+
-- Create replication slot for the node which was previously stalled (its replicatoin slot was deleted)
3033
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
3134
AS 'MODULE_PATHNAME','mtm_recover_node'
3235
LANGUAGE C;
3336

37+
-- Resume previously stopped node with live replication slot. If node was not stopped, this function has no effect.
38+
-- It doesn't create slot and returns error if node is stalled (slot eas dropped)
39+
CREATE FUNCTION mtm.resume_node(node integer) RETURNS void
40+
AS 'MODULE_PATHNAME','mtm_resume_node'
41+
LANGUAGE C;
42+
3443

3544
CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
3645
AS 'MODULE_PATHNAME','mtm_get_snapshot'
@@ -63,11 +72,11 @@ CREATE FUNCTION mtm.get_trans_by_xid(xid bigint) RETURNS mtm.trans_state
6372
AS 'MODULE_PATHNAME','mtm_get_trans_by_xid'
6473
LANGUAGE C;
6574

66-
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
75+
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
6776
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
6877
LANGUAGE C;
6978

70-
CREATE FUNCTION mtm.collect_cluster_info() RETURNS SETOF mtm.cluster_state
79+
CREATE FUNCTION mtm.collect_cluster_info() RETURNS SETOF mtm.cluster_state
7180
AS 'MODULE_PATHNAME','mtm_collect_cluster_info'
7281
LANGUAGE C;
7382

@@ -105,7 +114,7 @@ LANGUAGE C;
105114

106115
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));
107116

108-
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
117+
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
109118
$$
110119
DECLARE
111120
seq_class record;

contrib/mmts/multimaster.c

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ PG_FUNCTION_INFO_V1(mtm_stop_node);
117117
PG_FUNCTION_INFO_V1(mtm_add_node);
118118
PG_FUNCTION_INFO_V1(mtm_poll_node);
119119
PG_FUNCTION_INFO_V1(mtm_recover_node);
120+
PG_FUNCTION_INFO_V1(mtm_resume_node);
120121
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
121122
PG_FUNCTION_INFO_V1(mtm_get_csn);
122123
PG_FUNCTION_INFO_V1(mtm_get_trans_by_gid);
@@ -1997,7 +1998,7 @@ static void MtmDisableNode(int nodeId)
19971998
*/
19981999
static void MtmEnableNode(int nodeId)
19992000
{
2000-
if (BIT_SET(Mtm->disabledNodeMask, nodeId-1)) {
2001+
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
20012002
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
20022003
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
20032004
BIT_SET(Mtm->recoveredNodeMask, nodeId-1);
@@ -3656,7 +3657,7 @@ void MtmRecoverNode(int nodeId)
36563657
MTM_ELOG(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nAllNodes);
36573658
}
36583659
MtmLock(LW_EXCLUSIVE);
3659-
if (BIT_SET(Mtm->stoppedNodeMask, nodeId-1))
3660+
if (BIT_CHECK(Mtm->stoppedNodeMask, nodeId-1))
36603661
{
36613662
Assert(BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
36623663
BIT_CLEAR(Mtm->stoppedNodeMask, nodeId-1);
@@ -3671,6 +3672,36 @@ void MtmRecoverNode(int nodeId)
36713672
}
36723673
}
36733674

3675+
/*
3676+
* Resume previosly stopped node.
3677+
* This function creates logical replication slot for the node which will collect
3678+
* all changes which should be sent to this node from this moment.
3679+
*/
3680+
void MtmResumeNode(int nodeId)
3681+
{
3682+
if (nodeId <= 0 || nodeId > Mtm->nAllNodes)
3683+
{
3684+
MTM_ELOG(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nAllNodes);
3685+
}
3686+
MtmLock(LW_EXCLUSIVE);
3687+
if (BIT_CHECK(Mtm->stalledNodeMask, nodeId-1))
3688+
{
3689+
MtmUnlock();
3690+
MTM_ELOG(ERROR, "Node %d can not be resumed because it's replication slot is dropped", nodeId);
3691+
}
3692+
if (BIT_CHECK(Mtm->stoppedNodeMask, nodeId-1))
3693+
{
3694+
Assert(BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
3695+
BIT_CLEAR(Mtm->stoppedNodeMask, nodeId-1);
3696+
}
3697+
MtmUnlock();
3698+
3699+
if (!MtmIsBroadcast())
3700+
{
3701+
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)", nodeId), true);
3702+
}
3703+
}
3704+
36743705
/*
36753706
* Permanently exclude node from the cluster. Node will not participate in voting and can not be automatically recovered
36763707
* until MtmRecoverNode is invoked.
@@ -3763,6 +3794,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
37633794
}
37643795
MTM_LOG1("Startup of logical replication to node %d", MtmReplicationNodeId);
37653796
MtmLock(LW_EXCLUSIVE);
3797+
3798+
if (BIT_CHECK(Mtm->stalledNodeMask, MtmReplicationNodeId-1)) {
3799+
MtmUnlock();
3800+
MTM_ELOG(ERROR, "Stalled node %d tries to initiate recovery", MtmReplicationNodeId);
3801+
}
3802+
37663803
if (BIT_CHECK(Mtm->stoppedNodeMask, MtmReplicationNodeId-1)) {
37673804
MTM_ELOG(WARNING, "Stopped node %d tries to initiate recovery", MtmReplicationNodeId);
37683805
do {
@@ -4149,6 +4186,14 @@ mtm_recover_node(PG_FUNCTION_ARGS)
41494186
PG_RETURN_VOID();
41504187
}
41514188

4189+
Datum
4190+
mtm_resume_node(PG_FUNCTION_ARGS)
4191+
{
4192+
int nodeId = PG_GETARG_INT32(0);
4193+
MtmResumeNode(nodeId);
4194+
PG_RETURN_VOID();
4195+
}
4196+
41524197
Datum
41534198
mtm_get_snapshot(PG_FUNCTION_ARGS)
41544199
{

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ extern void MtmUnlockNode(int nodeId);
398398
extern void MtmStopNode(int nodeId, bool dropSlot);
399399
extern void MtmReconnectNode(int nodeId);
400400
extern void MtmRecoverNode(int nodeId);
401+
extern void MtmResumeNode(int nodeId);
401402
extern void MtmOnNodeDisconnect(int nodeId);
402403
extern void MtmOnNodeConnect(int nodeId);
403404
extern void MtmWakeUpBackend(MtmTransState* ts);

0 commit comments

Comments
 (0)