@@ -49,14 +49,16 @@ CREATE TABLE tables (
49
49
-- Main partitions
50
50
CREATE TABLE partitions (
51
51
part_name text PRIMARY KEY ,
52
- node_id int NOT NULL REFERENCES nodes(id) ON DELETE CASCADE , -- node on which partition lies
52
+ -- node on which partition lies
53
+ node_id int NOT NULL REFERENCES nodes(id) ON DELETE CASCADE ,
53
54
relation text NOT NULL REFERENCES tables(relation) ON DELETE CASCADE
54
55
);
55
56
56
57
-- Partition replicas
57
58
CREATE TABLE replicas (
58
59
part_name text NOT NULL REFERENCES partitions(part_name) ON DELETE CASCADE ,
59
- node_id int NOT NULL REFERENCES nodes(id) ON DELETE CASCADE , -- node on which partition lies
60
+ -- node on which partition lies
61
+ node_id int NOT NULL REFERENCES nodes(id) ON DELETE CASCADE ,
60
62
relation text NOT NULL REFERENCES tables(relation) ON DELETE CASCADE ,
61
63
PRIMARY KEY (part_name,node_id)
62
64
);
@@ -590,7 +592,8 @@ DECLARE
590
592
subs text = ' ' ;
591
593
sub_options text = ' ' ;
592
594
BEGIN
593
- IF shardman .redirect_to_shardlord (format(' set_redundancy(%L, %L)' , rel_name, redundancy))
595
+ IF shardman .redirect_to_shardlord (format(' set_redundancy(%L, %L)' , rel_name,
596
+ redundancy))
594
597
THEN
595
598
RETURN;
596
599
END IF;
@@ -605,18 +608,21 @@ BEGIN
605
608
FOR part IN SELECT * FROM shardman .partitions WHERE relation= rel_name
606
609
LOOP
607
610
-- Count number of replicas of this partition
608
- SELECT count (* ) INTO n_replicas FROM shardman .replicas WHERE part_name= part .part_name ;
611
+ SELECT count (* ) INTO n_replicas FROM shardman .replicas
612
+ WHERE part_name= part .part_name ;
609
613
IF n_replicas < redundancy
610
614
THEN -- If it is smaller than requested...
611
- SELECT replication_group INTO repl_group FROM shardman .nodes where id= part .node_id ;
615
+ SELECT replication_group INTO repl_group FROM shardman .nodes
616
+ WHERE id= part .node_id ;
612
617
-- ...then add requested number of replicas in corresponding replication group
613
618
FOR repl_node IN SELECT id FROM shardman .nodes
614
619
WHERE replication_group= repl_group AND id<> part .node_id AND NOT EXISTS
615
620
(SELECT * FROM shardman .replicas WHERE node_id= id AND part_name= part .part_name )
616
621
ORDER by random() LIMIT redundancy- n_replicas
617
622
LOOP
618
623
-- Insert information about new replica in replicas table
619
- INSERT INTO shardman .replicas (part_name, node_id, relation) VALUES (part .part_name , repl_node, rel_name);
624
+ INSERT INTO shardman .replicas (part_name, node_id, relation)
625
+ VALUES (part .part_name , repl_node, rel_name);
620
626
-- Establish publications and subscriptions for this partition
621
627
pubs := format(' %s%s:ALTER PUBLICATION node_%s ADD TABLE %I;' ,
622
628
pubs, part .node_id , repl_node, part .part_name );
@@ -629,7 +635,10 @@ BEGIN
629
635
-- Broadcast alter publication commands
630
636
PERFORM shardman .broadcast (pubs, super_connstr => true);
631
637
-- Broadcast alter subscription commands
632
- PERFORM shardman .broadcast (subs, synchronous => copy_data, super_connstr => true);
638
+ -- Initial tablesync creates temporary repslots, which wait until all xacts
639
+ -- started before snapshot creation end; because of that we must alter subs
640
+ -- sequentially, or deadlocks are possible.
641
+ PERFORM shardman .broadcast (subs, sequential => copy_data, super_connstr => true);
633
642
634
643
-- Maintain change log to be able to synchronize replicas after primary node failure
635
644
IF redundancy > 1
643
652
$$ LANGUAGE plpgsql;
644
653
645
654
-- Wait completion of initial table sync for all replication subscriptions.
646
- -- This function can be used after set_redundancy to ensure that partitions are copied to replicas.
655
+ -- This function can be used after set_redundancy to ensure that partitions are
656
+ -- copied to replicas.
647
657
CREATE FUNCTION ensure_redundancy () RETURNS void AS $$
648
658
DECLARE
649
659
src_node_id int ;
@@ -704,7 +714,8 @@ BEGIN
704
714
drop1 := format(' %s%s:DROP TABLE %I CASCADE;' ,
705
715
drop1, node_id, rel_name);
706
716
-- Drop replicas and stub tables (which are replaced with foreign tables)
707
- FOR pname IN SELECT part_name FROM shardman .partitions WHERE relation= rel_name
717
+ FOR pname IN SELECT part_name FROM shardman .partitions WHERE
718
+ relation = rel_name
708
719
LOOP
709
720
drop2 := format(' %s%s:DROP TABLE IF EXISTS %I CASCADE;' ,
710
721
drop2, node_id, pname);
@@ -745,7 +756,8 @@ DECLARE
745
756
repl_node_id int ;
746
757
drop_slots text = ' ' ;
747
758
BEGIN
748
- IF shardman .redirect_to_shardlord (format(' mv_partition(%L, %L)' , mv_part_name, dst_node_id))
759
+ IF shardman .redirect_to_shardlord (format(' mv_partition(%L, %L)' , mv_part_name,
760
+ dst_node_id))
749
761
THEN
750
762
RETURN;
751
763
END IF;
@@ -757,7 +769,9 @@ BEGIN
757
769
END IF;
758
770
src_node_id := part .node_id ;
759
771
760
- SELECT replication_group, super_connection_string INTO src_repl_group, conn_string FROM shardman .nodes WHERE id= src_node_id;
772
+ SELECT replication_group, super_connection_string
773
+ INTO src_repl_group, conn_string
774
+ FROM shardman .nodes WHERE id= src_node_id;
761
775
SELECT replication_group INTO dst_repl_group FROM shardman .nodes WHERE id= dst_node_id;
762
776
763
777
IF src_node_id = dst_node_id THEN
@@ -1759,27 +1773,29 @@ RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
1759
1773
-- prefix: node-id:sql-statement;
1760
1774
-- To run multiple statements on node, wrap them in {}:
1761
1775
-- {node-id:statement; statement;}
1776
+ -- All statements are run in one transaction.
1777
+ -- Don't specify node id twice with 2pc, we use only one prepared_xact name.
1762
1778
-- Node id '0' means shardlord, shardlord_connstring guc is used.
1763
- -- Don't specify them separately with 2pc, we use only one prepared_xact name.
1764
1779
-- No escaping is performed, so ';', '{' and '}' inside queries are not supported.
1765
- -- By default functions throws error is execution is failed at some of the
1766
- -- nodes, with ignore_errors=true errors are ignored and function returns string
1767
- -- with "Error:" prefix containing list of errors terminated by dots with
1768
- -- nodes prefixes.
1769
- -- In case of normal completion this function return list with node prefixes
1770
- -- separated by columns with single result for select queries or number of
1771
- -- affected rows for other commands.
1772
- -- If two_phase parameter is true, then each statement is wrapped in blocked and
1773
- -- prepared with subsequent commit or rollback of prepared transaction at second
1774
- -- phase of two phase commit.
1780
+ -- By default function throws error if execution has failed at some of the
1781
+ -- nodes. With ignore_errors=true errors are ignored and each error is appended
1782
+ -- to the function result in form of "<error>${node_id}:Something bad</error>"
1783
+ -- In case of normal completion this function returns comma-separated results
1784
+ -- for each run. A "result" is the first column of the first row of last
1785
+ -- statement.
1786
+ -- If two_phase parameter is true, then each statement is firstly prepared with
1787
+ -- subsequent commit or rollback of prepared transaction at second phase of two
1788
+ -- phase commit.
1775
1789
-- If sync_commit_on is false, we set session synchronous_commit to local.
1790
+ -- If sequential is true, we send text cmd only when at least one statement for
1791
+ -- previous was already executed.
1776
1792
-- If super_connstr is true, super connstring is used everywhere, usual
1777
1793
-- connstr otherwise.
1778
1794
CREATE FUNCTION broadcast (cmds text ,
1779
1795
ignore_errors bool = false,
1780
1796
two_phase bool = false,
1781
1797
sync_commit_on bool = false,
1782
- synchronous bool = false,
1798
+ sequential bool = false,
1783
1799
super_connstr bool = false)
1784
1800
RETURNS text AS ' pg_shardman' LANGUAGE C STRICT;
1785
1801
@@ -2118,34 +2134,54 @@ CREATE FUNCTION get_system_identifier()
2118
2134
-- Type to represent vertex in lock graph
2119
2135
create type process as (node int , pid int );
2120
2136
2121
- -- View to build lock graph which can be used to detect global deadlock
2122
- CREATE VIEW lock_graph (wait,hold) AS
2137
+ -- View to build lock graph which can be used to detect global deadlock.
2138
+ -- Application_name is assumed pgfdw:$system_id:$coord_pid
2139
+ -- gid is assumed $pid:$count:$sys_id:$xid:$participants_count
2140
+ CREATE VIEW lock_graph (wait, hold) AS
2141
+ -- If xact is already prepared, we take node and pid of the coordinator.
2142
+ -- local dependencies
2123
2143
SELECT
2124
2144
ROW(shardman .get_my_id (),
2125
2145
wait .pid )::shardman .process ,
2126
- ROW(CASE WHEN hold .pid IS NOT NULL THEN shardman .get_my_id () ELSE shardman .get_node_by_sysid (split_part(gid,' :' ,3 )::bigint ) END,
2127
- COALESCE(hold .pid , split_part(gid,' :' ,1 )::int ))::shardman .process
2128
- FROM pg_locks wait, pg_locks hold LEFT OUTER JOIN pg_prepared_xacts twopc ON twopc .transaction = hold .transactionid
2146
+ CASE WHEN hold .pid IS NOT NULL THEN
2147
+ ROW(shardman .get_my_id (), hold .pid )::shardman .process
2148
+ ELSE -- prepared
2149
+ ROW(shardman .get_node_by_sysid (split_part(gid, ' :' , 3 )::bigint ),
2150
+ split_part(gid, ' :' , 1 )::int )::shardman .process
2151
+ END
2152
+ FROM pg_locks wait, pg_locks hold LEFT OUTER JOIN pg_prepared_xacts twopc
2153
+ ON twopc .transaction = hold .transactionid
2129
2154
WHERE
2130
2155
NOT wait .granted AND wait .pid IS NOT NULL AND hold .granted
2131
- AND (wait .transactionid = hold .transactionid OR (wait .page = hold .page AND wait .tuple = hold .tuple ))
2132
- AND (hold .pid IS NOT NULL OR twopc .gid IS NOT NULL )
2156
+ -- this select captures waitings on xid and on, hm, tuples
2157
+ AND (wait .transactionid = hold .transactionid OR
2158
+ (wait .page = hold .page AND wait .tuple = hold .tuple ))
2159
+ AND (hold .pid IS NOT NULL OR twopc .gid IS NOT NULL ) -- ???
2133
2160
UNION ALL
2134
- SELECT ROW(shardman .get_node_by_sysid (split_part(application_name,' :' ,2 )::bigint ),
2161
+ -- if this fdw backend is busy, potentially waiting, add edge coordinator -> fdw
2162
+ SELECT ROW(shardman .get_node_by_sysid (split_part(application_name, ' :' , 2 )::bigint ),
2135
2163
split_part(application_name,' :' ,3 )::int )::shardman .process ,
2136
2164
ROW(shardman .get_my_id (),
2137
2165
pid)::shardman .process
2138
2166
FROM pg_stat_activity WHERE application_name LIKE ' pgfdw:%' AND wait_event<> ' ClientRead'
2139
2167
UNION ALL
2168
+ -- otherwise, coordinator itself is busy, potentially waiting, so add fdw ->
2169
+ -- coordinator edge
2140
2170
SELECT ROW(shardman .get_my_id (),
2141
2171
pid)::shardman .process ,
2142
2172
ROW(shardman .get_node_by_sysid (split_part(application_name,' :' ,2 )::bigint ),
2143
2173
split_part(application_name,' :' ,3 )::int )::shardman .process
2144
2174
FROM pg_stat_activity WHERE application_name LIKE ' pgfdw:%' AND wait_event= ' ClientRead' ;
2145
2175
2146
- -- Pack lock graph into string
2176
+ -- Pack lock graph into comma-separated string of edges like "2:17439->4:30046",
2177
+ -- i.e. pid 17439 on node 2 waits for pid 30046 on node 4
2147
2178
CREATE FUNCTION serialize_lock_graph () RETURNS TEXT AS $$
2148
- SELECT COALESCE(string_agg((wait).node|| ' :' || (wait).pid|| ' ->' || (hold).node|| ' :' || (hold).pid, ' ,' ),' ' ) FROM shardman .lock_graph ;
2179
+ SELECT COALESCE(
2180
+ string_agg((wait).node || ' :' || (wait).pid || ' ->' ||
2181
+ (hold).node || ' :' || (hold).pid,
2182
+ ' ,' ),
2183
+ ' ' )
2184
+ FROM shardman .lock_graph ;
2149
2185
$$ LANGUAGE sql;
2150
2186
2151
2187
-- Unpack lock graph from string
@@ -2154,7 +2190,7 @@ CREATE FUNCTION deserialize_lock_graph(edges text) RETURNS SETOF shardman.lock_g
2154
2190
split_part(split_part(edge, ' ->' , 1 ), ' :' , 2 )::int )::shardman .process AS wait,
2155
2191
ROW(split_part(split_part(edge, ' ->' , 2 ), ' :' , 1 )::int ,
2156
2192
split_part(split_part(edge, ' ->' , 2 ), ' :' , 2 )::int )::shardman .process AS hold
2157
- FROM regexp_split_to_table(edges, ' ,' ) edge WHERE edge<> ' ' ;
2193
+ FROM regexp_split_to_table(edges, ' ,' ) edge WHERE edge <> ' ' ;
2158
2194
$$ LANGUAGE sql;
2159
2195
2160
2196
-- Collect lock graphs from all nodes
@@ -2179,23 +2215,30 @@ BEGIN
2179
2215
END;
2180
2216
$$ LANGUAGE plpgsql;
2181
2217
2182
-
2183
- -- Detect distributed deadlock and returns path in the lock graph forming deadlock loop
2218
+ -- Find all distributed deadlocks and for random one return path in the lock
2219
+ -- graph containing deadlock loop.
2220
+ -- We go from each vertex in all directions until either there is nowhere to go
2221
+ -- to or loop is discovered. It means that for each n-vertex-loop n paths
2222
+ -- starting at different vertices are actually found, though we return only one.
2223
+ -- Note that it doesn't neccessary returns path contatins ONLY the loop:
2224
+ -- non-empty inital tail is perfectly possible.
2184
2225
CREATE FUNCTION detect_deadlock (lock_graph text ) RETURNS shardman .process [] AS $$
2185
2226
WITH RECURSIVE LinkTable AS (SELECT wait AS Parent, hold AS Child FROM shardman .deserialize_lock_graph (lock_graph)),
2186
2227
cte AS (
2187
2228
SELECT Child, Parent, ARRAY[Child] AS AllParents, false AS Loop
2188
2229
FROM LinkTable
2189
2230
UNION ALL
2190
- SELECT c .Child , c .Parent , p .AllParents || c .Child , c .Child = ANY(p .AllParents )
2231
+ SELECT c .Child , c .Parent , p .AllParents || c .Child , c .Child = ANY(p .AllParents )
2191
2232
FROM LinkTable c JOIN cte p ON c .Parent = p .Child AND NOT p .Loop
2192
2233
)
2193
- SELECT AllParents FROM cte WHERE Loop;
2234
+ SELECT AllParents FROM cte WHERE Loop LIMIT 1 ;
2194
2235
$$ LANGUAGE sql;
2195
2236
2196
2237
-- Monitor cluster for presence of distributed deadlocks and node failures.
2197
- -- Tries to cancel queries causing deadlock and exclude unavailable nodes from the cluster.
2198
- CREATE FUNCTION monitor (deadlock_check_timeout_sec int = 5 , rm_node_timeout_sec int = 60 ) RETURNS void AS $$
2238
+ -- Tries to cancel queries causing deadlock and exclude unavailable nodes from
2239
+ -- the cluster.
2240
+ CREATE FUNCTION monitor (check_timeout_sec int = 5 ,
2241
+ rm_node_timeout_sec int = 60 ) RETURNS void AS $$
2199
2242
DECLARE
2200
2243
prev_deadlock_path shardman .process [];
2201
2244
deadlock_path shardman .process [];
@@ -2210,30 +2253,30 @@ DECLARE
2210
2253
error_end int ;
2211
2254
error_msg text ;
2212
2255
error_node_id int ;
2213
- failed_node_id int ;
2256
+ failed_node_id int : = null ;
2214
2257
failure_timestamp timestamp with time zone ;
2215
2258
BEGIN
2216
- IF shardman .redirect_to_shardlord (format(' monitor(%s, %s)' , deadlock_check_timeout_sec , rm_node_timeout_sec))
2259
+ IF shardman .redirect_to_shardlord (format(' monitor(%s, %s)' , check_timeout_sec , rm_node_timeout_sec))
2217
2260
THEN
2218
2261
RETURN;
2219
2262
END IF;
2220
2263
2221
- RAISE NOTICE ' Start cluster monitor ...' ;
2264
+ RAISE NOTICE ' Start cluster monitoring ...' ;
2222
2265
2223
2266
LOOP
2224
2267
resp := shardman .global_lock_graph ();
2225
2268
error_begin := position(' <error>' IN resp);
2226
- IF error_begin<> 0
2269
+ IF error_begin <> 0
2227
2270
THEN
2228
2271
error_end := position(' </error>' IN resp);
2229
2272
sep := position(' :' IN resp);
2230
2273
error_node_id := substring (resp FROM error_begin+ 7 FOR sep- error_begin- 7 )::int ;
2231
2274
error_msg := substring (resp FROM sep+ 1 FOR error_end- sep- 1 );
2232
- IF error_node_id = failed_node_id
2275
+ IF error_node_id = failed_node_id and rm_node_timeout_sec IS NOT NULL
2233
2276
THEN
2234
2277
IF clock_timestamp() > failure_timestamp + rm_node_timeout_sec * interval ' 1 sec'
2235
2278
THEN
2236
- RAISE NOTICE ' Remove node % because of % timeout expiration' , failed_node_id, rm_node_timeout_sec;
2279
+ RAISE NOTICE ' Removing node % because of % timeout expiration' , failed_node_id, rm_node_timeout_sec;
2237
2280
PERFORM shardman .broadcast (format(' 0:SELECT shardman.rm_node(%s, force=>true);' , failed_node_id));
2238
2281
PERFORM shardman .broadcast (' 0:SELECT shardman.recover_xacts();' );
2239
2282
failed_node_id := null ;
@@ -2247,13 +2290,16 @@ BEGIN
2247
2290
ELSE
2248
2291
failed_node_id := null ;
2249
2292
deadlock_path := shardman .detect_deadlock (resp);
2293
+ -- pick out the loop itself
2250
2294
loop_end := array_upper(deadlock_path, 1 );
2251
2295
loop_begin := array_position(deadlock_path, deadlock_path[loop_end]);
2252
2296
-- Check if old and new lock graph contain the same subgraph.
2253
2297
-- Because we can not make consistent distributed snapshot,
2254
2298
-- collected global local graph can contain "false" loops.
2255
2299
-- So we report deadlock only if detected loop persists during
2256
2300
-- deadlock detection period.
2301
+ -- We count upon that not only the loop, but the sequence of visited
2302
+ -- nodes is the same
2257
2303
IF prev_deadlock_path IS NOT NULL
2258
2304
AND loop_end - loop_begin = prev_loop_end - prev_loop_begin
2259
2305
AND deadlock_path[loop_begin:loop_end] = prev_deadlock_path[prev_loop_begin:prev_loop_end]
@@ -2268,7 +2314,7 @@ BEGIN
2268
2314
prev_loop_begin := loop_begin;
2269
2315
prev_loop_end := loop_end;
2270
2316
END IF;
2271
- PERFORM pg_sleep(deadlock_check_timeout_sec );
2317
+ PERFORM pg_sleep(check_timeout_sec );
2272
2318
END LOOP;
2273
2319
END;
2274
2320
$$ LANGUAGE plpgsql;
0 commit comments