Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5f7a3ec

Browse files
knizhnikkelvich
authored andcommitted
Add mtm.get_cluster_state and mtm.get_nodes_state
1 parent f051d35 commit 5f7a3ec

File tree

4 files changed

+116
-2
lines changed

4 files changed

+116
-2
lines changed

bgwpool.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,16 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
9797
}
9898
}
9999

100+
size_t BgwPoolGetQueueSize(BgwPool* pool)
101+
{
102+
size_t used;
103+
SpinLockAcquire(&pool->lock);
104+
used = pool->head <= pool->tail ? pool->tail - pool->head : pool->size - pool->head + pool->tail;
105+
SpinLockRelease(&pool->lock);
106+
return used;
107+
}
108+
109+
100110
void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
101111
{
102112
Assert(size+4 <= pool->size);

bgwpool.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,6 @@ extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbn
3333

3434
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
3535

36+
extern size_t BgwPoolGetQueueSize(BgwPool* pool);
37+
3638
#endif

multimaster--1.0.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,17 @@ CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
2323
AS 'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

26+
27+
CREATE TYPE mtm.node_state(id integer, disabled bool, disconnected bool, catchUp boolean, slotLag bigint, connStr text);
28+
29+
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
30+
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
31+
LANGUAGE C;
32+
33+
CREATE TYPE mtm.cluster_state(status text, disabledNodeMask bigint, disconnectedNodeMask bigint, catchUpNodeMask bigint, nNodes integer, nActiveQueries integer, queueSize bigint);
34+
35+
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
36+
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
37+
LANGUAGE C;
38+
2639
CREATE TABLE IF NOT EXISTS mtm.ddl_log (issued timestamp with time zone not null, query text);

multimaster.c

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <time.h>
1111

1212
#include "postgres.h"
13+
#include "funcapi.h"
1314
#include "fmgr.h"
1415
#include "miscadmin.h"
1516
#include "libpq-fe.h"
@@ -91,6 +92,8 @@ PG_FUNCTION_INFO_V1(mtm_stop_replication);
9192
PG_FUNCTION_INFO_V1(mtm_drop_node);
9293
PG_FUNCTION_INFO_V1(mtm_recover_node);
9394
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
95+
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
96+
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
9497

9598
static Snapshot MtmGetSnapshot(Snapshot snapshot);
9699
static void MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
@@ -683,6 +686,22 @@ static void MtmCheckSlots()
683686
}
684687
}
685688

689+
static int64 MtmGetSlotLag(int nodeId)
690+
{
691+
int i;
692+
for (i = 0; i < max_replication_slots; i++) {
693+
ReplicationSlot* slot = &ReplicationSlotCtl->replication_slots[i];
694+
int node;
695+
if (slot->in_use
696+
&& sscanf(slot->data.name.data, MULTIMASTER_SLOT_PATTERN, &node) == 1
697+
&& node == nodeId)
698+
{
699+
return GetXLogInsertRecPtr() - slot->data.restart_lsn;
700+
}
701+
}
702+
return -1;
703+
}
704+
686705
static void
687706
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
688707
{
@@ -932,8 +951,8 @@ _PG_init(void)
932951
"Multimaster queue size",
933952
NULL,
934953
&MtmQueueSize,
935-
1024*1024,
936-
1024,
954+
256*1024*1024,
955+
1024*1024,
937956
INT_MAX,
938957
PGC_BACKEND,
939958
0,
@@ -1258,6 +1277,76 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
12581277
PG_RETURN_INT64(dtmTx.snapshot);
12591278
}
12601279

1280+
typedef struct
1281+
{
1282+
int nodeId;
1283+
char* connStrPtr;
1284+
TupleDesc desc;
1285+
Datum values[6];
1286+
bool nulls[6];
1287+
} MtmGetNodeStateCtx;
1288+
1289+
Datum
1290+
mtm_get_nodes_state(PG_FUNCTION_ARGS)
1291+
{
1292+
FuncCallContext* funcctx;
1293+
MtmGetNodeStateCtx* usrfctx;
1294+
MemoryContext oldcontext;
1295+
char* p;
1296+
bool is_first_call = SRF_IS_FIRSTCALL();
1297+
1298+
if (is_first_call) {
1299+
funcctx = SRF_FIRSTCALL_INIT();
1300+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1301+
usrfctx = (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
1302+
get_call_result_type(fcinfo, NULL, &usrfctx->desc);
1303+
usrfctx->nodeId = 1;
1304+
usrfctx->connStrPtr = pstrdup(MtmConnStrs);
1305+
memset(usrfctx->nulls, false, sizeof(usrfctx->nulls));
1306+
funcctx->user_fctx = usrfctx;
1307+
MemoryContextSwitchTo(oldcontext);
1308+
}
1309+
funcctx = SRF_PERCALL_SETUP();
1310+
usrfctx = (MtmGetNodeStateCtx*)funcctx->user_fctx;
1311+
if (usrfctx->nodeId > MtmNodes) {
1312+
SRF_RETURN_DONE(funcctx);
1313+
}
1314+
usrfctx->values[0] = Int32GetDatum(usrfctx->nodeId);
1315+
usrfctx->values[1] = BoolGetDatum(BIT_CHECK(dtm->disabledNodeMask, usrfctx->nodeId-1));
1316+
usrfctx->values[2] = BoolGetDatum(BIT_CHECK(dtm->connectivityMask, usrfctx->nodeId-1));
1317+
usrfctx->values[3] = BoolGetDatum(BIT_CHECK(dtm->nodeLockerMask, usrfctx->nodeId-1));
1318+
usrfctx->values[4] = Int64GetDatum(MtmGetSlotLag(usrfctx->nodeId));
1319+
p = strchr(usrfctx->connStrPtr, ',');
1320+
if (p != NULL) {
1321+
*p++ = '\0';
1322+
}
1323+
usrfctx->values[5] = CStringGetTextDatum(usrfctx->connStrPtr);
1324+
usrfctx->connStrPtr = p;
1325+
usrfctx->nodeId += 1;
1326+
1327+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));
1328+
}
1329+
1330+
Datum
1331+
mtm_get_cluster_state(PG_FUNCTION_ARGS)
1332+
{
1333+
TupleDesc desc;
1334+
Datum values[7];
1335+
bool nulls[7] = {false};
1336+
1337+
get_call_result_type(fcinfo, NULL, &desc);
1338+
1339+
values[0] = CStringGetTextDatum(MtmNodeStatusMnem[dtm->status]);
1340+
values[1] = Int64GetDatum(dtm->disabledNodeMask);
1341+
values[2] = Int64GetDatum(dtm->connectivityMask);
1342+
values[3] = Int64GetDatum(dtm->nodeLockerMask);
1343+
values[4] = Int32GetDatum(dtm->nNodes);
1344+
values[5] = Int32GetDatum((int)dtm->pool.active);
1345+
values[6] = Int64GetDatum(BgwPoolGetQueueSize(&dtm->pool));
1346+
1347+
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
1348+
}
1349+
12611350
/*
12621351
* Execute statement with specified parameters and check its result
12631352
*/

0 commit comments

Comments
 (0)