Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b8c0a63

Browse files
knizhnikkelvich
authored andcommitted
Prevent recusive broadcast by setting special applicastion name
1 parent 16df7ec commit b8c0a63

File tree

3 files changed

+15
-20
lines changed

3 files changed

+15
-20
lines changed

multimaster.c

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
798798
* Send notification only if ABORT happens during transaction processing at replicas,
799799
* do not send notification if ABORT is receiver from master
800800
*/
801-
MTM_TRACE("%d: send ABORT notification to coordinator %d\n", MyProcPid, x->gtid.node);
801+
MTM_INFO("%d: send ABORT notification abort transaction %d to coordinator %d\n", MyProcPid, x->gtid.xid, x->gtid.node);
802802
if (ts == NULL) {
803803
Assert(TransactionIdIsValid(x->xid));
804804
ts = hash_search(MtmXid2State, &x->xid, HASH_ENTER, NULL);
@@ -1602,6 +1602,11 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
16021602
return Mtm->recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
16031603
}
16041604

1605+
static bool MtmIsBroadcast()
1606+
{
1607+
return application_name != NULL && strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) == 0;
1608+
}
1609+
16051610
void MtmRecoverNode(int nodeId)
16061611
{
16071612
if (nodeId <= 0 || nodeId > Mtm->nNodes)
@@ -1611,7 +1616,7 @@ void MtmRecoverNode(int nodeId)
16111616
if (!BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
16121617
elog(ERROR, "Node %d was not disabled", nodeId);
16131618
}
1614-
if (!IsTransactionBlock())
1619+
if (!MtmIsBroadcast())
16151620
{
16161621
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')", nodeId), true);
16171622
}
@@ -1628,7 +1633,7 @@ void MtmDropNode(int nodeId, bool dropSlot)
16281633
}
16291634
BIT_SET(Mtm->disabledNodeMask, nodeId-1);
16301635
Mtm->nNodes -= 1;
1631-
if (!IsTransactionBlock())
1636+
if (!MtmIsBroadcast())
16321637
{
16331638
MtmBroadcastUtilityStmt(psprintf("select mtm.drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
16341639
}
@@ -1648,7 +1653,6 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
16481653
static bool
16491654
MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
16501655
{
1651-
elog(WARNING, "MtmReplicationTxnFilterHook: args->origin_id=%d, MtmReplicationNodeId=%d", args->origin_id, MtmReplicationNodeId);
16521656
return args->origin_id == InvalidRepOriginId || MtmIsRecoveredNode(MtmReplicationNodeId);
16531657
}
16541658

@@ -1795,7 +1799,6 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
17951799
{
17961800
PGresult *result = PQexec(conn, sql);
17971801
int status = PQresultStatus(result);
1798-
char *errstr;
17991802

18001803
bool ret = status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK;
18011804

@@ -1815,25 +1818,18 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg)
18151818

18161819
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
18171820
{
1818-
char* conn_str = pstrdup(MtmConnStrs);
1819-
char* conn_str_end = conn_str + strlen(conn_str);
18201821
int i = 0;
18211822
nodemask_t disabledNodeMask = Mtm->disabledNodeMask;
18221823
int failedNode = -1;
18231824
char const* errorMsg = NULL;
18241825
PGconn **conns = palloc0(sizeof(PGconn*)*MtmNodes);
18251826
char* utility_errmsg;
18261827

1827-
while (conn_str < conn_str_end)
1828+
for (i = 0; i < MtmNodes; i++)
18281829
{
1829-
char* p = strchr(conn_str, ',');
1830-
if (p == NULL) {
1831-
p = conn_str_end;
1832-
}
1833-
*p = '\0';
18341830
if (!BIT_CHECK(disabledNodeMask, i))
18351831
{
1836-
conns[i] = PQconnectdb(conn_str);
1832+
conns[i] = PQconnectdb(psprintf("%s application_name=%s", Mtm->nodes[i].con.connStr, MULTIMASTER_BROADCAST_SERVICE));
18371833
if (PQstatus(conns[i]) != CONNECTION_OK)
18381834
{
18391835
if (ignoreError)
@@ -1845,12 +1841,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
18451841
do {
18461842
PQfinish(conns[i]);
18471843
} while (--i >= 0);
1848-
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
1844+
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[i].con.connStr, failedNode);
18491845
}
18501846
}
18511847
}
1852-
conn_str = p + 1;
1853-
i += 1;
18541848
}
18551849
Assert(i == MtmNodes);
18561850

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#define MULTIMASTER_MAX_SLOT_NAME_SIZE 16
2727
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
2828
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
29+
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
2930

3031
#define USEC 1000000
3132

pglogical_apply.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -934,10 +934,10 @@ void MtmExecutor(int id, void* work, size_t size)
934934
{
935935
EmitErrorReport();
936936
FlushErrorState();
937-
MTM_TRACE("%d: REMOTE begin abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
938-
MtmEndSession();
937+
MTM_INFO("%d: REMOTE begin abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
938+
MtmEndSession(false);
939939
AbortCurrentTransaction();
940-
MTM_TRACE("%d: REMOTE end abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
940+
MTM_INFO("%d: REMOTE end abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
941941
}
942942
PG_END_TRY();
943943

0 commit comments

Comments
 (0)