Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 015aa8f

Browse files
kvapkelvich
authored andcommitted
Add PQconnectdb_safe function which filters out some parameters incompatible with libpq.
1 parent fcc7ae0 commit 015aa8f

File tree

3 files changed

+35
-11
lines changed

3 files changed

+35
-11
lines changed

multimaster.c

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,8 +1859,6 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
18591859
elog(ERROR, "Invalid raftable port: %s", port+9);
18601860
}
18611861
n += 9;
1862-
memmove(port, port+n, connStrLen - n + 1);
1863-
connStrLen -= n;
18641862
} else {
18651863
conn->raftablePort = 0;
18661864
}
@@ -1872,8 +1870,6 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
18721870
elog(ERROR, "Invalid arbiter port: %s", port+12);
18731871
}
18741872
n += 12;
1875-
memmove(port, port+n, connStrLen - n + 1);
1876-
connStrLen -= n;
18771873
} else {
18781874
conn->arbiterPort = 0;
18791875
}
@@ -2796,6 +2792,32 @@ typedef struct
27962792
int nodeId;
27972793
} MtmGetClusterInfoCtx;
27982794

2795+
static void erase_option_from_connstr(const char *option, char *connstr)
2796+
{
2797+
char *needle = psprintf("%s=", option);
2798+
while (1) {
2799+
char *found = strstr(connstr, needle);
2800+
if (found == NULL) break;
2801+
while (*found != '\0' && *found != ' ') {
2802+
*found = ' ';
2803+
found++;
2804+
}
2805+
}
2806+
pfree(needle);
2807+
}
2808+
2809+
PGconn *PQconnectdb_safe(const char *conninfo)
2810+
{
2811+
PGconn *conn;
2812+
char *safe_connstr = pstrdup(conninfo);
2813+
erase_option_from_connstr("raftport", safe_connstr);
2814+
erase_option_from_connstr("arbiterport", safe_connstr);
2815+
2816+
conn = PQconnectdb(safe_connstr);
2817+
2818+
pfree(safe_connstr);
2819+
return conn;
2820+
}
27992821

28002822
Datum
28012823
mtm_get_cluster_info(PG_FUNCTION_ARGS)
@@ -2828,9 +2850,9 @@ mtm_get_cluster_info(PG_FUNCTION_ARGS)
28282850
if (usrfctx->nodeId > Mtm->nAllNodes) {
28292851
SRF_RETURN_DONE(funcctx);
28302852
}
2831-
conn = PQconnectdb(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2853+
conn = PQconnectdb_safe(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
28322854
if (PQstatus(conn) != CONNECTION_OK) {
2833-
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId);
2855+
elog(ERROR, "Failed to establish connection '%s' to node %d: error = %s", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId, PQerrorMessage(conn));
28342856
}
28352857
result = PQexec(conn, "select * from mtm.get_cluster_state()");
28362858

@@ -3004,7 +3026,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
30043026
{
30053027
if (!BIT_CHECK(disabledNodeMask, i))
30063028
{
3007-
conns[i] = PQconnectdb(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
3029+
conns[i] = PQconnectdb_safe(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
30083030
if (PQstatus(conns[i]) != CONNECTION_OK)
30093031
{
30103032
if (ignoreError)
@@ -3016,7 +3038,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
30163038
do {
30173039
PQfinish(conns[i]);
30183040
} while (--i >= 0);
3019-
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[failedNode].con.connStr, failedNode+1);
3041+
elog(ERROR, "Failed to establish connection '%s' to node %d, error = %s", Mtm->nodes[failedNode].con.connStr, failedNode+1, PQerrorMessage(conns[i]));
30203042
}
30213043
}
30223044
PQsetNoticeReceiver(conns[i], MtmNoticeReceiver, &i);

multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include "access/clog.h"
99
#include "pglogical_output/hooks.h"
10+
#include "libpq-fe.h"
1011

1112
#define DEBUG_LEVEL 0
1213

@@ -283,7 +284,7 @@ extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
283284
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
284285
extern bool MtmWatchdog(timestamp_t now);
285286
extern void MtmCheckHeartbeat(void);
286-
287+
extern PGconn *PQconnectdb_safe(const char *conninfo);
287288

288289

289290
#endif

pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,9 @@ pglogical_receiver_main(Datum main_arg)
217217
char *copybuf = NULL;
218218
int spill_file = -1;
219219
StringInfoData spill_info;
220+
char *slotName;
220221
char* connString = psprintf("replication=database %s", Mtm->nodes[nodeId-1].con.connStr);
221-
char* slotName = psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId);
222+
slotName = psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId);
222223

223224
initStringInfo(&spill_info);
224225

@@ -261,7 +262,7 @@ pglogical_receiver_main(Datum main_arg)
261262
count = Mtm->recoveryCount;
262263

263264
/* Establish connection to remote server */
264-
conn = PQconnectdb(connString);
265+
conn = PQconnectdb_safe(connString);
265266
status = PQstatus(conn);
266267
if (status != CONNECTION_OK)
267268
{

0 commit comments

Comments
 (0)