Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3edfdff

Browse files
knizhnikkelvich
authored andcommitted
Add information about BGW to node status
1 parent 5b20bea commit 3edfdff

File tree

5 files changed

+23
-5
lines changed

5 files changed

+23
-5
lines changed

multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ AS 'MODULE_PATHNAME','mtm_get_last_csn'
3636
LANGUAGE C;
3737

3838

39-
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "connStr" text);
39+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "oldestSnapshot" bigint, "SenderPid" integer, "SenderStartTime" timestamp, "ReceiverPid" integer, "ReceiverStartTime" timestamp, "connStr" text);
4040

4141
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
4242
AS 'MODULE_PATHNAME','mtm_get_nodes_state'

multimaster.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,7 +2073,8 @@ void MtmDropNode(int nodeId, bool dropSlot)
20732073
static void
20742074
MtmOnProcExit(int code, Datum arg)
20752075
{
2076-
if (MtmReplicationNodeId >= 0) {
2076+
if (MtmReplicationNodeId > 0) {
2077+
Mtm->nodes[MtmReplicationNodeId-1].senderPid = -1;
20772078
MTM_LOG1("WAL-sender to %d is terminated", MtmReplicationNodeId);
20782079
MtmOnNodeDisconnect(MtmReplicationNodeId);
20792080
}
@@ -2085,6 +2086,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
20852086
ListCell *param;
20862087
bool recoveryCompleted = false;
20872088
MtmIsRecoverySession = false;
2089+
Mtm->nodes[MtmReplicationNodeId-1].senderPid = MyProcPid;
2090+
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime = MtmGetSystemTime();
20882091
foreach(param, args->in_params)
20892092
{
20902093
DefElem *elem = lfirst(param);
@@ -2377,7 +2380,11 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
23772380
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
23782381
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime/USECS_PER_SEC));
23792382
usrfctx->values[7] = Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].oldestSnapshot);
2380-
usrfctx->values[8] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2383+
usrfctx->values[8] = Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderPid);
2384+
usrfctx->values[9] = Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].senderStartTime);
2385+
usrfctx->values[10] = Int32GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverPid);
2386+
usrfctx->values[11] = Int64GetDatum(Mtm->nodes[usrfctx->nodeId-1].receiverStartTime);
2387+
usrfctx->values[12] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
23812388
usrfctx->nodeId += 1;
23822389

23832390
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));

multimaster.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
#define Anum_mtm_local_tables_rel_name 2
6161

6262
#define Natts_mtm_cluster_state 16
63-
#define Natts_mtm_nodes_state 9
63+
#define Natts_mtm_nodes_state 13
6464

6565
typedef uint64 csn_t; /* commit serial number */
6666
#define INVALID_CSN ((csn_t)-1)
@@ -125,8 +125,12 @@ typedef struct
125125
MtmConnectionInfo con;
126126
timestamp_t transDelay;
127127
timestamp_t lastStatusChangeTime;
128+
timestamp_t receiverStartTime;
129+
timestamp_t senderStartTime;
130+
int senderPid;
131+
int receiverPid;
128132
XLogRecPtr flushPos;
129-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
133+
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
130134
} MtmNodeInfo;
131135

132136
typedef struct MtmTransState

pglogical_proto.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
146146
} else {
147147
csn_t csn = MtmTransactionSnapshot(txn->xid);
148148
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
149+
/*
150+
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151+
* We do not need to send such transactions unless we perform recovery
152+
*/
149153
if (csn == INVALID_CSN && !isRecovery) {
150154
return;
151155
}

pglogical_receiver.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ pglogical_receiver_main(Datum main_arg)
226226

227227
MtmCreateSpillDirectory(nodeId);
228228

229+
Mtm->nodes[nodeId-1].senderPid = MyProcPid;
230+
Mtm->nodes[nodeId-1].senderStartTime = MtmGetSystemTime();
231+
229232
sprintf(worker_proc, "mtm_pglogical_receiver_%d_%d", MtmNodeId, nodeId);
230233

231234
/* We're now ready to receive signals */

0 commit comments

Comments
 (0)