@@ -90,14 +90,10 @@ DECLARE
90
90
master_node_id int ;
91
91
sys_id bigint ;
92
92
conn_string_effective text = COALESCE(conn_string, super_conn_string);
93
- redirected bool;
94
93
BEGIN
95
- SELECT * FROM shardman .redirect_to_shardlord_with_res (
96
- format(' add_node(%L, %L, %L)' , super_conn_string, conn_string, repl_group))
97
- INTO new_node_id, redirected;
98
- IF redirected
94
+ IF NOT shardman .is_shardlord ()
99
95
THEN
100
- RETURN new_node_id ;
96
+ RETURN shardman . broadcast (format( ' 0:SELECT shardman.add_node(%L, %L, %L) ' , super_conn_string, conn_string, repl_group)):: int ;
101
97
END IF;
102
98
103
99
-- Insert new node in nodes table
@@ -357,7 +353,7 @@ BEGIN
357
353
super_connstr => true);
358
354
PERFORM shardman .broadcast (conf, ignore_errors:= true, super_connstr => true);
359
355
END IF;
360
- /* To correctly remove foreign servers we need to update pf_depend table, otherwise
356
+ /* To correctly remove foreign servers we need to update pg_depend table, otherwise
361
357
* our hack with direct update pg_foreign_table leaves deteriorated dependencies
362
358
-- Remove foreign servers at all nodes for the removed node
363
359
FOR node IN SELECT * FROM shardman.nodes WHERE id<>rm_node_id
@@ -1508,9 +1504,145 @@ BEGIN
1508
1504
END
1509
1505
$$ LANGUAGE plpgsql;
1510
1506
1511
- -- -------------------------------------------------------------------
1512
- -- Utility functions
1513
- -- -------------------------------------------------------------------
1507
+
1508
+ -- Commit or rollback not completed distributed transactions
1509
+ CREATE FUNCTION recover_xacts () RETURNS void AS $$
1510
+ DECLARE
1511
+ node_id int ;
1512
+ xacts text [];
1513
+ xact_node_id int ;
1514
+ xact text ;
1515
+ cmds text = ' ' ;
1516
+ gid text ;
1517
+ xid bigint ;
1518
+ sysid bigint ;
1519
+ counter text ;
1520
+ coordinator int ;
1521
+ status text ;
1522
+ n_participants int ;
1523
+ n_prepared int ;
1524
+ resp text ;
1525
+ do_commit bool;
1526
+ do_rollback bool;
1527
+ finish text = ' ' ;
1528
+ BEGIN
1529
+ IF shardman .redirect_to_shardlord (' recover_xacts()' )
1530
+ THEN
1531
+ RETURN;
1532
+ END IF;
1533
+
1534
+ FOR node_id IN SELECT id FROM shardman .nodes
1535
+ LOOP
1536
+ cmds := format(' %s%s:SELECT string_agg(' ' %s=>' ' ||gid, ' ' ,' ' ) FROM pg_prepared_xacts;' , cmds, node_id, node_id);
1537
+ END LOOP;
1538
+
1539
+ -- Collected prepared xacts from all nodes
1540
+ SELECT string_to_array(shardman .broadcast (cmds), ' ,' ) INTO xacts;
1541
+
1542
+ FOREACH xact IN ARRAY xacts
1543
+ LOOP
1544
+ xact_node_id := split_part(xact, ' =>' , 1 );
1545
+ gid := split_part(xact, ' =>' , 2 );
1546
+ sysid := split_part(gid, ' :' , 3 )::bigint ;
1547
+ xid := split_part(gid, ' :' , 4 )::bigint ;
1548
+ SELECT id INTO coordinator FROM shardman .nodes WHERE system_id= sysid;
1549
+ IF coordinator IS NULL
1550
+ THEN
1551
+ -- Coordinator node is not available
1552
+ RAISE NOTICE ' Coordinator of transaction % is not available' , gid;
1553
+ n_participants := split_part(gid, ' :' , 5 )::int ;
1554
+ IF n_participants > 1
1555
+ THEN
1556
+ -- Poll all participants.
1557
+ -- First of all try portable way: get information from pg_prepared_xacts.
1558
+ cmds := ' ' ;
1559
+ FOR node_id IN SELECT id FROM shardman .nodes
1560
+ LOOP
1561
+ cmds := format(' %s%s:SELECT COUNT(*) FROM pg_prepared_xacts WHERE gid=%L;' , cmds, node_id, gid);
1562
+ END LOOP;
1563
+ SELECT shardman .broadcast (cmds) INTO resp;
1564
+
1565
+ n_prepared := 0 ;
1566
+ FOREACH counter IN ARRAY string_to_array(resp, ' ,' )
1567
+ LOOP
1568
+ n_prepared := n_prepared + counter::int ;
1569
+ END LOOP;
1570
+
1571
+ IF n_prepared= n_participants
1572
+ THEN
1573
+ RAISE NOTICE ' Commit distributed transaction % which is prepared at all participant nodes' , gid;
1574
+ finish := format(' %s%s:COMMIT PREPARED %L;' , finish, xact_node_id, gid);
1575
+ ELSE
1576
+ RAISE NOTICE ' Distributed transaction % is prepared at % nodes from %' ,
1577
+ gid, n_prepared, n_participants;
1578
+
1579
+ IF EXISTS (SELECT * FROM pg_proc WHERE proname= ' pg_prepared_xact_status' )
1580
+ THEN
1581
+ -- Without coordinator there is no standard way to get status of this distributed transaction.
1582
+ -- Use PGPRO-EE pg_prepared_xact_status() function if available
1583
+ cmds := ' ' ;
1584
+ FOR node_id IN SELECT id FROM shardman .nodes
1585
+ LOOP
1586
+ cmds := format(' %s%s:SELECT pg_prepared_xact_status(%L);' , cmds, node_id, gid);
1587
+ END LOOP;
1588
+ SELECT shardman .broadcast (cmds) INTO resp;
1589
+
1590
+ -- Collect information about distributed transaction status at all nodes
1591
+ do_commit := false;
1592
+ do_rollback := false;
1593
+ FOREACH status IN ARRAY string_to_array(resp, ' ,' )
1594
+ LOOP
1595
+ IF status= ' committed'
1596
+ THEN
1597
+ do_commit := true;
1598
+ ELSIF status= ' aborted'
1599
+ THEN
1600
+ do_rollback := true;
1601
+ END IF;
1602
+ END LOOP;
1603
+
1604
+ IF do_commit
1605
+ THEN
1606
+ IF do_rollack
1607
+ THEN
1608
+ RAISE NOTICE ' Inconsistent state of transaction %' , gid;
1609
+ ELSE
1610
+ RAISE NOTICE ' Commit transaction %s at node % because if was committed at one of participants' ,
1611
+ gid, xact_node_id;
1612
+ finish := format(' %s%s:COMMIT PREPARED %L;' , finish, xact_node_id, gid);
1613
+ END IF;
1614
+ ELSIF do_rollback
1615
+ THEN
1616
+ RAISE NOTICE ' Abort transaction %s at node % because if was aborted at one of participants' ,
1617
+ gid, xact_node_id;
1618
+ finish := format(' %s%s:ROLLBACK PREPARED %L;' , finish, xact_node_id, gid);
1619
+ ELSE
1620
+ RAISE NOTICE ' Can not make any decision concerning distributes transaction %' , gid;
1621
+ END IF;
1622
+ END IF;
1623
+ END IF;
1624
+ ELSE
1625
+ RAISE NOTICE ' Commit transaction % with single participant %' , gid, xact_node_id;
1626
+ finish := format(' %s%s:COMMIT PREPARED %L;' , finish, xact_node_id, gid);
1627
+ END IF;
1628
+ ELSE
1629
+ -- Check status of transaction at coordinator
1630
+ SELECT shardman .broadcast (format(' %s:SELECT txid_status(%s);' , coordinator, xid)) INTO status;
1631
+ RAISE NOTICE ' Status of distributed transaction % is % at coordinator %' , gid, status, coordinator;
1632
+ IF status= ' committed'
1633
+ THEN
1634
+ finish := format(' %s%s:COMMIT PREPARED %L;' , finish, xact_node_id, gid);
1635
+ ELSIF status= ' aborted'
1636
+ THEN
1637
+ finish := format(' %s%s:ROLLBACK PREPARED %L;' , finish, xact_node_id, gid);
1638
+ END IF;
1639
+ END IF;
1640
+ END LOOP;
1641
+
1642
+ -- Finish all prepared transactions for which decision was made
1643
+ PERFORM shardman .broadcast (finish, sync_commit_on => true);
1644
+ END
1645
+ $$ LANGUAGE plpgsql;
1514
1646
1515
1647
-- Generate rules for redirecting updates for shared table
1516
1648
CREATE FUNCTION gen_create_rules_sql (rel_name text , fdw_name text ) RETURNS text AS $$
@@ -1559,27 +1691,17 @@ END
1559
1691
$$ LANGUAGE plpgsql;
1560
1692
1561
1693
-- Execute command at shardlord
1562
- CREATE FUNCTION redirect_to_shardlord_with_res (cmd text , out res int ,
1563
- out redirected bool) AS $$
1694
+ CREATE FUNCTION redirect_to_shardlord (cmd text ) RETURNS bool AS $$
1564
1695
BEGIN
1565
- IF NOT shardman .is_shardlord () THEN
1696
+ IF NOT shardman .is_shardlord ()
1697
+ THEN
1566
1698
RAISE NOTICE ' Redirect command "%" to shardlord' ,cmd;
1567
- res = (shardman .broadcast (format(' 0:SELECT shardman.%s;' , cmd)))::int ;
1568
- redirected = true;
1699
+ RETURN true;
1569
1700
ELSE
1570
- redirected = false;
1701
+ RETURN false;
1571
1702
END IF;
1572
1703
END
1573
1704
$$ LANGUAGE plpgsql;
1574
- -- same, but don't care for the result -- to avoid changing all calls to
1575
- -- redirect_to_shardlord to '.redirected'
1576
- CREATE FUNCTION redirect_to_shardlord (cmd text ) RETURNS bool AS $$
1577
- DECLARE
1578
- BEGIN
1579
- RETURN redirected FROM shardman .redirect_to_shardlord_with_res (cmd);
1580
- END
1581
- $$ LANGUAGE plpgsql;
1582
-
1583
1705
1584
1706
-- Generate based on information from catalog SQL statement creating this table
1585
1707
CREATE FUNCTION gen_create_table_sql (relation text )
@@ -1642,7 +1764,7 @@ BEGIN
1642
1764
um_opts := ' ' ;
1643
1765
SELECT * FROM shardman .pq_conninfo_parse (conn_string)
1644
1766
INTO conn_string_keywords, conn_string_vals;
1645
- FOR i IN 1 ..( SELECT array_upper(conn_string_keywords, 1 ) ) LOOP
1767
+ FOR i IN 1 ..array_upper(conn_string_keywords, 1 ) LOOP
1646
1768
IF conn_string_keywords[i] = ' client_encoding' OR
1647
1769
conn_string_keywords[i] = ' fallback_application_name' THEN
1648
1770
CONTINUE; /* not allowed in postgres_fdw */
@@ -1999,7 +2121,8 @@ DECLARE
1999
2121
poll text = ' ' ;
2000
2122
graph text ;
2001
2123
BEGIN
2002
- IF NOT shardman .is_shardlord () THEN
2124
+ IF NOT shardman .is_shardlord ()
2125
+ THEN
2003
2126
RETURN shardman .broadcast (' 0:SELECT shardman.global_lock_graph();' );
2004
2127
END IF;
2005
2128
@@ -2028,7 +2151,7 @@ CREATE FUNCTION detect_deadlock(lock_graph text) RETURNS shardman.process[] AS $
2028
2151
$$ LANGUAGE sql;
2029
2152
2030
2153
-- Monitor cluster for presence of distributed deadlocks and node failures.
2031
- -- Tries to cancel queries causing deadlock and exclude unavailable nodes from the cluser .
2154
+ -- Tries to cancel queries causing deadlock and exclude unavailable nodes from the cluster .
2032
2155
CREATE FUNCTION monitor (deadlock_check_timeout_sec int = 5 , rm_node_timeout_sec int = 60 ) RETURNS void AS $$
2033
2156
DECLARE
2034
2157
prev_deadlock_path shardman .process [];
@@ -2069,6 +2192,7 @@ BEGIN
2069
2192
THEN
2070
2193
RAISE NOTICE ' Remove node % because of % timeout expiration' , failed_node_id, rm_node_timeout_sec;
2071
2194
PERFORM shardman .broadcast (format(' 0:SELECT shardman.rm_node(%s, force=>true);' , failed_node_id));
2195
+ PERFORM shardman .broadcast (' 0:SELECT shardman.recover_xacts();' );
2072
2196
failed_node_id := null ;
2073
2197
END IF;
2074
2198
ELSE
0 commit comments