1
1
/* ------------------------------------------------------------------------
2
2
*
3
- * shardman .sql
4
- * Commands infrastructure, interface funcs, common utility funcs.
3
+ * pg_shardman .sql
4
+ * Sharding and replication using pg_pathman, postres_fdw and logical replication
5
5
*
6
6
* Copyright (c) 2017, Postgres Professional
7
7
*
16
16
-- list of nodes present in the cluster
17
17
CREATE TABLE nodes (
18
18
id serial PRIMARY KEY ,
19
- conn_string text UNIQUE NOT NULL ,
19
+ connection_string text UNIQUE NOT NULL ,
20
20
replication_group text NOT NULL -- group of nodes within which shard replicas are allocated
21
21
);
22
22
@@ -39,11 +39,10 @@ CREATE TABLE replicas (
39
39
40
40
41
41
-- Add a node: adjust logical replication channels in replication group and create foreign servers
42
- CREATE FUNCTION add_node (conn_string text , repl_group text ) RETURNS void AS $$
42
+ CREATE FUNCTION add_node (conn_string text , repl_group text DEFAULT ' all ' ) RETURNS void AS $$
43
43
DECLARE
44
- new_node_id integer ;
45
- node nodes;
46
- shardlord_connstr text ;
44
+ new_node_id int ;
45
+ node nodes;
47
46
pubs text := ' ' ;
48
47
subs text := ' ' ;
49
48
sync text := ' ' ;
@@ -55,12 +54,13 @@ DECLARE
55
54
new_server_opts text ;
56
55
new_um_opts text ;
57
56
sync_standbys text ;
58
- sync_repl boolean := shardman .synchronous_replication ();
57
+ shardlord_conn_string text ;
58
+ sync_repl bool := shardman .synchronous_replication ();
59
59
BEGIN
60
60
-- Insert new node in nodes table
61
- INSERT INTO shardman .nodes (conn_string ,replication_group) values (conn_string, repl_group) returning id into new_node_id;
61
+ INSERT INTO shardman .nodes (connection_string ,replication_group) VALUES (conn_string, repl_group) RETURNING id INTO new_node_id;
62
62
63
- -- Construct list of sychronous standbyes (subscriptions have name node_$ID)
63
+ -- Construct list of synchronous standbyes (subscriptions have name node_$ID)
64
64
SELECT string_agg(' node_' || id, ' ,' ) INTO sync_standbys from nodes;
65
65
66
66
-- Construct foreign server options from connection string of new node
@@ -72,54 +72,55 @@ BEGIN
72
72
CREATE PUBLICATION shardman_meta_pub FOR TABLE shardman .nodes ,shardman .partitions ,shardman .replicas ;
73
73
END IF;
74
74
75
- -- Adjust replication channels within repplication group.
76
- -- We need all-to-all replication channels, so add
75
+ -- Adjust replication channels within replication group.
76
+ -- We need all-to-all replication channels between all group members.
77
77
FOR node IN SELECT * FROM shardman .nodes WHERE replication_group= repl_group AND id<> new_node_id
78
78
LOOP
79
79
-- Add to new node publications for all existed nodes and add publication for new node to all existed nodes
80
80
pubs := format(' %s%s:CREATE PUBLICATION node_%s;
81
81
%s:CREATE PUBLICATION node_%s;' ,
82
- pubs, node .id , new_node_id,
83
- new_node_id, node .id );
84
- -- Add to new node subscriptions for to existed nodes and add subsciption to new node to all existed nodes
82
+ pubs, node .id , new_node_id,
83
+ new_node_id, node .id );
84
+ -- Add to new node subscriptions to existed nodes and add subscription to new node to all existed nodes
85
85
subs := format(' %s%s:CREATE SUBSCRIPTION node_%s CONNECTION %L PUBLICATION node_%s with (synchronous_commit = local);
86
86
%s:CREATE SUBSCRIPTION node_%s CONNECTION %L PUBLICATION node_%s with (synchronous_commit = local);' ,
87
87
subs, node .id , new_node_id, conn_string, node .id ,
88
- new_node_id, node .id , node .conn_string , new_node_id);
88
+ new_node_id, node .id , node .connection_string , new_node_id);
89
89
90
90
-- Adjust synchronous standby list at all nodes
91
91
sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
92
- sync, node .id , sync_standbys);
92
+ sync, node .id , sync_standbys);
93
93
conf := format(' %s%s:SELECT pg_reload_conf();' ,
94
- conf, node .id );
94
+ conf, node .id );
95
95
END LOOP;
96
96
97
97
-- Create subscription to shardlord for metadata
98
- SELECT shardman .shardlord_connection_string () INTO shardlord_connstr ;
98
+ SELECT shardman .shardlord_connection_string () INTO shardlord_conn_string ;
99
99
subs := format(' %s%s:CREATE SUBSCRIPTION shardman_meta_sub CONNECTION %L PUBLICATION shardman_meta_pub with (synchronous_commit = local);' ,
100
100
subs, new_node_id, shardlord_conn_string);
101
101
102
- -- Broadcasr create publication commands
102
+ -- Broadcast create publication commands
103
103
PERFORM shardman .broadcast (pubs);
104
- -- Broadcasr create subscription commands
104
+ -- Broadcast create subscription commands
105
105
PERFORM shardman .broadcast (subs);
106
106
107
- -- In case of synchronous replication broadcasr commands updating synchronous standby list
107
+ -- In case of synchronous replication broadcast update synchronous standby list commands
108
108
IF sync_repl THEN
109
109
-- Adjust synchronous standby list at new nodes
110
110
sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
111
- sync, new_node_id, sync_stanbys);
111
+ sync, new_node_id, sync_stanbys);
112
+ -- Reload configuration at new node
112
113
conf := format(' %s%s:SELECT pg_reload_conf();' ,
113
- conf, new_node_id);
114
- PERFORM shardman .broadcast (sync);
115
- PERFORM shardman .broadcast (conf);
114
+ conf, new_node_id);
115
+ PERFORM shardman .broadcast (sync);
116
+ PERFORM shardman .broadcast (conf);
116
117
END IF;
117
118
118
- -- Add foriegn servers for connection with new node and backward
119
+ -- Add foreign servers for connection to the new node and backward
119
120
FOR node IN SELECT * FROM shardman .nodes WHERE id<> new_node_id
120
121
LOOP
121
122
-- Construct foreign server options from connection string of this node
122
- SELECT * FROM shardman .conninfo_to_postgres_fdw_opts (node .conn_string ) INTO server_opts, um_opts;
123
+ SELECT * FROM shardman .conninfo_to_postgres_fdw_opts (node .connection_string ) INTO server_opts, um_opts;
123
124
124
125
-- Create foreign server for new node at all other nodes and servers at new node for all other nodes
125
126
fdws := format(' %s%s:CREATE SERVER node_%s FOREIGN DATA WRAPPER postgres_fdw %s;
@@ -134,7 +135,7 @@ BEGIN
134
135
node .id , new_node_id, new_um_ops);
135
136
END LOOP;
136
137
137
- -- Broadcast command for creating foriegn servers
138
+ -- Broadcast command for creating foreign servers
138
139
PERFORM shardman .broadcast (fdws);
139
140
-- Broadcast command for creating user mapping for this servers
140
141
PERFORM shardman .broadcast (usms);
@@ -144,23 +145,23 @@ $$ LANGUAGE plpgsql;
144
145
145
146
-- Remove node: try to choose alternative from one of replicas of this nodes, exclude node from replication channels and remove foreign servers.
146
147
-- To remove node with existed partitions use force=true parameter
147
- CREATE FUNCTION rm_node (rm_node_id int , force boolean DEFAULT false) RETURNS void AS $$
148
+ CREATE FUNCTION rm_node (rm_node_id int , force bool DEFAULT false) RETURNS void AS $$
148
149
DECLARE
149
150
node nodes;
150
151
part partitions;
151
152
pubs text := ' ' ;
152
153
subs text := ' ' ;
153
154
fdws text := ' ' ;
154
155
fdw_part_name text ;
155
- new_master_id integer ;
156
+ new_master_id int ;
156
157
sync_standbys text ;
157
- sync_repl boolean := shardman .synchronous_replication ();
158
+ sync_repl bool := shardman .synchronous_replication ();
158
159
BEGIN
159
- -- If it is not forced remove, check if there are no parittions at this node
160
+ -- If it is not forced remove, check if there are no partitions at this node
160
161
IF NOT force THEN
161
162
IF EXISTS (SELECT * FROM shardman .partitions WHERE node= rm_node_id)
162
163
THEN
163
- RAISE ERROR ' Use forse =true to remove non-empty node' ;
164
+ RAISE ERROR ' Use force =true to remove non-empty node' ;
164
165
END IF;
165
166
END IF;
166
167
@@ -194,8 +195,10 @@ BEGIN
194
195
-- In case of synchronous replication update synchronous standbys list
195
196
IF sync_repl
196
197
THEN
198
+ -- Update synchronous standbys list at the removed node
199
+ sync := format(' %s%s:SELECT shardman_set_sync_standbys(%L);' ,
200
+ sync, rm_node_id, sync_standbys);
197
201
PERFORM shardman .broadcast (sync, ignore_errors:= true);
198
- PERFORM shardman .broadcast (fdws, ignore_errors:= true);
199
202
END IF;
200
203
201
204
-- Remove foreign servers at all nodes for the removed node
@@ -212,10 +215,10 @@ BEGIN
212
215
PERFORM shardman .broadcast (fdws, ignore_errors:= true);
213
216
fdws := ' ' ;
214
217
215
- -- Exclude parittions of removed node
218
+ -- Exclude partitions of removed node
216
219
FOR part in SELECT * from shardman .partitions where node= rm_node_id
217
220
LOOP
218
- -- Is there some replica of thi node
221
+ -- Is there some replica of this node?
219
222
SELECT node INTO new_master_id FROM shardman .replicas WHERE part_name= part .part_name ORDER BY random() LIMIT 1 ;
220
223
IF new_master_id IS NOT NULL
221
224
THEN -- exists some replica for this node: redirect foreign table to this replica and refresh LR channels for this replication group
@@ -232,7 +235,7 @@ BEGIN
232
235
-- Publish this partition at new master
233
236
pubs := format(' %s%s:ALTER PUBLICATION node_%s ADD TABLE %I;' ,
234
237
pubs, new_master_id, repl .node , part .part_name );
235
- -- And refresg subscriptions and replicas
238
+ -- And refresh subscriptions and replicas
236
239
subs := format(' %s%s:ALTER SUBSCRIPTION node_%s REFRESH PUBLICATION' ,
237
240
subs, repl .node , new_master_id);
238
241
END LOOP;
@@ -251,12 +254,12 @@ BEGIN
251
254
LOOP
252
255
fdw_part_name := format(' %s_fdw' , part .part_name );
253
256
IF node .id = new_master_id THEN
254
- -- At new master node replace foreign link with local paritition
257
+ -- At new master node replace foreign link with local partition
255
258
fdws := format(' %s%d:SELECT replace_hash_partition(%L,%L);' ,
256
259
fdws, node .id , fdw_part_name, part .part_name );
257
260
ELSE
258
261
-- At all other nodes adjust foreign server for foreign table to refer to new master node.
259
- -- It is noty possible to alter foreign server for foreign table so we have to do it in such "hackers" way:
262
+ -- It is not possible to alter foreign server for foreign table so we have to do it in such "hackers" way:
260
263
fdws := format(' %s%s:UPDATE pg_foreign_table SET ftserver = (SELECT oid FROM pg_foreign_server WHERE srvname = ' ' node_%s' ' ) WHERE ftrelid = (SELECT oid FROM pg_class WHERE relname=%L);' ,
261
264
fdws, node .id , new_master_id, fdw_part_name);
262
265
END IF;
@@ -281,8 +284,8 @@ RETURNS void AS $$
281
284
DECLARE
282
285
create_table text ;
283
286
node nodes;
284
- node_ids integer [];
285
- node_id integer ;
287
+ node_ids int [];
288
+ node_id int ;
286
289
part_name text ;
287
290
fdw_part_name text ;
288
291
table_attrs text ;
@@ -291,9 +294,8 @@ DECLARE
291
294
create_partitions text := ' ' ;
292
295
create_fdws text := ' ' ;
293
296
replace_parts text := ' ' ;
294
- i integer ;
295
- -- Do not drop partitions replaced with foreign tables, because them can be used for replicas
296
- -- drop_parts text := '';
297
+ i int ;
298
+ n_nodes int ;
297
299
BEGIN
298
300
IF EXISTS(SELECT relation from shardman .tables where relation = rel_name)
299
301
THEN
@@ -307,62 +309,60 @@ BEGIN
307
309
-- Create parent table at all nodes
308
310
create_tables := format(' %s%s:%s;' ,
309
311
create_tables, node .id , create_table);
310
- -- Create parittions using pathman at all nodes
311
- create_partitions := format(' %s%s:select create_hash_partitions(%L,%L,%s );' ,
312
+ -- Create partitions using pathman at all nodes
313
+ create_partitions := format(' %s%s:select create_hash_partitions(%L,%L,%L );' ,
312
314
create_partitions, node .id , rel_name, expr, partitions_count);
313
315
END LOOP;
314
316
315
- -- Broadcase create table commands
317
+ -- Broadcast create table commands
316
318
PERFORM shardman .broadcast (create_tables);
317
- -- Broadcase create hash partitions command
319
+ -- Broadcast create hash partitions command
318
320
PERFORM shardman .broadcast (create_partitions);
319
321
320
322
-- Get list of nodes in random order
321
323
SELECT ARRAY(SELECT id from shardman .nodes ORDER BY random()) INTO node_ids;
324
+ n_nodes := array_length(node_ids, 1 );
322
325
323
- -- Reconstruct table attribures from parent table
326
+ -- Reconstruct table attributes from parent table
324
327
SELECT shardman .reconstruct_table_attrs (rel_nam) INTO table_attr;
325
328
326
329
FOR i IN 1 ..partitions_count
327
330
LOOP
328
331
-- Choose location of new partition
329
- node_id := node_ids[1 + (i % partitions_count )]; -- round robin
332
+ node_id := node_ids[1 + (i % n_nodes )]; -- round robin
330
333
part_name := format(' %s_%s' , rel_name, i);
331
334
fdw_part_name := format(' %s_fdw' , part_name);
332
335
-- Insert information about new partition in partitions table
333
336
INSERT INTO shardman .partitions (part_name,node,relation) VALUES (part_name, node_id, rel_name);
334
- -- Construct name of the servers where paritition will be located
337
+ -- Construct name of the servers where partition will be located
335
338
srv_name := format(' node_%s' , node_id);
336
339
337
- -- Replace local partition with foreign table at all nodes excepot owner
340
+ -- Replace local partition with foreign table at all nodes except owner
338
341
FOR node IN SELECT * from shardman .nodes WHERE id<> node_id
339
342
LOOP
340
- -- Create foriegn table for this partition
343
+ -- Create foreign table for this partition
341
344
create_fdw := format(' %s%s:CREATE FOREIGN TABLE %I %s SERVER %s OPTIONS (table_name %L)' ,
342
345
create_fdws, node .id , fdw_part_name, table_attrs, srv_name, part_name);
343
346
replace_parts := format(' %s%s:SELECT replace_hash_partition(%L, %L);' ,
344
- replace_parts, node .id , part_name, fdw_part_name);
345
- -- drop_parts := format('%s%s:DROP TABKE %I;',
346
- -- drop_parts, node.id, part_name);
347
+ replace_parts, node .id , part_name, fdw_part_name);
347
348
END LOOP;
348
349
END LOOP;
349
350
350
351
-- Broadcast create foreign table commands
351
352
PERFORM shardman .broadcast (create_fdws);
352
- -- Broadcasr replace hash parition commands
353
+ -- Broadcast replace hash partition commands
353
354
PERFORM shardman .broadcast (replace_parts);
354
- -- PERFORM shardman.broadcast(drop_parts);
355
355
END
356
356
$$ LANGUAGE plpgsql;
357
357
358
358
-- Provide requested level of redundancy. 0 means no redundancy.
359
- -- If existed level of redundancy os greater than specified, then right now this function does nothing.
360
- CREATE FUNCTION set_redundancy (rel_name regclass, redundancy integer )
359
+ -- If existed level of redundancy is greater than specified, then right now this function does nothing.
360
+ CREATE FUNCTION set_redundancy (rel_name regclass, redundancy int )
361
361
RETURNS void AS $$
362
362
DECLARE
363
363
part partitions;
364
- n_replicas integer ;
365
- repl_node integer ;
364
+ n_replicas int ;
365
+ repl_node int ;
366
366
repl_group text ;
367
367
pubs text := ' ' ;
368
368
subs text := ' ' ;
@@ -375,7 +375,7 @@ BEGIN
375
375
IF n_replicas < redundancy
376
376
THEN -- If it is smaller than requested...
377
377
SELECT replication_group INTO repl_group FROM shardman .nodes where id= part .node ;
378
- -- ...then add requested number of replicas in correspodent replication group
378
+ -- ...then add requested number of replicas in corresponding replication group
379
379
FOR repl_node IN SELECT id FROM shardman .nodes WHERE replication_group= repl_group AND NOT EXISTS(SELECT * FROM shardman .replicas WHERE node= id AND part_name= part .part_name ORDER by random() LIMIT redundancy- n_replicas
380
380
LOOP
381
381
-- Insert information about new replica in replicas table
@@ -393,20 +393,31 @@ BEGIN
393
393
PERFORM shardman .broadcast (pubs);
394
394
-- Broadcast alter subscription commands
395
395
PERFORM shardman .broadcast (fdws);
396
+
396
397
-- This function doesn't wait completion of replication sync
397
398
END
398
399
$$ LANGUAGE plpgsql;
399
400
400
401
401
402
-- Utility functions
402
403
404
+ -- Generate based on information from catalog SQL statement creating this table
403
405
CREATE FUNCTION gen_create_table_sql (relation text )
404
406
RETURNS text AS ' pg_shardman' LANGUAGE C STRICT;
405
407
408
+ -- Reconstruct table attributes for foreign table
406
409
CREATE FUNCTION reconstruct_table_attrs (relation regclass)
407
410
RETURNS text AS ' pg_shardman' LANGUAGE C STRICT;
408
411
409
- CREATE FUNCTION broadcast (cmds text ) RETURNS text
412
+ -- Broadcast SQL commands to nodes and wait their completion.
413
+ -- cmds is list of SQL commands separated by by semi-columns with node prefix: node-id:sql-statement;
414
+ -- By default functions throws error is execution is failed at some of the nodes, with ignore_errors=true errors are ignored
415
+ -- and function returns string with "Error:" prefix containing list of errors separated by semi-columns with nodes prefixes.
416
+ -- In case o normal completion this function return list with node prefixes separated by semi-columns with single result of select queries
417
+ -- or number of affected rows for other commands.
418
+ -- If two_phase parameter is true, then each statement is wrapped in blocked and prepared with subsequent commit or rollback of prepared transaction
419
+ -- at second phase of two phase commit.
420
+ CREATE FUNCTION broadcast (cmds text , ignore_errors bool DEFAULT false, two_phase bool DEFAULT false) RETURNS text
410
421
RETURNS text AS ' pg_shardman' LANGUAGE C STRICT;
411
422
412
423
-- Options to postgres_fdw are specified in two places: user & password in user
@@ -415,12 +426,12 @@ RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
415
426
-- format, i.e. we can't say create server ... options (dbname 'port=4848
416
427
-- host=blabla.org'). So we have to parse the opts and pass them manually. libpq
417
428
-- knows how to do it, but doesn't expose that. On the other hand, quote_literal
418
- -- (which is neccessary here) doesn't seem to have handy C API. I resorted to
429
+ -- (which is necessary here) doesn't seem to have handy C API. I resorted to
419
430
-- have C function which parses the opts and returns them in two parallel
420
431
-- arrays, and this sql function joins them with quoting. TODO: of course,
421
432
-- quote_literal_cstr exists.
422
433
-- Returns two strings: one with opts ready to pass to CREATE FOREIGN SERVER
423
- -- stmt, and one wih opts ready to pass to CREATE USER MAPPING.
434
+ -- stmt, and one with opts ready to pass to CREATE USER MAPPING.
424
435
CREATE FUNCTION conninfo_to_postgres_fdw_opts (IN conn_string text ,
425
436
OUT server_opts text , OUT um_opts text ) RETURNS record AS $$
426
437
DECLARE
@@ -464,12 +475,15 @@ BEGIN
464
475
END IF;
465
476
END $$ LANGUAGE plpgsql STRICT;
466
477
478
+ -- Parse connection string. This function is used by conninfo_to_postgres_fdw_opts to construct postgres_fdw options list.
467
479
CREATE FUNCTION pq_conninfo_parse (IN conninfo text , OUT keys text [], OUT vals text [])
468
480
RETURNS record AS ' pg_shardman' LANGUAGE C STRICT;
469
481
482
+ -- Get shardlord connection string from configuration parameters
470
483
CREATE FUNCTION shardload_connection_string ()
471
484
RETURNS text AS ' pg_shardman' LANGUAGE C STRICT;
472
485
486
+ -- Check from configuration parameters is sycnhronous replixation mode was enabled
473
487
CREATE FUNCTION synchronous_replication ()
474
- RETURNS boolean AS ' pg_shardman' LANGUAGE C STRICT;
488
+ RETURNS bool AS ' pg_shardman' LANGUAGE C STRICT;
475
489
0 commit comments