Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 511db06

Browse files
committed
create_replica itself done, update_fdw and ensure_standby still stubbed
1 parent 5c22332 commit 511db06

File tree

4 files changed

+183
-101
lines changed

4 files changed

+183
-101
lines changed

bin/common.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ function run_demo()
6161
:
6262
psql -p 5433 -c "drop table if exists pt cascade;"
6363
psql -p 5433 -c "CREATE TABLE pt(id INT NOT NULL, payload REAL);"
64-
psql -p 5433 -c "INSERT INTO pt SELECT generate_series(1, 1000), random();"
64+
psql -p 5433 -c "INSERT INTO pt SELECT generate_series(1, 10), random();"
6565
psql -c "select shardman.add_node('port=5433');"
6666
psql -c "select shardman.add_node('port=5434');"
6767
psql -c "select shardman.create_hash_partitions(2, 'pt', 'id', 2);"

init.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,12 @@ CREATE FUNCTION drop_repslot(slot_name text, with_fire bool DEFAULT false)
193193
DECLARE
194194
slot_exists bool;
195195
BEGIN
196-
RAISE DEBUG 'Dropping repslot %', slot_name;
196+
RAISE DEBUG '[SHARDMAN] Dropping repslot %', slot_name;
197197
EXECUTE format('SELECT EXISTS (SELECT * FROM pg_replication_slots
198198
WHERE slot_name = %L)', slot_name) INTO slot_exists;
199199
IF slot_exists THEN
200200
IF with_fire THEN -- kill walsender twice
201-
RAISE DEBUG 'Killing repslot % with fire', slot_name;
201+
RAISE DEBUG '[SHARDMAN] Killing repslot % with fire', slot_name;
202202
PERFORM shardman.terminate_repslot_walsender(slot_name);
203203
PERFORM pg_sleep(1);
204204
PERFORM shardman.terminate_repslot_walsender(slot_name);

shard.sql

Lines changed: 65 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ CREATE TABLE partitions (
8383
);
8484

8585
------------------------------------------------------------
86-
-- Metadata triggers
86+
-- Metadata triggers and funcs called from libpq updating metadata & LR channels
8787
------------------------------------------------------------
8888

8989
-- On adding new primary, create proper foreign server & foreign table and
@@ -140,67 +140,73 @@ CREATE TRIGGER primary_moved AFTER UPDATE ON shardman.partitions
140140
-- fire trigger only on worker nodes
141141
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER primary_moved;
142142

143-
-- Update metadata according to new replica creation.
144-
-- Old tail part is still read-only when this called. There are two main jobs
145-
-- to do: set up LR sync channel between old tail and new replica and update fdw
146-
-- everywhere. For the former we could configure already existing channel used
147-
-- for partition copy, but we will not do that, because
148-
-- * It is not easier than creating new pub & sub: we have to rename pub, drop
149-
-- and create repslot (there is no way to rename it), rename sub, alter sub's
150-
-- slot_name, alter sub's publication, probably rename sub application name,
151-
-- probably run REFRESH (which requires alive pub just as CREATE SUBSCRIPTION)
152-
-- and hope that everything will be ok. Not sure about refreshing, though -- I
153-
-- don't know is it ok not doing it if tables didn't change. Doc says it
154-
-- should be executed.
155-
-- * Since it is not possible to rename repslot and and it is not possible to
156-
-- specify since which lsn start replication, tables must be synced anyway
157-
-- during these operations, so what the point of reusing old sub? And copypart
158-
-- in shard.c really cares that tables are synced at this moment and src is
159-
-- read-only.
160-
CREATE FUNCTION replica_created() RETURNS TRIGGER AS $$
143+
-- Executed on newtail node, see cr_rebuild_lr
144+
CREATE FUNCTION replica_created_rebuild_drop_cp_sub(
145+
part_name name, oldtail int, newtail int) RETURNS void AS $$
161146
DECLARE
162-
cp_logname text := shardman.get_cp_logname(NEW.part_name, NEW.prv, NEW.owner);
163-
oldtail_pubname name := shardman.get_data_pubname(NEW.part_name, NEW.prv);
164-
oldtail_connstr text := shardman.get_worker_node_connstr(NEW.prv);
165-
newtail_subname name := shardman.get_data_subname(NEW.part_name, NEW.prv, NEW.owner);
166-
my_id int := shardman.get_node_id();
147+
cp_logname text := shardman.get_cp_logname(part_name, oldtail, newtail);
167148
BEGIN
168-
RAISE DEBUG '[SHARDMAN] replica_created trigger called';
169-
IF my_id = NEW.prv THEN -- old tail node
170-
-- Drop publication & repslot used for copy
171-
PERFORM shardman.drop_repslot_and_pub(cp_logname);
172-
-- Create publication & repslot for new data channel
173-
PERFORM shardman.create_repslot(oldtail_pubname);
174-
EXECUTE format('DROP PUBLICATION IF EXISTS %I', oldtail_pubname);
175-
EXECUTE format('CREATE PUBLICATION %I FOR TABLE %I',
176-
oldtail_pubname, NEW.part_name);
177-
-- Make this channel sync
178-
PERFORM shardman.ensure_sync_standby(newtail_subname);
179-
-- Now it is safe to make old tail writable again
180-
PERFORM shardman.readonly_table_off(relation);
181-
ELSEIF my_id = NEW.owner THEN -- created replica, i.e. new tail node
182-
-- Drop subscription used for copy
183-
PERFORM shardman.eliminate_sub(cp_logname);
184-
-- And create subscription for new data channel
185-
-- It should never exist at this moment, but just in case...
186-
PERFORM shardman.eliminate_sub(newtail_subname);
187-
EXECUTE format(
188-
'CREATE SUBSCRIPTION %I connection %L
189-
PUBLICATION %I with (create_slot = false, slot_name = %L);',
190-
newtail_subname, oldtail_connstr, oldtail_pubname, oldtail_pubname);
191-
-- Now fdw connstring to this part should include only primary and myself
192-
PERFORM shardman.update_fdw_server(NEW);
193-
ELSE -- other nodes
194-
-- just update fdw connstr to add new replica
195-
PERFORM shardman.update_fdw_server(NEW);
196-
END IF;
197-
RETURN NULL;
198-
END
199-
$$ LANGUAGE plpgsql;
200-
CREATE TRIGGER replica_created AFTER INSERT ON shardman.partitions
201-
FOR EACH ROW WHEN (NEW.prv IS NOT NULL) EXECUTE PROCEDURE replica_created();
149+
PERFORM shardman.readonly_replica_on(part_name::regclass);
150+
-- Drop subscription used for copy
151+
PERFORM shardman.eliminate_sub(cp_logname);
152+
END $$ LANGUAGE plpgsql;
153+
154+
-- Executed on oldtail node, see cr_rebuild_lr
155+
CREATE FUNCTION replica_created_rebuild_lr_create_data_pub(
156+
part_name name, oldtail int, newtail int) RETURNS void AS $$
157+
DECLARE
158+
cp_logname text := shardman.get_cp_logname(part_name, oldtail, newtail);
159+
oldtail_pubname name := shardman.get_data_pubname(part_name, oldtail);
160+
newtail_subname name := shardman.get_data_subname(part_name, oldtail, newtail);
161+
BEGIN
162+
-- Repslot for new data channel. Must be first, since we "cannot create
163+
-- logical replication slot in transaction that has performed writes"
164+
PERFORM shardman.create_repslot(oldtail_pubname);
165+
-- Drop publication & repslot used for copy
166+
PERFORM shardman.drop_repslot_and_pub(cp_logname);
167+
-- Create publication for new data channel
168+
EXECUTE format('DROP PUBLICATION IF EXISTS %I', oldtail_pubname);
169+
EXECUTE format('CREATE PUBLICATION %I FOR TABLE %I',
170+
oldtail_pubname, part_name);
171+
-- Make this channel sync
172+
PERFORM shardman.ensure_sync_standby(newtail_subname);
173+
-- Now it is safe to make old tail writable again
174+
PERFORM shardman.readonly_table_off(part_name::regclass);
175+
END $$ LANGUAGE plpgsql;
176+
177+
-- Executed on oldtail node, see cr_rebuild_lr
178+
CREATE FUNCTION replica_created_rebuild_lr_create_data_sub(
179+
part_name name, oldtail int, newtail int) RETURNS void AS $$
180+
DECLARE
181+
oldtail_pubname name := shardman.get_data_pubname(part_name, oldtail);
182+
oldtail_connstr text := shardman.get_worker_node_connstr(oldtail);
183+
newtail_subname name := shardman.get_data_subname(part_name, oldtail, newtail);
184+
BEGIN
185+
-- Create subscription for new data channel
186+
-- It should never exist at this moment, but just in case...
187+
PERFORM shardman.eliminate_sub(newtail_subname);
188+
EXECUTE format(
189+
'CREATE SUBSCRIPTION %I connection %L
190+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);',
191+
newtail_subname, oldtail_connstr, oldtail_pubname, oldtail_pubname);
192+
END $$ LANGUAGE plpgsql;
193+
194+
-- TODO
195+
-- Update fdw according to new replica creation. We update it on newtail node --
196+
-- its connstring to this part should include only primary and newtail itself,
197+
-- and on all other nodes except oldtail, so they learn about new replica.
198+
-- CREATE FUNCTION replica_created_update_fdw() RETURNS TRIGGER AS $$
199+
-- BEGIN
200+
-- RAISE DEBUG '[SHARDMAN] replica_created_update_fdw trigger called';
201+
-- IF shardman.get_node_id() != NEW.prv THEN -- don't update on oldtail node
202+
-- PERFORM shardman.update_fdw_server(NEW);
203+
-- END IF;
204+
-- RETURN NULL;
205+
-- END $$ LANGUAGE plpgsql;
206+
-- CREATE TRIGGER replica_created AFTER INSERT ON shardman.partitions
207+
-- FOR EACH ROW WHEN (NEW.prv IS NOT NULL) EXECUTE PROCEDURE replica_created();
202208
-- fire trigger only on worker nodes
203-
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER replica_created;
209+
-- ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER replica_created;
204210

205211
-- Otherwise partitioned tables on worker nodes not will be dropped properly,
206212
-- see pathman's docs.
@@ -462,7 +468,6 @@ BEGIN
462468
'If you see this, most probably node with primary part has failed and' ||
463469
' you need to promote replica. Promotion is not yet implemented, sorry :(';
464470
END IF;
465-
raise warning 'NEW IS %', NEW;
466471
RETURN NEW;
467472
END $$ LANGUAGE plpgsql;
468473
-- And make replica writable again

0 commit comments

Comments
 (0)