Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d2a0aad

Browse files
committed
Merge remote-tracking branch 'origin/master'
Merging partition removal.
2 parents e92d228 + eba355c commit d2a0aad

File tree

3 files changed

+122
-1
lines changed

3 files changed

+122
-1
lines changed

init.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,16 @@ END
7272
$$ LANGUAGE plpgsql;
7373

7474
-- Remove node. Its state will be reset, all shardman data lost.
75-
CREATE FUNCTION rm_node(node_id int) RETURNS int AS $$
75+
CREATE FUNCTION rm_node(node_id int, force bool default false) RETURNS int AS $$
7676
DECLARE
7777
c_id int;
7878
BEGIN
7979
INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'rm_node')
8080
RETURNING id INTO c_id;
8181
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, node_id);
82+
IF force THEN
83+
INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, 'force');
84+
END IF;
8285
RETURN c_id;
8386
END
8487
$$ LANGUAGE plpgsql;

shard.sql

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,89 @@ CREATE TRIGGER part_moved AFTER UPDATE ON shardman.partitions
251251
-- fire trigger only on worker nodes
252252
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER part_moved;
253253

254+
255+
-- Partition removed: drop old LR channels.
256+
CREATE FUNCTION part_removed() RETURNS TRIGGER AS $$
257+
DECLARE
258+
me int := shardman.my_id();
259+
prev_src_lname text;
260+
src_next_lname text;
261+
new_primary partitions;
262+
drop_slot_delay int := 2000; -- two seconds
263+
BEGIN
264+
RAISE DEBUG '[SHMN] part_removed trigger called for part %, owner %',
265+
OLD.part_name, OLD.owner;
266+
267+
ASSERT (OLD.prv IS NULL OR OLD.nxt IS NULL), 'We currently do not support redundancy level > 2';
268+
269+
IF OLD.prv IS NOT NULL THEN
270+
prev_src_lname := shardman.get_data_lname(OLD.part_name, OLD.prv, OLD.owner);
271+
ELSE
272+
-- Primaty is moved
273+
select * from shardman.partitions where owner=OLD.nxt and part_name=OLD.part_name into new_primary;
274+
END IF;
275+
IF OLD.nxt IS NOT NULL THEN
276+
src_next_lname := shardman.get_data_lname(OLD.part_name, OLD.owner, OLD.nxt);
277+
END IF;
278+
279+
280+
IF me = OLD.owner THEN -- src node
281+
-- If primary part was moved, replace on src node its partition with
282+
-- foreign one
283+
IF OLD.prv IS NULL THEN
284+
PERFORM shardman.replace_usual_part_with_foreign(new_primary);
285+
ELSE
286+
-- On the other hand, if prev replica existed, drop sub for old
287+
-- channel prev -> src
288+
PERFORM shardman.eliminate_sub(prev_src_lname);
289+
END IF;
290+
IF OLD.nxt IS NOT NULL THEN
291+
-- If next replica existed, drop pub for old channel src -> next
292+
-- Wait sometime to let other node first remove subscription
293+
PERFORM pg_sleep(drop_slot_delay);
294+
PERFORM shardman.drop_repslot_and_pub(src_next_lname);
295+
PERFORM shardman.remove_sync_standby(src_next_lname);
296+
END IF;
297+
-- Drop old table anyway
298+
EXECUTE format('DROP TABLE IF EXISTS %I', OLD.part_name);
299+
ELSEIF me = OLD.prv THEN -- node with prev replica
300+
-- Wait sometime to let other node first remove subscription
301+
PERFORM pg_sleep(drop_slot_delay);
302+
-- Drop pub for old channel prev -> src
303+
PERFORM shardman.drop_repslot_and_pub(prev_src_lname);
304+
PERFORM shardman.remove_sync_standby(prev_src_lname);
305+
-- Update L2-list (TODO: change replication model from chain to star)
306+
PERFORM update shardman.partitions set nxt=OLD.nxt where owner=me and part_name=OLD.part_name;
307+
ELSEIF me = OLD.nxt THEN -- node with next replica
308+
-- Drop sub for old channel src -> next
309+
PERFORM shardman.eliminate_sub(src_next_lname);
310+
-- Update L2-list (TODO: change replication model from chain to star)
311+
PERFORM update shardman.partitions set prv=OLD.prv where owner=me and part_name=OLD.part_name;
312+
-- This replica is promoted to primary node, so drop trigger disabling writes to the table
313+
PERFORM readonly_replica_off(part_name);
314+
-- Replace FDW with local partition
315+
PERFORM shardman.replace_foreign_part_with_usual(new_primary);
316+
END IF;
317+
318+
-- If primary was moved
319+
IF OLD.prv IS NULL THEN
320+
-- then update fdw almost everywhere
321+
PERFORM shardman.update_fdw_server(new_primary);
322+
END IF;
323+
324+
RETURN NULL;
325+
END
326+
$$ LANGUAGE plpgsql;
327+
328+
CREATE TRIGGER part_removed AFTER REMOVE ON shardman.partitions
329+
FOR EACH ROW
330+
EXECUTE PROCEDURE part_removed();
331+
-- fire trigger only on worker nodes
332+
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER part_removed;
333+
334+
335+
336+
254337
-- Executed on newtail node, see cr_rebuild_lr
255338
CREATE FUNCTION replica_created_drop_cp_sub(
256339
part_name name, oldtail int, newtail int) RETURNS void AS $$

src/pg_shardman.c

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,41 @@ rm_node(Cmd *cmd)
768768
{
769769
int32 node_id = atoi(cmd->opts[0]);
770770
char *sql;
771+
char **opts;
772+
bool force = false;
773+
int e;
774+
775+
for (opts = cmd->opts; *opts; opts++)
776+
{
777+
if (strcmp(*opts, "force") == 0)
778+
{
779+
force = true;
780+
break;
781+
}
782+
}
783+
784+
SPI_PROLOG;
785+
if (force)
786+
{
787+
sql = psprintf("delete from shardman.partitions where owner=%d", node_id);
788+
void_spi(sql);
789+
}
790+
else
791+
{
792+
bool isnull;
793+
sql = psprintf("select count(*) from shardman.partitions where owner=%d", node_id);
794+
e = SPI_execute(sql, true, 0);
795+
if (e < 0)
796+
shmn_elog(FATAL, "Stmt failed: %s", sql);
797+
Assert(SPI_processed == 1);
798+
if (DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull)) != 0)
799+
{
800+
ereport(ERROR, (errmsg("Can not remove node with existed partitions"),
801+
errhint("Add \"force\" option to remove node with existed partitions.")));
802+
}
803+
}
804+
pfree(sql);
805+
SPI_EPILOG;
771806

772807
elog(INFO, "Removing node %d ", node_id);
773808
if (!node_in_cluster(node_id))

0 commit comments

Comments
 (0)