Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2b5b567

Browse files
danolivoMikhail Rutman
authored and
Mikhail Rutman
committed
Fix fragile issue of DMQ sender service. Restart Campaigner at the DMQ sender service exit. Do it to ensure that critical data are cleaned before next voting.
Tags: Multimaster
1 parent 02f0b9e commit 2b5b567

File tree

5 files changed

+42
-50
lines changed

5 files changed

+42
-50
lines changed

src/dmq.c

+12-20
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
#include "logger.h"
3333
#include "compat.h"
3434
#include "mtm_utils.h"
35+
#include "multimaster.h"
36+
#include "state.h"
3537

3638
#include "access/transam.h"
3739
#include "libpq/libpq.h"
@@ -52,23 +54,6 @@
5254
#define DMQ_MQ_SIZE ((Size) 65536)
5355
#define DMQ_MQ_MAGIC 0x646d71
5456

55-
/* XXX: move to common */
56-
#define BIT_CLEAR(mask, bit) ((mask) &= ~((uint64)1 << (bit)))
57-
#define BIT_CHECK(mask, bit) (((mask) & ((uint64)1 << (bit))) != 0)
58-
static int
59-
first_set_bit(uint64 mask)
60-
{
61-
int i;
62-
63-
for (i = 0; i < DMQ_N_MASK_POS; i++)
64-
{
65-
if (BIT_CHECK(mask, i))
66-
return i;
67-
}
68-
return -1;
69-
}
70-
71-
7257
/*
7358
* Shared data structures to hold current connections topology.
7459
* All that stuff can be moved to persistent tables to avoid hardcoded
@@ -497,6 +482,12 @@ dmq_sender_at_exit(int status, Datum arg)
497482
}
498483
}
499484
LWLockRelease(dmq_state->lock);
485+
486+
/*
487+
* Restart the Campaigner to be sure that all critical data reset before the
488+
* next voting.
489+
*/
490+
CampaignerStop();
500491
}
501492

502493
void
@@ -1390,7 +1381,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
13901381
void *extra = NULL;
13911382

13921383
/*
1393-
* We do not call MtmDisbaleTimeouts() here because of connection to this
1384+
* We do not call MtmDisableTimeouts() here because of connection to this
13941385
* client is made by MtmPQconnectPoll() that sets all needed timeouts.
13951386
*/
13961387

@@ -1660,7 +1651,7 @@ dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *payload
16601651
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, false);
16611652
pfree(buf.data);
16621653
if (res != SHM_MQ_SUCCESS)
1663-
mtm_log(WARNING, "[DMQ] dmq_push: can't send to queue");
1654+
mtm_log(ERROR, "[DMQ] dmq_push: can't send to queue");
16641655
}
16651656

16661657
/*
@@ -1776,7 +1767,8 @@ dmq_reattach_shm_mq(int handle_id)
17761767
* from which receivers caller wants to get message and filters inhandles
17771768
* through it.
17781769
*/
1779-
void dmq_attach_receiver(char *sender_name, int8 mask_pos)
1770+
void
1771+
dmq_attach_receiver(char *sender_name, int8 mask_pos)
17801772
{
17811773
int i;
17821774
int handle_id = -1;

src/include/state.h

+1
Original file line numberDiff line numberDiff line change
@@ -149,5 +149,6 @@ extern void MtmMonitorStart(Oid db_id, Oid user_id);
149149
extern void MtmRefreshClusterStatus(void);
150150
extern nodemask_t MtmGetDisabledNodeMask(void);
151151
extern nodemask_t MtmGetEnabledNodeMask(bool ignore_disabled);
152+
extern void CampaignerStop(void);
152153

153154
#endif

src/mtm_utils.c

+12-29
Original file line numberDiff line numberDiff line change
@@ -22,42 +22,36 @@
2222
*
2323
* This timeouts, when set in the postgres config file, affect all process.
2424
* The multimaster needs his sessions not to be interrupted, so we disable
25-
* these timeouts.
25+
* these timeouts.
2626
*
2727
* This function raises an error on PQExec failed.
2828
*/
29-
static void
29+
static bool
3030
disable_client_timeouts(PGconn *conn)
3131
{
3232
PGresult *res;
3333

3434
res = PQexec(conn, "SET statement_timeout = 0");
3535
if (PQresultStatus(res) != PGRES_COMMAND_OK)
36-
{
37-
char *msg = pchomp(PQerrorMessage(conn));
38-
mtm_log(ERROR, "failed to set statement_timeout: %s", msg);
39-
}
36+
mtm_log(ERROR, "failed to set statement_timeout: %s",
37+
pchomp(PQerrorMessage(conn)));
4038

4139
res = PQexec(conn, "SET lock_timeout = 0");
4240
if (PQresultStatus(res) != PGRES_COMMAND_OK)
43-
{
44-
char *msg = pchomp(PQerrorMessage(conn));
45-
mtm_log(ERROR, "failed to set lock_timeout: %s", msg);
46-
}
41+
mtm_log(ERROR, "failed to set lock_timeout: %s",
42+
pchomp(PQerrorMessage(conn)));
4743

4844
res = PQexec(conn, "SET idle_in_transaction_session_timeout = 0");
4945
if (PQresultStatus(res) != PGRES_COMMAND_OK)
50-
{
51-
char *msg = pchomp(PQerrorMessage(conn));
52-
mtm_log(ERROR, "failed to set idle_in_transaction_session_timeout: %s", msg);
53-
}
46+
mtm_log(ERROR, "failed to set idle_in_transaction_session_timeout: %s",
47+
pchomp(PQerrorMessage(conn)));
5448

5549
res = PQexec(conn, "SET idle_session_timeout = 0");
5650
if (PQresultStatus(res) != PGRES_COMMAND_OK)
57-
{
58-
char *msg = pchomp(PQerrorMessage(conn));
59-
mtm_log(ERROR, "failed to set idle_session_timeout: %s", msg);
60-
}
51+
mtm_log(ERROR, "failed to set idle_session_timeout: %s",
52+
pchomp(PQerrorMessage(conn)));
53+
54+
return true;
6155
}
6256

6357
/*
@@ -73,24 +67,13 @@ extern void
7367
MtmDisableTimeouts(void)
7468
{
7569
if (get_timeout_active(STATEMENT_TIMEOUT))
76-
{
7770
disable_timeout(STATEMENT_TIMEOUT, false);
78-
}
79-
8071
if (get_timeout_active(LOCK_TIMEOUT))
81-
{
8272
disable_timeout(LOCK_TIMEOUT, false);
83-
}
84-
8573
if (get_timeout_active(IDLE_IN_TRANSACTION_SESSION_TIMEOUT))
86-
{
8774
disable_timeout(IDLE_IN_TRANSACTION_SESSION_TIMEOUT, false);
88-
}
89-
9075
if (get_timeout_active(IDLE_SESSION_TIMEOUT))
91-
{
9276
disable_timeout(IDLE_SESSION_TIMEOUT, false);
93-
}
9477
}
9578

9679
/*

src/multimaster.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ static void MtmShmemStartup(void);
9999

100100
static void launcher_init(void);
101101
void launcher_main(Datum main_arg);
102-
void drop_node_entry(int node_id);
103102

104103
MtmShared *Mtm;
105104

@@ -1863,6 +1862,7 @@ launcher_main(Datum main_arg)
18631862
/* init this worker */
18641863
pqsignal(SIGTERM, die);
18651864
BackgroundWorkerUnblockSignals();
1865+
MtmBackgroundWorker = true;
18661866

18671867
memset(&hash_info, 0, sizeof(hash_info));
18681868
hash_info.entrysize = hash_info.keysize = sizeof(Oid);

src/state.c

+16
Original file line numberDiff line numberDiff line change
@@ -882,6 +882,14 @@ CampaignerWake(void)
882882
kill(mtm_state->campaigner_pid, SIGHUP);
883883
}
884884

885+
/* Service to restart a campaigner process. */
886+
void
887+
CampaignerStop(void)
888+
{
889+
if (mtm_state->campaigner_pid != 0)
890+
kill(mtm_state->campaigner_pid, SIGTERM);
891+
}
892+
885893
/* campaigner never rereads PG config, but it currently it hardly needs to */
886894
static void
887895
CampaignerSigHupHandler(SIGNAL_ARGS)
@@ -3856,6 +3864,12 @@ stop_node_workers(int node_id, MtmConfig *new_cfg, Datum arg)
38563864
pfree(bgws[node_id - 1].handle);
38573865
bgws[node_id - 1].handle = NULL;
38583866

3867+
/*
3868+
* Only cleaning a name field guarantees that monitor wouldn't restart this
3869+
* receiver.
3870+
*/
3871+
bgws[node_id - 1].name[0] = '\0';
3872+
38593873
/* delete recovery slot, was acquired by receiver */
38603874
ReplicationSlotDrop(filter_slot_name, true);
38613875

@@ -4102,6 +4116,8 @@ MtmMonitor(Datum arg)
41024116
* Note that if for some reason monitor wasn't running
41034117
* (e.g. process killed) during node drop, cleanup in
41044118
* stop_node_workers will be skipped. Very unlikely, but not nice.
4119+
* XXX: Should rethink this code, because problem, described above,
4120+
* has cought in PGPRO-6146.
41054121
*/
41064122
mtm_cfg = MtmReloadConfig(mtm_cfg, start_node_workers,
41074123
stop_node_workers, PointerGetDatum(bgws),

0 commit comments

Comments
 (0)