Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4444c22

Browse files
knizhnikkelvich
authored andcommitted
Add mtm.resume_node function
1 parent bcc04ea commit 4444c22

File tree

3 files changed

+61
-6
lines changed

3 files changed

+61
-6
lines changed

multimaster--1.0.sql

+13-4
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,28 @@ CREATE FUNCTION mtm.stop_replication() RETURNS void
1818
AS 'MODULE_PATHNAME','mtm_stop_replication'
1919
LANGUAGE C;
2020

21+
-- Stop replication to the node. Node is didsabled, If drop_slot is true, then replication slot is dropped and node can be recovered using basebackup and recover_node function.
22+
-- If drop_slot is false and limit for maximal slot gap was not reached, then node can be restarted using resume_node function.
2123
CREATE FUNCTION mtm.stop_node(node integer, drop_slot bool default false) RETURNS void
2224
AS 'MODULE_PATHNAME','mtm_stop_node'
2325
LANGUAGE C;
2426

27+
-- Add new node to the cluster. Number of nodes should not exeed maximal number of nodes in the cluster.
2528
CREATE FUNCTION mtm.add_node(conn_str text) RETURNS void
2629
AS 'MODULE_PATHNAME','mtm_add_node'
2730
LANGUAGE C;
2831

29-
-- Create replication slot for the node which was previously stopped
32+
-- Create replication slot for the node which was previously stalled (its replicatoin slot was deleted)
3033
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
3134
AS 'MODULE_PATHNAME','mtm_recover_node'
3235
LANGUAGE C;
3336

37+
-- Resume previously stopped node with live replication slot. If node was not stopped, this function has no effect.
38+
-- It doesn't create slot and returns error if node is stalled (slot eas dropped)
39+
CREATE FUNCTION mtm.resume_node(node integer) RETURNS void
40+
AS 'MODULE_PATHNAME','mtm_resume_node'
41+
LANGUAGE C;
42+
3443

3544
CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
3645
AS 'MODULE_PATHNAME','mtm_get_snapshot'
@@ -63,11 +72,11 @@ CREATE FUNCTION mtm.get_trans_by_xid(xid bigint) RETURNS mtm.trans_state
6372
AS 'MODULE_PATHNAME','mtm_get_trans_by_xid'
6473
LANGUAGE C;
6574

66-
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
75+
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
6776
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
6877
LANGUAGE C;
6978

70-
CREATE FUNCTION mtm.collect_cluster_info() RETURNS SETOF mtm.cluster_state
79+
CREATE FUNCTION mtm.collect_cluster_info() RETURNS SETOF mtm.cluster_state
7180
AS 'MODULE_PATHNAME','mtm_collect_cluster_info'
7281
LANGUAGE C;
7382

@@ -105,7 +114,7 @@ LANGUAGE C;
105114

106115
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema name, rel_name name, primary key(rel_schema, rel_name));
107116

108-
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
117+
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
109118
$$
110119
DECLARE
111120
seq_class record;

multimaster.c

+47-2
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ PG_FUNCTION_INFO_V1(mtm_stop_node);
117117
PG_FUNCTION_INFO_V1(mtm_add_node);
118118
PG_FUNCTION_INFO_V1(mtm_poll_node);
119119
PG_FUNCTION_INFO_V1(mtm_recover_node);
120+
PG_FUNCTION_INFO_V1(mtm_resume_node);
120121
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
121122
PG_FUNCTION_INFO_V1(mtm_get_csn);
122123
PG_FUNCTION_INFO_V1(mtm_get_trans_by_gid);
@@ -2002,7 +2003,7 @@ static void MtmDisableNode(int nodeId)
20022003
*/
20032004
static void MtmEnableNode(int nodeId)
20042005
{
2005-
if (BIT_SET(Mtm->disabledNodeMask, nodeId-1)) {
2006+
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
20062007
BIT_CLEAR(Mtm->disabledNodeMask, nodeId-1);
20072008
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
20082009
BIT_SET(Mtm->recoveredNodeMask, nodeId-1);
@@ -3678,7 +3679,7 @@ void MtmRecoverNode(int nodeId)
36783679
MTM_ELOG(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nAllNodes);
36793680
}
36803681
MtmLock(LW_EXCLUSIVE);
3681-
if (BIT_SET(Mtm->stoppedNodeMask, nodeId-1))
3682+
if (BIT_CHECK(Mtm->stoppedNodeMask, nodeId-1))
36823683
{
36833684
Assert(BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
36843685
BIT_CLEAR(Mtm->stoppedNodeMask, nodeId-1);
@@ -3693,6 +3694,36 @@ void MtmRecoverNode(int nodeId)
36933694
}
36943695
}
36953696

3697+
/*
3698+
* Resume previosly stopped node.
3699+
* This function creates logical replication slot for the node which will collect
3700+
* all changes which should be sent to this node from this moment.
3701+
*/
3702+
void MtmResumeNode(int nodeId)
3703+
{
3704+
if (nodeId <= 0 || nodeId > Mtm->nAllNodes)
3705+
{
3706+
MTM_ELOG(ERROR, "NodeID %d is out of range [1,%d]", nodeId, Mtm->nAllNodes);
3707+
}
3708+
MtmLock(LW_EXCLUSIVE);
3709+
if (BIT_CHECK(Mtm->stalledNodeMask, nodeId-1))
3710+
{
3711+
MtmUnlock();
3712+
MTM_ELOG(ERROR, "Node %d can not be resumed because it's replication slot is dropped", nodeId);
3713+
}
3714+
if (BIT_CHECK(Mtm->stoppedNodeMask, nodeId-1))
3715+
{
3716+
Assert(BIT_CHECK(Mtm->disabledNodeMask, nodeId-1));
3717+
BIT_CLEAR(Mtm->stoppedNodeMask, nodeId-1);
3718+
}
3719+
MtmUnlock();
3720+
3721+
if (!MtmIsBroadcast())
3722+
{
3723+
MtmBroadcastUtilityStmt(psprintf("select mtm.resume_node(%d)", nodeId), true);
3724+
}
3725+
}
3726+
36963727
/*
36973728
* Permanently exclude node from the cluster. Node will not participate in voting and can not be automatically recovered
36983729
* until MtmRecoverNode is invoked.
@@ -3785,6 +3816,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
37853816
}
37863817
MTM_LOG1("Startup of logical replication to node %d", MtmReplicationNodeId);
37873818
MtmLock(LW_EXCLUSIVE);
3819+
3820+
if (BIT_CHECK(Mtm->stalledNodeMask, MtmReplicationNodeId-1)) {
3821+
MtmUnlock();
3822+
MTM_ELOG(ERROR, "Stalled node %d tries to initiate recovery", MtmReplicationNodeId);
3823+
}
3824+
37883825
if (BIT_CHECK(Mtm->stoppedNodeMask, MtmReplicationNodeId-1)) {
37893826
MTM_ELOG(WARNING, "Stopped node %d tries to initiate recovery", MtmReplicationNodeId);
37903827
do {
@@ -4171,6 +4208,14 @@ mtm_recover_node(PG_FUNCTION_ARGS)
41714208
PG_RETURN_VOID();
41724209
}
41734210

4211+
Datum
4212+
mtm_resume_node(PG_FUNCTION_ARGS)
4213+
{
4214+
int nodeId = PG_GETARG_INT32(0);
4215+
MtmResumeNode(nodeId);
4216+
PG_RETURN_VOID();
4217+
}
4218+
41744219
Datum
41754220
mtm_get_snapshot(PG_FUNCTION_ARGS)
41764221
{

multimaster.h

+1
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ extern void MtmUnlockNode(int nodeId);
400400
extern void MtmStopNode(int nodeId, bool dropSlot);
401401
extern void MtmReconnectNode(int nodeId);
402402
extern void MtmRecoverNode(int nodeId);
403+
extern void MtmResumeNode(int nodeId);
403404
extern void MtmOnNodeDisconnect(int nodeId);
404405
extern void MtmOnNodeConnect(int nodeId);
405406
extern void MtmWakeUpBackend(MtmTransState* ts);

0 commit comments

Comments
 (0)