Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c6d60df

Browse files
committed
Squashed 'contrib/pg_shardman/' changes from c350eaf..4c97bed
4c97bed Add monitor function 07ff8f9 Merge branch 'broadcast' of https://git.postgrespro.ru/a.sher/pg_shardman into broadcast e7cfaa5 Add monitor_deadlocks function git-subtree-dir: contrib/pg_shardman git-subtree-split: 4c97bed
1 parent 852c8c0 commit c6d60df

File tree

2 files changed

+154
-62
lines changed

2 files changed

+154
-62
lines changed

pg_shardman--0.0.2.sql

Lines changed: 90 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2007,41 +2007,104 @@ BEGIN
20072007
LOOP
20082008
poll := format('%s%s:SELECT shardman.serialize_lock_graph();', poll, node_id);
20092009
END LOOP;
2010-
SELECT shardman.broadcast(poll) INTO graph;
2010+
SELECT shardman.broadcast(poll, ignore_errors => true) INTO graph;
20112011

20122012
RETURN graph;
20132013
END;
20142014
$$ LANGUAGE plpgsql;
20152015

20162016

2017-
-- Detect distributed deadlock and return set of process involed in deadlock. If there is no deadlock then this view ias empty.
2018-
--
2019-
-- This query is based on the algorithm described by Knuth for detecting a cycle in a linked list. In one column, keep track of the children,
2020-
-- the children's children, the children's children's children, etc. In another column, keep track of the grandchildren, the grandchildren's grandchildren,
2021-
-- the grandchildren's grandchildren's grandchildren, etc.
2022-
--
2023-
-- For the initial selection, the distance between Child and Grandchild columns is 1. Every selection from union all increases the depth of Child by 1, and that of Grandchild by 2.
2024-
-- The distance between them increases by 1.
2025-
--
2026-
-- If there is any loop, since the distance only increases by 1 each time, at some point after Child is in the loop, the distance will be a multiple of the cycle length.
2027-
-- When that happens, the Child and the Grandchild columns are the same. Use that as an additional condition to stop the recursion, and detect it in the rest of your code as an error.
2028-
CREATE VIEW deadlock AS
2029-
WITH RECURSIVE LinkTable AS (SELECT wait AS Parent, hold AS Child FROM shardman.deserialize_lock_graph(shardman.global_lock_graph())),
2017+
-- Detect distributed deadlock and returns path in the lock graph forming deadlock loop
2018+
CREATE FUNCTION detect_deadlock(lock_graph text) RETURNS shardman.process[] AS $$
2019+
WITH RECURSIVE LinkTable AS (SELECT wait AS Parent, hold AS Child FROM shardman.deserialize_lock_graph(lock_graph)),
20302020
cte AS (
2031-
SELECT lt1.Parent, lt1.Child, lt2.Child AS Grandchild
2032-
FROM LinkTable lt1
2033-
INNER JOIN LinkTable lt2 on lt2.Parent = lt1.Child
2034-
UNION ALL
2035-
SELECT cte.Parent, lt1.Child, lt3.Child AS Grandchild
2036-
FROM cte
2037-
INNER JOIN LinkTable lt1 ON lt1.Parent = cte.Child
2038-
INNER JOIN LinkTable lt2 ON lt2.Parent = cte.Grandchild
2039-
INNER JOIN LinkTable lt3 ON lt3.Parent = lt2.Child
2040-
WHERE cte.Child <> cte.Grandchild
2021+
SELECT Child, Parent, ARRAY[Child] AS AllParents, false AS Loop
2022+
FROM LinkTable
2023+
UNION ALL
2024+
SELECT c.Child, c.Parent, p.AllParents||c.Child, c.Child=ANY(p.AllParents)
2025+
FROM LinkTable c JOIN cte p ON c.Parent = p.Child AND NOT p.Loop
20412026
)
2042-
SELECT DISTINCT Parent
2043-
FROM cte
2044-
WHERE Child = Grandchild;
2027+
SELECT AllParents FROM cte WHERE Loop;
2028+
$$ LANGUAGE sql;
2029+
2030+
-- Monitor cluster for presence of distributed deadlocks and node failures.
2031+
-- Tries to cancel queries causing deadlock and exclude unavailable nodes from the cluser.
2032+
CREATE FUNCTION monitor(deadlock_check_timeout_sec int = 5, rm_node_timeout_sec int = 60) RETURNS void AS $$
2033+
DECLARE
2034+
prev_deadlock_path shardman.process[];
2035+
deadlock_path shardman.process[];
2036+
victim shardman.process;
2037+
loop_begin int;
2038+
loop_end int;
2039+
prev_loop_begin int;
2040+
prev_loop_end int;
2041+
sep int;
2042+
resp text;
2043+
error_begin int;
2044+
error_end int;
2045+
error_msg text;
2046+
error_node_id int;
2047+
failed_node_id int;
2048+
failure_timestamp timestamp with time zone;
2049+
BEGIN
2050+
IF shardman.redirect_to_shardlord(format('monitor(%s, %s)', deadlock_check_timeout_sec, rm_node_timeout_sec))
2051+
THEN
2052+
RETURN;
2053+
END IF;
2054+
2055+
RAISE NOTICE 'Start cluster monitor...';
2056+
2057+
LOOP
2058+
resp := shardman.global_lock_graph();
2059+
error_begin := position('<error>' IN resp);
2060+
IF error_begin<>0
2061+
THEN
2062+
error_end := position('</error>' IN resp);
2063+
sep := position(':' IN resp);
2064+
error_node_id := substring(resp FROM error_begin+7 FOR sep-error_begin-7)::int;
2065+
error_msg := substring(resp FROM sep+1 FOR error_end-sep-1);
2066+
IF error_node_id = failed_node_id
2067+
THEN
2068+
IF clock_timestamp() > failure_timestamp + rm_node_timeout_sec * interval '1 sec'
2069+
THEN
2070+
RAISE NOTICE 'Remove node % because of % timeout expiration', failed_node_id, rm_node_timeout_sec;
2071+
PERFORM shardman.broadcast(format('0:SELECT shardman.rm_node(%s, force=>true);', failed_node_id));
2072+
failed_node_id := null;
2073+
END IF;
2074+
ELSE
2075+
RAISE NOTICE 'Node % reports error message %', error_node_id, error_msg;
2076+
failed_node_id := error_node_id;
2077+
failure_timestamp := clock_timestamp();
2078+
END IF;
2079+
prev_deadlock_path := null;
2080+
ELSE
2081+
failed_node_id := null;
2082+
deadlock_path := shardman.detect_deadlock(resp);
2083+
loop_end := array_upper(deadlock_path, 1);
2084+
loop_begin := array_position(deadlock_path, deadlock_path[loop_end]);
2085+
-- Check if old and new lock graph contain the same subgraph.
2086+
-- Because we can not make consistent distributed snapshot,
2087+
-- collected global local graph can contain "false" loops.
2088+
-- So we report deadlock only if detected loop persists during
2089+
-- deadlock detection period.
2090+
IF prev_deadlock_path IS NOT NULL
2091+
AND loop_end - loop_begin = prev_loop_end - prev_loop_begin
2092+
AND deadlock_path[loop_begin:loop_end] = prev_deadlock_path[prev_loop_begin:prev_loop_end]
2093+
THEN
2094+
-- Try to cancel random node in loop
2095+
victim := deadlock_path[loop_begin + ((loop_end - loop_begin)*random())::integer];
2096+
RAISE NOTICE 'Detect deadlock: cancel process % at node %', victim.pid, victim.node;
2097+
PERFORM shardman.broadcast(format('%s:SELECT pg_cancel_backend(%s);',
2098+
victim.node, victim.pid));
2099+
END IF;
2100+
prev_deadlock_path := deadlock_path;
2101+
prev_loop_begin := loop_begin;
2102+
prev_loop_end := loop_end;
2103+
END IF;
2104+
PERFORM pg_sleep(deadlock_check_timeout_sec);
2105+
END LOOP;
2106+
END;
2107+
$$ LANGUAGE plpgsql;
20452108

20462109

20472110
-- View for monitoring logical replication lag.

pg_shardman.c

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,17 @@ wait_command_completion(PGconn* conn)
122122
return true;
123123
}
124124

125+
typedef struct
126+
{
127+
PGconn* con;
128+
char* sql;
129+
int node;
130+
} Channel;
131+
125132
Datum
126133
broadcast(PG_FUNCTION_ARGS)
127134
{
128-
char *sql_full = text_to_cstring(PG_GETARG_TEXT_PP(0));
135+
char* sql_full = text_to_cstring(PG_GETARG_TEXT_PP(0));
129136
char* cmd = pstrdup(sql_full);
130137
bool ignore_errors = PG_GETARG_BOOL(1);
131138
bool two_phase = PG_GETARG_BOOL(2);
@@ -143,17 +150,18 @@ broadcast(PG_FUNCTION_ARGS)
143150
int n_cmds = 0;
144151
int i;
145152
int n_cons = 1024;
146-
PGconn** conn;
153+
Channel* chan;
154+
PGconn* con;
147155
StringInfoData resp;
148156

149-
char* errmsg = NULL;
157+
char const* errmsg = "";
150158

151159
elog(DEBUG1, "Broadcast commmand '%s'", cmd);
152160

153161
initStringInfo(&resp);
154162

155163
SPI_connect();
156-
conn = (PGconn**) palloc(sizeof(PGconn*) * n_cons);
164+
chan = (Channel*) palloc(sizeof(Channel) * n_cons);
157165

158166
while ((sep = strchr(cmd, *cmd == '{' ? '}' : ';')) != NULL)
159167
{
@@ -163,7 +171,7 @@ broadcast(PG_FUNCTION_ARGS)
163171
cmd += 1;
164172
rc = sscanf(cmd, "%d:%n", &node_id, &n);
165173
if (rc != 1) {
166-
elog(ERROR, "SHARDMAN: Invalid command string: %s", cmd);
174+
elog(ERROR, "SHARDMAN: Invalid command string: '%s' in '%s'", cmd, sql_full);
167175
}
168176
sql = cmd + n;
169177
cmd = sep + 1;
@@ -191,21 +199,26 @@ broadcast(PG_FUNCTION_ARGS)
191199
}
192200
if (n_cmds >= n_cons)
193201
{
194-
conn = (PGconn**) repalloc(conn, sizeof(PGconn*) * (n_cons *= 2));
202+
chan = (Channel*) repalloc(chan, sizeof(Channel) * (n_cons *= 2));
195203
}
196204

197-
conn[n_cmds] = PQconnectdb(conn_str);
198-
if (PQstatus(conn[n_cmds++]) != CONNECTION_OK)
205+
con = PQconnectdb(conn_str);
206+
chan[n_cmds].con = con;
207+
chan[n_cmds].node = node_id;
208+
chan[n_cmds].sql = sql;
209+
n_cmds += 1;
210+
211+
if (PQstatus(con) != CONNECTION_OK)
199212
{
200213
if (ignore_errors)
201214
{
202-
errmsg = psprintf("%s%d:Connection failure: %s.",
203-
errmsg ? errmsg : "", node_id,
204-
PQerrorMessage(conn[n_cmds - 1]));
215+
errmsg = psprintf("%s<error>%d:Connection failure: %s</error>",
216+
errmsg, node_id, PQerrorMessage(con));
217+
chan[n_cmds-1].sql = NULL;
205218
continue;
206219
}
207220
errmsg = psprintf("Failed to connect to node %d: %s", node_id,
208-
PQerrorMessage(conn[n_cmds-1]));
221+
PQerrorMessage(con));
209222
goto cleanup;
210223
}
211224
if (!sync_commit_on)
@@ -221,18 +234,18 @@ broadcast(PG_FUNCTION_ARGS)
221234
}
222235
}
223236
elog(DEBUG1, "Send command '%s' to node %d", sql, node_id);
224-
if (!PQsendQuery(conn[n_cmds - 1], sql)
225-
|| (sequential && !wait_command_completion(conn[n_cmds - 1])))
237+
if (!PQsendQuery(con, sql)
238+
|| (sequential && !wait_command_completion(con)))
226239
{
227240
if (ignore_errors)
228241
{
229-
errmsg = psprintf("%s%d:Failed to send query '%s': %s'.",
230-
errmsg ? errmsg : "", node_id, sql,
231-
PQerrorMessage(conn[n_cmds-1]));
242+
errmsg = psprintf("%s<error>%d:Failed to send query '%s': %s</error>",
243+
errmsg, node_id, sql, PQerrorMessage(con));
244+
chan[n_cmds-1].sql = NULL;
232245
continue;
233246
}
234247
errmsg = psprintf("Failed to send query '%s' to node %d: %s'", sql,
235-
node_id, PQerrorMessage(conn[n_cmds-1]));
248+
node_id, PQerrorMessage(con));
236249
goto cleanup;
237250
}
238251
}
@@ -248,7 +261,15 @@ broadcast(PG_FUNCTION_ARGS)
248261
PGresult* res = NULL;
249262
ExecStatusType status;
250263

251-
while ((next_res = PQgetResult(conn[i])) != NULL)
264+
con = chan[i].con;
265+
266+
if (chan[i].sql == NULL)
267+
{
268+
/* Ignore commands which were not sent */
269+
continue;
270+
}
271+
272+
while ((next_res = PQgetResult(con)) != NULL)
252273
{
253274
if (res != NULL)
254275
{
@@ -260,10 +281,12 @@ broadcast(PG_FUNCTION_ARGS)
260281
{
261282
if (ignore_errors)
262283
{
263-
errmsg = psprintf("%s%d:Failed to received response for '%s': %s.", errmsg ? errmsg : "", node_id, sql_full, PQerrorMessage(conn[i]));
284+
errmsg = psprintf("%s<error>%d:Failed to received response for '%s': %s</error>",
285+
errmsg, chan[i].node, chan[i].sql, PQerrorMessage(con));
264286
continue;
265287
}
266-
errmsg = psprintf("Failed to receive response for query %s from node %d: %s", sql_full, node_id, PQerrorMessage(conn[i]));
288+
errmsg = psprintf("Failed to receive response for query %s from node %d: %s",
289+
chan[i].sql, chan[i].node, PQerrorMessage(con));
267290
goto cleanup;
268291
}
269292

@@ -273,11 +296,13 @@ broadcast(PG_FUNCTION_ARGS)
273296
{
274297
if (ignore_errors)
275298
{
276-
errmsg = psprintf("%s%d:Command %s failed: %s.", errmsg ? errmsg : "", node_id, sql_full, PQerrorMessage(conn[i]));
299+
errmsg = psprintf("%s<error>%d:Command %s failed: %s</error>",
300+
errmsg, chan[i].node, chan[i].sql, PQerrorMessage(con));
277301
PQclear(res);
278302
continue;
279303
}
280-
errmsg = psprintf("Command %s failed at node %d: %s", sql_full, node_id, PQerrorMessage(conn[i]));
304+
errmsg = psprintf("Command %s failed at node %d: %s",
305+
chan[i].sql, chan[i].node, PQerrorMessage(con));
281306
PQclear(res);
282307
goto cleanup;
283308
}
@@ -292,11 +317,13 @@ broadcast(PG_FUNCTION_ARGS)
292317
if (ignore_errors)
293318
{
294319
appendStringInfoString(&resp, "?");
295-
elog(WARNING, "SHARDMAN: Query '%s' doesn't return single tuple at node %d", sql_full, node_id);
320+
elog(WARNING, "SHARDMAN: Query '%s' doesn't return single tuple at node %d",
321+
chan[i].sql, chan[i].node);
296322
}
297323
else
298324
{
299-
errmsg = psprintf("Query '%s' doesn't return single tuple at node %d", sql_full, node_id);
325+
errmsg = psprintf("Query '%s' doesn't return single tuple at node %d",
326+
chan[i].sql, chan[i].node);
300327
PQclear(res);
301328
goto cleanup;
302329
}
@@ -315,37 +342,39 @@ broadcast(PG_FUNCTION_ARGS)
315342
cleanup:
316343
for (i = 0; i < n_cmds; i++)
317344
{
345+
con = chan[i].con;
318346
if (two_phase)
319347
{
320-
if (errmsg)
348+
if (*errmsg)
321349
{
322-
res = PQexec(conn[i], "ROLLBACK PREPARED 'shardlord'");
350+
res = PQexec(con, "ROLLBACK PREPARED 'shardlord'");
323351
if (PQresultStatus(res) != PGRES_COMMAND_OK)
324352
{
325353
elog(WARNING, "SHARDMAN: Rollback of 2PC failed at node %d: %s",
326-
node_id, PQerrorMessage(conn[i]));
354+
chan[i].node, PQerrorMessage(con));
327355
}
328356
PQclear(res);
329357
}
330358
else
331359
{
332-
res = PQexec(conn[i], "COMMIT PREPARED 'shardlord'");
360+
res = PQexec(con, "COMMIT PREPARED 'shardlord'");
333361
if (PQresultStatus(res) != PGRES_COMMAND_OK)
334362
{
335363
elog(WARNING, "SHARDMAN: Commit of 2PC failed at node %d: %s",
336-
node_id, PQerrorMessage(conn[i]));
364+
chan[i].node, PQerrorMessage(con));
337365
}
338366
PQclear(res);
339367
}
340368
}
341-
PQfinish(conn[i]);
369+
PQfinish(con);
342370
}
343371

344-
if (errmsg)
372+
if (*errmsg)
345373
{
346374
if (ignore_errors)
347375
{
348-
appendStringInfo(&resp, "Error:%s", errmsg);
376+
resetStringInfo(&resp);
377+
appendStringInfoString(&resp, errmsg);
349378
elog(WARNING, "SHARDMAN: %s", errmsg);
350379
}
351380
else
@@ -354,7 +383,7 @@ broadcast(PG_FUNCTION_ARGS)
354383
}
355384
}
356385

357-
pfree(conn);
386+
pfree(chan);
358387
SPI_finish();
359388

360389
PG_RETURN_TEXT_P(cstring_to_text(resp.data));
@@ -366,7 +395,7 @@ broadcast(PG_FUNCTION_ARGS)
366395
* defaults, everything. Parameter is not REGCLASS because pg_dump can't
367396
* handle oids anyway. Connstring must be proper libpq connstring, it is feed
368397
* to pg_dump.
369-
* TODO: actually we should have much more control on what is dumped, so we
398+
* TODO: actually we should have muchmore control on what is dumped, so we
370399
* need to copy-paste parts of messy pg_dump or collect the needed data
371400
* manually walking over catalogs.
372401
*/

0 commit comments

Comments
 (0)