Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit ab16f67

Browse files
committed
Add create_shared_table function
1 parent 07c2f34 commit ab16f67

File tree

1 file changed

+136
-4
lines changed

1 file changed

+136
-4
lines changed

pg_shardman--1.0.sql

Lines changed: 136 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ CREATE TABLE nodes (
2424
-- List of sharded tables
2525
CREATE TABLE tables (
2626
relation text PRIMARY KEY, -- table name
27-
sharding_key text NOT NULL, -- expression by which table is sharded
27+
sharding_key text, -- expression by which table is sharded
28+
master_node integer REFERENCES nodes(id) ON DELETE CASCADE,
2829
partitions_count int NOT NULL, -- number of partitions
29-
create_sql text NOT NULL -- sql to create the table
30+
create_sql text NOT NULL, -- sql to create the table
31+
create_rules_sql text -- sql to create rules for shared table
3032
);
3133

3234
-- Main partitions
@@ -77,11 +79,13 @@ DECLARE
7779
create_tables text = '';
7880
create_partitions text = '';
7981
create_fdws text = '';
82+
create_rules text = '';
8083
replace_parts text = '';
8184
fdw_part_name text;
8285
table_attrs text;
8386
srv_name text;
84-
conn_string_effective text = COALESCE(conn_string, super_conn_string);
87+
rules text = '';
88+
conn_string_effective text = COALESCE(conn_string, super_conn_string);
8589
BEGIN
8690
IF shardman.redirect_to_shardlord(
8791
format('add_node(%L, %L, %L)', super_conn_string, conn_string, repl_group))
@@ -170,7 +174,7 @@ BEGIN
170174
PERFORM shardman.broadcast(usms);
171175

172176
-- Create FDWs at new node for all existed partitions
173-
FOR t IN SELECT * from shardman.tables
177+
FOR t IN SELECT * from shardman.tables WHERE sharding_key IS NOT NULL
174178
LOOP
175179
create_tables := format('%s{%s:%s}',
176180
create_tables, new_node_id, t.create_sql);
@@ -190,6 +194,24 @@ BEGIN
190194
END LOOP;
191195
END LOOP;
192196

197+
-- Create subscriptions for all shared tables
198+
subs := '';
199+
FOR t IN SELECT * from shardman.tables WHERE master_node IS NOT NULL
200+
LOOP
201+
SELECT connection_string INTO conn_string from shardman.nodes WHERE id=t.master_node;
202+
create_tables := format('%s{%s:%s}',
203+
create_tables, new_node_id, t.create_sql);
204+
SELECT shardman.reconstruct_table_attrs(t.relation) INTO table_attrs;
205+
srv_name := format('node_%s', t.master_node);
206+
fdw_part_name := format('%s_fdw', t.relation);
207+
create_fdws := format('%s%s:CREATE FOREIGN TABLE %I %s SERVER %s OPTIONS (table_name %L);',
208+
create_fdws, new_node_id, fdw_part_name, table_attrs, srv_name, t.relation);
209+
subs := format('%s%s:CREATE SUBSCRIPTION share_%s CONNECTION %L PUBLICATION share_%s with (create_slot=false, copy_data=false, slot_name=''share_%s'', synchronous_commit=local);',
210+
subs, new_node_id, t.relation, conn_string, t.relation, t.relation);
211+
create_rules := format('%s{%s:%s}',
212+
create_rules, new_node_id, t.create_rules_sql);
213+
END LOOP;
214+
193215
-- Broadcast create table commands
194216
PERFORM shardman.broadcast(create_tables);
195217
-- Broadcast create hash partitions command
@@ -198,6 +220,10 @@ BEGIN
198220
PERFORM shardman.broadcast(create_fdws);
199221
-- Broadcast replace hash partition commands
200222
PERFORM shardman.broadcast(replace_parts);
223+
-- Broadcast create subscriptions for shared tables
224+
PERFORM shardman.broadcast(subs);
225+
-- Broadcast create rules for shared tables
226+
PERFORM shardman.broadcast(create_rules);
201227
END
202228
$$ LANGUAGE plpgsql;
203229

@@ -210,6 +236,7 @@ DECLARE
210236
node shardman.nodes;
211237
part shardman.partitions;
212238
repl shardman.replicas;
239+
t shardman.tables;
213240
pubs text = '';
214241
subs text = '';
215242
fdws text = '';
@@ -243,6 +270,7 @@ BEGIN
243270
-- Remove all subscriptions and publications of this node
244271
FOR node IN SELECT * FROM shardman.nodes WHERE replication_group=repl_group AND id<>rm_node_id
245272
LOOP
273+
-- Drop publication and subscriptions for replicas
246274
pubs := format('%s%s:DROP PUBLICATION node_%s;
247275
%s:DROP PUBLICATION node_%s;',
248276
pubs, node.id, rm_node_id,
@@ -264,6 +292,13 @@ BEGIN
264292
conf := format('%s%s:SELECT pg_reload_conf();', conf, node.id);
265293
END LOOP;
266294

295+
-- Drop shared tables subscriptions
296+
FOR t IN SELECT * from shardman.tables WHERE master_node IS NOT NULL
297+
LOOP
298+
subs := format('%s%s:DROP SUBSCRIPTION share_%s;',
299+
subs, rm_node_id, t.relation);
300+
END LOOP;
301+
267302
-- Broadcast drop subscription commands, ignore errors because removed node may be not available
268303
PERFORM shardman.broadcast(subs, ignore_errors:=true, sync_commit_on => true,
269304
super_connst => true);
@@ -383,6 +418,8 @@ BEGIN
383418
THEN
384419
RAISE EXCEPTION 'Table % is already sharded', rel_name;
385420
END IF;
421+
422+
-- Generate SQL statement creating this table
386423
SELECT shardman.gen_create_table_sql(rel_name) INTO create_table;
387424

388425
INSERT INTO shardman.tables (relation,sharding_key,partitions_count,create_sql) values (rel_name,expr,part_count,create_table);
@@ -759,6 +796,101 @@ BEGIN
759796
END
760797
$$ LANGUAGE plpgsql;
761798

799+
-- Share table between all nodes. This function should be executed at shardlord. The empty table should be present at shardlord,
800+
-- but not at nodes.
801+
CREATE FUNCTION create_shared_table(rel regclass, master_node_id int) RETURNS void AS $$
802+
DECLARE
803+
node shardman.nodes;
804+
subs text = '';
805+
fdws text = '';
806+
rules text = '';
807+
conn_string text;
808+
create_table text;
809+
create_tables text;
810+
create_rules text;
811+
table_attrs text;
812+
rel_name text = rel::text;
813+
fdw_name text = format('%s_fdw', rel_name);
814+
srv_name text = format('node_%s', master_node_id);
815+
pk text;
816+
dst text;
817+
src text;
818+
BEGIN
819+
-- Check if valid node ID is passed and get connectoin string of this node
820+
SELECT connection_string INTO conn_string FROM shardman.nodes WHERE id=master_node_id;
821+
IF conn_string IS NULL THEN
822+
RAISE EXCEPTION 'There is no node with ID % in the cluster',master_node_id;
823+
END IF;
824+
825+
-- Generate SQL statement creating this table
826+
SELECT shardman.gen_create_table_sql(rel_name) INTO create_table;
827+
828+
create_rules := format('CREATE RULE on_update_to_%s AS ON UPDATE TO %I DO INSTEAD UPDATE %I SET (%s) = (%s) WHERE %s;
829+
CREATE RULE on_insert_to_%s AS ON INSERT TO %I DO INSTEAD INSERT INTO %I (%s) VALUES (%s);
830+
CREATE RULE on_delete_from_%s AS ON DELETE TO %I DO INSTEAD DELETE FROM %I WHERE %s;',
831+
rel_name, rel_name, fdw_rel_name, dst, src, pk,
832+
rel_name, rel_name, fdw_rel_name, dst, src,
833+
rel_name, rel_name, fdw_rel_name, pk);
834+
835+
INSERT INTO shardman.tables (relation,master_node,create_sql,create_rules_sql) values (rel_name,master_node_id,create_table,create_rules);
836+
837+
FOR node IN SELECT * FROM shardman.nodes
838+
LOOP
839+
create_tables := format('%s{%s:%s}',
840+
create_tables, node.id, create_table);
841+
END LOOP;
842+
843+
-- Broadcast create table command
844+
PERFORM shardman.broadcast(create_tables);
845+
846+
-- Create publication at master node
847+
PERFORM shardman.broadcast(format('%s:CREATE PUBLICATION share_%s FOR TABLE %I;
848+
%s:SELECT pg_create_logical_replication_slot(''share_%s'', ''pgoutput'');',
849+
master_node_id, rel_name, rel_name,
850+
master_node_id, rel_name));
851+
852+
-- Construct list of attributes of the table for update/insert
853+
SELECT INTO dst, src
854+
string_agg(quote_ident(attname), ', '),
855+
string_agg('NEW.' || quote_ident(attname), ', ')
856+
FROM pg_attribute
857+
WHERE attrelid = rel
858+
AND NOT attisdropped -- no dropped (dead) columns
859+
AND attnum > 0;
860+
861+
-- Construct primary key condition for update
862+
SELECT INTO pk
863+
string_agg(quote_ident(a.attname) || '=NEW.'|| quote_ident(a.attname), ' AND ')
864+
FROM pg_index i
865+
JOIN pg_attribute a ON a.attrelid = i.indrelid
866+
AND a.attnum = ANY(i.indkey)
867+
WHERE i.indrelid = rel
868+
AND i.indisprimary;
869+
870+
-- Construct table attributes for create foreign table
871+
SELECT shardman.reconstruct_table_attrs(rel) INTO table_attrs;
872+
873+
-- Create subscriptions, foreign tables and rules at all nodes
874+
FOR node IN SELECT * FROM shardman.nodes WHERE id<>master_node_id
875+
LOOP
876+
subs := format('%s%s:CREATE SUBSCRIPTION share_%s CONNECTION %L PUBLICATION share_%s with (create_slot=false, copy_data=false, slot_name=''share_%s'', synchronous_commit=local);',
877+
subs, node.id, rel_name, conn_string, rel_name, rel_name);
878+
fdws := format('%s%s:CREATE FOREIGN TABLE %I %s SERVER %s OPTIONS (table_name %L);',
879+
fdws, node.id, fdw_rel_name, table_attrs, srv_name, rel_name);
880+
rules := format('%s{%s:%s}',
881+
rules, node.id, create_rules);
882+
END LOOP;
883+
884+
-- Create subscriptions at all nodes
885+
PERFORM shardman.broadcast(subs);
886+
-- Create foreign tables at all nodes
887+
PERFORM shardman.broadcast(fdws);
888+
-- Create redirect rules at all nodes
889+
PERFORM shardman.broadcast(rules);
890+
END
891+
$$ LANGUAGE plpgsql;
892+
893+
762894
---------------------------------------------------------------------
763895
-- Utility functions
764896
---------------------------------------------------------------------

0 commit comments

Comments
 (0)