Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7fd38fb

Browse files
committed
fix mtm.nodes() to be function and add rest of the info needed
1 parent 2193763 commit 7fd38fb

10 files changed

+89
-32
lines changed

Cluster.pm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ sub configure
151151
multimaster.heartbeat_send_timeout = 250
152152
multimaster.max_nodes = 6
153153
# XXX try without ignore_tables_without_pk
154-
multimaster.ignore_tables_without_pk = true
154+
# multimaster.ignore_tables_without_pk = true
155155
multimaster.volkswagen_mode = 1
156156
log_line_prefix = '%t [%p]: '
157157
));

multimaster--1.0.sql

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ CREATE TYPE mtm.node_info AS (
6262
"receiver_status" text
6363
);
6464

65+
CREATE TYPE mtm.node AS (
66+
"id" int,
67+
"conninfo" text,
68+
"is_self" bool,
69+
"enabled" bool,
70+
"connected" bool,
71+
"sender_pid" int,
72+
"receiver_pid" int,
73+
"n_workers" int,
74+
"receiver_status" text
75+
);
76+
6577
---
6678
--- User facing API for node info and management.
6779
---
@@ -84,8 +96,13 @@ RETURNS mtm.cluster_status
8496
AS 'MODULE_PATHNAME','mtm_status'
8597
LANGUAGE C;
8698

87-
CREATE VIEW mtm.nodes AS
88-
SELECT id, conninfo, is_self, (mtm.node_info(id)).* FROM mtm.cluster_nodes;
99+
CREATE OR REPLACE FUNCTION mtm.nodes() RETURNS SETOF mtm.node AS
100+
$$
101+
SELECT id, conninfo, is_self, (mtm.node_info(id)).*
102+
FROM mtm.cluster_nodes
103+
ORDER BY id;
104+
$$
105+
LANGUAGE sql;
89106

90107
CREATE OR REPLACE FUNCTION mtm.add_node(connstr text) RETURNS void AS
91108
$$

src/include/multimaster.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,12 @@ typedef struct
215215
int recoveryCount; /* Number of completed recoveries */
216216
int donorNodeId; /* Cluster node from which this node was
217217
* populated */
218-
int dmq_dest_ids[MTM_MAX_NODES];
218+
struct {
219+
MtmReplicationMode receiver_mode;
220+
pid_t sender_pid;
221+
pid_t receiver_pid;
222+
int dmq_dest_id;
223+
} peers[MTM_MAX_NODES];
219224
BgwPool pools[FLEXIBLE_ARRAY_MEMBER]; /* [Mtm->nAllNodes]: per-node data */
220225
} MtmState;
221226

src/include/receiver.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ typedef struct
1010
XLogRecPtr end_lsn;
1111
} MtmReceiverContext;
1212

13+
typedef enum
14+
{
15+
REPLMODE_RECOVERY, /* perform recovery of the node by applying all data from the slot from specified point */
16+
REPLMODE_RECOVERED /* recovery of receiver node is completed so drop old slot and restart replication from the current position in WAL */
17+
} MtmReplicationMode;
18+
19+
extern char const *const MtmReplicationModeName[];
20+
1321
extern BackgroundWorkerHandle *MtmStartReceiver(int nodeId, Oid db_id, Oid user_id, pid_t monitor_pid);
1422

1523
extern void MtmExecutor(void* work, size_t size, MtmReceiverContext *rctx);

src/multimaster.c

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ MtmStateShmemStartup()
332332
MemSet(Mtm, 0, sizeof(MtmState));
333333
Mtm->stop_new_commits = false;
334334
Mtm->recovered = false;
335-
Mtm->status = MTM_DISABLED; //MTM_INITIALIZATION;
335+
Mtm->status = MTM_DISABLED;
336336
Mtm->recoverySlot = 0;
337337
Mtm->locks = GetNamedLWLockTranche(MULTIMASTER_NAME);
338338
Mtm->disabledNodeMask = (nodemask_t) -1;
@@ -347,10 +347,13 @@ MtmStateShmemStartup()
347347
Mtm->localTablesHashLoaded = false;
348348
Mtm->latestSyncpoint = InvalidXLogRecPtr;
349349

350-
// XXX: change to dsa and make it per-receiver
351350
for (i = 0; i < MtmMaxNodes; i++)
352351
{
353-
Mtm->dmq_dest_ids[i] = -1;
352+
Mtm->peers[i].receiver_pid = InvalidPid;
353+
Mtm->peers[i].sender_pid = InvalidPid;
354+
Mtm->peers[i].dmq_dest_id = -1;
355+
356+
// XXX: change to dsa and make it per-receiver
354357
BgwPoolInit(&Mtm->pools[i], MtmExecutor, MtmQueueSize, 0);
355358
}
356359
}
@@ -838,6 +841,8 @@ mtm_init_cluster(PG_FUNCTION_ARGS)
838841
PG_RETURN_VOID();
839842
}
840843

844+
// XXX: During evaluation of (mtm.node_info(id)).* this function called
845+
// once each columnt for every row. So may be just rewrite to SRF.
841846
Datum
842847
mtm_node_info(PG_FUNCTION_ARGS)
843848
{
@@ -847,14 +852,38 @@ mtm_node_info(PG_FUNCTION_ARGS)
847852
bool nulls[Natts_mtm_node_info] = {false};
848853

849854
MtmLock(LW_SHARED);
855+
850856
values[Anum_mtm_node_info_enabled - 1] =
851857
BoolGetDatum(!BIT_CHECK(Mtm->disabledNodeMask, node_id - 1));
852858
values[Anum_mtm_node_info_connected - 1] =
853859
BoolGetDatum(!BIT_CHECK(Mtm->selfConnectivityMask, node_id - 1));
854-
values[Anum_mtm_node_info_sender_pid - 1] = Int32GetDatum(0);
855-
values[Anum_mtm_node_info_receiver_pid - 1] = Int32GetDatum(0);
856-
values[Anum_mtm_node_info_n_workers - 1] = Int32GetDatum(0);
857-
values[Anum_mtm_node_info_receiver_status - 1] = CStringGetTextDatum("unimplemented");
860+
861+
if (Mtm->peers[node_id - 1].sender_pid != InvalidPid)
862+
{
863+
values[Anum_mtm_node_info_sender_pid - 1] =
864+
Int32GetDatum(Mtm->peers[node_id - 1].sender_pid);
865+
}
866+
else
867+
{
868+
nulls[Anum_mtm_node_info_sender_pid - 1] = true;
869+
}
870+
871+
if (Mtm->peers[node_id - 1].receiver_pid != InvalidPid)
872+
{
873+
values[Anum_mtm_node_info_receiver_pid - 1] =
874+
Int32GetDatum(Mtm->peers[node_id - 1].receiver_pid);
875+
values[Anum_mtm_node_info_n_workers - 1] =
876+
Int32GetDatum(Mtm->pools[node_id - 1].nWorkers);
877+
values[Anum_mtm_node_info_receiver_status - 1] =
878+
CStringGetTextDatum(MtmReplicationModeName[Mtm->peers[node_id - 1].receiver_mode]);
879+
}
880+
else
881+
{
882+
nulls[Anum_mtm_node_info_receiver_pid - 1] = true;
883+
nulls[Anum_mtm_node_info_n_workers - 1] = true;
884+
nulls[Anum_mtm_node_info_receiver_status - 1] = true;
885+
}
886+
858887
MtmUnlock();
859888

860889
get_call_result_type(fcinfo, NULL, &desc);

src/pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -852,7 +852,7 @@ mtm_send_reply(TransactionId xid, int node_id, MtmMessageCode msg_code)
852852
MtmArbiterMessage msg;
853853

854854
MtmLock(LW_SHARED);
855-
dest_id = Mtm->dmq_dest_ids[node_id - 1];
855+
dest_id = Mtm->peers[node_id - 1].dmq_dest_id;
856856
MtmUnlock();
857857
Assert(dest_id >= 0);
858858

src/pglogical_proto.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
783783
mtm_log(ERROR, "Out-of-clique node %d tries to connect",
784784
MtmReplicationNodeId);
785785
}
786+
Mtm->peers[MtmReplicationNodeId - 1].sender_pid = MyProcPid;
786787
MtmUnlock();
787788

788789
if (hooks_data->is_recovery)
@@ -843,6 +844,7 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
843844
Assert(MtmReplicationNodeId >= 0);
844845

845846
MtmLock(LW_EXCLUSIVE);
847+
Mtm->peers[MtmReplicationNodeId - 1].sender_pid = InvalidPid;
846848
if (BIT_CHECK(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1))
847849
{
848850
BIT_CLEAR(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1);

src/pglogical_receiver.c

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,6 @@
6666

6767
bool MtmIsReceiver;
6868

69-
typedef enum
70-
{
71-
REPLMODE_EXIT, /* receiver should exit */
72-
REPLMODE_RECOVERED, /* recovery of receiver node is completed so drop old slot and restart replication from the current position in WAL */
73-
REPLMODE_RECOVERY, /* perform recovery of the node by applying all data from the slot from specified point */
74-
REPLMODE_CREATE_NEW, /* destination node is recovered: drop old slot and restart from roveredLsn position */
75-
REPLMODE_OPEN_EXISTED /* normal mode: use existed slot or create new one and start receiving data from it from the remembered position */
76-
} MtmReplicationMode;
77-
7869
typedef struct MtmFlushPosition
7970
{
8071
dlist_node node;
@@ -83,13 +74,10 @@ typedef struct MtmFlushPosition
8374
XLogRecPtr remote_end;
8475
} MtmFlushPosition;
8576

86-
static char const* const MtmReplicationModeName[] =
77+
char const* const MtmReplicationModeName[] =
8778
{
88-
"exit",
89-
"recovered", /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
90-
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
91-
"create_new", /* destination node is recovered: drop old slot and restart from roveredLsn position */
92-
"open_existed" /* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
79+
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
80+
"recovered" /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
9381
};
9482

9583
static dlist_head MtmLsnMapping = DLIST_STATIC_INIT(MtmLsnMapping);
@@ -550,6 +538,9 @@ pglogical_receiver_at_exit(int status, Datum arg)
550538
{
551539
int node_id = DatumGetInt32(arg);
552540
BgwPoolCancel(&Mtm->pools[node_id - 1]);
541+
MtmLock(LW_EXCLUSIVE);
542+
Mtm->peers[node_id - 1].receiver_pid = InvalidPid;
543+
MtmUnlock();
553544
}
554545

555546
void
@@ -639,6 +630,11 @@ pglogical_receiver_main(Datum main_arg)
639630
*/
640631
mode = MtmGetReplicationMode(nodeId);
641632

633+
MtmLock(LW_EXCLUSIVE);
634+
Mtm->peers[nodeId - 1].receiver_pid = MyProcPid;
635+
Mtm->peers[nodeId - 1].receiver_mode = mode;
636+
MtmUnlock();
637+
642638
// XXX: delete unnecessary modes
643639
Assert(mode == REPLMODE_RECOVERY || mode == REPLMODE_RECOVERED);
644640

src/resolver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ scatter_status_requests(MtmConfig *mtm_cfg)
407407

408408
// XXX: we need here to await destination
409409
MtmLock(LW_SHARED);
410-
dest_id = Mtm->dmq_dest_ids[node_id - 1];
410+
dest_id = Mtm->peers[node_id - 1].dmq_dest_id;
411411
MtmUnlock();
412412
Assert(dest_id >= 0);
413413

src/state.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ check_status_requests(MtmConfig *mtm_cfg)
981981
msg->node = Mtm->my_node_id;
982982

983983
MtmLock(LW_SHARED);
984-
dest_id = Mtm->dmq_dest_ids[sender_node_id - 1];
984+
dest_id = Mtm->peers[sender_node_id - 1].dmq_dest_id;
985985
MtmUnlock();
986986
Assert(dest_id >= 0);
987987

@@ -1128,7 +1128,7 @@ start_node_workers(int node_id, MtmConfig *new_cfg, Datum arg)
11281128
dest = dmq_destination_add(dmq_connstr, dmq_my_name, dmq_node_name,
11291129
MtmHeartbeatSendTimeout);
11301130
MtmLock(LW_EXCLUSIVE);
1131-
Mtm->dmq_dest_ids[node_id - 1] = dest;
1131+
Mtm->peers[node_id - 1].dmq_dest_id = dest;
11321132
MtmUnlock();
11331133

11341134
/* Attach receiver so we can collect tx requests */
@@ -1179,7 +1179,7 @@ stop_node_workers(int node_id, MtmConfig *new_cfg, Datum arg)
11791179
dmq_destination_drop(dmq_name);
11801180

11811181
MtmLock(LW_EXCLUSIVE);
1182-
Mtm->dmq_dest_ids[node_id - 1] = -1;
1182+
Mtm->peers[node_id - 1].dmq_dest_id = -1;
11831183
MtmUnlock();
11841184

11851185
/*
@@ -1354,7 +1354,7 @@ MtmMonitor(Datum arg)
13541354
}
13551355

13561356
/* Launch resolver after we added dmq destinations */
1357-
// XXX: that's because of current use of Mtm->dmq_dest_ids[]
1357+
// XXX: that's because of current use of Mtm->peers[].dmq_dest_id
13581358
if (resolver == NULL)
13591359
resolver = ResolverStart(db_id, user_id);
13601360

0 commit comments

Comments
 (0)