@@ -389,12 +389,14 @@ BEGIN
389
389
END IF;
390
390
391
391
-- Is there some replica of this node?
392
- SELECT node_id INTO new_master_id FROM shardman .replicas WHERE part_name= part .part_name ORDER BY random() LIMIT 1 ;
392
+ SELECT node_id INTO new_master_id FROM shardman .replicas
393
+ WHERE part_name= part .part_name ORDER BY random() LIMIT 1 ;
393
394
IF new_master_id IS NOT NULL
394
395
THEN -- exists some replica for this node: redirect foreign table to
395
396
-- this replica and refresh LR channels for this replication group
396
397
-- Update partitions table: now replica is promoted to master...
397
- UPDATE shardman .partitions SET node_id= new_master_id WHERE part_name= part .part_name ;
398
+ UPDATE shardman .partitions SET node_id= new_master_id
399
+ WHERE part_name= part .part_name ;
398
400
-- ... and is not a replica any more
399
401
DELETE FROM shardman .replicas WHERE part_name= part .part_name AND node_id= new_master_id;
400
402
@@ -415,9 +417,12 @@ BEGIN
415
417
PERFORM shardman .broadcast (pubs, super_connstr => true);
416
418
-- Broadcast refresh alter subscription commands
417
419
PERFORM shardman .broadcast (subs, super_connstr => true);
418
- ELSE -- there is no replica: we have to create new empty partition at random mode and redirect all FDWs to it
419
- SELECT id INTO new_master_id FROM shardman .nodes WHERE id<> rm_node_id ORDER BY random() LIMIT 1 ;
420
- INSERT INTO shardman .partitions (part_name,node_id,relation) VALUES (part .part .name,new_master_id,part .relation );
420
+ ELSE -- there is no replica: we have to create new empty partition at
421
+ -- random mode and redirect all FDWs to it
422
+ SELECT id INTO new_master_id FROM shardman .nodes
423
+ WHERE id<> rm_node_id ORDER BY random() LIMIT 1 ;
424
+ INSERT INTO shardman .partitions (part_name,node_id,relation)
425
+ VALUES (part .part .name,new_master_id,part .relation );
421
426
END IF;
422
427
423
428
-- Update pathman partition map at all nodes
@@ -434,7 +439,7 @@ BEGIN
434
439
-- At all other nodes adjust foreign server for foreign table to
435
440
-- refer to new master node.
436
441
prts := format(
437
- ' %s%s:SELECT shardman.alter_ftable_set_server(%L, ' ' node_%s' ' );' ,
442
+ ' %s%s:SELECT shardman.alter_ftable_set_server(%L, ' ' node_%s' ' , true );' ,
438
443
prts, node .id , fdw_part_name, new_master_id);
439
444
END IF;
440
445
END LOOP;
@@ -455,9 +460,10 @@ BEGIN
455
460
END
456
461
$$ LANGUAGE plpgsql;
457
462
458
- -- Since PG doesn't support it, mess with catalogs directly. If no more foreign
459
- -- tables use old server, drop it.
460
- CREATE FUNCTION alter_ftable_set_server (ftable name, new_fserver name) RETURNS void AS $$
463
+ -- Since PG doesn't support it, mess with catalogs directly. If asked and no one
464
+ -- uses this server, drop it.
465
+ CREATE FUNCTION alter_ftable_set_server (ftable name, new_fserver name,
466
+ server_not_needed bool DEFAULT false) RETURNS void AS $$
461
467
DECLARE
462
468
new_fserver_oid oid := oid FROM pg_foreign_server WHERE srvname = new_fserver;
463
469
old_fserver name := srvname FROM pg_foreign_server
@@ -467,7 +473,8 @@ BEGIN
467
473
UPDATE pg_foreign_table SET ftserver = new_fserver_oid WHERE ftrelid = ftable::regclass;
468
474
UPDATE pg_depend SET refobjid = new_fserver_oid
469
475
WHERE objid = ftable::regclass AND refobjid = old_fserver_oid;
470
- IF (SELECT count (* ) FROM pg_foreign_table WHERE ftserver = old_fserver_oid) = 0
476
+ IF server_not_needed AND
477
+ ((SELECT count (* ) FROM pg_foreign_table WHERE ftserver = old_fserver_oid) = 0 )
471
478
THEN
472
479
EXECUTE format(' DROP SERVER %s CASCADE' , old_fserver);
473
480
END IF;
@@ -862,8 +869,8 @@ BEGIN
862
869
drop_fdws := format(' %s%s:DROP FOREIGN TABLE %I;' ,
863
870
drop_fdws, node .id , fdw_part_name);
864
871
ELSE
865
- replace_parts := format(' %s%s:UPDATE pg_foreign_table SET ftserver = (SELECT oid FROM pg_foreign_server WHERE srvname = ' ' node_%s' ' ) WHERE ftrelid = (SELECT oid FROM pg_class WHERE relname=%L );' ,
866
- replace_parts, node .id , dst_node_id, fdw_part_name );
872
+ replace_parts := format(' %s%s:SELECT shardman.alter_ftable_set_server(%L, ' ' node_%s' ' );' ,
873
+ replace_parts, node .id , fdw_part_name, dst_node_id );
867
874
END IF;
868
875
END LOOP;
869
876
@@ -2032,40 +2039,46 @@ BEGIN
2032
2039
IF seqno <> max_seqno
2033
2040
THEN
2034
2041
RAISE NOTICE ' Advance node % from %' , replica .node_id , advanced_node;
2035
- PERFORM shardman .remote_copy (replica .relation , replica .part_name , replica .node_id , advanced_node, seqno);
2042
+ PERFORM shardman .remote_copy (replica .relation , pname, replica .node_id ,
2043
+ advanced_node, seqno);
2036
2044
END IF;
2037
2045
END LOOP;
2038
2046
END;
2039
2047
$$ LANGUAGE plpgsql;
2040
2048
2041
- -- Get relation primary key. There can be table with no primary key or with compound primary key.
2042
- -- But logical replication and hash partitioning in any case requires single primary key.
2049
+ -- Get relation primary key. There can be table with no primary key or with
2050
+ -- compound primary key. But logical replication and hash partitioning in any
2051
+ -- case requires single primary key.
2043
2052
CREATE FUNCTION get_primary_key (rel regclass, out pk_name text , out pk_type text ) AS $$
2044
- SELECT a .attname ::text ,a .atttypid ::regtype::text FROM pg_index i
2053
+ SELECT a .attname ::text , a .atttypid ::regtype::text FROM pg_index i
2045
2054
JOIN pg_attribute a ON a .attrelid = i .indrelid
2046
2055
AND a .attnum = ANY(i .indkey )
2047
2056
WHERE i .indrelid = rel
2048
2057
AND i .indisprimary ;
2049
2058
$$ LANGUAGE sql;
2050
2059
2051
- -- Copy missing data from one node to another. This function us using change_log table to determine records which need to be copied.
2060
+ -- Copy missing data from one node to another. This function uses change_log
2061
+ -- table to determine records which need to be copied.
2052
2062
-- See explanations in synchronize_replicas.
2053
2063
-- Parameters:
2054
2064
-- rel_name: name of parent relation
2055
2065
-- part_name: synchronized partition name
2056
2066
-- dst_node: lagging node
2057
2067
-- src_node: advanced node
2058
2068
-- last_seqno: maximal seqno at lagging node
2059
- CREATE FUNCTION remote_copy (rel_name text , part_name text , dst_node int , src_node int , last_seqno bigint ) RETURNS void AS $$
2069
+ CREATE FUNCTION remote_copy (rel_name text , part_name text , dst_node int ,
2070
+ src_node int , last_seqno bigint ) RETURNS void AS $$
2060
2071
DECLARE
2061
2072
script text ;
2062
2073
conn_string text ;
2063
2074
pk_name text ;
2064
2075
pk_type text ;
2065
2076
BEGIN
2066
- SELECT * FROM shardman .get_primary_key (rel_name) INTO pk_name,pk_type;
2077
+ SELECT * FROM shardman .get_primary_key (rel_name) INTO pk_name, pk_type;
2078
+ ASSERT pk_name IS NOT NULL , ' Can' ' t sync replicas without primary key' ;
2067
2079
SELECT connection_string INTO conn_string FROM shardman .nodes WHERE id= src_node;
2068
- -- We need to execute all this three statements in one transaction to exclude inconsistencies in case of failure
2080
+ -- We need to execute all this three statements in one transaction to
2081
+ -- exclude inconsistencies in case of failure
2069
2082
script := format(' {%s:COPY %s_change_log FROM PROGRAM ' ' psql "%s" -c "COPY (SELECT * FROM %s_change_log WHERE seqno>%s) TO stdout"' ' ;
2070
2083
DELETE FROM %I USING %s_change_log cl WHERE cl.seqno>%s AND cl.old_pk=%I;
2071
2084
COPY %I FROM PROGRAM ' ' psql "%s" -c "COPY (SELECT DISTINCT ON (%I) %I.* FROM %I,%s_change_log cl WHERE cl.seqno>%s AND cl.new_pk=%I ORDER BY %I) TO stdout"' ' }' ,
0 commit comments