Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 07c2f34

Browse files
committed
Fix wait of move partition completion
1 parent 079ddab commit 07c2f34

File tree

1 file changed

+43
-17
lines changed

1 file changed

+43
-17
lines changed

pg_shardman--1.0.sql

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ BEGIN
597597
PERFORM shardman.broadcast(subs, super_connstr => true);
598598

599599
-- Wait completion of partition copy and prohibit access to this partition
600-
PERFORM shardman.wait_copy_completion(src_node_id, mv_part_name);
600+
PERFORM shardman.wait_copy_completion(src_node_id, dst_node_id, mv_part_name);
601601

602602
RAISE NOTICE 'Copy of partition % from node % to % is completed',
603603
mv_part_name, src_node_id, dst_node_id;
@@ -659,7 +659,7 @@ BEGIN
659659
PERFORM shardman.broadcast(drop_fdws);
660660

661661
-- Truncate partition table and restore access to it at source node
662-
PERFORM shardman.complete_partition_move(src_node_id, mv_part_name);
662+
PERFORM shardman.complete_partition_move(src_node_id, dst_node_id, mv_part_name);
663663
END
664664
$$ LANGUAGE plpgsql;
665665

@@ -886,40 +886,66 @@ CREATE FUNCTION is_shardlord()
886886
RETURNS bool AS 'pg_shardman' LANGUAGE C STRICT;
887887

888888
-- Wait completion of partition copy using LR
889-
CREATE FUNCTION wait_copy_completion(src_node_id int, part_name text) RETURNS void AS $$
889+
CREATE FUNCTION wait_copy_completion(src_node_id int, dst_node_id int, part_name text) RETURNS void AS $$
890890
DECLARE
891891
slot text = format('copy_%s', part_name);
892892
lag bigint;
893893
response text;
894894
caughtup_threshold bigint = 1024*1024;
895895
timeout_sec int = 1;
896896
locked bool = false;
897+
synced bool = false;
898+
wal_lsn text;
897899
BEGIN
898900
LOOP
899-
response := shardman.broadcast(format('%s:SELECT confirmed_flush_lsn - pg_current_wal_lsn() FROM pg_replication_slots WHERE slot_name=%L;', src_node_id, slot));
900-
lag := response::bigint;
901-
902-
RAISE DEBUG 'Replication lag %', lag;
903-
IF locked THEN
904-
IF lag <= 0 THEN
905-
RETURN;
901+
IF NOT synced
902+
THEN
903+
response := shardman.broadcast(format(
904+
'%s:SELECT srsubstate FROM pg_subscription_rel srel
905+
JOIN pg_subscription s on srel.srsubid = s.oid where subname=%L;',
906+
dst_node_id, slot));
907+
IF response='r' THEN
908+
synced := true;
909+
RAISE DEBUG 'Table % sync completed', part_name;
910+
CONTINUE;
911+
END IF;
912+
ELSE
913+
IF NOT locked THEN
914+
response := shardman.broadcast(format('%s:SELECT pg_current_wal_lsn() - confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name=%L;', src_node_id, slot));
915+
ELSE
916+
response := shardman.broadcast(format('%s:SELECT %L - confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name=%L;', src_node_id, wal_lsn, slot));
917+
END IF;
918+
lag := response::bigint;
919+
920+
RAISE DEBUG 'Replication lag %', lag;
921+
IF locked THEN
922+
IF lag<=0 THEN
923+
RETURN;
924+
END IF;
925+
ELSIF lag < caughtup_threshold THEN
926+
PERFORM shardman.broadcast(format('%s:CREATE TRIGGER write_protection BEFORE INSERT OR UPDATE OR DELETE ON %I FOR EACH STATEMENT EXECUTE PROCEDURE shardman.deny_access();',
927+
src_node_id, part_name));
928+
SELECT shardman.broadcast(format('%s:SELECT pg_current_wal_lsn();', src_node_id)) INTO wal_lsn;
929+
locked := true;
930+
CONTINUE;
906931
END IF;
907-
ELSIF lag < caughtup_threshold THEN
908-
PERFORM shardman.broadcast(format('%s:REVOKE SELECT,INSERT,UPDATE,DELETE ON %I FROM PUBLIC;',
909-
src_node_id, part_name));
910-
locked := true;
911-
CONTINUE;
912932
END IF;
913933
PERFORM pg_sleep(timeout_sec);
914934
END LOOP;
915935
END
916936
$$ LANGUAGE plpgsql;
917937

918-
CREATE FUNCTION complete_partition_move(src_node_id int, part_name text) RETURNS void AS $$
938+
CREATE FUNCTION complete_partition_move(src_node_id int, dst_node_id int, part_name text) RETURNS void AS $$
919939
BEGIN
920940
PERFORM shardman.broadcast(format('%s:TRUNCATE TABLE %I;',
921941
src_node_id, part_name));
922-
PERFORM shardman.broadcast(format('%s:GRANT SELECT,INSERT,UPDATE,DELETE ON %I TO PUBLIC;',
942+
PERFORM shardman.broadcast(format('%s:DROP TRIGGER write_protection ON %I;',
923943
src_node_id, part_name));
924944
END
925945
$$ LANGUAGE plpgsql;
946+
947+
CREATE FUNCTION deny_access() RETURNS trigger AS $$
948+
BEGIN
949+
RAISE EXCEPTION 'Access to moving parition is temporary denied';
950+
END
951+
$$ LANGUAGE plpgsql;

0 commit comments

Comments
 (0)