@@ -840,11 +840,11 @@ BEGIN
840
840
END IF;
841
841
842
842
-- Copy partition data to new location
843
- pubs := format(' %s:CREATE PUBLICATION copy_ %s FOR TABLE %I;
844
- %s:SELECT pg_create_logical_replication_slot(' ' copy_ %s' ' , ' ' pgoutput' ' );' ,
843
+ pubs := format(' %s:CREATE PUBLICATION shardman_copy_ %s FOR TABLE %I;
844
+ %s:SELECT pg_create_logical_replication_slot(' ' shardman_copy_ %s' ' , ' ' pgoutput' ' );' ,
845
845
src_node_id, mv_part_name, mv_part_name,
846
846
src_node_id, mv_part_name);
847
- subs := format(' %s:CREATE SUBSCRIPTION copy_ %s CONNECTION %L PUBLICATION copy_ %s with (create_slot=false, slot_name=' ' copy_ %s' ' , synchronous_commit=local);' ,
847
+ subs := format(' %s:CREATE SUBSCRIPTION shardman_copy_ %s CONNECTION %L PUBLICATION shardman_copy_ %s with (create_slot=false, slot_name=' ' shardman_copy_ %s' ' , synchronous_commit=local);' ,
848
848
dst_node_id, mv_part_name, conn_string, mv_part_name, mv_part_name);
849
849
850
850
-- Create publication and slot for copying
@@ -857,11 +857,28 @@ BEGIN
857
857
RAISE NOTICE ' Copy of partition % from node % to % is completed' ,
858
858
mv_part_name, src_node_id, dst_node_id;
859
859
860
- pubs := format( ' %s:DROP PUBLICATION copy_%s; ' , src_node_id, mv_part_name) ;
860
+ pubs := ' ' ;
861
861
subs := ' ' ;
862
862
863
- -- Update replication channels
864
- FOR repl_node_id IN SELECT node_id from shardman .replicas WHERE part_name= mv_part_name
863
+ -- Drop temporary LR channel
864
+ PERFORM shardman .broadcast (format(' %s:DROP PUBLICATION shardman_copy_%s;' ,
865
+ src_node_id, mv_part_name),
866
+ super_connstr => true);
867
+ -- drop sub cannot be executed in multi-command string, so don't set
868
+ -- synchronous_commit to local
869
+ PERFORM shardman .broadcast (format(
870
+ ' %s:DROP SUBSCRIPTION shardman_copy_%s;' ,
871
+ dst_node_id, mv_part_name), super_connstr => true, sync_commit_on => true);
872
+
873
+ -- Drop old channels and establish new ones. We don't care much about the
874
+ -- order of actions: if recover() initially fixes LR channels and only then
875
+ -- repairs mappings, we will be fine anyway: all nodes currently see old
876
+ -- location which is locked for writes, and will be unlocked (if needed)
877
+ -- only after fixing channels and mappings. Ideally we should also block
878
+ -- reads of old partition to prevent returning stale data.
879
+ pubs := ' ' ;
880
+ subs := ' ' ;
881
+ FOR repl_node_id IN SELECT node_id FROM shardman .replicas WHERE part_name= mv_part_name
865
882
LOOP
866
883
pubs := format(' %s%s:ALTER PUBLICATION node_%s DROP TABLE %I;
867
884
%s:ALTER PUBLICATION node_%s ADD TABLE %I;' ,
@@ -877,9 +894,6 @@ BEGIN
877
894
PERFORM shardman .broadcast (pubs, super_connstr => true);
878
895
-- Broadcast alter subscription commands
879
896
PERFORM shardman .broadcast (subs, super_connstr => true);
880
- -- Drop copy subscription
881
- PERFORM shardman .broadcast (format(' %s:DROP SUBSCRIPTION copy_%s;' ,
882
- dst_node_id, mv_part_name), sync_commit_on => true, super_connstr => true);
883
897
884
898
-- Now, with source part locked and dst part fully copied, update owner of
885
899
-- this partition: we consider move as completed at this point. We must make
@@ -1278,7 +1292,8 @@ BEGIN
1278
1292
END
1279
1293
$$ LANGUAGE plpgsql;
1280
1294
1281
- -- Check consistency of cluster with metadata and perform recovery
1295
+ -- Check consistency of cluster against metadata and perform recovery. All nodes
1296
+ -- must be up for successfull completion.
1282
1297
CREATE FUNCTION recover () RETURNS void AS $$
1283
1298
DECLARE
1284
1299
dst_node shardman .nodes ;
@@ -1307,17 +1322,118 @@ DECLARE
1307
1322
sync_standbys text [];
1308
1323
old_sync_policy text ;
1309
1324
new_sync_policy text ;
1325
+ node record;
1326
+ prim name;
1327
+ foreign_part name;
1310
1328
BEGIN
1311
1329
IF shardman .redirect_to_shardlord (' recover()' )
1312
1330
THEN
1313
1331
RETURN;
1314
1332
END IF;
1315
1333
1334
+ -- Remove potentially hanged temporary pub and sub used for mv_partition,
1335
+ -- truncate & forbid writes to not used partitions, unlock used partitions
1336
+ -- to fix up things after suddenly failed mv_partition. Yeah, since
1337
+ -- currently we don't log executed commands, we have to do that everywhere.
1338
+ FOR node IN SELECT n .id ,
1339
+ ARRAY(SELECT prims .part_name FROM shardman .partitions prims WHERE n .id = prims .node_id ) primary_parts,
1340
+ ARRAY(SELECT part_name FROM shardman .partitions
1341
+ WHERE part_name NOT IN
1342
+ (SELECT prims .part_name FROM shardman .partitions prims WHERE n .id = prims .node_id )
1343
+ AND part_name NOT IN
1344
+ (SELECT repls .part_name FROM shardman .replicas repls WHERE n .id = repls .node_id )) foreign_parts
1345
+ FROM shardman .nodes n
1346
+ LOOP
1347
+ subs := format(' %s{%s:SELECT shardman.drop_copy_sub();' , subs, node .id );
1348
+ FOREACH prim IN ARRAY node .primary_parts LOOP -- unlock local parts
1349
+ subs := format(' %s SELECT shardman.write_protection_off(%L::regclass);' ,
1350
+ subs, prim);
1351
+ END LOOP;
1352
+ FOREACH foreign_part IN ARRAY node .foreign_parts LOOP -- lock foreign parts
1353
+ subs := format(' %s SELECT shardman.write_protection_on(%L::regclass);
1354
+ TRUNCATE %I;' ,
1355
+ subs, foreign_part, foreign_part);
1356
+ END LOOP;
1357
+ subs := subs || ' }' ;
1358
+
1359
+ pubs := format(' %s%s:SELECT shardman.drop_copy_pub();' , pubs, node .id );
1360
+ END LOOP;
1361
+ PERFORM shardman .broadcast (subs, super_connstr => true);
1362
+ PERFORM shardman .broadcast (pubs, super_connstr => true);
1363
+
1364
+ -- Fix replication channels
1365
+ pubs := ' ' ;
1366
+ subs := ' ' ;
1367
+ FOR repl_group IN SELECT DISTINCT replication_group FROM shardman .nodes
1368
+ LOOP
1369
+ FOR src_node IN SELECT * FROM shardman .nodes WHERE replication_group = repl_group
1370
+ LOOP
1371
+ FOR dst_node IN SELECT * FROM shardman .nodes
1372
+ WHERE replication_group = repl_group AND id <> src_node .id
1373
+ LOOP
1374
+ pub_name := format(' node_%s' , dst_node .id );
1375
+ sub_name := format(' sub_%s_%s' , dst_node .id , src_node .id );
1376
+
1377
+ -- Construct list of partitions which need to be published from
1378
+ -- src node to dst node
1379
+ SELECT coalesce(string_agg(pname, ' ,' ), ' ' ) INTO replicated_tables FROM
1380
+ (SELECT p .part_name pname FROM shardman .partitions p, shardman .replicas r
1381
+ WHERE p .node_id = src_node .id AND r .node_id = dst_node .id AND
1382
+ p .part_name = r .part_name ORDER BY p .part_name ) parts;
1383
+
1384
+ pubs := format(' %s%s:SELECT shardman.recover_pub(%L, %L);' ,
1385
+ pubs, src_node .id , pub_name, replicated_tables);
1386
+
1387
+ -- Create subscription if not exists
1388
+ -- FIXME: we ought to recreate sub anyway if slot/pub was
1389
+ -- recreated
1390
+ IF shardman .not_exists (dst_node .id , format(
1391
+ ' pg_subscription WHERE subname=%L' , sub_name))
1392
+ THEN
1393
+ RAISE NOTICE ' Creating subscription % at node %' , sub_name, dst_node .id ;
1394
+ subs := format(' %s%s:CREATE SUBSCRIPTION %I CONNECTION %L PUBLICATION %I WITH (copy_data=false, create_slot=false, slot_name=%L, synchronous_commit=local);' ,
1395
+ subs, dst_node .id , sub_name, src_node .connection_string , pub_name, pub_name);
1396
+ END IF;
1397
+ END LOOP;
1398
+
1399
+ -- Restore synchronous standby list
1400
+ IF shardman .synchronous_replication ()
1401
+ THEN
1402
+ new_sync_policy := shardman .construct_ssnames (src_node .id );
1403
+
1404
+ SELECT shardman .broadcast (format(
1405
+ ' %s:SELECT setting from pg_settings
1406
+ WHERE name=' ' synchronous_standby_names' ' ;' , src_node .id ))
1407
+ INTO old_sync_policy;
1408
+
1409
+ IF old_sync_policy <> new_sync_policy
1410
+ THEN
1411
+ RAISE NOTICE ' Alter synchronous_standby_names to ' ' %' ' at node %' , new_sync_policy, src_node .id ;
1412
+ sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
1413
+ sync, src_node .id , new_sync_policy);
1414
+ conf := format(' %s%s:SELECT pg_reload_conf();' , conf, src_node .id );
1415
+ END IF;
1416
+ END IF;
1417
+ END LOOP;
1418
+ END LOOP;
1419
+
1420
+ -- Create missing publications and repslots
1421
+ PERFORM shardman .broadcast (pubs, super_connstr => true);
1422
+ -- Create missing subscriptions
1423
+ PERFORM shardman .broadcast (subs, super_connstr => true);
1424
+
1425
+ IF sync <> ' '
1426
+ THEN -- Alter synchronous_standby_names if needed
1427
+ -- alter system must be one-line command, don't set synchronous_commit
1428
+ PERFORM shardman .broadcast (sync, super_connstr => true, sync_commit_on => true);
1429
+ PERFORM shardman .broadcast (conf, super_connstr => true);
1430
+ END IF;
1431
+
1316
1432
-- Restore FDWs
1317
- FOR src_node in SELECT * FROM shardman .nodes
1433
+ FOR src_node IN SELECT * FROM shardman .nodes
1318
1434
LOOP
1319
1435
-- Restore foreign servers
1320
- FOR dst_node in SELECT * FROM shardman .nodes
1436
+ FOR dst_node IN SELECT * FROM shardman .nodes
1321
1437
LOOP
1322
1438
IF src_node .id <> dst_node .id
1323
1439
THEN
@@ -1438,72 +1554,6 @@ BEGIN
1438
1554
END LOOP;
1439
1555
END LOOP;
1440
1556
1441
- -- Restore replication channels
1442
- FOR repl_group IN SELECT DISTINCT replication_group FROM shardman .nodes
1443
- LOOP
1444
- FOR src_node IN SELECT * FROM shardman .nodes WHERE replication_group = repl_group
1445
- LOOP
1446
- FOR dst_node IN SELECT * FROM shardman .nodes
1447
- WHERE replication_group = repl_group AND id <> src_node .id
1448
- LOOP
1449
- pub_name := format(' node_%s' , dst_node .id );
1450
- sub_name := format(' sub_%s_%s' , dst_node .id , src_node .id );
1451
-
1452
- -- Construct list of partitions which need to be published from
1453
- -- src node to dst node
1454
- SELECT coalesce(string_agg(pname, ' ,' ), ' ' ) INTO replicated_tables FROM
1455
- (SELECT p .part_name pname FROM shardman .partitions p, shardman .replicas r
1456
- WHERE p .node_id = src_node .id AND r .node_id = dst_node .id AND
1457
- p .part_name = r .part_name ORDER BY p .part_name ) parts;
1458
-
1459
- pubs := format(' %s%s:SELECT shardman.recover_pub(%L, %L);' ,
1460
- pubs, src_node .id , pub_name, replicated_tables);
1461
-
1462
- -- Create subscription if not exists
1463
- -- FIXME: we ought to recreate sub anyway if slot/pub was
1464
- -- recreated
1465
- IF shardman .not_exists (dst_node .id , format(
1466
- ' pg_subscription WHERE subname=%L' , sub_name))
1467
- THEN
1468
- RAISE NOTICE ' Creating subscription % at node %' , sub_name, dst_node .id ;
1469
- subs := format(' %s%s:CREATE SUBSCRIPTION %I CONNECTION %L PUBLICATION %I WITH (copy_data=false, create_slot=false, slot_name=%L, synchronous_commit=local);' ,
1470
- subs, dst_node .id , sub_name, src_node .connection_string , pub_name, pub_name);
1471
- END IF;
1472
- END LOOP;
1473
-
1474
- -- Restore synchronous standby list
1475
- IF shardman .synchronous_replication ()
1476
- THEN
1477
- new_sync_policy := shardman .construct_ssnames (src_node .id );
1478
-
1479
- SELECT shardman .broadcast (format(
1480
- ' %s:SELECT setting from pg_settings
1481
- WHERE name=' ' synchronous_standby_names' ' ;' , src_node .id ))
1482
- INTO old_sync_policy;
1483
-
1484
- IF old_sync_policy <> new_sync_policy
1485
- THEN
1486
- RAISE NOTICE ' Alter synchronous_standby_names to ' ' %' ' at node %' , new_sync_policy, src_node .id ;
1487
- sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
1488
- sync, src_node .id , new_sync_policy);
1489
- conf := format(' %s%s:SELECT pg_reload_conf();' , conf, src_node .id );
1490
- END IF;
1491
- END IF;
1492
- END LOOP;
1493
- END LOOP;
1494
-
1495
- -- Create missing publications
1496
- PERFORM shardman .broadcast (pubs, super_connstr => true);
1497
- -- Create missing subscriptions
1498
- PERFORM shardman .broadcast (subs, super_connstr => true);
1499
-
1500
- IF sync <> ' '
1501
- THEN -- Alter synchronous_standby_names if needed
1502
- PERFORM shardman .broadcast (sync, sync_commit_on => true, super_connstr => true);
1503
- PERFORM shardman .broadcast (conf, super_connstr => true);
1504
- END IF;
1505
-
1506
-
1507
1557
-- Restore shared tables
1508
1558
pubs := ' ' ;
1509
1559
subs := ' ' ;
@@ -1587,6 +1637,33 @@ BEGIN
1587
1637
END
1588
1638
$$ LANGUAGE plpgsql;
1589
1639
1640
+ CREATE FUNCTION drop_copy_sub () RETURNS void AS $$
1641
+ DECLARE
1642
+ sub_name name;
1643
+ BEGIN
1644
+ FOR sub_name IN SELECT subname FROM pg_subscription WHERE subname LIKE ' shardman_copy_%'
1645
+ LOOP
1646
+ PERFORM shardman .eliminate_sub (sub_name);
1647
+ END LOOP;
1648
+ END
1649
+ $$ LANGUAGE plpgsql;
1650
+
1651
+ CREATE FUNCTION drop_copy_pub () RETURNS void AS $$
1652
+ DECLARE
1653
+ pub_name name;
1654
+ slotname name;
1655
+ BEGIN
1656
+ FOR pub_name IN SELECT pubname FROM pg_publication WHERE pubname LIKE ' shardman_copy_%'
1657
+ LOOP
1658
+ EXECUTE format(' DROP PUBLICATION %I' , pub_name);
1659
+ END LOOP;
1660
+ FOR slotname IN SELECT slot_name FROM pg_replication_slots WHERE slot_name LIKE ' shardman_copy_%'
1661
+ LOOP
1662
+ PERFORM pg_drop_replication_slot(slotname);
1663
+ END LOOP;
1664
+ END
1665
+ $$ LANGUAGE plpgsql;
1666
+
1590
1667
-- Make sure pub & slot on the node exists and proper parts are published.
1591
1668
-- 'replicated_tables' is comma-separated list of tables to publish, probably
1592
1669
-- empty string. It is ordered for easier comparison.
@@ -2030,7 +2107,7 @@ $$ LANGUAGE plpgsql;
2030
2107
-- parts are fully synced and src part is locked.
2031
2108
CREATE FUNCTION wait_copy_completion (src_node_id int , dst_node_id int , part_name text ) RETURNS void AS $$
2032
2109
DECLARE
2033
- slot text = format(' copy_ %s' , part_name);
2110
+ slot text = format(' shardman_copy_ %s' , part_name);
2034
2111
lag bigint ;
2035
2112
response text ;
2036
2113
caughtup_threshold bigint = 1024 * 1024 ;
@@ -2085,7 +2162,8 @@ BEGIN
2085
2162
END
2086
2163
$$ LANGUAGE plpgsql;
2087
2164
2088
- -- Disable writes to the partition
2165
+ -- Disable writes to the partition, if we are not replica. This is handy because
2166
+ -- we use replication to copy table.
2089
2167
CREATE FUNCTION write_protection_on (part regclass) RETURNS void AS $$
2090
2168
BEGIN
2091
2169
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = ' write_protection' AND
@@ -2110,7 +2188,7 @@ $$ LANGUAGE plpgsql;
2110
2188
-- Trigger procedure prohibiting modification of the table
2111
2189
CREATE FUNCTION deny_access () RETURNS trigger AS $$
2112
2190
BEGIN
2113
- RAISE EXCEPTION ' This partition was moved to another node. Run shardman.recovery (), if this error persists.' ;
2191
+ RAISE EXCEPTION ' This partition was moved to another node. Run shardman.recover (), if this error persists.' ;
2114
2192
END
2115
2193
$$ LANGUAGE plpgsql;
2116
2194
@@ -2527,7 +2605,7 @@ DECLARE
2527
2605
kill_ws_times int := 3 ;
2528
2606
BEGIN
2529
2607
RAISE DEBUG ' [SHMN] Dropping repslot %' , slot_name;
2530
- EXECUTE format(' SELECT EXISTS (SELECT * FROM pg_replication_slots
2608
+ EXECUTE format(' SELECT EXISTS (SELECT 1 FROM pg_replication_slots
2531
2609
WHERE slot_name = %L)' , slot_name) INTO slot_exists;
2532
2610
IF slot_exists THEN
2533
2611
IF with_fire THEN -- kill walsender several times
0 commit comments