@@ -143,8 +143,8 @@ BEGIN
143
143
FOR node IN SELECT * FROM shardman .nodes WHERE replication_group = repl_group LOOP
144
144
-- Construct list of synchronous standbyes=subscriptions to this node
145
145
sync_standbys :=
146
- coalesce(ARRAY(SELECT format(' sub_%s_%s' , id, node .id ) FROM shardman .nodes
147
- WHERE replication_group = repl_group AND id <> node .id ), ' {}' ::text []);
146
+ coalesce(ARRAY(SELECT format(' sub_%s_%s' , id, node .id ) sby_name FROM shardman .nodes
147
+ WHERE replication_group = repl_group AND id <> node .id ORDER BY sby_name ), ' {}' ::text []);
148
148
sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to ' ' FIRST %s (%s)' ' ;' ,
149
149
sync, node .id , array_length(sync_standbys, 1 ),
150
150
array_to_string(sync_standbys, ' ,' ));
@@ -186,7 +186,7 @@ BEGIN
186
186
LOOP
187
187
create_tables := format(' %s{%s:%s}' ,
188
188
create_tables, new_node_id, t .create_sql );
189
- create_partitions := format(' %s%s:select create_hash_partitions(%L,%L,%L);' ,
189
+ create_partitions := format(' %s%s:SELECT create_hash_partitions(%L,%L,%L);' ,
190
190
create_partitions, new_node_id, t .relation , t .sharding_key , t .partitions_count );
191
191
SELECT shardman .reconstruct_table_attrs (t .relation ) INTO table_attrs;
192
192
FOR part IN SELECT * from shardman .partitions WHERE relation= t .relation
@@ -891,12 +891,12 @@ BEGIN
891
891
SELECT shardman .reconstruct_table_attrs (rel) INTO table_attrs;
892
892
893
893
-- Create rules for redirecting updates
894
- create_rules := format(' CREATE RULE on_update_to_%s AS ON UPDATE TO %I DO INSTEAD UPDATE %I SET (%s) = (%s) WHERE %s;
895
- CREATE RULE on_insert_to_%s AS ON INSERT TO %I DO INSTEAD INSERT INTO %I (%s) VALUES (%s);
896
- CREATE RULE on_delete_from_%s AS ON DELETE TO %I DO INSTEAD DELETE FROM %I WHERE %s;' ,
897
- rel_name, rel_name, fdw_name, dst, src, pk,
898
- rel_name, rel_name, fdw_name, dst, src,
899
- rel_name, rel_name, fdw_name, pk);
894
+ create_rules := format(' CREATE RULE on_update AS ON UPDATE TO %I DO INSTEAD UPDATE %I SET (%s) = (%s) WHERE %s;
895
+ CREATE RULE on_insert AS ON INSERT TO %I DO INSTEAD INSERT INTO %I (%s) VALUES (%s);
896
+ CREATE RULE on_delete AS ON DELETE TO %I DO INSTEAD DELETE FROM %I WHERE %s;' ,
897
+ rel_name, fdw_name, dst, src, pk,
898
+ rel_name, fdw_name, dst, src,
899
+ rel_name, fdw_name, pk);
900
900
901
901
-- Create table at all nodes
902
902
FOR node IN SELECT * FROM shardman .nodes
@@ -1093,10 +1093,307 @@ BEGIN
1093
1093
END
1094
1094
$$ LANGUAGE plpgsql;
1095
1095
1096
+
1097
+ -- Check consistency of cluster with metadata and perform recovery
1098
+ CREATE FUNCTION recovery () RETURNS void AS $$
1099
+ DECLARE
1100
+ dst_node shardman .nodes ;
1101
+ src_node shardman .nodes ;
1102
+ part shardman .partitions ;
1103
+ repl shardman .replicas ;
1104
+ t shardman .tables ;
1105
+ repl_group text ;
1106
+ server_opts text ;
1107
+ um_opts text ;
1108
+ table_attrs text ;
1109
+ fdw_part_name text ;
1110
+ srv_name text ;
1111
+ create_table text ;
1112
+ conn_string text ;
1113
+ pub_name text ;
1114
+ sub_name text ;
1115
+ pubs text = ' ' ;
1116
+ subs text = ' ' ;
1117
+ sync text = ' ' ;
1118
+ conf text = ' ' ;
1119
+ old_replicated_tables text ;
1120
+ new_replicated_tables text ;
1121
+ node_id int ;
1122
+ sync_standbys text [];
1123
+ old_sync_policy text ;
1124
+ new_sync_policy text ;
1125
+ BEGIN
1126
+ IF shardman .redirect_to_shardlord (' recovery()' )
1127
+ THEN
1128
+ RETURN;
1129
+ END IF;
1130
+
1131
+ -- Restore FDWs
1132
+ FOR src_node in SELECT * FROM shardman .nodes
1133
+ LOOP
1134
+ FOR dst_node in SELECT * FROM shardman .nodes
1135
+ LOOP
1136
+ IF src_node .id <> dst_node .id
1137
+ THEN
1138
+ -- Create foreign server if not exists
1139
+ srv_name := format(' node_%s' , dst_node .id );
1140
+ IF shardman .not_exists (src_node .id , format(' pg_foreign_server WHERE srvname=%L' , srv_name))
1141
+ THEN
1142
+ SELECT * FROM shardman .conninfo_to_postgres_fdw_opts (dst_node .connection_string ) INTO server_opts, um_opts;
1143
+ RAISE NOTICE ' Create foreign server % at node %' , srv_name, src_node .id ;
1144
+ PERFORM shardman .broadcast (format(' {%s:CREATE SERVER %I FOREIGN DATA WRAPPER postgres_fdw %s;
1145
+ CREATE USER MAPPING FOR CURRENT_USER SERVER %I %s}' ,
1146
+ src_node .id , srv_name, server_opts,
1147
+ srv_name, um_opts));
1148
+ END IF;
1149
+ END IF;
1150
+ END LOOP;
1151
+
1152
+ FOR part IN SELECT * from shardman .partitions
1153
+ LOOP
1154
+ -- Create parent table if not exists
1155
+ IF shardman .not_exists (src_node .id , format(' pg_class WHERE relname=%I' , part .relation ))
1156
+ THEN
1157
+ RAISE NOTICE ' Create table % at node %' , part .relation , src_node_id;
1158
+ SELECT create_sql INTO create_table FROM sharman .tables WHERE relation= part .relation ;
1159
+ PERFORM shardman .broadcast (format(' {%s:%s}' , src_node .id , create_table));
1160
+ END IF;
1161
+
1162
+ IF part .node_id <> src_node .id
1163
+ THEN -- foreign partition
1164
+ fdw_part_name := format(' %s_fdw' , part .part_name );
1165
+ srv_name := format(' node_%s' , part .node_id );
1166
+
1167
+ -- Create foreign table if not exists
1168
+ IF shardman .not_exists (src_node .id , format(' pg_class c,pg_foreign_table f WHERE c.oid=f.ftrelid AND c.relname=%L' , fdw_part_name))
1169
+ THEN
1170
+ RAISE NOTICE ' Create foreign table %I at node %' , fdw_part_name, src_node .id ;
1171
+ SELECT shardman .reconstruct_table_attrs (part .relation ) INTO table_attrs;
1172
+ PERFORM shardman .broadcast (format(' %s:CREATE FOREIGN TABLE % %s SERVER %s OPTIONS (table_name %L);' ,
1173
+ src_node .id , fdw_part_name, table_attrs, srv_name, part .relation ));
1174
+ ELSIF shardman .not_exists (src_node .id , format(' pg_class c,pg_foreign_table f,pg_foreign_server s WHERE c.oid=f.ftrelid AND c.relname=%L AND f.ftserver=s.oid AND s.srvname = %L' , fdw_part_name, srv_name))
1175
+ THEN
1176
+ RAISE NOTICE ' Bind foreign table % to server % at node %' , fdw_part_name, srv_name, src_node .id ;
1177
+ PERFORM shardman .broadcast (format(' %s:UPDATE pg_foreign_table SET ftserver = (SELECT oid FROM pg_foreign_server WHERE srvname = %L) WHERE ftrelid = (SELECT oid FROM pg_class WHERE relname=%L);' ,
1178
+ src_node .id , srv_name, fdw_part_name));
1179
+ END IF;
1180
+
1181
+ -- Check if parent table contains as child foreign table
1182
+ IF shardman .not_exists (src_node .id , format(' pg_class p,pg_inteherits i,pg_class c WHERE p.relname=%L AND p.oid=i.inhparent AND i.inhrelid=c.oid AND c.relname=%L' ,
1183
+ part .relation , fdw_part_name))
1184
+ THEN
1185
+ -- If parent table contains neither local neither foreign partititiones, then assume that table is not partitioned at all
1186
+ IF shardman .not_exists (src_node .id , format(' pg_class p,pg_inteherits i,pg_class c WHERE p.relname=%L AND p.oid=i.inhparent AND i.inhrelid=c.oid AND c.relname=%L' ,
1187
+ part .relation , part .part_name ))
1188
+ THEN
1189
+ RAISE NOTICE ' Create hash partitions for table % at node %' , part .relation , src_node .id ;
1190
+ SELECT * INTO t FROM shardman .tables WHERE relation= part .relation ;
1191
+ PERFORM shardman .broadcast (format(' %s:SELECT create_hash_partitions(%L,%L,%L);' ,
1192
+ src_node .id , t .relation , t .sharding_key , t .partitions_count ));
1193
+ END IF;
1194
+ RAISE NOTICE ' Replace % with % at node %' , part .part_name , fdw_part_name, src_node .id ;
1195
+ PERFORM shardman .broadcast (format(' %s:SELECT replace_hash_partition(%L,%L);' ,
1196
+ src_node .id , part .part_name , fdw_part_name));
1197
+ END IF;
1198
+ ELSE -- local partition
1199
+ -- Check if parent table contains as child local partition
1200
+ IF shardman .not_exists (src_node .id , format(' pg_class p,pg_inteherits i,pg_class c WHERE p.relname=%L AND p.oid=i.inhparent AND i.inhrelid=c.oid AND c.relname=%L' ,
1201
+ part .relation , part .part_name ))
1202
+ THEN
1203
+ -- If parent table contains neither local neither foreign partititiones, then assume that table is not partitioned at all
1204
+ IF shardman .not_exists (src_node .id , format(' pg_class p,pg_inteherits i,pg_class c WHERE p.relname=%L AND p.oid=i.inhparent AND i.inhrelid=c.oid AND c.relname=%L' ,
1205
+ part .relation , fdw_part_name))
1206
+ THEN
1207
+ RAISE NOTICE ' Create hash partitions for table % at node %' , part .relation , src_node .id ;
1208
+ SELECT * INTO t FROM shardman .tables WHERE relation= part .relation ;
1209
+ PERFORM shardman .broadcast (format(' %s:SELECT create_hash_partitions(%L,%L,%L);' ,
1210
+ src_node .id , t .relation , t .sharding_key , t .partitions_count ));
1211
+ END IF;
1212
+ RAISE NOTICE ' Replace % with % at node %' , fdw_part_name, part .part_name , src_node .id ;
1213
+ PERFORM shardman .broadcast (format(' %s:SELECT replace_hash_partition(%L,%L);' ,
1214
+ src_node .id , fdw_part_name, part .part_name ));
1215
+ END IF;
1216
+ END IF;
1217
+ END LOOP;
1218
+ END LOOP;
1219
+
1220
+ -- Restore replication channels
1221
+ FOR repl_group IN SELECT DISTINCT replication_group FROM shardman .nodes
1222
+ LOOP
1223
+ FOR src_node IN SELECT * from shardman .nodes WHERE replication_group = repl_group
1224
+ LOOP
1225
+ FOR dst_node IN SELECT * from shardman .nodes WHERE replication_group = repl_group AND id<> src_node .id
1226
+ LOOP
1227
+ pub_name := format(' node_%s' , dst_node .id );
1228
+ sub_name := format(' sub_%s_%s' , dst_node .id , src_node .id );
1229
+
1230
+ -- Construct list of partitions published by this node
1231
+ SELECT string_agg(pname, ' ,' ) INTO new_replicated_tables FROM
1232
+ (SELECT p .part_name pname FROM shardman .partitions p,shardman .replicas r WHERE p .node_id = src_node .id AND p .part_name = r .part_name ORDER BY p .part_name ) parts;
1233
+ SELECT string_agg(pname, ' ,' ) INTO old_replicated_tables FROM
1234
+ (SELECT c .relname pname FROM pg_publication p,pg_publication_rel r,pg_class c WHERE p .pubname = pub_name AND p .oid = r .prpubid AND r .prrelid = c .oid ORDER BY c .relname ) parts;
1235
+
1236
+ -- Create publication if not exists
1237
+ IF shardman .not_exists (src_node .id , format(' pg_publication WHERE pubname=%L' , pub_name))
1238
+ THEN
1239
+ RAISE NOTICE ' Create publication % at node %' , pub_name, src_node .id ;
1240
+ pubs := format(' %s%s:CREATE PUBLICATION %I FOR TABLE %s;' ,
1241
+ pubs, src_node .id , pub_name, new_replicated_tables);
1242
+ ELSIF new_replicated_tables<> old_replicated_tables
1243
+ THEN
1244
+ RAISE NOTICE ' Alter publication % at node %' , pub_name, src_node .id ;
1245
+ pubs := format(' %s%s:ALTER PUBLICATION %I SET TABLE %s;' ,
1246
+ pubs, src_node .id , pub_name, new_replicated_tables);
1247
+ END IF;
1248
+
1249
+ -- Create replication slot if not exists
1250
+ IF shardman .not_exists (src_node .id , format(' pg_replication_slots WHERE slot_name=%L' , pub_name))
1251
+ THEN
1252
+ RAISE NOTICE ' Create replication slot % at node %' , pub_name, src_node .id ;
1253
+ pubs := format(' %s%s:SELECT pg_create_logical_replication_slot(%L, ' ' pgoutput' ' );' ,
1254
+ pubs, src_node .id , pub_name);
1255
+ END IF;
1256
+
1257
+ -- Create subscription if not exists
1258
+ IF shardman .not_exists (dst_node .id , format(' pg_subsription WHERE subname=%L' , sub_name))
1259
+ THEN
1260
+ RAISE NOTICE ' Create subscription % at node %' , sub_name, drc_node .id ;
1261
+ subs := format(' %s%s:CREATE SUBSCRIPTION %I CONNECTION %L PUBLICATION %I WITH (copy_data=false, create_slot=false, slot_name=%L, synchronous_commit=local);' ,
1262
+ subs, dst_node .id , sub_name, src_node .connection_string , pub_name, pub_name);
1263
+ END IF;
1264
+ END LOOP;
1265
+
1266
+ -- Resotore synchronous standby list
1267
+ IF shardman .synchronous_replication ()
1268
+ THEN
1269
+ sync_standbys :=
1270
+ coalesce(ARRAY(SELECT format(' sub_%s_%s' , id, node .id ) sby_name FROM shardman .nodes
1271
+ WHERE replication_group = repl_group AND id <> src_node .id ORDER BY sby_name), ' {}' ::text []);
1272
+ new_sync_policy := format(' FIRST %s (%s)' , array_length(sync_standbys, 1 ), array_to_string(sync_standbys, ' ,' ));
1273
+
1274
+ SELECT shardman .broadcast (format(' %s:SELECT setting from pg_settings WHERE name=' ' synchronous_standby_names' ' ;' , src_node .id ))
1275
+ INTO old_sync_policy;
1276
+
1277
+ IF old_sync_policy<> new_sync_policy
1278
+ THEN
1279
+ RAISE NOTICE ' Alter synchronous_standby_names to ' ' %' ' at node %' , new_sync_policy, src_node .id ;
1280
+ sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
1281
+ sync, src_node .id , new_sync_policy);
1282
+ conf := format(' %s%s:SELECT pg_reload_conf();' , conf, src_node .id );
1283
+ END IF;
1284
+ END IF;
1285
+ END LOOP;
1286
+ END LOOP;
1287
+
1288
+ -- Create not existed publications
1289
+ PERFORM shardman .broadcast (pubs, super_connstr => true);
1290
+ -- Create not existed subscriptions
1291
+ PERFORM shardman .broadcast (subs, super_connstr => true);
1292
+
1293
+ IF sync <> ' '
1294
+ THEN -- Alter synchronous_standby_names if needed
1295
+ PERFORM shardman .broadcast (sync, sync_commit_on => true, super_connstr => true);
1296
+ PERFORM shardman .broadcast (conf, super_connstr => true);
1297
+ END IF;
1298
+
1299
+
1300
+ -- Restore shared tables
1301
+ pubs := ' ' ;
1302
+ subs := ' ' ;
1303
+ FOR t IN SELECT * from shardman .tables WHERE master_node IS NOT NULL
1304
+ LOOP
1305
+ -- Create table if not exists
1306
+ IF shardman .not_exists (t .master_node , format(' pg_class WHERE relname=%I' , t .relation ))
1307
+ THEN
1308
+ RAISE NOTICE ' Create table % at node %' , t .relation , t .master_node ;
1309
+ PERFORM shardman .broadcast (format(' {%s:%s}' , t .master_node , t .create_sql ));
1310
+ END IF;
1311
+
1312
+ -- Construct list of shared tables at this node
1313
+ SELECT string_agg(pname, ' ,' ) INTO new_replicated_tables FROM
1314
+ (SELECT relation AS pname FROM shardman .tables WHERE master_node= t .master_node ORDER BY relation) shares;
1315
+ SELECT string_agg(pname, ' ,' ) INTO old_replicated_tables FROM
1316
+ (SELECT c .relname pname FROM pg_publication p,pg_publication_rel r,pg_class c WHERE p .pubname = ' shared_tables' AND p .oid = r .prpubid AND r .prrelid = c .oid ORDER BY c .relname ) shares;
1317
+
1318
+ -- Create publication if not exists
1319
+ IF shardman .not_exists (t .master_node , ' pg_publication WHERE pubname=' ' shared_tables' ' ' )
1320
+ THEN
1321
+ RAISE NOTICE ' Create publication shared_tables at node %' , master_node_id;
1322
+ pubs := format(' %s%s:CREATE PUBLICATION shared_tables FOR TABLE %s;' ,
1323
+ pubs, t .master_node , new_replicated_tables);
1324
+ ELSIF new_replicated_tables<> old_replicated_tables
1325
+ THEN
1326
+ RAISE NOTICE ' Alter publication shared_tables at node %' , master_node_id;
1327
+ pubs := format(' %s%s:ALTER PUBLICATION shared_tables SET TABLE %s;' ,
1328
+ pubs, t .master_node , new_replicated_tables);
1329
+ END IF;
1330
+
1331
+ SELECT connection_string INTO conn_string FROM shardman .nodes WHERE id= t .master_node ;
1332
+ srv_name := format(' node_%s' , t .master_node );
1333
+
1334
+ -- Create replicas of shared table at all nodes if not exist
1335
+ FOR node_id IN SELECT id from shardman .nodes WHERE id<> t .master_node
1336
+ LOOP
1337
+ -- Create table if not exists
1338
+ IF shardman .not_exists (node_id, format(' pg_class WHERE relname=%I' , t .relation ))
1339
+ THEN
1340
+ RAISE NOTICE ' Create table % at node %' , t .relation , node_id;
1341
+ PERFORM shardman .broadcast (format(' {%s:%s}' , node_id, t .create_sql ));
1342
+ END IF;
1343
+
1344
+ -- Create foreign table if not exists
1345
+ fdw_part_name := format(' %s_fdw' , t .relation );
1346
+ IF shardman .not_exists (node_id, format(' pg_class c,pg_foreign_table f ON c.oid=f.ftrelid WHERE c.relname=%L' , fdw_part_name))
1347
+ THEN
1348
+ RAISE NOTICE ' Create foreign table %I at node %' , fdw_part_name, node_id;
1349
+ SELECT shardman .reconstruct_table_attrs (t .relation ) INTO table_attrs;
1350
+ PERFORM shardman .broadcast (format(' %s:CREATE FOREIGN TABLE % %s SERVER %s OPTIONS (table_name %L);' ,
1351
+ node_id, fdw_part_name, table_attrs, srv_name, t .relation ));
1352
+ END IF;
1353
+
1354
+ -- Create rules if not exists
1355
+ IF shardman .not_exists (node_id, format(' pg_rules WHERE tablename=%I AND rulename=' ' on_update' ' ' , t .relation ))
1356
+ THEN
1357
+ RAISE NOTICE ' Create rules for table % at node %' , t .relation , node_id;
1358
+ PERFORM shardman .broadcast (format(' {%s:%s}' , node_id, t .create_rules_sql ));
1359
+ END IF;
1360
+
1361
+ -- Create subscription to master if not exists
1362
+ sub_name := format(' share_%s_%s' , node_id, t .master_node );
1363
+ IF shardman .not_exists (node .id , format(' pg_subscription WHERE slot_name=%L' , sub_name))
1364
+ THEN
1365
+ RAISE NOTICE ' Create subscription % at node %' , sub_name, node_id;
1366
+ subs := format(' %s%s:CREATE SUBSCRIPTION %I CONNECTION %L PUBLICATION shared_tables with (copy_data=false, synchronous_commit=local);' ,
1367
+ subs, node_id, sub_name, conn_string);
1368
+ END IF;
1369
+ END LOOP;
1370
+ END LOOP;
1371
+
1372
+ -- Create not existed publications
1373
+ PERFORM shardman .broadcast (pubs, super_connstr => true);
1374
+ -- Create not existed subscriptions
1375
+ PERFORM shardman .broadcast (subs, super_connstr => true);
1376
+ END
1377
+ $$ LANGUAGE plpgsql;
1378
+
1379
+
1380
+
1096
1381
-- -------------------------------------------------------------------
1097
1382
-- Utility functions
1098
1383
-- -------------------------------------------------------------------
1099
1384
1385
+ -- Check if resource exists at remote node
1386
+ CREATE FUNCTION not_exists (node_id int , what text ) RETURNS bool AS $$
1387
+ DECLARE
1388
+ req text ;
1389
+ resp text ;
1390
+ BEGIN
1391
+ req := format(' SELECT count(*) FROM %s;' , what);
1392
+ SELECT shardman .boradcast (req) INTO resp;
1393
+ return resp::bigint = 0 ;
1394
+ END
1395
+ $$ LANGUAGE plpgsql;
1396
+
1100
1397
-- Execute command at shardlord
1101
1398
CREATE FUNCTION redirect_to_shardlord (cmd text ) RETURNS bool AS $$
1102
1399
BEGIN
0 commit comments