Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5c22332

Browse files
committed
create_replica almost done in plpgsql, now it needs to be rewritten.
Replication slots can't be created from transactions that wrote something, and this is another reason to manage LR channels entirely from libpq. At least repslots creation must be moved out of trigger. Also I have removed partition numbering as there is apparently no reason to keep it: partitions can refer to nodes keeping its previous and next replica.
1 parent a9da45b commit 5c22332

File tree

7 files changed

+173
-85
lines changed

7 files changed

+173
-85
lines changed

init.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,14 @@ BEGIN
213213
pg_replication_slots WHERE slot_name = %L', slot_name);
214214
END $$ LANGUAGE plpgsql STRICT;
215215

216+
-- Drop with fire repslot and publication with the same name. Useful for 1-to-1
217+
-- pub-sub.
218+
CREATE FUNCTION drop_repslot_and_pub(pub name) RETURNS void AS $$
219+
BEGIN
220+
EXECUTE format('DROP PUBLICATION IF EXISTS %I', pub);
221+
PERFORM shardman.drop_repslot(pub, true);
222+
END $$ LANGUAGE plpgsql STRICT;
223+
216224
-- If sub exists, disable it, detach repslot from it and possibly drop. We
217225
-- manage repslots ourselves, so it is essential to detach rs before dropping
218226
-- sub, and repslots can't be detached while subscription is active.

membership.sql

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,15 @@ BEGIN
9494
RETURN n_id;
9595
END
9696
$$ LANGUAGE plpgsql;
97+
98+
-- Get connstr of worker node with id node_id. ERROR is raised if there isn't
99+
-- one.
100+
CREATE OR REPLACE FUNCTION get_worker_node_connstr(node_id int) RETURNS text as $$
101+
DECLARE
102+
connstr text := connstring FROM shardman.nodes WHERE id = node_id AND worker;
103+
BEGIN
104+
IF connstr IS NULL THEN
105+
RAISE EXCEPTION 'Worker node with id % not found', node_id;
106+
END IF;
107+
RETURN connstr;
108+
END $$ LANGUAGE plpgsql STRICT;

postgresql.conf.master.template

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# master-specific part
22
shardman.master = on
33
shardman.master_dbname = ars
4-
shardman.master_connstring = 'port=5432'
4+
shardman.master_connstring = 'port=5432'
5+
shardman.cmd_retry_naptime = 3000
6+
shardman.poll_interval = 3000

shard.sql

Lines changed: 122 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ ALTER TABLE shardman.tables ENABLE REPLICA TRIGGER new_table_worker_side;
5454
CREATE FUNCTION new_table_master_side() RETURNS TRIGGER AS $$
5555
BEGIN
5656
INSERT INTO shardman.partitions
57-
SELECT part_name, 0, NULL, NULL, NEW.relation AS relation, NEW.initial_node AS owner
57+
SELECT part_name, NEW.initial_node AS owner, NULL, NULL, NEW.relation AS relation
5858
FROM (SELECT part_name FROM shardman.gen_part_names(
5959
NEW.relation, NEW.partitions_count))
6060
AS partnames;
@@ -68,21 +68,18 @@ CREATE TRIGGER new_table_master_side AFTER INSERT ON shardman.tables
6868
-- Partitions
6969
------------------------------------------------------------
7070

71-
-- Primary shard and its replicas compose a doubly-linked list with 0 shard in
72-
-- the beginning.
71+
-- Primary shard and its replicas compose a doubly-linked list: nxt refers to
72+
-- the node containing next replica, prv to node with previous replica (or
73+
-- primary, if we are the first replica). If prv is NULL, this is primary
74+
-- replica. We don't number parts separately since we are not ever going to
75+
-- allow several copies of the same partition on one node.
7376
CREATE TABLE partitions (
7477
part_name text,
75-
-- Shard number. 0 means primary shard.
76-
num serial,
77-
nxt int,
78-
prv int,
78+
owner int NOT NULL REFERENCES nodes(id), -- node on which partition lies
79+
prv int REFERENCES nodes(id),
80+
nxt int REFERENCES nodes(id),
7981
relation text NOT NULL REFERENCES tables(relation),
80-
owner int REFERENCES nodes(id), -- node on which partition lies
81-
PRIMARY KEY (part_name, num),
82-
FOREIGN KEY (part_name, nxt) REFERENCES shardman.partitions(part_name, num),
83-
FOREIGN KEY (part_name, prv) REFERENCES shardman.partitions(part_name, num),
84-
-- primary has no prv, replica must have prv
85-
CONSTRAINT prv_existence CHECK (num = 0 OR prv IS NOT NULL)
82+
PRIMARY KEY (part_name, owner)
8683
);
8784

8885
------------------------------------------------------------
@@ -96,43 +93,31 @@ CREATE TABLE partitions (
9693
-- it.
9794
CREATE FUNCTION new_primary() RETURNS TRIGGER AS $$
9895
BEGIN
99-
IF NEW.owner != (SELECT shardman.get_node_id()) THEN
96+
RAISE DEBUG '[SHARDMAN] new_primary trigger called for part %, owner %',
97+
NEW.part_name, NEW.owner;
98+
IF NEW.owner != shardman.get_node_id() THEN
10099
PERFORM shardman.replace_usual_part_with_foreign(NEW);
101100
END IF;
102101
RETURN NULL;
103102
END
104103
$$ LANGUAGE plpgsql;
105104
CREATE TRIGGER new_primary AFTER INSERT ON shardman.partitions
106-
FOR EACH ROW WHEN (NEW.num = 0) EXECUTE PROCEDURE new_primary();
105+
FOR EACH ROW WHEN (NEW.prv IS NULL) EXECUTE PROCEDURE new_primary();
107106
-- fire trigger only on worker nodes
108107
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER new_primary;
109108

110-
-- Replace foreign table-partition with local. The latter must exist!
111-
-- Foreign table will be dropped.
112-
CREATE FUNCTION replace_foreign_part_with_usual(part partitions)
113-
RETURNS void AS $$
114-
DECLARE
115-
fdw_part_name name;
116-
BEGIN
117-
ASSERT to_regclass(part.part_name) IS NOT NULL;
118-
SELECT shardman.get_fdw_part_name(part.part_name) INTO fdw_part_name;
119-
EXECUTE format('SELECT replace_hash_partition(%L, %L);',
120-
fdw_part_name, part.part_name);
121-
EXECUTE format('DROP FOREIGN TABLE %I;', fdw_part_name);
122-
END $$ LANGUAGE plpgsql;
123-
124109
-- Update metadata according to primary move
125110
CREATE FUNCTION primary_moved() RETURNS TRIGGER AS $$
126111
DECLARE
127-
cp_logname text := format('shardman_copy_%s_%s_%s',
128-
OLD.part_name, OLD.owner, NEW.owner);
112+
cp_logname text := shardman.get_cp_logname(OLD.part_name, OLD.owner, NEW.owner);
129113
my_id int := shardman.get_node_id();
130114
BEGIN
115+
RAISE DEBUG '[SHARDMAN] primary_moved trigger called for part %, owner %->%',
116+
NEW.part_name, OLD.owner, NEW.owner;
131117
ASSERT NEW.owner != OLD.owner, 'primary_moved handles only moved parts';
132118
IF my_id = OLD.owner THEN -- src node
133119
-- Drop publication & repslot used for copy
134-
EXECUTE format('DROP PUBLICATION IF EXISTS %I', cp_logname);
135-
PERFORM shardman.drop_repslot(cp_logname, true);
120+
PERFORM shardman.drop_repslot_and_pub(cp_logname);
136121
-- On src node, replace its partition with foreign one
137122
PERFORM shardman.replace_usual_part_with_foreign(NEW);
138123
ELSEIF my_id = NEW.owner THEN -- dst node
@@ -148,44 +133,75 @@ BEGIN
148133
END
149134
$$ LANGUAGE plpgsql;
150135
CREATE TRIGGER primary_moved AFTER UPDATE ON shardman.partitions
151-
FOR EACH ROW EXECUTE PROCEDURE primary_moved();
136+
FOR EACH ROW WHEN (OLD.prv is NULL AND NEW.prv IS NULL -- it is primary
137+
AND OLD.owner != NEW.owner -- and it is really moved
138+
AND OLD.part_name = NEW.part_name) -- sanity check
139+
EXECUTE PROCEDURE primary_moved();
152140
-- fire trigger only on worker nodes
153141
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER primary_moved;
154142

155143
-- Update metadata according to new replica creation.
144+
-- Old tail part is still read-only when this called. There are two main jobs
145+
-- to do: set up LR sync channel between old tail and new replica and update fdw
146+
-- everywhere. For the former we could configure already existing channel used
147+
-- for partition copy, but we will not do that, because
148+
-- * It is not easier than creating new pub & sub: we have to rename pub, drop
149+
-- and create repslot (there is no way to rename it), rename sub, alter sub's
150+
-- slot_name, alter sub's publication, probably rename sub application name,
151+
-- probably run REFRESH (which requires alive pub just as CREATE SUBSCRIPTION)
152+
-- and hope that everything will be ok. Not sure about refreshing, though -- I
153+
-- don't know is it ok not doing it if tables didn't change. Doc says it
154+
-- should be executed.
155+
-- * Since it is not possible to rename repslot and and it is not possible to
156+
-- specify since which lsn start replication, tables must be synced anyway
157+
-- during these operations, so what the point of reusing old sub? And copypart
158+
-- in shard.c really cares that tables are synced at this moment and src is
159+
-- read-only.
156160
CREATE FUNCTION replica_created() RETURNS TRIGGER AS $$
157161
DECLARE
158-
cp_logname text := format('shardman_copy_%s_%s_%s',
159-
NEW.part_name, NEW.prv, NEW.owner);
162+
cp_logname text := shardman.get_cp_logname(NEW.part_name, NEW.prv, NEW.owner);
163+
oldtail_pubname name := shardman.get_data_pubname(NEW.part_name, NEW.prv);
164+
oldtail_connstr text := shardman.get_worker_node_connstr(NEW.prv);
165+
newtail_subname name := shardman.get_data_subname(NEW.part_name, NEW.prv, NEW.owner);
160166
my_id int := shardman.get_node_id();
161167
BEGIN
162-
-- ASSERT NEW.owner != OLD.owner, 'partition_moved handles only moved parts';
163-
-- cp_logname := format('shardman_copy_%s_%s_%s',
164-
-- OLD.part_name, OLD.owner, NEW.owner);
165-
-- my_id := (SELECT shardman.get_node_id());
166-
-- IF my_id = OLD.owner THEN -- src node
167-
-- -- Drop publication & repslot used for copy
168-
-- EXECUTE format('DROP PUBLICATION IF EXISTS %I', cp_logname);
169-
-- PERFORM shardman.drop_repslot(cp_logname, true);
170-
-- -- On src node, replace its partition with foreign one
171-
-- PERFORM shardman.replace_usual_part_with_foreign(NEW);
172-
-- ELSEIF my_id = NEW.owner THEN -- dst node
173-
-- -- Drop subscription used for copy
174-
-- PERFORM shardman.eliminate_sub(cp_logname);
175-
-- PERFORM shardman.replace_foreign_part_with_usual(NEW);
176-
-- ELSE -- other nodes
177-
-- -- just update foreign server
178-
-- PERFORM shardman.update_fdw_server(NEW);
179-
-- END IF;
180-
-- RETURN NULL;
168+
RAISE DEBUG '[SHARDMAN] replica_created trigger called';
169+
IF my_id = NEW.prv THEN -- old tail node
170+
-- Drop publication & repslot used for copy
171+
PERFORM shardman.drop_repslot_and_pub(cp_logname);
172+
-- Create publication & repslot for new data channel
173+
PERFORM shardman.create_repslot(oldtail_pubname);
174+
EXECUTE format('DROP PUBLICATION IF EXISTS %I', oldtail_pubname);
175+
EXECUTE format('CREATE PUBLICATION %I FOR TABLE %I',
176+
oldtail_pubname, NEW.part_name);
177+
-- Make this channel sync
178+
PERFORM shardman.ensure_sync_standby(newtail_subname);
179+
-- Now it is safe to make old tail writable again
180+
PERFORM shardman.readonly_table_off(relation);
181+
ELSEIF my_id = NEW.owner THEN -- created replica, i.e. new tail node
182+
-- Drop subscription used for copy
183+
PERFORM shardman.eliminate_sub(cp_logname);
184+
-- And create subscription for new data channel
185+
-- It should never exist at this moment, but just in case...
186+
PERFORM shardman.eliminate_sub(newtail_subname);
187+
EXECUTE format(
188+
'CREATE SUBSCRIPTION %I connection %L
189+
PUBLICATION %I with (create_slot = false, slot_name = %L);',
190+
newtail_subname, oldtail_connstr, oldtail_pubname, oldtail_pubname);
191+
-- Now fdw connstring to this part should include only primary and myself
192+
PERFORM shardman.update_fdw_server(NEW);
193+
ELSE -- other nodes
194+
-- just update fdw connstr to add new replica
195+
PERFORM shardman.update_fdw_server(NEW);
196+
END IF;
197+
RETURN NULL;
181198
END
182199
$$ LANGUAGE plpgsql;
183200
CREATE TRIGGER replica_created AFTER INSERT ON shardman.partitions
184-
FOR EACH ROW WHEN (NEW.num != 0) EXECUTE PROCEDURE replica_created();
201+
FOR EACH ROW WHEN (NEW.prv IS NOT NULL) EXECUTE PROCEDURE replica_created();
185202
-- fire trigger only on worker nodes
186203
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER replica_created;
187204

188-
189205
-- Otherwise partitioned tables on worker nodes not will be dropped properly,
190206
-- see pathman's docs.
191207
ALTER EVENT TRIGGER pathman_ddl_trigger ENABLE ALWAYS;
@@ -194,8 +210,8 @@ ALTER EVENT TRIGGER pathman_ddl_trigger ENABLE ALWAYS;
194210
-- Funcs related to fdw
195211
------------------------------------------------------------
196212

197-
-- We use _fdw suffix for foreign tables to avoid interleaving with real
198-
-- ones.
213+
-- Convention: we use _fdw suffix for foreign tables to avoid interleaving with
214+
-- real ones.
199215
CREATE FUNCTION get_fdw_part_name(part_name name) RETURNS name AS $$
200216
BEGIN
201217
RETURN format('%s_fdw', part_name);
@@ -324,6 +340,20 @@ BEGIN
324340
EXECUTE format('DROP TABLE %I', part.part_name);
325341
END $$ LANGUAGE plpgsql;
326342

343+
-- Replace foreign table-partition with local. The latter must exist!
344+
-- Foreign table will be dropped.
345+
CREATE FUNCTION replace_foreign_part_with_usual(part partitions)
346+
RETURNS void AS $$
347+
DECLARE
348+
fdw_part_name name;
349+
BEGIN
350+
ASSERT to_regclass(part.part_name) IS NOT NULL;
351+
SELECT shardman.get_fdw_part_name(part.part_name) INTO fdw_part_name;
352+
EXECUTE format('SELECT replace_hash_partition(%L, %L);',
353+
fdw_part_name, part.part_name);
354+
EXECUTE format('DROP FOREIGN TABLE %I;', fdw_part_name);
355+
END $$ LANGUAGE plpgsql;
356+
327357
-- Options to postgres_fdw are specified in two places: user & password in user
328358
-- mapping and everything else in create server. The problem is that we use
329359
-- single connstring, however user mapping and server doesn't understand this
@@ -493,3 +523,36 @@ BEGIN
493523
AS range(num)) AS range;
494524
END
495525
$$ LANGUAGE plpgsql;
526+
527+
-- Convention about pub, sub and repslot name used for copying part part_name
528+
-- from src node to dst node.
529+
CREATE FUNCTION get_cp_logname(part_name text, src int, dst int)
530+
RETURNS name AS $$
531+
BEGIN
532+
RETURN format('shardman_copy_%s_%s_%s', part_name, src, dst);
533+
END $$ LANGUAGE plpgsql STRICT;
534+
535+
-- Convention about pub and repslot name used for data replication from part
536+
-- on pub_node node to any part. We don't change pub and repslot while
537+
-- switching subs, so sub node is not included here.
538+
CREATE FUNCTION get_data_pubname(part_name text, pub_node int)
539+
RETURNS name AS $$
540+
BEGIN
541+
RETURN format('shardman_data_%s_%s', part_name, pub_node);
542+
END $$ LANGUAGE plpgsql STRICT;
543+
544+
-- Convention about sub and application_name used for data replication. We do
545+
-- recreate sub while switching pub, so pub node is included here.
546+
-- See comment to replica_created on why we don't reuse subs.
547+
CREATE FUNCTION get_data_subname(part_name text, pub_node int, sub_node int)
548+
RETURNS name AS $$
549+
BEGIN
550+
RETURN format('shardman_data_%s_%s_%s', part_name, pub_node, sub_node);
551+
END $$ LANGUAGE plpgsql STRICT;
552+
553+
-- Make sure that standby_name is present in synchronous_standby_names. If not,
554+
-- add it via ALTER SYSTEM and SIGHUP postmaster to reread conf.
555+
CREATE FUNCTION ensure_sync_standby(newtail_subname text) RETURNS void as $$
556+
BEGIN
557+
RAISE DEBUG '[SHARDMAN] imagine standby updated';
558+
END $$ LANGUAGE plpgsql STRICT;

src/include/pg_shardman.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ extern void check_for_sigterm(void);
4545
extern uint64 void_spi(char *sql);
4646
extern void update_cmd_status(int64 id, const char *new_status);
4747
extern void cmd_canceled(Cmd *cmd);
48-
extern char *get_worker_node_connstr(int node_id);
48+
extern char *get_worker_node_connstr(int32 node_id);
4949
extern int32 get_primary_owner(const char *part_name);
50-
extern int32 get_reptail_owner(const char *part_name, int32 *owner,
51-
int32 *partnum);
50+
extern int32 get_reptail_owner(const char *part_name);
5251
extern char *get_partition_relation(const char *part_name);
5352

5453
#endif /* PG_SHARDMAN_H */

src/pg_shardman.c

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ rm_node(Cmd *cmd)
707707
* NULL is returned, if there is no such node.
708708
*/
709709
char*
710-
get_worker_node_connstr(int node_id)
710+
get_worker_node_connstr(int32 node_id)
711711
{
712712
MemoryContext oldcxt = CurrentMemoryContext;
713713
char *sql = psprintf("select connstring from shardman.nodes where id = %d"
@@ -753,7 +753,7 @@ get_primary_owner(const char *part_name)
753753

754754
SPI_PROLOG;
755755
sql = psprintf( /* allocated in SPI ctxt, freed with ctxt release */
756-
"select owner from shardman.partitions where part_name = '%s' and num = 0;",
756+
"select owner from shardman.partitions where part_name = '%s' and prv IS NULL;",
757757
part_name);
758758

759759
if (SPI_execute(sql, true, 0) < 0)
@@ -774,15 +774,14 @@ get_primary_owner(const char *part_name)
774774

775775
/*
776776
* Get node id on which the last replica in the 'part_name' replica chain
777-
* resides, and its partnum. -1 is returned if such partition doesn't exist
778-
* at all.
777+
* resides. -1 is returned if such partition doesn't exist at all.
779778
*/
780779
int32
781-
get_reptail_owner(const char *part_name, int32 *owner, int32 *partnum)
780+
get_reptail_owner(const char *part_name)
782781
{
783782
char *sql;
784783
bool isnull;
785-
int result = 0;
784+
int owner;
786785

787786
SPI_PROLOG;
788787
sql = psprintf( /* allocated in SPI ctxt, freed with ctxt release */
@@ -793,19 +792,16 @@ get_reptail_owner(const char *part_name, int32 *owner, int32 *partnum)
793792
shmn_elog(FATAL, "Stmt failed : %s", sql);
794793

795794
if (SPI_processed == 0)
796-
result = -1;
795+
owner = -1;
797796
else
798797
{
799-
*owner = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
800-
SPI_tuptable->tupdesc,
801-
1, &isnull));
802-
*partnum = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
803-
SPI_tuptable->tupdesc,
804-
2, &isnull));
798+
owner = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
799+
SPI_tuptable->tupdesc,
800+
1, &isnull));
805801
}
806802

807803
SPI_EPILOG;
808-
return result;
804+
return owner;
809805
}
810806

811807
/*

0 commit comments

Comments
 (0)