Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7acdef8

Browse files
committed
rm_node deletes paritions.
This is refactor of commit e0762a9. Partiiton deletion implemented with trigger on row removal. We are in process of switching replication model from sausage to spider, you know, and this code assumes that there are no more than two partitions, in which case models are equal. Since we assume only 2 copies of data, after primary removal there is no need to choose new primary and rebuilt logical channels from it to other copies. Because of that, we implemented 'promotion' in trigger on part removal. Basically, we just mark partition as primary, replace fdw -> usual, usual -> fdw on nodes with replica and old primary and update fdw everywhere.
1 parent d2a0aad commit 7acdef8

File tree

4 files changed

+102
-61
lines changed

4 files changed

+102
-61
lines changed

membership.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,18 @@ BEGIN
7272
UPDATE shardman.local_meta SET v = my_id WHERE k = 'my_id';
7373
END $$ LANGUAGE plpgsql STRICT;
7474

75+
-- This node is shardlord?
76+
CREATE FUNCTION me_lord() RETURNS bool AS $$
77+
BEGIN
78+
RETURN shardlord FROM shardman.nodes WHERE id = shardman.my_id();
79+
END $$ LANGUAGE plpgsql STRICT;
80+
81+
-- This node is worker node?
82+
CREATE FUNCTION me_worker() RETURNS bool AS $$
83+
BEGIN
84+
RETURN worker_status = 'active' FROM shardman.nodes WHERE id = shardman.my_id();
85+
END $$ LANGUAGE plpgsql STRICT;
86+
7587
-- Get local node connstr regardless of its state. Returns NULL if node is not
7688
-- in cluster and never was in one.
7789
CREATE FUNCTION my_connstr() RETURNS text AS $$

readme.txt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,12 @@ managed by current shardlord), this state will be lost.
8383
my_id()
8484
Get this node's id.
8585

86-
rm_node(node_id int)
87-
Remove node from the cluster. Its shardman state will be reset. We don't delete
88-
tables with data and foreign tables though.
86+
rm_node(node_id int, force bool default false)
87+
Remove node from the cluster. If 'force' is true, we don't care whether node
88+
contains any partitions. Otherwise we won't allow to rm node holding shards.
89+
shardman's stuff (pubs, subs, repslots) on deleted node will most probably be
90+
reset if node is alive, but that is not guaranteed. We never delete tables with
91+
data and foreign tables.
8992

9093
You can see all cluster nodes at any time by examining shardman.nodes table:
9194
-- active is the normal mode, removed means node removed, rm_in_progress is

shard.sql

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -252,84 +252,101 @@ CREATE TRIGGER part_moved AFTER UPDATE ON shardman.partitions
252252
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER part_moved;
253253

254254

255-
-- Partition removed: drop old LR channels.
255+
-- Partition removed: drop LR channel and promote replica if primary was
256+
-- removed. Since for now we support only 2 copies (1 replica), we promote
257+
-- replica immediately if needed. Case with several replicas is much more
258+
-- complex because we need to rebuild LR channels, so later we will have
259+
-- separate cmd promote_replica(), while part deletion will just perform
260+
-- cleanup. Here we do nothing if we are removing the last copy of data, the
261+
-- caller is responsible for tracking that.
256262
CREATE FUNCTION part_removed() RETURNS TRIGGER AS $$
257263
DECLARE
264+
replica_removed bool := OLD.prv IS NOT NULL; -- replica or primary removed?
265+
-- if primary removed, is there replica that we will promote?
266+
replica_exists bool := OLD.nxt IS NOT NULL;
267+
prim_repl_lname text; -- channel between primary and replica
258268
me int := shardman.my_id();
259-
prev_src_lname text;
260-
src_next_lname text;
261-
new_primary partitions;
262-
drop_slot_delay int := 2000; -- two seconds
269+
new_primary shardman.partitions;
270+
drop_slot_delay int := 2; -- two seconds
263271
BEGIN
264272
RAISE DEBUG '[SHMN] part_removed trigger called for part %, owner %',
265273
OLD.part_name, OLD.owner;
266274

267275
ASSERT (OLD.prv IS NULL OR OLD.nxt IS NULL), 'We currently do not support redundancy level > 2';
268276

269-
IF OLD.prv IS NOT NULL THEN
270-
prev_src_lname := shardman.get_data_lname(OLD.part_name, OLD.prv, OLD.owner);
271-
ELSE
272-
-- Primaty is moved
273-
select * from shardman.partitions where owner=OLD.nxt and part_name=OLD.part_name into new_primary;
274-
END IF;
275-
IF OLD.nxt IS NOT NULL THEN
276-
src_next_lname := shardman.get_data_lname(OLD.part_name, OLD.owner, OLD.nxt);
277+
-- get log channel name and part we will promote, if any
278+
IF replica_removed THEN
279+
prim_repl_lname := shardman.get_data_lname(OLD.part_name, OLD.prv, OLD.owner);
280+
ELSE -- Primary is removed
281+
IF replica_exists THEN -- Primary removed, and it has replica
282+
prim_repl_lname := shardman.get_data_lname(OLD.part_name, OLD.owner,
283+
OLD.nxt);
284+
-- This replica is new primary
285+
SELECT * FROM shardman.partitions
286+
WHERE owner = OLD.nxt AND part_name= OLD.part_name INTO new_primary;
287+
-- whole record nullability seems to be non-working
288+
ASSERT new_primary.part_name IS NOT NULL;
289+
END IF;
277290
END IF;
278291

279-
280-
IF me = OLD.owner THEN -- src node
281-
-- If primary part was moved, replace on src node its partition with
282-
-- foreign one
283-
IF OLD.prv IS NULL THEN
284-
PERFORM shardman.replace_usual_part_with_foreign(new_primary);
285-
ELSE
286-
-- On the other hand, if prev replica existed, drop sub for old
287-
-- channel prev -> src
288-
PERFORM shardman.eliminate_sub(prev_src_lname);
289-
END IF;
290-
IF OLD.nxt IS NOT NULL THEN
291-
-- If next replica existed, drop pub for old channel src -> next
292-
-- Wait sometime to let other node first remove subscription
293-
PERFORM pg_sleep(drop_slot_delay);
294-
PERFORM shardman.drop_repslot_and_pub(src_next_lname);
295-
PERFORM shardman.remove_sync_standby(src_next_lname);
292+
IF me = OLD.owner THEN -- part dropped on us
293+
IF replica_removed THEN -- replica removed on us
294+
PERFORM shardman.eliminate_sub(prim_repl_lname);
295+
ELSE -- primary removed on us
296+
IF replica_exists IS NOT NULL THEN
297+
-- If next replica existed, drop pub & rs for data channel
298+
-- Wait sometime to let replica first remove subscription
299+
PERFORM pg_sleep(drop_slot_delay);
300+
PERFORM shardman.drop_repslot_and_pub(prim_repl_lname);
301+
PERFORM shardman.remove_sync_standby(prim_repl_lname);
302+
-- replace removed table with foreign one on promoted replica
303+
PERFORM shardman.replace_usual_part_with_foreign(new_primary);
304+
END IF;
296305
END IF;
297306
-- Drop old table anyway
298307
EXECUTE format('DROP TABLE IF EXISTS %I', OLD.part_name);
299-
ELSEIF me = OLD.prv THEN -- node with prev replica
308+
ELSEIF me = OLD.prv THEN -- node with primary for which replica was dropped
300309
-- Wait sometime to let other node first remove subscription
301310
PERFORM pg_sleep(drop_slot_delay);
302-
-- Drop pub for old channel prev -> src
303-
PERFORM shardman.drop_repslot_and_pub(prev_src_lname);
304-
PERFORM shardman.remove_sync_standby(prev_src_lname);
305-
-- Update L2-list (TODO: change replication model from chain to star)
306-
PERFORM update shardman.partitions set nxt=OLD.nxt where owner=me and part_name=OLD.part_name;
307-
ELSEIF me = OLD.nxt THEN -- node with next replica
308-
-- Drop sub for old channel src -> next
309-
PERFORM shardman.eliminate_sub(src_next_lname);
310-
-- Update L2-list (TODO: change replication model from chain to star)
311-
PERFORM update shardman.partitions set prv=OLD.prv where owner=me and part_name=OLD.part_name;
312-
-- This replica is promoted to primary node, so drop trigger disabling writes to the table
313-
PERFORM readonly_replica_off(part_name);
311+
-- Drop pub & rs for data channel
312+
PERFORM shardman.drop_repslot_and_pub(prim_repl_lname);
313+
PERFORM shardman.remove_sync_standby(prim_repl_lname);
314+
ELSEIF me = OLD.nxt THEN -- node with replica for which primary was dropped
315+
-- Drop sub for data channel
316+
PERFORM shardman.eliminate_sub(prim_repl_lname);
317+
-- This replica is promoted to primary node, so drop trigger disabling
318+
-- writes to the table and replace fdw with normal part
319+
PERFORM shardman.readonly_replica_off(OLD.part_name);
314320
-- Replace FDW with local partition
315321
PERFORM shardman.replace_foreign_part_with_usual(new_primary);
316322
END IF;
317323

318-
-- If primary was moved
319-
IF OLD.prv IS NULL THEN
320-
-- then update fdw almost everywhere
324+
IF NOT replica_removed AND shardman.me_worker() THEN
325+
-- update fdw almost everywhere
321326
PERFORM shardman.update_fdw_server(new_primary);
322327
END IF;
323328

329+
IF shardman.me_lord() THEN
330+
-- update partitions table: promote replica immediately after primary
331+
-- removal or remove link to dropped replica.
332+
IF replica_removed THEN
333+
UPDATE shardman.partitions SET nxt = NULL WHERE owner = OLD.prv AND
334+
part_name = OLD.part_name;
335+
ELSE
336+
UPDATE shardman.partitions SET prv = NULL
337+
WHERE owner = OLD.nxt AND part_name = OLD.part_name;
338+
END IF;
339+
END IF;
340+
324341
RETURN NULL;
325342
END
326343
$$ LANGUAGE plpgsql;
327344

328-
CREATE TRIGGER part_removed AFTER REMOVE ON shardman.partitions
345+
CREATE TRIGGER part_removed AFTER DELETE ON shardman.partitions
329346
FOR EACH ROW
330347
EXECUTE PROCEDURE part_removed();
331-
-- fire trigger only on worker nodes
332-
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER part_removed;
348+
-- fire trigger only on either shardlord and worker nodes
349+
ALTER TABLE shardman.partitions ENABLE ALWAYS TRIGGER part_removed;
333350

334351

335352

src/pg_shardman.c

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,7 @@ rm_node(Cmd *cmd)
771771
char **opts;
772772
bool force = false;
773773
int e;
774+
int64 parts_on_node;
774775

775776
for (opts = cmd->opts; *opts; opts++)
776777
{
@@ -781,28 +782,36 @@ rm_node(Cmd *cmd)
781782
}
782783
}
783784

784-
SPI_PROLOG;
785785
if (force)
786786
{
787-
sql = psprintf("delete from shardman.partitions where owner=%d", node_id);
787+
sql = psprintf("delete from shardman.partitions where owner=%d",
788+
node_id);
788789
void_spi(sql);
789790
}
790791
else
791792
{
792793
bool isnull;
793-
sql = psprintf("select count(*) from shardman.partitions where owner=%d", node_id);
794+
sql = psprintf(
795+
"select count(*) from shardman.partitions where owner=%d",
796+
node_id);
797+
SPI_PROLOG;
794798
e = SPI_execute(sql, true, 0);
795799
if (e < 0)
796800
shmn_elog(FATAL, "Stmt failed: %s", sql);
797801
Assert(SPI_processed == 1);
798-
if (DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &isnull)) != 0)
799-
{
800-
ereport(ERROR, (errmsg("Can not remove node with existed partitions"),
801-
errhint("Add \"force\" option to remove node with existed partitions.")));
802-
}
802+
parts_on_node =
803+
DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
804+
SPI_tuptable->tupdesc, 1, &isnull));
805+
SPI_EPILOG;
803806
}
804807
pfree(sql);
805-
SPI_EPILOG;
808+
if (parts_on_node != 0)
809+
{
810+
shmn_elog(WARNING, "Can't remove node %d with existing shards. Add \"force\" option to ignore this",
811+
node_id);
812+
update_cmd_status(cmd->id, "failed");
813+
return;
814+
}
806815

807816
elog(INFO, "Removing node %d ", node_id);
808817
if (!node_in_cluster(node_id))

0 commit comments

Comments
 (0)