Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0f3866d

Browse files
committed
Basic tests, add_node existing node check, wipe_state func.
1 parent 6f84422 commit 0f3866d

File tree

7 files changed

+617
-144
lines changed

7 files changed

+617
-144
lines changed

conf.add

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
shared_preload_libraries='pg_pathman'
1+
shared_preload_libraries='pg_shardman, pg_pathman'

pg_shardman--0.0.2.sql

Lines changed: 135 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,25 @@
1111
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
1212
\echo Use "CREATE EXTENSION pg_shardman" to load this file. \quit
1313

14+
-- We define several GUCs (though user can see them in SHOW if she sets it
15+
-- explicitly even without loaded lib) and have to inform pathman that we want
16+
-- shardman's COPY FROM, so it makes sense to load the lib on server start.
17+
DO $$
18+
BEGIN
19+
-- -- Yes, malicious user might have another extension containing 'pg_shardman'...
20+
-- -- Probably better just call no-op func from the library
21+
IF strpos(current_setting('shared_preload_libraries'), 'pg_shardman') = 0 THEN
22+
RAISE EXCEPTION 'pg_shardman must be loaded via shared_preload_libraries. Refusing to proceed.';
23+
END IF;
24+
END
25+
$$;
26+
1427
-- Shardman tables
1528

1629
-- List of nodes present in the cluster
1730
CREATE TABLE nodes (
1831
id serial PRIMARY KEY,
19-
system_id bigint NOT NULL,
32+
system_id bigint NOT NULL UNIQUE,
2033
super_connection_string text UNIQUE NOT NULL,
2134
connection_string text UNIQUE NOT NULL,
2235
replication_group text NOT NULL -- group of nodes within which shard replicas are allocated
@@ -62,7 +75,6 @@ CREATE FUNCTION add_node(super_conn_string text, conn_string text = NULL,
6275
repl_group text = 'default') RETURNS int AS $$
6376
DECLARE
6477
new_node_id int;
65-
system_id bigint;
6678
node shardman.nodes;
6779
part shardman.partitions;
6880
t shardman.tables;
@@ -93,21 +105,31 @@ DECLARE
93105
BEGIN
94106
IF NOT shardman.is_shardlord()
95107
THEN
96-
RETURN shardman.broadcast(format('0:SELECT shardman.add_node(%L, %L, %L)', super_conn_string, conn_string, repl_group))::int;
108+
RETURN shardman.broadcast(
109+
format('0:SELECT shardman.add_node(%L, %L, %L)',
110+
super_conn_string, conn_string, repl_group))::int;
97111
END IF;
98112

99113
-- Insert new node in nodes table
100-
INSERT INTO shardman.nodes (system_id, super_connection_string, connection_string, replication_group)
114+
INSERT INTO shardman.nodes (system_id, super_connection_string,
115+
connection_string, replication_group)
101116
VALUES (0, super_conn_string, conn_string_effective, repl_group)
102-
RETURNING id INTO new_node_id;
103-
104-
-- We have to update system_id after insert, because otherwise broadcast will not work
105-
sys_id := shardman.broadcast(format('%s:SELECT shardman.get_system_identifier();', new_node_id))::bigint;
117+
RETURNING id INTO new_node_id;
118+
119+
-- We have to update system_id after insert, because otherwise broadcast
120+
-- will not work
121+
sys_id := shardman.broadcast(
122+
format('%s:SELECT shardman.get_system_identifier();',
123+
new_node_id))::bigint;
124+
IF EXISTS(SELECT 1 FROM shardman.nodes where system_id = sys_id) THEN
125+
RAISE EXCEPTION 'Node with system id % is already in the cluster', sys_id;
126+
END IF;
106127
UPDATE shardman.nodes SET system_id=sys_id WHERE id=new_node_id;
107128

108129
-- Adjust replication channels within replication group.
109130
-- We need all-to-all replication channels between all group members.
110-
FOR node IN SELECT * FROM shardman.nodes WHERE replication_group = repl_group AND id <> new_node_id
131+
FOR node IN SELECT * FROM shardman.nodes WHERE replication_group = repl_group
132+
AND id <> new_node_id
111133
LOOP
112134
-- Add to new node publications for all existing nodes and add
113135
-- publication for new node to all existing nodes
@@ -443,8 +465,10 @@ $$ LANGUAGE plpgsql;
443465
-- Shard table with hash partitions. Parameters are the same as in pathman.
444466
-- It also scatter partitions through all nodes.
445467
-- This function expects that empty table is created at shardlord.
446-
-- So it can be executed only at shardlord and there is no need to redirect this function to shardlord.
447-
CREATE FUNCTION create_hash_partitions(rel regclass, expr text, part_count int, redundancy int = 0)
468+
-- It can be executed only at shardlord and there is no need to redirect this
469+
-- function to shardlord.
470+
CREATE FUNCTION create_hash_partitions(rel regclass, expr text, part_count int,
471+
redundancy int = 0)
448472
RETURNS void AS $$
449473
DECLARE
450474
create_table text;
@@ -1857,7 +1881,7 @@ BEGIN
18571881
dst_node_id, slot));
18581882
IF response::bool THEN
18591883
synced := true;
1860-
RAISE DEBUG 'Table % sync completed', part_name;
1884+
RAISE DEBUG '[SHMN] Table % sync completed', part_name;
18611885
CONTINUE;
18621886
END IF;
18631887
ELSE
@@ -1868,7 +1892,7 @@ BEGIN
18681892
END IF;
18691893
lag := response::bigint;
18701894

1871-
RAISE DEBUG 'Replication lag %', lag;
1895+
RAISE DEBUG '[SHMN] Replication lag %', lag;
18721896
IF locked THEN
18731897
IF lag<=0 THEN
18741898
RETURN;
@@ -2245,3 +2269,101 @@ CREATE VIEW replication_lag(pubnode, subnode, lag) AS
22452269
-- be explicitly excluded by filter condition, otherwise error will be reported.
22462270
CREATE VIEW replication_state(part_name, node_id, last_seqno) AS
22472271
SELECT part_name,node_id,shardman.broadcast(format('%s:SELECT max(seqno) FROM %s_change_log;',node_id,part_name))::bigint FROM shardman.replicas;
2272+
2273+
2274+
-- Drop replication slot, if it exists.
2275+
-- About 'with_fire' option: we can't just drop replication slots because
2276+
-- pg_drop_replication_slot will bail out with ERROR if connection is active.
2277+
-- Therefore the caller must either ensure that the connection is dead (e.g.
2278+
-- drop subscription on far end) or pass 'true' to 'with_fire' option, which
2279+
-- does the following dirty hack. It kills several times active walsender with
2280+
-- short interval. After the first kill, replica will immediately try to
2281+
-- reconnect, so the connection resurrects instantly. However, if we kill it
2282+
-- second time, replica won't try to reconnect until wal_retrieve_retry_interval
2283+
-- after its first reaction passes, which is 5 secs by default. Of course, this
2284+
-- is not reliable and should be redesigned.
2285+
CREATE FUNCTION drop_repslot(slot_name text, with_fire bool DEFAULT true)
2286+
RETURNS void AS $$
2287+
DECLARE
2288+
slot_exists bool;
2289+
kill_ws_times int := 3;
2290+
BEGIN
2291+
RAISE DEBUG '[SHMN] Dropping repslot %', slot_name;
2292+
EXECUTE format('SELECT EXISTS (SELECT * FROM pg_replication_slots
2293+
WHERE slot_name = %L)', slot_name) INTO slot_exists;
2294+
IF slot_exists THEN
2295+
IF with_fire THEN -- kill walsender several times
2296+
RAISE DEBUG '[SHMN] Killing repslot % with fire', slot_name;
2297+
FOR i IN 1..kill_ws_times LOOP
2298+
RAISE DEBUG '[SHMN] Killing walsender for slot %', slot_name;
2299+
PERFORM shardman.terminate_repslot_walsender(slot_name);
2300+
IF i != kill_ws_times THEN
2301+
PERFORM pg_sleep(0.05);
2302+
END IF;
2303+
END LOOP;
2304+
END IF;
2305+
EXECUTE format('SELECT pg_drop_replication_slot(%L)', slot_name);
2306+
END IF;
2307+
END
2308+
$$ LANGUAGE plpgsql STRICT;
2309+
CREATE FUNCTION terminate_repslot_walsender(slot_name text) RETURNS void AS $$
2310+
BEGIN
2311+
EXECUTE format('SELECT pg_terminate_backend(active_pid) FROM
2312+
pg_replication_slots WHERE slot_name = %L', slot_name);
2313+
END
2314+
$$ LANGUAGE plpgsql STRICT;
2315+
2316+
-- Drop sub unilaterally: If sub exists, disable it, detach repslot from it and
2317+
-- drop.
2318+
CREATE FUNCTION eliminate_sub(subname name)
2319+
RETURNS void AS $$
2320+
DECLARE
2321+
sub_exists bool;
2322+
BEGIN
2323+
EXECUTE format('SELECT EXISTS (SELECT 1 FROM pg_subscription WHERE subname
2324+
= %L)', subname) INTO sub_exists;
2325+
IF sub_exists THEN
2326+
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE', subname);
2327+
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE)', subname);
2328+
EXECUTE format('DROP SUBSCRIPTION %I', subname);
2329+
END IF;
2330+
END
2331+
$$ LANGUAGE plpgsql STRICT;
2332+
2333+
2334+
-- Remove all shardman state (LR stuff, synchronous_standby_names). If
2335+
-- drop_slots_with_fire is true, we will kill walsenders before dropping LR
2336+
-- slots.
2337+
-- We reset synchronous_standby_names to empty string after commit,
2338+
-- -- this is non-transactional action and might be not performed.
2339+
CREATE OR REPLACE FUNCTION wipe_state(drop_slots_with_fire bool DEFAULT true)
2340+
RETURNS void AS $$
2341+
DECLARE
2342+
srv record;
2343+
pub record;
2344+
sub record;
2345+
rs record;
2346+
BEGIN
2347+
-- otherwise we might hang
2348+
SET LOCAL synchronous_commit TO LOCAL;
2349+
2350+
FOR srv IN SELECT srvname FROM pg_foreign_server WHERE srvname LIKE 'node_%' LOOP
2351+
EXECUTE format('DROP SERVER %I CASCADE', srv.srvname);
2352+
END LOOP;
2353+
2354+
FOR pub IN SELECT pubname FROM pg_publication WHERE pubname LIKE 'node_%' LOOP
2355+
EXECUTE format('DROP PUBLICATION %I', pub.pubname);
2356+
END LOOP;
2357+
FOR sub IN SELECT subname FROM pg_subscription WHERE subname LIKE 'sub_%' LOOP
2358+
PERFORM shardman.eliminate_sub(sub.subname);
2359+
END LOOP;
2360+
FOR rs IN SELECT slot_name FROM pg_replication_slots
2361+
WHERE slot_name LIKE 'node_%' AND slot_type = 'logical' LOOP
2362+
PERFORM shardman.drop_repslot(rs.slot_name, drop_slots_with_fire);
2363+
END LOOP;
2364+
-- TODO: remove only shardman's standbys
2365+
PERFORM shardman.reset_synchronous_standby_names_on_commit();
2366+
END;
2367+
$$ LANGUAGE plpgsql;
2368+
CREATE FUNCTION reset_synchronous_standby_names_on_commit()
2369+
RETURNS void AS 'pg_shardman' LANGUAGE C STRICT;

pg_shardman.c

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,22 @@
77
* -------------------------------------------------------------------------
88
*/
99
#include "postgres.h"
10-
#include "libpq-fe.h"
11-
#include "miscadmin.h"
10+
11+
#include "access/htup_details.h"
12+
#include "access/xact.h"
13+
#include "access/xlog.h"
14+
#include "catalog/pg_type.h"
15+
#include "commands/event_trigger.h"
1216
#include "executor/spi.h"
1317
#include "funcapi.h"
18+
#include "libpq-fe.h"
19+
#include "miscadmin.h"
1420
#include "pgstat.h"
21+
#include "storage/latch.h"
1522
#include "utils/guc.h"
1623
#include "utils/rel.h"
1724
#include "utils/builtins.h"
1825
#include "utils/lsyscache.h"
19-
#include "catalog/pg_type.h"
20-
#include "access/htup_details.h"
21-
#include "access/xlog.h"
22-
#include "storage/latch.h"
2326

2427
/* ensure that extension won't load against incompatible version of Postgres */
2528
PG_MODULE_MAGIC;
@@ -31,6 +34,7 @@ PG_FUNCTION_INFO_V1(broadcast);
3134
PG_FUNCTION_INFO_V1(reconstruct_table_attrs);
3235
PG_FUNCTION_INFO_V1(pq_conninfo_parse);
3336
PG_FUNCTION_INFO_V1(get_system_identifier);
37+
PG_FUNCTION_INFO_V1(reset_synchronous_standby_names_on_commit);
3438

3539
/* GUC variables */
3640
static bool is_lord;
@@ -39,6 +43,11 @@ static char *shardlord_connstring;
3943

4044
extern void _PG_init(void);
4145

46+
static bool reset_ssn_callback_set = false;
47+
static bool reset_ssn_requested = false;
48+
49+
static void reset_ssn_xact_callback(XactEvent event, void *arg);
50+
4251
/*
4352
* Entrypoint of the module. Define GUCs.
4453
*/
@@ -75,6 +84,15 @@ _PG_init()
7584
PGC_SUSET,
7685
0,
7786
NULL, NULL, NULL);
87+
88+
/*
89+
* Tell pathman that we want it to do shardman-specific COPY FROM: that
90+
* is, support copy to foreign partitions by copying to foreign parent.
91+
* For now we just ask to do it always. Better to turn on this in copy
92+
* hook turn off after, however for that we need metadata on all nodes.
93+
*/
94+
*find_rendezvous_variable(
95+
"shardman_pathman_copy_from_rendezvous") = DatumGetPointer(1);
7896
}
7997

8098
Datum
@@ -605,3 +623,45 @@ get_system_identifier(PG_FUNCTION_ARGS)
605623
{
606624
PG_RETURN_INT64(GetSystemIdentifier());
607625
}
626+
627+
/*
628+
* Execute "ALTER SYSTEM SET synchronous_standby_names = '' on commit"
629+
*/
630+
Datum
631+
reset_synchronous_standby_names_on_commit(PG_FUNCTION_ARGS)
632+
{
633+
if (!reset_ssn_callback_set)
634+
RegisterXactCallback(reset_ssn_xact_callback, NULL);
635+
reset_ssn_requested = true;
636+
PG_RETURN_VOID();
637+
}
638+
639+
static void
640+
reset_ssn_xact_callback(XactEvent event, void *arg)
641+
{
642+
if (reset_ssn_requested)
643+
{
644+
/* I just wanted to practice a bit with PG nodes and lists */
645+
A_Const *aconst = makeNode(A_Const);
646+
List *set_stmt_args = list_make1(aconst);
647+
VariableSetStmt setstmt;
648+
AlterSystemStmt altersysstmt;
649+
650+
aconst->val.type = T_String;
651+
aconst->val.val.str = ""; /* set it to empty value */
652+
aconst->location = -1;
653+
654+
setstmt.type = T_VariableSetStmt;
655+
setstmt.kind = VAR_SET_VALUE;
656+
setstmt.name = "synchronous_standby_names";
657+
setstmt.args = set_stmt_args;
658+
659+
altersysstmt.type = T_AlterSystemStmt;
660+
altersysstmt.setstmt = &setstmt;
661+
AlterSystemSetConfigFile(&altersysstmt);
662+
pg_reload_conf(NULL);
663+
664+
list_free_deep(setstmt.args);
665+
reset_ssn_requested = false;
666+
}
667+
}

postgresql.conf.common

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
shared_preload_libraries = 'pg_pathman'
1+
shared_preload_libraries = 'pg_shardman, pg_pathman'
22

33
shardman.shardlord_connstring = 'port=5432' # shardlord's connstring
44

readme.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ Returns id of the new node.
348348
```plpgsql
349349
get_my_id()
350350
```
351-
Get this node's id. Executed on any node.
351+
Get this worker's id. Executed on any worker node. Fails on shardlord.
352352

353353
```plpgsql
354354
rm_node(rm_node_id int, force bool = false)
@@ -499,6 +499,16 @@ Function `shardman.recover_xacts` can be also implicitly invoked by database adm
499499
not completed distributed transactions. First of all it tries to obtain status of distributed transaction from its coordinator and only
500500
if it is not available, performs voting among all nodes.
501501

502+
```plpgsql
503+
wipe_state(drop_slots_with_fire bool DEFAULT true)
504+
```
505+
Remove unilaterally all publications, subscriptions and replication slots
506+
created on the worker node by `pg_shardman`. PostgreSQL forbids to drop
507+
replication slot with active connection; if `drop_slots_with_fire` is true, we
508+
will try to kill the walsenders before dropping the slots. Also, immediately
509+
after transaction commit set `synchronous_standby_names` GUC to empty string --
510+
this is a non-transactional action and there is a very small chance it won't be
511+
completed. You probably want to run it before `DROP EXTENSION pg_shardman`.
502512

503513
## Transactions
504514
When using vanilla PostgreSQL, local changes are handled by PostgreSQL as usual

tests/python/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
mkfile_dir := $(shell dirname $(shell readlink -f $(abspath $(lastword $(MAKEFILE_LIST)))))
44

55
all:
6-
cd $(mkfile_dir) && python3 -m unittest tests.py -v -f
6+
cd $(mkfile_dir) && python3 tests.py

0 commit comments

Comments
 (0)