Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8930610

Browse files
committed
restart receivers on node disables too
1 parent b666274 commit 8930610

File tree

4 files changed

+25
-7
lines changed

4 files changed

+25
-7
lines changed

Cluster.pm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ sub new
1616

1717
my $self = {
1818
nodes => \@nodes,
19-
recv_timeout => 3
19+
recv_timeout => 6
2020
};
2121

2222
bless $self, $class;

src/include/state.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ extern nodemask_t MtmGetDisabledNodeMask(void);
6262
extern nodemask_t MtmGetConnectedNodeMask(void);
6363
extern nodemask_t MtmGetEnabledNodeMask(void);
6464
extern int MtmGetRecoveryCount(void);
65+
extern int MtmGetNodeDisableCount(int node_id);
6566

6667
extern nodemask_t MtmGetDisabledNodeMask(void);
6768

src/pglogical_receiver.c

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,8 @@ pglogical_receiver_main(Datum main_arg)
564564
*/
565565
for(;;)
566566
{
567-
int count;
567+
int count,
568+
counterpart_disable_count;
568569
XLogRecPtr remote_start = InvalidXLogRecPtr;
569570
Syncpoint *spvector = NULL;
570571
HTAB *filter_map = NULL;
@@ -586,11 +587,8 @@ pglogical_receiver_main(Datum main_arg)
586587
Mtm->peers[nodeId - 1].receiver_mode = mode;
587588
LWLockRelease(Mtm->lock);
588589

589-
// XXX: delete unnecessary modes
590-
Assert(mode == REPLMODE_RECOVERY || mode == REPLMODE_RECOVERED);
591-
592-
// XXX: avoid that
593590
count = MtmGetRecoveryCount();
591+
counterpart_disable_count = MtmGetNodeDisableCount(nodeId);
594592

595593
/* Establish connection to remote server */
596594
conn = receiver_connect(MtmNodeById(receiver_mtm_cfg, nodeId)->conninfo);
@@ -715,12 +713,17 @@ pglogical_receiver_main(Datum main_arg)
715713
proc_exit(1);
716714
}
717715

718-
719716
if (count != MtmGetRecoveryCount()) {
720717
ereport(LOG, (MTM_ERRMSG("%s: restart WAL receiver because node was recovered", worker_proc)));
721718
goto OnError;
722719
}
723720

721+
if (counterpart_disable_count != MtmGetNodeDisableCount(nodeId))
722+
{
723+
ereport(LOG, (MTM_ERRMSG("%s: restart WAL receiver because counterpart was disabled", worker_proc)));
724+
goto OnError;
725+
}
726+
724727
/*
725728
* Receive data.
726729
*/

src/state.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ struct MtmState
7171
int recovery_slot;
7272

7373
int recovery_count;
74+
int node_recovery_count[MTM_MAX_NODES];
7475

7576
LWLock *lock;
7677

@@ -449,6 +450,7 @@ MtmDisableNode(int node_id)
449450
mtm_log(MtmStateMessage, "[STATE] Node %i: disabled", node_id);
450451

451452
BIT_CLEAR(mtm_state->enabled_mask, node_id - 1);
453+
mtm_state->node_recovery_count[node_id - 1] += 1;
452454

453455
// XXX my_node_id
454456
if (mtm_state->status == MTM_ONLINE)
@@ -1077,6 +1079,18 @@ MtmGetRecoveryCount()
10771079
return mtm_state->recovery_count;
10781080
}
10791081

1082+
int
1083+
MtmGetNodeDisableCount(int node_id)
1084+
{
1085+
int count;
1086+
1087+
LWLockAcquire(mtm_state->lock, LW_SHARED);
1088+
count = mtm_state->node_recovery_count[node_id - 1];
1089+
LWLockRelease(mtm_state->lock);
1090+
1091+
return count;
1092+
}
1093+
10801094
// XXX: During evaluation of (mtm.node_info(id)).* this function called
10811095
// once each columnt for every row. So may be just rewrite to SRF.
10821096
Datum

0 commit comments

Comments
 (0)