@@ -26,7 +26,7 @@ CREATE TABLE tables (
26
26
relation text PRIMARY KEY , -- table name
27
27
sharding_key text , -- expression by which table is sharded
28
28
master_node integer REFERENCES nodes(id) ON DELETE CASCADE ,
29
- partitions_count int NOT NULL , -- number of partitions
29
+ partitions_count int , -- number of partitions
30
30
create_sql text NOT NULL , -- sql to create the table
31
31
create_rules_sql text -- sql to create rules for shared table
32
32
);
@@ -85,6 +85,7 @@ DECLARE
85
85
table_attrs text ;
86
86
srv_name text ;
87
87
rules text = ' ' ;
88
+ master_node_id int ;
88
89
conn_string_effective text = COALESCE(conn_string, super_conn_string);
89
90
BEGIN
90
91
IF shardman .redirect_to_shardlord (
@@ -194,8 +195,7 @@ BEGIN
194
195
END LOOP;
195
196
END LOOP;
196
197
197
- -- Create subscriptions for all shared tables
198
- subs := ' ' ;
198
+ -- Create at new node FDWs for all shared tables
199
199
FOR t IN SELECT * from shardman .tables WHERE master_node IS NOT NULL
200
200
LOOP
201
201
SELECT connection_string INTO conn_string from shardman .nodes WHERE id= t .master_node ;
@@ -206,24 +206,30 @@ BEGIN
206
206
fdw_part_name := format(' %s_fdw' , t .relation );
207
207
create_fdws := format(' %s%s:CREATE FOREIGN TABLE %I %s SERVER %s OPTIONS (table_name %L);' ,
208
208
create_fdws, new_node_id, fdw_part_name, table_attrs, srv_name, t .relation );
209
- subs := format(' %s%s:CREATE SUBSCRIPTION share_%s CONNECTION %L PUBLICATION share_%s with (create_slot=false, copy_data=false, slot_name=' ' share_%s' ' , synchronous_commit=local);' ,
210
- subs, new_node_id, t .relation , conn_string, t .relation , t .relation );
211
209
create_rules := format(' %s{%s:%s}' ,
212
210
create_rules, new_node_id, t .create_rules_sql );
213
211
END LOOP;
214
212
215
- -- Broadcast create table commands
213
+ -- Create subscriptions for all shared tables
214
+ subs := ' ' ;
215
+ FOR master_node_id IN SELECT DISTINCT master_node FROM shardman .tables WHERE master_node IS NOT NULL
216
+ LOOP
217
+ subs := format(' %s%s:CREATE SUBSCRIPTION share_%s_%s CONNECTION %L PUBLICATION shared_tables with (synchronous_commit=local);' ,
218
+ subs, new_node_id, new_node_id, t .master_node , conn_string);
219
+ END LOOP;
220
+
221
+ -- Broadcast create table commands
216
222
PERFORM shardman .broadcast (create_tables);
217
223
-- Broadcast create hash partitions command
218
224
PERFORM shardman .broadcast (create_partitions);
219
225
-- Broadcast create foreign table commands
220
226
PERFORM shardman .broadcast (create_fdws);
221
227
-- Broadcast replace hash partition commands
222
228
PERFORM shardman .broadcast (replace_parts);
223
- -- Broadcast create subscriptions for shared tables
224
- PERFORM shardman .broadcast (subs);
225
229
-- Broadcast create rules for shared tables
226
230
PERFORM shardman .broadcast (create_rules);
231
+ -- Broadcast create subscriptions for shared tables
232
+ PERFORM shardman .broadcast (subs, super_connstr => true);
227
233
END
228
234
$$ LANGUAGE plpgsql;
229
235
@@ -236,7 +242,6 @@ DECLARE
236
242
node shardman .nodes ;
237
243
part shardman .partitions ;
238
244
repl shardman .replicas ;
239
- t shardman .tables ;
240
245
pubs text = ' ' ;
241
246
subs text = ' ' ;
242
247
fdws text = ' ' ;
@@ -246,6 +251,7 @@ DECLARE
246
251
new_master_id int ;
247
252
sync_standbys text [];
248
253
repl_group text ;
254
+ master_node_id int ;
249
255
BEGIN
250
256
IF shardman .redirect_to_shardlord (format(' rm_node(%L, %L)' , rm_node_id, force))
251
257
THEN
@@ -293,10 +299,10 @@ BEGIN
293
299
END LOOP;
294
300
295
301
-- Drop shared tables subscriptions
296
- FOR t IN SELECT * from shardman .tables WHERE master_node IS NOT NULL
302
+ FOR master_node_id IN SELECT DISTINCT master_node from shardman .tables WHERE master_node IS NOT NULL
297
303
LOOP
298
- subs := format(' %s%s:DROP SUBSCRIPTION share_%s;' ,
299
- subs, rm_node_id, t . relation );
304
+ subs := format(' %s%s:DROP SUBSCRIPTION share_%s_% s;' ,
305
+ subs, rm_node_id, rm_node_id, master_node_id );
300
306
END LOOP;
301
307
302
308
-- Broadcast drop subscription commands, ignore errors because removed node may be not available
@@ -798,9 +804,10 @@ $$ LANGUAGE plpgsql;
798
804
799
805
-- Share table between all nodes. This function should be executed at shardlord. The empty table should be present at shardlord,
800
806
-- but not at nodes.
801
- CREATE FUNCTION create_shared_table (rel regclass, master_node_id int ) RETURNS void AS $$
807
+ CREATE FUNCTION create_shared_table (rel regclass, master_node_id int = 1 ) RETURNS void AS $$
802
808
DECLARE
803
809
node shardman .nodes ;
810
+ pubs text = ' ' ;
804
811
subs text = ' ' ;
805
812
fdws text = ' ' ;
806
813
rules text = ' ' ;
@@ -815,40 +822,14 @@ DECLARE
815
822
pk text ;
816
823
dst text ;
817
824
src text ;
825
+ new_master bool;
818
826
BEGIN
819
- -- Check if valid node ID is passed and get connectoin string of this node
827
+ -- Check if valid node ID is passed and get connectoin string for this node
820
828
SELECT connection_string INTO conn_string FROM shardman .nodes WHERE id= master_node_id;
821
829
IF conn_string IS NULL THEN
822
- RAISE EXCEPTION ' There is no node with ID % in the cluster' ,master_node_id;
830
+ RAISE EXCEPTION ' There is no node with ID % in the cluster' , master_node_id;
823
831
END IF;
824
832
825
- -- Generate SQL statement creating this table
826
- SELECT shardman .gen_create_table_sql (rel_name) INTO create_table;
827
-
828
- create_rules := format(' CREATE RULE on_update_to_%s AS ON UPDATE TO %I DO INSTEAD UPDATE %I SET (%s) = (%s) WHERE %s;
829
- CREATE RULE on_insert_to_%s AS ON INSERT TO %I DO INSTEAD INSERT INTO %I (%s) VALUES (%s);
830
- CREATE RULE on_delete_from_%s AS ON DELETE TO %I DO INSTEAD DELETE FROM %I WHERE %s;' ,
831
- rel_name, rel_name, fdw_rel_name, dst, src, pk,
832
- rel_name, rel_name, fdw_rel_name, dst, src,
833
- rel_name, rel_name, fdw_rel_name, pk);
834
-
835
- INSERT INTO shardman .tables (relation,master_node,create_sql,create_rules_sql) values (rel_name,master_node_id,create_table,create_rules);
836
-
837
- FOR node IN SELECT * FROM shardman .nodes
838
- LOOP
839
- create_tables := format(' %s{%s:%s}' ,
840
- create_tables, node .id , create_table);
841
- END LOOP;
842
-
843
- -- Broadcast create table command
844
- PERFORM shardman .broadcast (create_tables);
845
-
846
- -- Create publication at master node
847
- PERFORM shardman .broadcast (format(' %s:CREATE PUBLICATION share_%s FOR TABLE %I;
848
- %s:SELECT pg_create_logical_replication_slot(' ' share_%s' ' , ' ' pgoutput' ' );' ,
849
- master_node_id, rel_name, rel_name,
850
- master_node_id, rel_name));
851
-
852
833
-- Construct list of attributes of the table for update/insert
853
834
SELECT INTO dst, src
854
835
string_agg(quote_ident(attname), ' , ' ),
@@ -860,29 +841,71 @@ BEGIN
860
841
861
842
-- Construct primary key condition for update
862
843
SELECT INTO pk
863
- string_agg(quote_ident(a .attname ) || ' =NEW .' || quote_ident(a .attname ), ' AND ' )
844
+ string_agg(quote_ident(a .attname ) || ' =OLD .' || quote_ident(a .attname ), ' AND ' )
864
845
FROM pg_index i
865
846
JOIN pg_attribute a ON a .attrelid = i .indrelid
866
847
AND a .attnum = ANY(i .indkey )
867
848
WHERE i .indrelid = rel
868
849
AND i .indisprimary ;
869
850
851
+ -- Generate SQL statement creating this table
852
+ SELECT shardman .gen_create_table_sql (rel_name) INTO create_table;
853
+
870
854
-- Construct table attributes for create foreign table
871
855
SELECT shardman .reconstruct_table_attrs (rel) INTO table_attrs;
872
856
857
+ -- Create rules for redirecting updates
858
+ create_rules := format(' CREATE RULE on_update_to_%s AS ON UPDATE TO %I DO INSTEAD UPDATE %I SET (%s) = (%s) WHERE %s;
859
+ CREATE RULE on_insert_to_%s AS ON INSERT TO %I DO INSTEAD INSERT INTO %I (%s) VALUES (%s);
860
+ CREATE RULE on_delete_from_%s AS ON DELETE TO %I DO INSTEAD DELETE FROM %I WHERE %s;' ,
861
+ rel_name, rel_name, fdw_name, dst, src, pk,
862
+ rel_name, rel_name, fdw_name, dst, src,
863
+ rel_name, rel_name, fdw_name, pk);
864
+
865
+ -- Create table at all nodes
866
+ FOR node IN SELECT * FROM shardman .nodes
867
+ LOOP
868
+ create_tables := format(' %s{%s:%s}' ,
869
+ create_tables, node .id , create_table);
870
+ END LOOP;
871
+
872
+ -- Create publication at master node
873
+ IF EXISTS(SELECT * from shardman .tables WHERE master_node= master_node_id)
874
+ THEN
875
+ new_master := false;
876
+ pubs := format(' %s:ALTER PUBLICATION shared_tables ADD TABLE %I;' ,
877
+ master_node_id, rel_name);
878
+ ELSE
879
+ new_master := true;
880
+ pubs := format(' %s:CREATE PUBLICATION shared_tables FOR TABLE %I;' ,
881
+ master_node_id, rel_name);
882
+ END IF;
883
+
884
+ -- Insert information about new table in shardman.tables
885
+ INSERT INTO shardman .tables (relation,master_node,create_sql,create_rules_sql) values (rel_name,master_node_id,create_table,create_rules);
886
+
873
887
-- Create subscriptions, foreign tables and rules at all nodes
874
888
FOR node IN SELECT * FROM shardman .nodes WHERE id<> master_node_id
875
889
LOOP
876
- subs := format(' %s%s:CREATE SUBSCRIPTION share_%s CONNECTION %L PUBLICATION share_%s with (create_slot=false, copy_data=false, slot_name=' ' share_%s' ' , synchronous_commit=local);' ,
877
- subs, node .id , rel_name, conn_string, rel_name, rel_name);
890
+ IF new_master THEN
891
+ subs := format(' %s%s:CREATE SUBSCRIPTION share_%s_%s CONNECTION %L PUBLICATION shared_tables WITH (copy_data=false, synchronous_commit=local);' ,
892
+ subs, node .id , node .id , master_node_id, conn_string);
893
+ ELSE
894
+ subs := format(' %s%s:ALTER SUBSCRIPTION share_%s_%s REFRESH PUBLICATIONS WITH (copy_data=false);' ,
895
+ subs, node .id , node .id , master_node_id);
896
+ END IF;
878
897
fdws := format(' %s%s:CREATE FOREIGN TABLE %I %s SERVER %s OPTIONS (table_name %L);' ,
879
- fdws, node .id , fdw_rel_name , table_attrs, srv_name, rel_name);
898
+ fdws, node .id , fdw_name , table_attrs, srv_name, rel_name);
880
899
rules := format(' %s{%s:%s}' ,
881
900
rules, node .id , create_rules);
882
901
END LOOP;
883
902
903
+ -- Broadcast create table command
904
+ PERFORM shardman .broadcast (create_tables);
905
+ -- Create or alter publication at master node
906
+ PERFORM shardman .broadcast (pubs);
884
907
-- Create subscriptions at all nodes
885
- PERFORM shardman .broadcast (subs);
908
+ PERFORM shardman .broadcast (subs, sync_commit_on => true, super_connstr => true );
886
909
-- Create foreign tables at all nodes
887
910
PERFORM shardman .broadcast (fdws);
888
911
-- Create redirect rules at all nodes
0 commit comments