@@ -105,6 +105,13 @@ CREATE TABLE partitions (
105
105
owner int REFERENCES nodes(id) -- node on which partition lies
106
106
);
107
107
108
+ -- We use _fdw suffix for foreign tables to avoid interleaving with real
109
+ -- ones.
110
+ CREATE FUNCTION get_fdw_part_name (part_name name) RETURNS name AS $$
111
+ BEGIN
112
+ RETURN format(' %s_fdw' , part_name);
113
+ END $$ LANGUAGE plpgsql STRICT;
114
+
108
115
-- Replace existing hash partition with foreign, assuming 'partition' shows
109
116
-- where it is stored. Existing partition is dropped.
110
117
CREATE FUNCTION replace_usual_part_with_foreign (part partitions)
@@ -128,9 +135,7 @@ BEGIN
128
135
part .part_name );
129
136
EXECUTE format(' CREATE USER MAPPING FOR CURRENT_USER SERVER %I
130
137
%s;' , part .part_name , um_opts);
131
- -- We use _fdw suffix for foreign tables to avoid interleaving with real
132
- -- ones.
133
- SELECT format(' %s_fdw' , part .part_name ) INTO fdw_part_name;
138
+ SELECT shardman .get_fdw_part_name (part .part_name ) INTO fdw_part_name;
134
139
EXECUTE format(' DROP FOREIGN TABLE IF EXISTS %I;' , fdw_part_name);
135
140
136
141
-- Generate and execute CREATE FOREIGN TABLE sql statement which will
@@ -179,7 +184,13 @@ ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER new_partition;
179
184
CREATE FUNCTION replace_foreign_part_with_usual (part partitions)
180
185
RETURNS void AS $$
181
186
DECLARE
187
+ fdw_part_name name;
182
188
BEGIN
189
+ ASSERT to_regclass(part .part_name ) IS NOT NULL ;
190
+ SELECT shardman .get_fdw_part_name (part .part_name ) INTO fdw_part_name;
191
+ EXECUTE format(' SELECT replace_hash_partition(%L, %L);' ,
192
+ fdw_part_name, part .part_name );
193
+ EXECUTE format(' DROP FOREIGN TABLE %I;' , fdw_part_name);
183
194
END $$ LANGUAGE plpgsql;
184
195
185
196
-- Update metadata according to partition move
@@ -188,14 +199,23 @@ END $$ LANGUAGE plpgsql;
188
199
CREATE FUNCTION partition_moved () RETURNS TRIGGER AS $$
189
200
DECLARE
190
201
movepart_logname text ; -- name of logical pub, sub, repslot for copying, etc
202
+ my_id int ;
191
203
BEGIN
192
204
ASSERT NEW .owner != OLD .owner , ' partition_moved handles only moved parts' ;
193
205
movepart_logname := format(' shardman_copy_%s_%s_%s' ,
194
206
OLD .part_name , OLD .owner , NEW .owner );
195
- IF OLD .owner == (SELECT shardman .get_node_id ()) THEN -- src node
196
- -- Drop
207
+ my_id := (SELECT shardman .get_node_id ());
208
+ IF my_id = OLD .owner THEN -- src node
209
+ -- Drop publication & repslot used for copy
210
+ EXECUTE format(' DROP PUBLICATION IF EXISTS %I' , movepart_logname);
211
+ PERFORM shardman .drop_repslot (movepart_logname, true);
197
212
-- On src node, replace its partition with foreign one
198
- PERFORM replace_usual_part_with_foreign(NEW);
213
+ PERFORM shardman .replace_usual_part_with_foreign (NEW);
214
+ ELSEIF my_id = NEW .owner THEN -- dst node
215
+ -- Drop subscription used for copy
216
+ PERFORM shardman .eliminate_sub (movepart_logname);
217
+ PERFORM shardman .replace_foreign_part_with_usual (NEW);
218
+ ELSE -- other nodes
199
219
END IF;
200
220
RETURN NULL ;
201
221
END
@@ -321,6 +341,7 @@ CREATE FUNCTION drop_repslot(slot_name text, with_fire bool DEFAULT false)
321
341
DECLARE
322
342
slot_exists bool;
323
343
BEGIN
344
+ RAISE DEBUG ' Dropping repslot %' , slot_name;
324
345
EXECUTE format(' SELECT EXISTS (SELECT * FROM pg_replication_slots
325
346
WHERE slot_name = %L)' , slot_name) INTO slot_exists;
326
347
IF slot_exists THEN
@@ -340,6 +361,25 @@ BEGIN
340
361
pg_replication_slots WHERE slot_name = %L' , slot_name);
341
362
END $$ LANGUAGE plpgsql STRICT;
342
363
364
+ -- If sub exists, disable it, detach repslot from it and possibly drop. We
365
+ -- manage repslots ourselves, so it is essential to detach rs before dropping
366
+ -- sub, and repslots can't be detached while subscription is active.
367
+ CREATE FUNCTION eliminate_sub (subname name, drop_sub bool DEFAULT true)
368
+ RETURNS void AS $$
369
+ DECLARE
370
+ sub_exists bool;
371
+ BEGIN
372
+ EXECUTE format(' SELECT count(*) > 0 FROM pg_subscription WHERE subname
373
+ = %L' , subname) INTO sub_exists;
374
+ IF sub_exists THEN
375
+ EXECUTE format(' ALTER SUBSCRIPTION %I DISABLE' , subname);
376
+ EXECUTE format(' ALTER SUBSCRIPTION %I SET (slot_name = NONE)' , subname);
377
+ IF drop_sub THEN
378
+ EXECUTE format(' DROP SUBSCRIPTION %I' , subname);
379
+ END IF;
380
+ END IF;
381
+ END $$ LANGUAGE plpgsql STRICT;
382
+
343
383
-- Remove all our logical replication stuff in case of drop extension.
344
384
-- Dropping extension cleanup is not that easy:
345
385
-- - pg offers event triggers sql_drop, dd_command_end and ddl_command_start
@@ -363,12 +403,7 @@ BEGIN
363
403
EXECUTE format(' DROP PUBLICATION %I' , pub .pubname );
364
404
END LOOP;
365
405
FOR sub IN SELECT subname FROM pg_subscription WHERE subname LIKE ' shardman_%' LOOP
366
- -- we are managing rep slots manually, so we need to detach it beforehand
367
- EXECUTE format(' ALTER SUBSCRIPTION %I DISABLE' , sub .subname );
368
- EXECUTE format(' ALTER SUBSCRIPTION %I SET (slot_name = NONE)' , sub .subname );
369
- IF drop_subs THEN
370
- EXECUTE format(' DROP SUBSCRIPTION %I' , sub .subname );
371
- END IF;
406
+ PERFORM shardman .eliminate_sub (sub .subname , drop_subs);
372
407
END LOOP;
373
408
-- TODO: drop repslots gracefully? For that we should iterate over all active
374
409
-- subscribers and turn off subscriptions first.
0 commit comments