Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1679653

Browse files
committed
SQL for moving partitions in progress, killing repslots with fire.
1 parent 33ec057 commit 1679653

File tree

1 file changed

+173
-99
lines changed

1 file changed

+173
-99
lines changed

pg_shardman--0.0.1.sql

Lines changed: 173 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -105,98 +105,66 @@ CREATE TABLE partitions (
105105
owner int REFERENCES nodes(id) -- node on which partition lies
106106
);
107107

108-
-- On adding new partition, create proper foreign server & foreign table and
109-
-- replace tmp (empty) partition with it.
110-
CREATE FUNCTION new_partition() RETURNS TRIGGER AS $$
108+
-- Replace existing hash partition with foreign, assuming 'partition' shows
109+
-- where it is stored. Existing partition is dropped.
110+
CREATE FUNCTION replace_usual_part_with_foreign(part partitions)
111+
RETURNS void AS $$
111112
DECLARE
112113
connstring text;
113-
connstring_keywords text[];
114-
connstring_vals text[];
115-
server_opts text default '';
116-
um_opts text default '';
117-
server_opts_first_time_through bool DEFAULT true;
118-
um_opts_first_time_through bool DEFAULT true;
119114
fdw_part_name text;
115+
server_opts text;
116+
um_opts text;
117+
BEGIN
118+
SELECT nodes.connstring FROM shardman.nodes WHERE id = part.owner
119+
INTO connstring;
120+
EXECUTE format('DROP SERVER IF EXISTS %I CASCADE;', part.part_name);
121+
122+
SELECT * INTO server_opts, um_opts FROM
123+
(SELECT * FROM shardman.conninfo_to_postgres_fdw_opts(connstring)) opts;
124+
125+
EXECUTE format('CREATE SERVER %I FOREIGN DATA WRAPPER
126+
postgres_fdw %s;', part.part_name, server_opts);
127+
EXECUTE format('DROP USER MAPPING IF EXISTS FOR CURRENT_USER SERVER %I;',
128+
part.part_name);
129+
EXECUTE format('CREATE USER MAPPING FOR CURRENT_USER SERVER %I
130+
%s;', part.part_name, um_opts);
131+
-- We use _fdw suffix for foreign tables to avoid interleaving with real
132+
-- ones.
133+
SELECT format('%s_fdw', part.part_name) INTO fdw_part_name;
134+
EXECUTE format('DROP FOREIGN TABLE IF EXISTS %I;', fdw_part_name);
135+
136+
-- Generate and execute CREATE FOREIGN TABLE sql statement which will
137+
-- clone the existing local table schema. In constrast to
138+
-- gen_create_table_sql, here we need only the header of the table,
139+
-- i.e. its attributes. CHECK constraint for partition will be added
140+
-- during the attachment, and other stuff doesn't seem to have much
141+
-- sense on foreign table.
142+
-- In fact, we should have CREATE FOREIGN TABLE (LIKE ...) to make this
143+
-- sane. We could also used here IMPORT FOREIGN SCHEMA, but it
144+
-- unneccessary involves network (we already have this schema locally)
145+
-- and dangerous: what if table was created and dropped before this
146+
-- change reached us? We might also use it with local table (create
147+
-- foreign server pointing to it, etc), but that's just ugly.
148+
EXECUTE format('CREATE FOREIGN TABLE %I %s SERVER %I OPTIONS (table_name %L)',
149+
fdw_part_name,
150+
(SELECT
151+
shardman.reconstruct_table_attrs(
152+
format('%I', part.relation))),
153+
part.part_name,
154+
part.part_name);
155+
-- Finally, replace local partition with foreign table
156+
EXECUTE format('SELECT replace_hash_partition(%L, %L)',
157+
part.part_name, fdw_part_name);
158+
-- And drop old table
159+
EXECUTE format('DROP TABLE %I', part.part_name);
160+
END $$ LANGUAGE plpgsql;
161+
162+
-- On adding new partition, create proper foreign server & foreign table and
163+
-- replace tmp (empty) partition with it.
164+
CREATE FUNCTION new_partition() RETURNS TRIGGER AS $$
120165
BEGIN
121166
IF NEW.owner != (SELECT shardman.get_node_id()) THEN
122-
SELECT nodes.connstring FROM shardman.nodes WHERE id = NEW.owner
123-
INTO connstring;
124-
EXECUTE format('DROP SERVER IF EXISTS %I CASCADE;', NEW.part_name);
125-
-- Options to postgres_fdw are specified in two places: user & password
126-
-- in user mapping and everything else in create server. The problem is
127-
-- that we use single connstring, however user mapping and server
128-
-- doesn't understand this format, i.e. we can't say create server
129-
-- ... options (dbname 'port=4848 host=blabla.org'). So we have to parse
130-
-- the opts and pass them manually. libpq knows how to do it, but
131-
-- doesn't expose that. On the other hand, quote_literal (which is
132-
-- neccessary here) doesn't have handy C API. I resorted to have C
133-
-- function which parses the opts and returns them in two parallel
134-
-- arrays, and here we join them with quoting.
135-
SELECT * FROM shardman.pq_conninfo_parse(connstring)
136-
INTO connstring_keywords, connstring_vals;
137-
FOR i IN 1..(SELECT array_upper(connstring_keywords, 1)) LOOP
138-
IF connstring_keywords[i] = 'client_encoding' OR
139-
connstring_keywords[i] = 'fallback_application_name' THEN
140-
CONTINUE; /* not allowed in postgres_fdw */
141-
ELSIF connstring_keywords[i] = 'user' OR
142-
connstring_keywords[i] = 'password' THEN -- user mapping option
143-
IF NOT um_opts_first_time_through THEN
144-
um_opts := um_opts || ', ';
145-
END IF;
146-
um_opts_first_time_through := false;
147-
um_opts := um_opts ||
148-
format('%s %L', connstring_keywords[i], connstring_vals[i]);
149-
ELSE -- server option
150-
IF NOT server_opts_first_time_through THEN
151-
server_opts := server_opts || ', ';
152-
END IF;
153-
server_opts_first_time_through := false;
154-
server_opts := server_opts ||
155-
format('%s %L', connstring_keywords[i], connstring_vals[i]);
156-
END IF;
157-
END LOOP;
158-
-- OPTIONS () is syntax error, so add OPTIONS only if we really have opts
159-
IF server_opts != '' THEN
160-
server_opts := format(' OPTIONS (%s)', server_opts);
161-
END IF;
162-
IF um_opts != '' THEN
163-
um_opts := format(' OPTIONS (%s)', um_opts);
164-
END IF;
165-
EXECUTE format('CREATE SERVER %I FOREIGN DATA WRAPPER
166-
postgres_fdw %s;', NEW.part_name, server_opts);
167-
EXECUTE format('DROP USER MAPPING IF EXISTS FOR CURRENT_USER SERVER %I;',
168-
NEW.part_name);
169-
EXECUTE format('CREATE USER MAPPING FOR CURRENT_USER SERVER %I
170-
%s;', NEW.part_name, um_opts);
171-
-- We use _fdw suffix for foreign tables to avoid interleaving with real
172-
-- ones.
173-
SELECT format('%s_fdw', NEW.part_name) INTO fdw_part_name;
174-
EXECUTE format('DROP FOREIGN TABLE IF EXISTS %I;', fdw_part_name);
175-
176-
-- Generate and execute CREATE FOREIGN TABLE sql statement which will
177-
-- clone the existing local table schema. In constrast to
178-
-- gen_create_table_sql, here we need only the header of the table,
179-
-- i.e. its attributes. CHECK constraint for partition will be added
180-
-- during the attachment, and other stuff doesn't seem to have much
181-
-- sense on foreign table.
182-
-- In fact, we should have CREATE FOREIGN TABLE (LIKE ...) to make this
183-
-- sane. We could also used here IMPORT FOREIGN SCHEMA, but it
184-
-- unneccessary involves network (we already have this schema locally)
185-
-- and dangerous: what if table was created and dropped before this
186-
-- change reached us?
187-
188-
EXECUTE format('CREATE FOREIGN TABLE %I %s SERVER %I OPTIONS (table_name %L)',
189-
fdw_part_name,
190-
(SELECT
191-
shardman.reconstruct_table_attrs(
192-
format('%I', NEW.part_name))),
193-
NEW.part_name,
194-
NEW.part_name);
195-
-- Finally, replace empty local tmp partition with foreign table
196-
EXECUTE format('SELECT replace_hash_partition(%L, %L)',
197-
NEW.part_name, fdw_part_name);
198-
-- And drop old empty table
199-
EXECUTE format('DROP TABLE %I', NEW.part_name);
167+
PERFORM shardman.replace_usual_part_with_foreign(NEW);
200168
END IF;
201169
RETURN NULL;
202170
END
@@ -206,6 +174,37 @@ CREATE TRIGGER new_partition AFTER INSERT ON shardman.partitions
206174
-- fire trigger only on worker nodes
207175
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER new_partition;
208176

177+
-- Replace foreign table-partition with local. The latter must exist!
178+
-- Foreign table will be dropped.
179+
CREATE FUNCTION replace_foreign_part_with_usual(part partitions)
180+
RETURNS void AS $$
181+
DECLARE
182+
BEGIN
183+
END $$ LANGUAGE plpgsql;
184+
185+
-- Update metadata according to partition move
186+
-- On adding new partition, create proper foreign server & foreign table and
187+
-- replace tmp (empty) partition with it.
188+
CREATE FUNCTION partition_moved() RETURNS TRIGGER AS $$
189+
DECLARE
190+
movepart_logname text; -- name of logical pub, sub, repslot for copying, etc
191+
BEGIN
192+
ASSERT NEW.owner != OLD.owner, 'partition_moved handles only moved parts';
193+
movepart_logname := format('shardman_copy_%s_%s_%s',
194+
OLD.part_name, OLD.owner, NEW.owner);
195+
IF OLD.owner == (SELECT shardman.get_node_id()) THEN -- src node
196+
-- Drop
197+
-- On src node, replace its partition with foreign one
198+
PERFORM replace_usual_part_with_foreign(NEW);
199+
END IF;
200+
RETURN NULL;
201+
END
202+
$$ LANGUAGE plpgsql;
203+
CREATE TRIGGER partition_moved AFTER UPDATE ON shardman.partitions
204+
FOR EACH ROW EXECUTE PROCEDURE partition_moved();
205+
-- fire trigger only on worker nodes
206+
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER partition_moved;
207+
209208
-- Currently it is used just to store node id, in general we can keep any local
210209
-- node metadata here. If is ever used extensively, probably hstore suits better.
211210
CREATE TABLE local_meta (
@@ -218,7 +217,8 @@ INSERT INTO @extschema@.local_meta VALUES ('node_id', NULL);
218217
CREATE TYPE cmd AS ENUM ('add_node', 'rm_node', 'create_hash_partitions',
219218
'move_mpart');
220219
-- command status
221-
CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress', 'success');
220+
CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress',
221+
'success');
222222

223223
CREATE TABLE cmd_log (
224224
id bigserial PRIMARY KEY,
@@ -305,18 +305,40 @@ BEGIN
305305
END
306306
$$ LANGUAGE plpgsql;
307307

308-
-- Drop replication slot, if it exists
309-
CREATE FUNCTION drop_repslot(slot_name text) RETURNS void AS $$
308+
-- Drop replication slot, if it exists.
309+
-- About 'hard' option: we can't just drop replication slots because
310+
-- pg_drop_replication_slot will bail out with ERROR if connection is active.
311+
-- Therefore the caller must either ensure that the connection is dead (e.g.
312+
-- drop subscription on far end) or pass 'true' to 'with_fire' option, which does
313+
-- the following dirty hack. It kills twice active walsender with 1 second
314+
-- interval. After the first kill, replica will immediately try to reconnect,
315+
-- so the connection resurrects instantly. However, if we kill it second time,
316+
-- replica won't try to reconnect until wal_retrieve_retry_interval after its
317+
-- first reaction passes, which is 5 secs by default. Of course, this is not
318+
-- reliable and should be redesigned.
319+
CREATE FUNCTION drop_repslot(slot_name text, with_fire bool DEFAULT false)
320+
RETURNS void AS $$
310321
DECLARE
311322
slot_exists bool;
312323
BEGIN
313324
EXECUTE format('SELECT EXISTS (SELECT * FROM pg_replication_slots
314325
WHERE slot_name = %L)', slot_name) INTO slot_exists;
315326
IF slot_exists THEN
327+
IF with_fire THEN -- kill walsender twice
328+
RAISE DEBUG 'Killing repslot % with fire', slot_name;
329+
PERFORM shardman.terminate_repslot_walsender(slot_name);
330+
PERFORM pg_sleep(1);
331+
PERFORM shardman.terminate_repslot_walsender(slot_name);
332+
END IF;
316333
EXECUTE format('SELECT pg_drop_replication_slot(%L)', slot_name);
317334
END IF;
318335
END
319-
$$ LANGUAGE plpgsql;
336+
$$ LANGUAGE plpgsql STRICT;
337+
CREATE FUNCTION terminate_repslot_walsender(slot_name text) RETURNS void AS $$
338+
BEGIN
339+
EXECUTE format('SELECT pg_terminate_backend(active_pid) FROM
340+
pg_replication_slots WHERE slot_name = %L', slot_name);
341+
END $$ LANGUAGE plpgsql STRICT;
320342

321343
-- Remove all our logical replication stuff in case of drop extension.
322344
-- Dropping extension cleanup is not that easy:
@@ -348,14 +370,12 @@ BEGIN
348370
EXECUTE format('DROP SUBSCRIPTION %I', sub.subname);
349371
END IF;
350372
END LOOP;
351-
-- TODO: we can't just drop replication slots because
352-
-- pg_drop_replication_slot will bail out with ERROR if connection is active.
353-
-- We should therefore iterate over all active subscribers and turn off
354-
-- subscriptions first.
355-
-- FOR rs IN SELECT slot_name FROM pg_replication_slots
356-
-- WHERE slot_name LIKE 'shardman_%' AND slot_type = 'logical' LOOP
357-
-- EXECUTE format('SELECT pg_drop_replication_slot(%L)', rs.slot_name);
358-
-- END LOOP;
373+
-- TODO: drop repslots gracefully? For that we should iterate over all active
374+
-- subscribers and turn off subscriptions first.
375+
FOR rs IN SELECT slot_name FROM pg_replication_slots
376+
WHERE slot_name LIKE 'shardman_%' AND slot_type = 'logical' LOOP
377+
PERFORM shardman.drop_repslot(rs.slot_name, true);
378+
END LOOP;
359379

360380
PERFORM shardman.reset_node_id();
361381
END;
@@ -445,6 +465,60 @@ CREATE FUNCTION gen_create_table_sql(relation text, connstring text) RETURNS tex
445465
CREATE FUNCTION reconstruct_table_attrs(relation regclass)
446466
RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
447467

468+
-- Options to postgres_fdw are specified in two places: user & password in user
469+
-- mapping and everything else in create server. The problem is that we use
470+
-- single connstring, however user mapping and server doesn't understand this
471+
-- format, i.e. we can't say create server ... options (dbname 'port=4848
472+
-- host=blabla.org'). So we have to parse the opts and pass them manually. libpq
473+
-- knows how to do it, but doesn't expose that. On the other hand, quote_literal
474+
-- (which is neccessary here) doesn't seem to have handy C API. I resorted to
475+
-- have C function which parses the opts and returns them in two parallel
476+
-- arrays, and this sql function joins them with quoting.
477+
-- Returns two strings: one with opts ready to pass to CREATE FOREIGN SERVER
478+
-- stmt, and one wih opts ready to pass to CREATE USER MAPPING.
479+
CREATE FUNCTION conninfo_to_postgres_fdw_opts(
480+
IN connstring text, OUT server_opts text, OUT um_opts text)
481+
RETURNS record AS $$
482+
DECLARE
483+
connstring_keywords text[];
484+
connstring_vals text[];
485+
server_opts_first_time_through bool DEFAULT true;
486+
um_opts_first_time_through bool DEFAULT true;
487+
BEGIN
488+
server_opts := '';
489+
um_opts := '';
490+
SELECT * FROM shardman.pq_conninfo_parse(connstring)
491+
INTO connstring_keywords, connstring_vals;
492+
FOR i IN 1..(SELECT array_upper(connstring_keywords, 1)) LOOP
493+
IF connstring_keywords[i] = 'client_encoding' OR
494+
connstring_keywords[i] = 'fallback_application_name' THEN
495+
CONTINUE; /* not allowed in postgres_fdw */
496+
ELSIF connstring_keywords[i] = 'user' OR
497+
connstring_keywords[i] = 'password' THEN -- user mapping option
498+
IF NOT um_opts_first_time_through THEN
499+
um_opts := um_opts || ', ';
500+
END IF;
501+
um_opts_first_time_through := false;
502+
um_opts := um_opts ||
503+
format('%s %L', connstring_keywords[i], connstring_vals[i]);
504+
ELSE -- server option
505+
IF NOT server_opts_first_time_through THEN
506+
server_opts := server_opts || ', ';
507+
END IF;
508+
server_opts_first_time_through := false;
509+
server_opts := server_opts ||
510+
format('%s %L', connstring_keywords[i], connstring_vals[i]);
511+
END IF;
512+
END LOOP;
513+
514+
-- OPTIONS () is syntax error, so add OPTIONS only if we really have opts
515+
IF server_opts != '' THEN
516+
server_opts := format(' OPTIONS (%s)', server_opts);
517+
END IF;
518+
IF um_opts != '' THEN
519+
um_opts := format(' OPTIONS (%s)', um_opts);
520+
END IF;
521+
END $$ LANGUAGE plpgsql STRICT;
448522
CREATE FUNCTION pq_conninfo_parse(IN conninfo text, OUT keys text[], OUT vals text[])
449523
RETURNS record AS 'pg_shardman' LANGUAGE C STRICT;
450524

0 commit comments

Comments
 (0)