Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fb7562c

Browse files
committed
disable node on broken dmq sender connetion
1 parent 6b687aa commit fb7562c

File tree

5 files changed

+52
-8
lines changed

5 files changed

+52
-8
lines changed

src/dmq.c

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,10 @@ static volatile sig_atomic_t got_SIGHUP = false;
136136

137137
static shmem_startup_hook_type PreviousShmemStartupHook;
138138

139-
dmq_receiver_hook_type dmq_receiver_start_hook;
140-
dmq_receiver_hook_type dmq_receiver_stop_hook;
139+
dmq_hook_type dmq_receiver_start_hook;
140+
dmq_hook_type dmq_receiver_stop_hook;
141+
dmq_hook_type dmq_sender_connect_hook;
142+
dmq_hook_type dmq_sender_disconnect_hook;
141143

142144
void dmq_sender_main(Datum main_arg);
143145

@@ -461,6 +463,8 @@ dmq_sender_main(Datum main_arg)
461463
"[DMQ] failed to send message to %s: %s",
462464
conns[conn_id].receiver_name,
463465
PQerrorMessage(conns[conn_id].pgconn));
466+
467+
dmq_sender_disconnect_hook(conns[conn_id].receiver_name);
464468
}
465469
else
466470
{
@@ -577,6 +581,8 @@ dmq_sender_main(Datum main_arg)
577581
"[DMQ] failed to send heartbeat to %s: %s",
578582
conns[conn_id].receiver_name,
579583
PQerrorMessage(conns[conn_id].pgconn));
584+
585+
dmq_sender_disconnect_hook(conns[conn_id].receiver_name);
580586
}
581587
}
582588
}
@@ -681,6 +687,8 @@ dmq_sender_main(Datum main_arg)
681687
mtm_log(DmqStateFinal,
682688
"[DMQ] Connected to %s",
683689
conns[conn_id].receiver_name);
690+
691+
dmq_sender_connect_hook(conns[conn_id].receiver_name);
684692
}
685693
break;
686694

@@ -695,6 +703,8 @@ dmq_sender_main(Datum main_arg)
695703
"[DMQ] connection error with %s: %s",
696704
conns[conn_id].receiver_name,
697705
PQerrorMessage(conns[conn_id].pgconn));
706+
707+
dmq_sender_disconnect_hook(conns[conn_id].receiver_name);
698708
}
699709
break;
700710
}

src/include/dmq.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
2929
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);
3030
extern void dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *buffer, size_t len);
3131

32-
typedef void (*dmq_receiver_hook_type) (char *);
33-
extern dmq_receiver_hook_type dmq_receiver_start_hook;
34-
extern dmq_receiver_hook_type dmq_receiver_stop_hook;
32+
typedef void (*dmq_hook_type) (char *);
33+
extern dmq_hook_type dmq_receiver_start_hook;
34+
extern dmq_hook_type dmq_receiver_stop_hook;
35+
extern dmq_hook_type dmq_sender_connect_hook;
36+
extern dmq_hook_type dmq_sender_disconnect_hook;
3537

3638
#endif

src/include/state.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ extern void MtmStateProcessEvent(MtmEvent ev, bool locked);
4848

4949
extern void MtmOnNodeDisconnect(char *node_name);
5050
extern void MtmOnNodeConnect(char *node_name);
51+
extern void MtmOnDmqSenderConnect(char *node_name);
52+
extern void MtmOnDmqSenderDisconnect(char *node_name);
5153

5254
extern void MtmRefreshClusterStatus(void);
5355

src/multimaster.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,8 @@ _PG_init(void)
505505
dmq_init();
506506
dmq_receiver_start_hook = MtmOnNodeConnect;
507507
dmq_receiver_stop_hook = MtmOnNodeDisconnect;
508+
dmq_sender_connect_hook = MtmOnDmqSenderConnect;
509+
dmq_sender_disconnect_hook = MtmOnDmqSenderDisconnect;
508510

509511
ResolverInit();
510512

src/state.c

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ struct MtmState
5858
{
5959
nodemask_t configured_mask;
6060
nodemask_t connected_mask;
61-
nodemask_t enabled_mask;
61+
nodemask_t dmq_senders_mask;
6262
nodemask_t receivers_mask;
6363
nodemask_t senders_mask;
64+
nodemask_t enabled_mask;
6465
nodemask_t clique;
6566

6667
bool referee_grant;
@@ -155,6 +156,7 @@ MtmStateFill(MtmConfig *cfg)
155156
mtm_state->recovery_slot = cfg->backup_node_id;
156157

157158
BIT_SET(mtm_state->connected_mask, cfg->my_node_id - 1);
159+
BIT_SET(mtm_state->dmq_senders_mask, cfg->my_node_id - 1);
158160

159161
/* re-create configured_mask */
160162
mtm_state->configured_mask = 0;
@@ -217,7 +219,7 @@ MtmCheckState(void)
217219
bool isEnabledState;
218220
MtmNodeStatus old_status;
219221
int nEnabled = popcount(mtm_state->enabled_mask);
220-
int nConnected = popcount(mtm_state->connected_mask);
222+
int nConnected = popcount(mtm_state->connected_mask & mtm_state->dmq_senders_mask);
221223
int nReceivers = popcount(mtm_state->receivers_mask);
222224
int nSenders = popcount(mtm_state->senders_mask);
223225
int nConfigured = popcount(mtm_state->configured_mask);
@@ -227,9 +229,10 @@ MtmCheckState(void)
227229
old_status = mtm_state->status;
228230

229231
mtm_log(MtmStateMessage,
230-
"[STATE] Status = (enabled=%s, connected=%s, clique=%s, receivers=%s, senders=%s, total=%i, referee_grant=%d)",
232+
"[STATE] Status = (enabled=%s, connected=%s, dmq_senders=%s, clique=%s, receivers=%s, senders=%s, total=%i, referee_grant=%d)",
231233
maskToString(mtm_state->enabled_mask),
232234
maskToString(mtm_state->connected_mask),
235+
maskToString(mtm_state->dmq_senders_mask),
233236
maskToString(mtm_state->clique),
234237
maskToString(mtm_state->receivers_mask),
235238
maskToString(mtm_state->senders_mask),
@@ -493,6 +496,31 @@ MtmOnNodeDisconnect(char *node_name)
493496
LWLockRelease(mtm_state->lock);
494497
}
495498

499+
void
500+
MtmOnDmqSenderConnect(char *node_name)
501+
{
502+
int node_id;
503+
sscanf(node_name, MTM_DMQNAME_FMT, &node_id);
504+
505+
LWLockAcquire(mtm_state->lock, LW_EXCLUSIVE);
506+
BIT_SET(mtm_state->dmq_senders_mask, node_id - 1);
507+
MtmCheckState();
508+
LWLockRelease(mtm_state->lock);
509+
}
510+
511+
void
512+
MtmOnDmqSenderDisconnect(char *node_name)
513+
{
514+
int node_id;
515+
sscanf(node_name, MTM_DMQNAME_FMT, &node_id);
516+
517+
LWLockAcquire(mtm_state->lock, LW_EXCLUSIVE);
518+
BIT_CLEAR(mtm_state->dmq_senders_mask, node_id - 1);
519+
MtmDisableNode(node_id);
520+
MtmCheckState();
521+
LWLockRelease(mtm_state->lock);
522+
}
523+
496524
// XXXX: make that event too
497525
void MtmOnNodeConnect(char *node_name)
498526
{

0 commit comments

Comments
 (0)