@@ -287,6 +287,7 @@ DECLARE
287
287
fdw_part_name text ;
288
288
new_master_id int ;
289
289
sync_standbys text [];
290
+ sync_standby_names text ;
290
291
repl_group text ;
291
292
master_node_id int ;
292
293
BEGIN
@@ -322,45 +323,42 @@ BEGIN
322
323
rm_node_id, node .id ,
323
324
node .id , rm_node_id,
324
325
rm_node_id, node .id );
325
- -- Subscription with associated slot can not be dropped inside block, but if we do not override synchronous_commit policy,
326
- -- then this command will be blocked waiting for sync replicas. So we need first do unbound slot from subscription.
327
- -- But it is possible only for disabled subscriptions. So we have to perform three steps: disable subscription, unbound slot, drop subscription.
328
- alts := format(' %s{%s:ALTER SUBSCRIPTION sub_%s_%s DISABLE;ALTER SUBSCRIPTION sub_%s_%s SET (slot_name=NONE)}{%s:ALTER SUBSCRIPTION sub_%s_%s DISABLE;ALTER SUBSCRIPTION sub_%s_%s SET (slot_name=NONE)}' ,
329
- alts, rm_node_id, rm_node_id, node .id , rm_node_id, node .id ,
330
- node .id , node .id , rm_node_id, node .id , rm_node_id);
331
- subs := format(' %s%s:DROP SUBSCRIPTION sub_%s_%s;
332
- %s:DROP SUBSCRIPTION sub_%s_%s;' ,
333
- subs, rm_node_id, rm_node_id, node .id ,
334
- node .id , node .id , rm_node_id);
326
+
327
+ subs := format(' %s%s:SELECT shardman.eliminate_sub(' ' sub_%s_%s' ' );
328
+ %s:SELECT shardman.eliminate_sub(' ' sub_%s_%s' ' );' ,
329
+ alts, rm_node_id, rm_node_id, node .id ,
330
+ node .id , node .id , rm_node_id);
335
331
336
332
-- Construct new synchronous standby list
333
+ -- TODO: now we construct it differently in 3 places. This should be
334
+ -- united.
337
335
sync_standbys :=
338
- coalesce(ARRAY(SELECT format(' sub_%s_%s' , id, node .id ) FROM shardman .nodes
339
- WHERE replication_group = repl_group AND id <> node .id AND
340
- id<> rm_node_id),
341
- ' {}' ::text []);
342
- sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to ' ' FIRST %s (%s)' ' ;' ,
343
- sync, node .id , array_length(sync_standbys, 1 ),
344
- array_to_string(sync_standbys, ' ,' ));
336
+ ARRAY(SELECT format(' sub_%s_%s' , id, node .id ) FROM shardman .nodes
337
+ WHERE replication_group = repl_group AND id <> node .id AND
338
+ id<> rm_node_id);
339
+ sync_standby_names := CASE WHEN sync_standbys = ' {}' ::text [] THEN
340
+ ' '
341
+ ELSE
342
+ format(' FIRST %s (%s)' , array_length(sync_standbys, 1 ),
343
+ array_to_string(sync_standbys, ' ,' ))
344
+ END;
345
+ sync := format(' %s%s:ALTER SYSTEM SET synchronous_standby_names to %L;' ,
346
+ sync, node .id , sync_standby_names);
345
347
conf := format(' %s%s:SELECT pg_reload_conf();' , conf, node .id );
346
348
END LOOP;
347
349
348
350
-- Drop shared tables subscriptions
349
- FOR master_node_id IN SELECT DISTINCT master_node from shardman .tables WHERE master_node IS NOT NULL
351
+ FOR master_node_id IN SELECT DISTINCT master_node FROM shardman .tables
352
+ WHERE master_node IS NOT NULL
350
353
LOOP
351
- alts := format(' %s{%s:ALTER SUBSCRIPTION share_%s_%s DISABLE;ALTER SUBSCRIPTION share_%s_%s SET (slot_name=NONE)}' ,
352
- alts, rm_node_id, rm_node_id, master_node_id, rm_node_id, master_node_id);
353
- subs := format(' %s%s:DROP SUBSCRIPTION share_%s_%s;' ,
354
- subs, rm_node_id, rm_node_id, master_node_id);
354
+ subs := format(' %s%s:SELECT shardman.eliminate_sub(share_%s_%s);' ,
355
+ subs, rm_node_id, rm_node_id, master_node_id);
355
356
pubs := format(' %s%s:SELECT pg_drop_replication_slot(' ' share_%s_%s' ' );' ,
356
- pubs, master_node_id, rm_node_id, master_node_id);
357
+ pubs, master_node_id, rm_node_id, master_node_id);
357
358
END LOOP;
358
359
359
- -- Broadcast alter subscription commands, ignore errors because removed node may be not available
360
- PERFORM shardman .broadcast (alts,
361
- ignore_errors => true,
362
- super_connstr => true);
363
- -- Broadcast drop subscription commands, ignore errors because removed node may be not available
360
+ -- Broadcast drop subscription commands, ignore errors because removed node
361
+ -- might be not available
364
362
PERFORM shardman .broadcast (subs,
365
363
ignore_errors => true,
366
364
super_connstr => true);
@@ -380,44 +378,30 @@ BEGIN
380
378
super_connstr => true);
381
379
PERFORM shardman .broadcast (conf, ignore_errors:= true, super_connstr => true);
382
380
END IF;
383
- /* To correctly remove foreign servers we need to update pg_depend table, otherwise
384
- * our hack with direct update pg_foreign_table leaves deteriorated dependencies
385
- -- Remove foreign servers at all nodes for the removed node
386
- FOR node IN SELECT * FROM shardman.nodes WHERE id<>rm_node_id
387
- LOOP
388
- -- Drop server for all nodes at the removed node and drop server at all nodes for the removed node
389
- fdws := format('%s%s:DROP SERVER node_%s;
390
- %s:DROP SERVER node_%s;',
391
- fdws, node.id, rm_node_id,
392
- rm_node_id, node.id);
393
- drps := format('%s%s:DROP USER MAPPING FOR CURRENT_USER SERVER node_%s;
394
- %s:DROP USER MAPPING FOR CURRENT_USER SERVER node_%s;',
395
- drps, node.id, rm_node_id,
396
- rm_node_id, node.id);
397
- END LOOP;
398
- */
399
- -- Exclude partitions of removed node
400
- FOR part in SELECT * from shardman .partitions where node_id= rm_node_id
381
+
382
+ -- Exclude partitions of removed node, promote them on replicas, if any
383
+ FOR part IN SELECT * from shardman .partitions WHERE node_id= rm_node_id
401
384
LOOP
402
385
-- If there are more than one replica of this partition, we need to synchronize them
403
- IF shardman .get_redundancy_of_partition (part .part_name )> 1
386
+ IF shardman .get_redundancy_of_partition (part .part_name ) > 1
404
387
THEN
405
388
PERFORM shardman .synchronize_replicas (part .part_name );
406
389
END IF;
407
390
408
391
-- Is there some replica of this node?
409
392
SELECT node_id INTO new_master_id FROM shardman .replicas WHERE part_name= part .part_name ORDER BY random() LIMIT 1 ;
410
393
IF new_master_id IS NOT NULL
411
- THEN -- exists some replica for this node: redirect foreign table to this replica and refresh LR channels for this replication group
412
- -- Update partitions table: now replica is promoted to master...
394
+ THEN -- exists some replica for this node: redirect foreign table to
395
+ -- this replica and refresh LR channels for this replication group
396
+ -- Update partitions table: now replica is promoted to master...
413
397
UPDATE shardman .partitions SET node_id= new_master_id WHERE part_name= part .part_name ;
414
398
-- ... and is not a replica any more
415
399
DELETE FROM shardman .replicas WHERE part_name= part .part_name AND node_id= new_master_id;
416
400
417
401
pubs := ' ' ;
418
402
subs := ' ' ;
419
403
-- Refresh LR channels for this replication group
420
- FOR repl in SELECT * FROM shardman .replicas WHERE part_name= part .part_name
404
+ FOR repl IN SELECT * FROM shardman .replicas WHERE part_name= part .part_name
421
405
LOOP
422
406
-- Publish this partition at new master
423
407
pubs := format(' %s%s:ALTER PUBLICATION node_%s ADD TABLE %I;' ,
@@ -447,24 +431,49 @@ BEGIN
447
431
fdws := format(' %s%s:DROP FOREIGN TABLE %I;' ,
448
432
fdws, node .id , fdw_part_name);
449
433
ELSE
450
- -- At all other nodes adjust foreign server for foreign table to refer to new master node.
451
- -- It is not possible to alter foreign server for foreign table so we have to do it in such "hackers" way:
452
- prts := format(' %s%s:UPDATE pg_foreign_table SET ftserver = (SELECT oid FROM pg_foreign_server WHERE srvname = ' ' node_%s' ' ) WHERE ftrelid = (SELECT oid FROM pg_class WHERE relname=%L);' ,
453
- prts, node .id , new_master_id, fdw_part_name);
434
+ -- At all other nodes adjust foreign server for foreign table to
435
+ -- refer to new master node.
436
+ prts := format(
437
+ ' %s%s:SELECT shardman.alter_ftable_set_server(%L, ' ' node_%s' ' );' ,
438
+ prts, node .id , fdw_part_name, new_master_id);
454
439
END IF;
455
440
END LOOP;
456
441
END LOOP;
457
442
458
443
-- Broadcast changes of pathman mapping
459
- PERFORM shardman .broadcast (prts, ignore_errors: = true);
444
+ PERFORM shardman .broadcast (prts, ignore_errors : = true);
460
445
-- Broadcast drop server commands
461
- PERFORM shardman .broadcast (fdws, ignore_errors:= true);
446
+ PERFORM shardman .broadcast (fdws, ignore_errors := true);
447
+
448
+ -- Clean removed node, if it is reachable
449
+ PERFORM shardman .broadcast (format(' %s:SELECT shardman.wipe_state();' ,
450
+ rm_node_id),
451
+ ignore_errors := true);
462
452
463
453
-- Finally delete node from nodes table and all dependent tables
464
454
DELETE from shardman .nodes WHERE id= rm_node_id;
465
455
END
466
456
$$ LANGUAGE plpgsql;
467
457
458
+ -- Since PG doesn't support it, mess with catalogs directly. If no more foreign
459
+ -- tables use old server, drop it.
460
+ CREATE FUNCTION alter_ftable_set_server (ftable name, new_fserver name) RETURNS void AS $$
461
+ DECLARE
462
+ new_fserver_oid oid := oid FROM pg_foreign_server WHERE srvname = new_fserver;
463
+ old_fserver name := srvname FROM pg_foreign_server
464
+ WHERE oid = (SELECT ftserver FROM pg_foreign_table WHERE ftrelid = ftable::regclass);
465
+ old_fserver_oid oid := oid FROM pg_foreign_server WHERE srvname = old_fserver;
466
+ BEGIN
467
+ UPDATE pg_foreign_table SET ftserver = new_fserver_oid WHERE ftrelid = ftable::regclass;
468
+ UPDATE pg_depend SET refobjid = new_fserver_oid
469
+ WHERE objid = ftable::regclass AND refobjid = old_fserver_oid;
470
+ IF (SELECT count (* ) FROM pg_foreign_table WHERE ftserver = old_fserver_oid) = 0
471
+ THEN
472
+ EXECUTE format(' DROP SERVER %s CASCADE' , old_fserver);
473
+ END IF;
474
+ END
475
+ $$ LANGUAGE plpgsql;
476
+
468
477
-- Bail out with ERROR if some replication group doesn't have 'redundancy'
469
478
-- replicas
470
479
CREATE FUNCTION check_max_replicas (redundancy int ) RETURNS void AS $$
0 commit comments