Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 97a3e42

Browse files
knizhnikkelvich
authored andcommitted
Add mtm.get_cluster_info function
1 parent abd79ad commit 97a3e42

File tree

5 files changed

+88
-29
lines changed

5 files changed

+88
-29
lines changed

bgwpool.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ typedef struct
1616
int id;
1717
} BgwPoolExecutorCtx;
1818

19-
size_t n_snapshots;
20-
size_t n_active;
21-
2219
static void BgwPoolMainLoop(Datum arg)
2320
{
2421
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)arg;
@@ -36,7 +33,8 @@ static void BgwPoolMainLoop(Datum arg)
3633
size = *(int*)&pool->queue[pool->head];
3734
Assert(size < pool->size);
3835
work = malloc(size);
39-
pool->active -= 1;
36+
pool->pending -= 1;
37+
pool->active += 1;
4038
if (pool->head + size + 4 > pool->size) {
4139
memcpy(work, pool->queue, size);
4240
pool->head = INTALIGN(size);
@@ -54,6 +52,9 @@ static void BgwPoolMainLoop(Datum arg)
5452
SpinLockRelease(&pool->lock);
5553
pool->executor(id, work, size);
5654
free(work);
55+
SpinLockAcquire(&pool->lock);
56+
pool->active -= 1;
57+
SpinLockRelease(&pool->lock);
5758
}
5859
}
5960

@@ -71,6 +72,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, si
7172
pool->tail = 0;
7273
pool->size = queueSize;
7374
pool->active = 0;
75+
pool->pending = 0;
7476
strcpy(pool->dbname, dbname);
7577
}
7678

@@ -126,9 +128,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
126128
PGSemaphoreLock(&pool->overflow);
127129
SpinLockAcquire(&pool->lock);
128130
} else {
129-
pool->active += 1;
130-
n_snapshots += 1;
131-
n_active += pool->active;
131+
pool->pending += 1;
132132
*(int*)&pool->queue[pool->tail] = size;
133133
if (pool->size - pool->tail >= size + 4) {
134134
memcpy(&pool->queue[pool->tail+4], work, size);

bgwpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ typedef struct
2020
size_t tail;
2121
size_t size;
2222
size_t active;
23+
size_t pending;
2324
bool producerBlocked;
2425
char dbname[MAX_DBNAME_LEN];
2526
char* queue;

multimaster--1.0.sql

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,22 @@ AS 'MODULE_PATHNAME','mtm_get_snapshot'
2424
LANGUAGE C;
2525

2626

27-
CREATE TYPE mtm.node_state AS (id integer, disabled bool, disconnected bool, catchUp bool, slotLag bigint, avgTransDelay bigint, lastStatusChange timestamp, connStr text);
27+
CREATE TYPE mtm.node_state AS ("id" integer, "disabled" bool, "disconnected" bool, "catchUp" bool, "slotLag" bigint, "avgTransDelay" bigint, "lastStatusChange" timestamp, "connStr" text);
2828

2929
CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
3030
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3131
LANGUAGE C;
3232

33-
CREATE TYPE mtm.cluster_state AS (status text, disabledNodeMask bigint, disconnectedNodeMask bigint, catchUpNodeMask bigint, nNodes integer, nActiveQueries integer, queueSize bigint, transCount bigint, timeShift bigint, recoverySlot integer);
33+
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "nNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer);
3434

3535
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3636
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATE FUNCTION mtm.get_cluster_info() RETURNS SETOF mtm.cluster_state
40+
AS 'MODULE_PATHNAME','mtm_get_cluster_info'
41+
LANGUAGE C;
42+
3943
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
4044
AS 'MODULE_PATHNAME','mtm_make_table_local'
4145
LANGUAGE C;

multimaster.c

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
107107
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
108108
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
109109
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
110+
PG_FUNCTION_INFO_V1(mtm_get_cluster_info);
110111
PG_FUNCTION_INFO_V1(mtm_make_table_local);
111112
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
112113

@@ -1608,7 +1609,7 @@ _PG_init(void)
16081609
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
16091610
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16101611
&Mtm2PCMinTimeout,
1611-
10000,
1612+
100000, /* 100 seconds */
16121613
0,
16131614
INT_MAX,
16141615
PGC_BACKEND,
@@ -1623,7 +1624,7 @@ _PG_init(void)
16231624
"Percent of prepare time for maximal time of second phase of two-pahse commit",
16241625
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
16251626
&Mtm2PCPrepareRatio,
1626-
100,
1627+
1000, /* 10 times */
16271628
0,
16281629
INT_MAX,
16291630
PGC_BACKEND,
@@ -2177,10 +2178,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
21772178
typedef struct
21782179
{
21792180
int nodeId;
2180-
char* connStrPtr;
21812181
TupleDesc desc;
2182-
Datum values[8];
2183-
bool nulls[8];
2182+
Datum values[Natts_mtm_nodes_state];
2183+
bool nulls[Natts_mtm_nodes_state];
21842184
} MtmGetNodeStateCtx;
21852185

21862186
Datum
@@ -2189,7 +2189,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21892189
FuncCallContext* funcctx;
21902190
MtmGetNodeStateCtx* usrfctx;
21912191
MemoryContext oldcontext;
2192-
char* p;
21932192
int64 lag;
21942193
bool is_first_call = SRF_IS_FIRSTCALL();
21952194

@@ -2199,7 +2198,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21992198
usrfctx = (MtmGetNodeStateCtx*)palloc(sizeof(MtmGetNodeStateCtx));
22002199
get_call_result_type(fcinfo, NULL, &usrfctx->desc);
22012200
usrfctx->nodeId = 1;
2202-
usrfctx->connStrPtr = pstrdup(MtmConnStrs);
22032201
memset(usrfctx->nulls, false, sizeof(usrfctx->nulls));
22042202
funcctx->user_fctx = usrfctx;
22052203
MemoryContextSwitchTo(oldcontext);
@@ -2218,23 +2216,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22182216
usrfctx->nulls[4] = lag < 0;
22192217
usrfctx->values[5] = Int64GetDatum(Mtm->transCount ? Mtm->nodes[usrfctx->nodeId-1].transDelay/Mtm->transCount : 0);
22202218
usrfctx->values[6] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[usrfctx->nodeId-1].lastStatusChangeTime));
2221-
p = strchr(usrfctx->connStrPtr, ',');
2222-
if (p != NULL) {
2223-
*p++ = '\0';
2224-
}
2225-
usrfctx->values[7] = CStringGetTextDatum(usrfctx->connStrPtr);
2226-
usrfctx->connStrPtr = p;
2219+
usrfctx->values[7] = CStringGetTextDatum(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
22272220
usrfctx->nodeId += 1;
22282221

22292222
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));
22302223
}
22312224

2225+
22322226
Datum
22332227
mtm_get_cluster_state(PG_FUNCTION_ARGS)
22342228
{
22352229
TupleDesc desc;
2236-
Datum values[10];
2237-
bool nulls[10] = {false};
2230+
Datum values[Natts_mtm_cluster_state];
2231+
bool nulls[Natts_mtm_cluster_state] = {false};
22382232
get_call_result_type(fcinfo, NULL, &desc);
22392233

22402234
values[0] = CStringGetTextDatum(MtmNodeStatusMnem[Mtm->status]);
@@ -2243,16 +2237,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
22432237
values[3] = Int64GetDatum(Mtm->nodeLockerMask);
22442238
values[4] = Int32GetDatum(Mtm->nNodes);
22452239
values[5] = Int32GetDatum((int)Mtm->pool.active);
2246-
values[6] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2247-
values[7] = Int64GetDatum(Mtm->transCount);
2248-
values[8] = Int64GetDatum(Mtm->timeShift);
2249-
values[9] = Int32GetDatum(Mtm->recoverySlot);
2250-
nulls[9] = Mtm->recoverySlot == 0;
2240+
values[6] = Int32GetDatum((int)Mtm->pool.pending);
2241+
values[7] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
2242+
values[8] = Int64GetDatum(Mtm->transCount);
2243+
values[9] = Int64GetDatum(Mtm->timeShift);
2244+
values[10] = Int32GetDatum(Mtm->recoverySlot);
22512245

22522246
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
22532247
}
22542248

22552249

2250+
typedef struct
2251+
{
2252+
int nodeId;
2253+
} MtmGetClusterInfoCtx;
2254+
2255+
2256+
Datum
2257+
mtm_get_cluster_info(PG_FUNCTION_ARGS)
2258+
{
2259+
2260+
FuncCallContext* funcctx;
2261+
MtmGetClusterInfoCtx* usrfctx;
2262+
MemoryContext oldcontext;
2263+
TupleDesc desc;
2264+
bool is_first_call = SRF_IS_FIRSTCALL();
2265+
int i;
2266+
PGconn* conn;
2267+
PGresult *result;
2268+
char* values[Natts_mtm_cluster_state];
2269+
HeapTuple tuple;
2270+
2271+
if (is_first_call) {
2272+
funcctx = SRF_FIRSTCALL_INIT();
2273+
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
2274+
usrfctx = (MtmGetClusterInfoCtx*)palloc(sizeof(MtmGetNodeStateCtx));
2275+
get_call_result_type(fcinfo, NULL, &desc);
2276+
funcctx->attinmeta = TupleDescGetAttInMetadata(desc);
2277+
usrfctx->nodeId = 1;
2278+
funcctx->user_fctx = usrfctx;
2279+
MemoryContextSwitchTo(oldcontext);
2280+
}
2281+
funcctx = SRF_PERCALL_SETUP();
2282+
usrfctx = (MtmGetClusterInfoCtx*)funcctx->user_fctx;
2283+
if (usrfctx->nodeId > MtmNodes) {
2284+
SRF_RETURN_DONE(funcctx);
2285+
}
2286+
conn = PQconnectdb(Mtm->nodes[usrfctx->nodeId-1].con.connStr);
2287+
if (PQstatus(conn) != CONNECTION_OK) {
2288+
elog(ERROR, "Failed to establish connection '%s' to node %d", Mtm->nodes[usrfctx->nodeId-1].con.connStr, usrfctx->nodeId);
2289+
}
2290+
result = PQexec(conn, "select * from mtm.get_cluster_state()");
2291+
2292+
if (PQresultStatus(result) != PGRES_TUPLES_OK || PQntuples(result) != 1) {
2293+
elog(ERROR, "Failed to receive data from %d", usrfctx->nodeId);
2294+
}
2295+
2296+
for (i = 0; i < Natts_mtm_cluster_state; i++) {
2297+
values[i] = PQgetvalue(result, 0, i);
2298+
}
2299+
tuple = BuildTupleFromCStrings(funcctx->attinmeta, values);
2300+
PQclear(result);
2301+
PQfinish(conn);
2302+
usrfctx->nodeId += 1;
2303+
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
2304+
}
2305+
2306+
22562307
Datum mtm_make_table_local(PG_FUNCTION_ARGS)
22572308
{
22582309
Oid reloid = PG_GETARG_OID(1);

multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
#define Anum_mtm_local_tables_rel_schema 1
5757
#define Anum_mtm_local_tables_rel_name 2
5858

59+
#define Natts_mtm_cluster_state 11
60+
#define Natts_mtm_nodes_state 8
61+
5962
typedef uint64 csn_t; /* commit serial number */
6063
#define INVALID_CSN ((csn_t)-1)
6164

0 commit comments

Comments
 (0)