@@ -577,10 +577,43 @@ BEGIN
577
577
-- Broadcast alter subscription commands
578
578
PERFORM shardman .broadcast (subs, synchronous => copy_data, super_connstr => true);
579
579
580
- -- This function doesn't wait completion of replication sync
580
+ -- This function doesn't wait completion of replication sync.
581
+ -- Use wait ensure_redundancy function to wait until sync is completed
581
582
END
582
583
$$ LANGUAGE plpgsql;
583
584
585
+ -- Wait completion of initial table sync for all replication subscriptions.
586
+ -- This function can be used after set_redundancy to ensure that partitions are copied to replicas.
587
+ CREATE FUNCTION ensure_redundancy () RETURNS void AS $$
588
+ DECLARE
589
+ src_node_id int ;
590
+ dst_node_id int ;
591
+ timeout_sec int = 1 ;
592
+ sub_name text ;
593
+ poll text ;
594
+ response text ;
595
+ BEGIN
596
+ LOOP
597
+ poll := ' '
598
+ FOR src_node_id IN SELECT id FROM shardman .nodes
599
+ LOOP
600
+ FOR dst_node_id IN SELECT id FROM shardman .nodes WHERE id<> src_node_id
601
+ LOOP
602
+ sub_name := format(' sub_%s_%s' , dst_node_id, src_node_id);
603
+ poll := format(' %s%s:SELECT shardman.is_subscription_ready(%L);' ,
604
+ poll, dst_node_id, sub_name);
605
+ END LOOP;
606
+ END LOOP;
607
+
608
+ response := shardman .broadcast (poll);
609
+ EXIT WHEN POSITION(' f' IN response)= 0 ;
610
+
611
+ PERFORM pg_sleep(timeout_sec);
612
+ END LOOP;
613
+ END
614
+ $$ LANGUAGE plpgsql;
615
+
616
+
584
617
-- Remove table from all nodes.
585
618
CREATE FUNCTION rm_table (rel regclass)
586
619
RETURNS void AS $$
@@ -1537,10 +1570,15 @@ CREATE FUNCTION is_shardlord()
1537
1570
RETURNS bool AS ' pg_shardman' LANGUAGE C STRICT;
1538
1571
1539
1572
-- Get subscription status
1540
- CREATE FUNCTION get_subscription_status (sname text ) RETURNS " char" AS $$
1541
- SELECT srsubstate FROM pg_subscription_rel srel
1542
- JOIN pg_subscription s on srel .srsubid = s .oid where subname= sname;
1543
- $$ LANGUAGE sql;
1573
+ CREATE FUNCTION is_subscription_ready (sname text ) RETURNS bool AS $$
1574
+ DECLARE
1575
+ n_not_ready bigint ;
1576
+ BEGIN
1577
+ SELECT count (* ) INTO n_not_ready FROM pg_subscription_rel srel
1578
+ JOIN pg_subscription s ON srel .srsubid = s .oid WHERE subname= sname AND srsubstate<> ' r' ;
1579
+ RETURN n_not_ready= 0 ;
1580
+ END
1581
+ $$ LANGUAGE plpgsql;
1544
1582
1545
1583
-- Wait initial sync completion
1546
1584
CREATE FUNCTION wait_sync_completion (src_node_id int , dst_node_id int ) RETURNS void AS $$
@@ -1549,9 +1587,9 @@ DECLARE
1549
1587
response text ;
1550
1588
BEGIN
1551
1589
LOOP
1552
- response := shardman .broadcast (format(' %s:SELECT shardman.get_subscription_status (' ' sub_%s_%s' ' );' ,
1590
+ response := shardman .broadcast (format(' %s:SELECT shardman.is_subscription_ready (' ' sub_%s_%s' ' );' ,
1553
1591
dst_node_id, dst_node_id, src_node_id));
1554
- EXIT WHEN response= ' r ' ;
1592
+ EXIT WHEN response::bool ;
1555
1593
PERFORM pg_sleep(timeout_sec);
1556
1594
END LOOP;
1557
1595
END
@@ -1573,9 +1611,9 @@ BEGIN
1573
1611
LOOP
1574
1612
IF NOT synced
1575
1613
THEN
1576
- response := shardman .broadcast (format(' %s:SELECT shardman.get_subscription_status (%L);' ,
1614
+ response := shardman .broadcast (format(' %s:SELECT shardman.is_subscription_ready (%L);' ,
1577
1615
dst_node_id, slot));
1578
- IF response= ' r ' THEN
1616
+ IF response::bool THEN
1579
1617
synced := true;
1580
1618
RAISE DEBUG ' Table % sync completed' , part_name;
1581
1619
CONTINUE;
0 commit comments