@@ -371,6 +371,12 @@ BEGIN
371
371
-- Exclude partitions of removed node
372
372
FOR part in SELECT * from shardman .partitions where node_id= rm_node_id
373
373
LOOP
374
+ -- If there are more than one replica of this partition, we need to synchronize them
375
+ IF shardman .get_redundancy_of_partition (part .part_name )> 1
376
+ THEN
377
+ PERFORM shardman .synchronize_replicas (part .part_name );
378
+ END IF;
379
+
374
380
-- Is there some replica of this node?
375
381
SELECT node_id INTO new_master_id FROM shardman .replicas WHERE part_name= part .part_name ORDER BY random() LIMIT 1 ;
376
382
IF new_master_id IS NOT NULL
@@ -577,7 +583,13 @@ BEGIN
577
583
-- Broadcast alter subscription commands
578
584
PERFORM shardman .broadcast (subs, synchronous => copy_data, super_connstr => true);
579
585
580
- -- This function doesn't wait completion of replication sync.
586
+ -- Maintain change log to be able to synchronize replicas after primary node failure
587
+ IF redundancy > 1
588
+ THEN
589
+ PERFORM shardman .generate_on_change_triggers (rel_name);
590
+ END IF;
591
+
592
+ -- This function doesn't wait completion of replication sync.
581
593
-- Use wait ensure_redundancy function to wait until sync is completed
582
594
END
583
595
$$ LANGUAGE plpgsql;
@@ -598,7 +610,7 @@ BEGIN
598
610
RETURN;
599
611
END IF;
600
612
601
- -- Wait until all subscritpion switch to ready state
613
+ -- Wait until all subscriptions switch to ready state
602
614
LOOP
603
615
poll := ' ' ;
604
616
FOR src_node_id IN SELECT id FROM shardman .nodes
@@ -611,7 +623,7 @@ BEGIN
611
623
END LOOP;
612
624
END LOOP;
613
625
614
- -- Poll subsciption statuses at all nodes
626
+ -- Poll subscription statuses at all nodes
615
627
response := shardman .broadcast (poll);
616
628
617
629
-- Check if all are ready
@@ -931,16 +943,13 @@ BEGIN
931
943
RAISE EXCEPTION ' There is no node with ID % in the cluster' , master_node_id;
932
944
END IF;
933
945
934
- -- Get information about relation attributes and primary keys needed to construct CREATE TABLE statement and
935
- PERFORM shardman .get_relation_metadata (rel, dst, src, pk);
936
-
937
946
-- Generate SQL statement creating this table
938
947
SELECT shardman .gen_create_table_sql (rel_name) INTO create_table;
939
948
940
949
-- Construct table attributes for create foreign table
941
950
SELECT shardman .reconstruct_table_attrs (rel) INTO table_attrs;
942
951
943
- -- Generatate SQL statements creating instead rules for updates
952
+ -- Generate SQL statements creating instead rules for updates
944
953
SELECT shardman .gen_create_rules_sql (rel_name) INTO create_rules;
945
954
946
955
-- Create table at all nodes
@@ -1004,6 +1013,7 @@ DECLARE
1004
1013
src_repl_group text ;
1005
1014
dst_repl_group text ;
1006
1015
master_node_id int ;
1016
+ rel_name text ;
1007
1017
BEGIN
1008
1018
IF shardman .redirect_to_shardlord (format(' mv_replica(%L, %L, %L)' , mv_part_name, src_node_id, dst_node_id))
1009
1019
THEN
@@ -1038,7 +1048,7 @@ BEGIN
1038
1048
END IF;
1039
1049
1040
1050
-- Get node ID of primary partition
1041
- SELECT node_id INTO master_node_id FROM shardman .partitions WHERE part_name= mv_part_name;
1051
+ SELECT node_id,relation INTO master_node_id,rel_name FROM shardman .partitions WHERE part_name= mv_part_name;
1042
1052
1043
1053
IF master_node_id= dst_node_id
1044
1054
THEN
@@ -1064,6 +1074,14 @@ BEGIN
1064
1074
1065
1075
-- Truncate original table
1066
1076
PERFORM shardman .broadcast (format(' %s:TRUNCATE TABLE %I;' , src_node_id, mv_part_name));
1077
+
1078
+ -- If there are more than one replica, we need to maintain change_log table for it
1079
+ IF shardman .get_redundancy_of_partition (mv_part_name) > 1
1080
+ THEN
1081
+ PERFORM shardman .broadcast (format(' {%s:%s}{%s:%s}' ,
1082
+ dst_node_id, shardman .create_on_change_triggers (rel_name, mv_part_name),
1083
+ src_node_id, shardman .drop_on_change_triggers (mv_part_name)));
1084
+ END IF;
1067
1085
END
1068
1086
$$ LANGUAGE plpgsql;
1069
1087
@@ -1130,7 +1148,7 @@ $$ LANGUAGE plpgsql;
1130
1148
1131
1149
-- Get self node identifier
1132
1150
CREATE FUNCTION get_my_id () RETURNS int AS $$
1133
- DECLARE
1151
+ DECLARE
1134
1152
node_id int ;
1135
1153
BEGIN
1136
1154
SELECT shardman .broadcast (format(' 0:SELECT id FROM shardman.nodes WHERE system_id=%s;' , shardman .get_system_identifier ()))::int INTO node_id;
@@ -1426,11 +1444,14 @@ BEGIN
1426
1444
PERFORM shardman .broadcast (pubs, super_connstr => true);
1427
1445
-- Create not existed subscriptions
1428
1446
PERFORM shardman .broadcast (subs, super_connstr => true);
1447
+
1448
+ -- Create not existed on_change triggers
1449
+ PERFORM shardman .generate_on_change_triggers ();
1429
1450
END
1430
1451
$$ LANGUAGE plpgsql;
1431
1452
1432
1453
1433
- -- Alter table at sharload and all nodes
1454
+ -- Alter table at shardlord and all nodes
1434
1455
CREATE FUNCTION alter_table (rel regclass, alter_clause text ) RETURNS void AS $$
1435
1456
DECLARE
1436
1457
rel_name text = rel::text ;
@@ -1644,7 +1665,7 @@ CREATE FUNCTION synchronous_replication()
1644
1665
CREATE FUNCTION is_shardlord ()
1645
1666
RETURNS bool AS ' pg_shardman' LANGUAGE C STRICT;
1646
1667
1647
- -- Get subscription status
1668
+ -- Get subscription status
1648
1669
CREATE FUNCTION is_subscription_ready (sname text ) RETURNS bool AS $$
1649
1670
DECLARE
1650
1671
n_not_ready bigint ;
@@ -1719,6 +1740,7 @@ BEGIN
1719
1740
END
1720
1741
$$ LANGUAGE plpgsql;
1721
1742
1743
+ -- Truncate the partition at source node after copy completion and switch off write protection for this partition
1722
1744
CREATE FUNCTION complete_partition_move (src_node_id int , dst_node_id int , part_name text ) RETURNS void AS $$
1723
1745
BEGIN
1724
1746
PERFORM shardman .broadcast (format(' %s:TRUNCATE TABLE %I;' ,
@@ -1728,20 +1750,189 @@ BEGIN
1728
1750
END
1729
1751
$$ LANGUAGE plpgsql;
1730
1752
1753
+ -- Trigger procedure prohibiting modification of the table
1731
1754
CREATE FUNCTION deny_access () RETURNS trigger AS $$
1732
1755
BEGIN
1733
1756
RAISE EXCEPTION ' Access to moving partition is temporary denied' ;
1734
1757
END
1735
1758
$$ LANGUAGE plpgsql;
1736
1759
1760
+ -- In case of primary node failure ensure that all replicas are identical using change_log table.
1761
+ -- We check last seqno stored in change_log at all replicas and for each lagging replica (last_seqno < max_seqno) perform three actions:
1762
+ -- 1. Copy missing part of change_log table
1763
+ -- 2. Delete all records from partition which primary key=old_pk in change_log table with seqno>last_seqno
1764
+ -- 3. Copy from advanced replica those records which primary key=new_pk in change_log table with seqno>last_seqno
1765
+ CREATE FUNCTION synchronize_replicas (pname text ) RETURNS void AS $$
1766
+ DECLARE
1767
+ max_seqno bigint = 0 ;
1768
+ seqno bigint ;
1769
+ advanced_node int ;
1770
+ replica shardman .replicas ;
1771
+ BEGIN
1772
+ -- Select most advanced replica: replica with largest seqno
1773
+ FOR replica IN SELECT * FROM shardman .replicas WHERE part_name= pname
1774
+ LOOP
1775
+ SELECT shardman .broadcast (format(' %s:SELECT max(seqno) FROM %s_change_log;' ,
1776
+ replica .node_id , replica .part_name ))::bigint INTO seqno;
1777
+ IF seqno > max_seqno
1778
+ THEN
1779
+ max_seqno := seqno;
1780
+ advanced_node := replica .node_id ;
1781
+ END IF;
1782
+ END LOOP;
1783
+
1784
+ -- Synchronize all lagging replicas
1785
+ FOR replica IN SELECT * FROM shardman .replicas WHERE part_name= pname AND node_id<> advanced_node
1786
+ LOOP
1787
+ SELECT shardman .broadcast (format(' %s:SELECT max(seqno) FROM %s_change_log;' ,
1788
+ replica .node_id , replica .part_name ))::bigint INTO seqno;
1789
+ IF seqno <> max_seqno
1790
+ THEN
1791
+ RAISE NOTICE ' Advance node % from %' , replica .node_id , advanced_node;
1792
+ PERFORM shardman .remote_copy (replica .relation , replica .part_name , replica .node_id , advanced_node, seqno);
1793
+ END IF;
1794
+ END LOOP;
1795
+ END;
1796
+ $$ LANGUAGE plpgsql;
1797
+
1798
+ -- Get relation primary key. There can be table with no primary key or with compound primary key.
1799
+ -- But logical replication and hash partitioning in any case requires single primary key.
1800
+ CREATE FUNCTION get_primary_key (rel regclass, out pk_name text , out pk_type text ) AS $$
1801
+ SELECT a .attname ::text ,a .atttypid ::regtype::text FROM pg_index i
1802
+ JOIN pg_attribute a ON a .attrelid = i .indrelid
1803
+ AND a .attnum = ANY(i .indkey )
1804
+ WHERE i .indrelid = rel
1805
+ AND i .indisprimary ;
1806
+ $$ LANGUAGE sql;
1807
+
1808
+ -- Copy missing data from one node to another. This function us using change_log table to determine records which need to be copied.
1809
+ -- See explanations in synchronize_replicas.
1810
+ -- Parameters:
1811
+ -- rel_name: name of parent relation
1812
+ -- part_name: synchronized partition name
1813
+ -- dst_node: lagging node
1814
+ -- src_node: advanced node
1815
+ -- last_seqno: maximal seqno at lagging node
1816
+ CREATE FUNCTION remote_copy (rel_name text , part_name text , dst_node int , src_node int , last_seqno bigint ) RETURNS void AS $$
1817
+ DECLARE
1818
+ script text ;
1819
+ conn_string text ;
1820
+ pk_name text ;
1821
+ pk_type text ;
1822
+ BEGIN
1823
+ SELECT * FROM shardman .get_primary_key (rel_name) INTO pk_name,pk_type;
1824
+ SELECT connection_string INTO conn_string FROM shardman .nodes WHERE id= src_node;
1825
+ -- We need to execute all this three statements in one transaction to exclude inconsistencies in case of failure
1826
+ script := format(' {%s:COPY %s_change_log FROM PROGRAM ' ' psql "%s" -c "COPY (SELECT * FROM %s_change_log WHERE seqno>%s) TO stdout"' ' ;
1827
+ DELETE FROM %I USING %s_change_log cl WHERE cl.seqno>%s AND cl.old_pk=%I;
1828
+ COPY %I FROM PROGRAM ' ' psql "%s" -c "COPY (SELECT DISTINCT ON (%I) %I.* FROM %I,%s_change_log cl WHERE cl.seqno>%s AND cl.new_pk=%I ORDER BY %I) TO stdout"' ' }' ,
1829
+ dst_node, part_name, conn_string, part_name, last_seqno,
1830
+ part_name, part_name, last_seqno, pk_name,
1831
+ part_name, conn_string, pk_name, part_name, part_name, part_name, last_seqno, pk_name, pk_name);
1832
+ PERFORM shardman .broadcast (script);
1833
+ END;
1834
+ $$ LANGUAGE plpgsql;
1835
+
1836
+ -- Drop on_change triggers when replica is moved
1837
+ CREATE FUNCTION drop_on_change_triggers (part_name text ) RETURNS text AS $$
1838
+ BEGIN
1839
+ return format(' DROP FUNCTION on_%s_insert CASCADE;
1840
+ DROP FUNCTION on_%s_update CASCADE;
1841
+ DROP FUNCTION on_%s_delete CASCADE;' ,
1842
+ part_name, part_name, part_name);
1843
+ END;
1844
+ $$ LANGUAGE plpgsql;
1845
+
1846
+ -- Generate triggers which maintain change_log table for replica
1847
+ CREATE FUNCTION create_on_change_triggers (rel_name text , part_name text ) RETURNS text AS $$
1848
+ DECLARE
1849
+ pk_name text ;
1850
+ pk_type text ;
1851
+ change_log_limit int = 32 * 1024 ;
1852
+ BEGIN
1853
+ SELECT * FROM shardman .get_primary_key (rel_name) INTO pk_name,pk_type;
1854
+ RETURN format($triggers$
1855
+ CREATE TABLE IF NOT EXISTS %s_change_log(seqno bigserial primary key , new_pk %s, old_pk %s);
1856
+ CREATE FUNCTION on_ %s_update() RETURNS TRIGGER AS $func$
1857
+ DECLARE
1858
+ last_seqno bigint ;
1859
+ BEGIN
1860
+ INSERT INTO %s_change_log (new_pk, old_pk) values (NEW.%I, OLD.%I) RETURNING seqno INTO last_seqno;
1861
+ IF last_seqno %% %s = 0 THEN
1862
+ DELETE FROM %s_change_log WHERE seqno < last_seqno - %s;
1863
+ END IF;
1864
+ RETURN NEW;
1865
+ END; $func$ LANGUAGE plpgsql;
1866
+ CREATE FUNCTION on_ %s_insert() RETURNS TRIGGER AS $func$
1867
+ DECLARE
1868
+ last_seqno bigint ;
1869
+ BEGIN
1870
+ INSERT INTO %s_change_log (new_pk) values (NEW.%I) RETURNING seqno INTO last_seqno;
1871
+ IF last_seqno %% %s = 0 THEN
1872
+ DELETE FROM %s_change_log WHERE seqno < last_seqno - %s;
1873
+ END IF;
1874
+ RETURN NEW;
1875
+ END; $func$ LANGUAGE plpgsql;
1876
+ CREATE FUNCTION on_ %s_delete() RETURNS TRIGGER AS $func$
1877
+ DECLARE
1878
+ last_seqno bigint ;
1879
+ BEGIN
1880
+ INSERT INTO %s_change_log (old_pk) values (OLD.%I) RETURNING seqno INTO last_seqno;
1881
+ IF last_seqno %% %s = 0 THEN
1882
+ DELETE FROM %s_change_log WHERE seqno < last_seqno - %s;
1883
+ END IF;
1884
+ END; $func$ LANGUAGE plpgsql;
1885
+ CREATE TRIGGER on_insert AFTER INSERT ON %I FOR EACH ROW EXECUTE PROCEDURE on_%s_insert();
1886
+ CREATE TRIGGER on_update AFTER UPDATE ON %I FOR EACH ROW EXECUTE PROCEDURE on_%s_update();
1887
+ CREATE TRIGGER on_delete AFTER DELETE ON %I FOR EACH ROW EXECUTE PROCEDURE on_%s_delete();
1888
+ ALTER TABLE %I ENABLE REPLICA TRIGGER on_insert, ENABLE REPLICA TRIGGER on_update, ENABLE REPLICA TRIGGER on_delete;$triggers$,
1889
+ part_name, pk_type, pk_type,
1890
+ part_name, part_name, pk_name, pk_name, change_log_limit* 2 , part_name, change_log_limit,
1891
+ part_name, part_name, pk_name, change_log_limit* 2 , part_name, change_log_limit,
1892
+ part_name, part_name, pk_name, change_log_limit* 2 , part_name, change_log_limit,
1893
+ part_name, part_name,
1894
+ part_name, part_name,
1895
+ part_name, part_name,
1896
+ part_name);
1897
+ END;
1898
+ $$ LANGUAGE plpgsql;
1899
+
1900
+ -- Generate change_log triggers for partitions with more than one replica at nodes where this replicas are located
1901
+ CREATE FUNCTION generate_on_change_triggers (table_pattern text = ' %' ) RETURNS void AS $$
1902
+ DECLARE
1903
+ replica shardman .replicas ;
1904
+ create_triggers text = ' ' ;
1905
+ BEGIN
1906
+ FOR replica IN SELECT * FROM shardman .replicas WHERE relation LIKE table_pattern
1907
+ LOOP
1908
+ IF shardman .get_redundancy_of_partition (replica .part_name ) > 1
1909
+ AND shardman .not_exists (replica .node_id , format(' pg_trigger t, pg_class c WHERE tgname=' ' on_insert' ' AND t.tgrelid=c.oid AND c.relname=%L' , replica .part_name ))
1910
+ THEN
1911
+ create_triggers := format(' %s{%s:%s}' , create_triggers, replica .node_id , shardman .create_on_change_triggers (replica .relation , replica .part_name ));
1912
+ END IF;
1913
+ END LOOP;
1914
+
1915
+ -- Create triggers at all nodes
1916
+ PERFORM shardman .broadcast (create_triggers);
1917
+ END;
1918
+ $$ LANGUAGE plpgsql;
1919
+
1920
+
1737
1921
-- Returns PostgreSQL system identifier (written in control file)
1738
1922
CREATE FUNCTION get_system_identifier ()
1739
1923
RETURNS bigint AS ' pg_shardman' LANGUAGE C STRICT;
1740
1924
1741
- -- View for monitoring replication lag
1925
+ -- View for monitoring logical replication lag.
1926
+ -- Can be used only at shardlord.
1742
1927
CREATE VIEW replication_lag (pubnode, subnode, lag) AS
1743
1928
SELECT src .id AS srcnode, dst .id AS dstnode,
1744
1929
shardman .broadcast (format(' %s:SELECT pg_current_wal_lsn() - confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name=' ' node_%s' ' ;' ,
1745
1930
src .id , dst .id ))::bigint AS lag
1746
1931
FROM shardman .nodes src, shardman .nodes dst WHERE src .id <> dst .id ;
1747
1932
1933
+ -- Yet another view for replication state based on change_log table.
1934
+ -- Can be used only at shardlord only only if redundancy level is greater than 1.
1935
+ -- This functions is polling state of all nodes, if some node is offline, then it should
1936
+ -- be explicitly excluded by filter condition, otherwise error will be reported.
1937
+ CREATE VIEW replication_state (part_name, node_id, last_seqno) AS
1938
+ SELECT part_name,node_id,shardman .broadcast (format(' %s:SELECT max(seqno) FROM %s_change_log;' ,node_id,part_name))::bigint FROM shardman .replicas ;
0 commit comments