Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f01424c

Browse files
committed
Updated LR data channel for move_primary
1 parent 511db06 commit f01424c

File tree

5 files changed

+226
-55
lines changed

5 files changed

+226
-55
lines changed

bin/common.sh

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ master_port=5432
1818
# declare -a worker_datadirs=("${HOME}/postgres/data2")
1919
# declare -a worker_ports=("5433")
2020

21-
declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3")
22-
declare -a worker_ports=("5433" "5434")
21+
# declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3")
22+
# declare -a worker_ports=("5433" "5434")
2323

24-
# declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3" "${HOME}/postgres/data4")
25-
# declare -a worker_ports=("5433" "5434" "5435")
24+
declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3" "${HOME}/postgres/data4")
25+
declare -a worker_ports=("5433" "5434" "5435")
2626

2727
#------------------------------------------------------------
2828
PATH="$PATH:${pgpath}bin/"
@@ -64,5 +64,10 @@ function run_demo()
6464
psql -p 5433 -c "INSERT INTO pt SELECT generate_series(1, 10), random();"
6565
psql -c "select shardman.add_node('port=5433');"
6666
psql -c "select shardman.add_node('port=5434');"
67+
psql -p 5433 -c "drop table if exists pt_0;" # drop replica
6768
psql -c "select shardman.create_hash_partitions(2, 'pt', 'id', 2);"
69+
70+
psql -c "select shardman.add_node('port=5435');"
71+
psql -c "select shardman.move_primary('pt_0', 4);"
72+
psql -c "select shardman.create_replica('pt_0', 2);"
6873
}

shard.sql

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,48 @@ CREATE TRIGGER new_primary AFTER INSERT ON shardman.partitions
106106
-- fire trigger only on worker nodes
107107
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER new_primary;
108108

109+
-- Executed on node with new primary, see mp_rebuild_lr
110+
CREATE FUNCTION primary_moved_create_data_pub(p_name name, src int, dst int)
111+
RETURNS void AS $$
112+
DECLARE
113+
-- Metadata is not yet updated, so taking nxt from src node
114+
replica int := nxt FROM shardman.partitions
115+
WHERE part_name = p_name AND owner = src;
116+
new_pubname text := shardman.get_data_pubname(p_name, dst);
117+
new_subname text := shardman.get_data_subname(p_name, dst, replica);
118+
BEGIN
119+
PERFORM shardman.create_repslot(new_pubname);
120+
-- Create publication for new data channel
121+
EXECUTE format('DROP PUBLICATION IF EXISTS %I', new_pubname);
122+
EXECUTE format('CREATE PUBLICATION %I FOR TABLE %I',
123+
new_pubname, p_name);
124+
-- Make this channel sync
125+
PERFORM shardman.ensure_sync_standby(new_subname);
126+
END $$ LANGUAGE plpgsql STRICT;
127+
128+
-- Executed on nearest replica after primary moved, see mp_rebuild_lr
129+
CREATE FUNCTION primary_moved_create_data_sub(p_name name, src int, dst int)
130+
RETURNS void AS $$
131+
DECLARE
132+
-- Metadata is not yet updated, so taking nxt from src node
133+
replica int := nxt FROM shardman.partitions
134+
WHERE part_name = p_name AND owner = src;
135+
new_pubname text := shardman.get_data_pubname(p_name, dst);
136+
new_subname text := shardman.get_data_subname(p_name, dst, replica);
137+
cp_logname text := shardman.get_cp_logname(p_name, src, dst);
138+
new_connstr text := shardman.get_worker_node_connstr(dst);
139+
BEGIN
140+
-- Drop subscription used for copy
141+
PERFORM shardman.eliminate_sub(cp_logname);
142+
-- Create subscription for new data channel
143+
-- It should never exist at this moment, but just in case...
144+
PERFORM shardman.eliminate_sub(new_subname);
145+
EXECUTE format(
146+
'CREATE SUBSCRIPTION %I connection %L
147+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);',
148+
new_subname, new_connstr, new_pubname, new_pubname);
149+
END $$ LANGUAGE plpgsql STRICT;
150+
109151
-- Update metadata according to primary move
110152
CREATE FUNCTION primary_moved() RETURNS TRIGGER AS $$
111153
DECLARE
@@ -115,6 +157,8 @@ BEGIN
115157
RAISE DEBUG '[SHARDMAN] primary_moved trigger called for part %, owner %->%',
116158
NEW.part_name, OLD.owner, NEW.owner;
117159
ASSERT NEW.owner != OLD.owner, 'primary_moved handles only moved parts';
160+
ASSERT NEW.nxt = OLD.nxt OR (NEW.nxt IS NULL AND OLD.nxt IS NULL),
161+
'both primary and replica must not be moved in one update';
118162
IF my_id = OLD.owner THEN -- src node
119163
-- Drop publication & repslot used for copy
120164
PERFORM shardman.drop_repslot_and_pub(cp_logname);
@@ -141,7 +185,7 @@ CREATE TRIGGER primary_moved AFTER UPDATE ON shardman.partitions
141185
ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER primary_moved;
142186

143187
-- Executed on newtail node, see cr_rebuild_lr
144-
CREATE FUNCTION replica_created_rebuild_drop_cp_sub(
188+
CREATE FUNCTION replica_created_drop_cp_sub(
145189
part_name name, oldtail int, newtail int) RETURNS void AS $$
146190
DECLARE
147191
cp_logname text := shardman.get_cp_logname(part_name, oldtail, newtail);
@@ -152,7 +196,7 @@ BEGIN
152196
END $$ LANGUAGE plpgsql;
153197

154198
-- Executed on oldtail node, see cr_rebuild_lr
155-
CREATE FUNCTION replica_created_rebuild_lr_create_data_pub(
199+
CREATE FUNCTION replica_created_create_data_pub(
156200
part_name name, oldtail int, newtail int) RETURNS void AS $$
157201
DECLARE
158202
cp_logname text := shardman.get_cp_logname(part_name, oldtail, newtail);
@@ -175,7 +219,7 @@ BEGIN
175219
END $$ LANGUAGE plpgsql;
176220

177221
-- Executed on oldtail node, see cr_rebuild_lr
178-
CREATE FUNCTION replica_created_rebuild_lr_create_data_sub(
222+
CREATE FUNCTION replica_created_create_data_sub(
179223
part_name name, oldtail int, newtail int) RETURNS void AS $$
180224
DECLARE
181225
oldtail_pubname name := shardman.get_data_pubname(part_name, oldtail);

src/include/pg_shardman.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ extern void cmd_canceled(Cmd *cmd);
4848
extern char *get_worker_node_connstr(int32 node_id);
4949
extern int32 get_primary_owner(const char *part_name);
5050
extern int32 get_reptail_owner(const char *part_name);
51+
extern int32 get_next_node(const char *part_name, int32 node_id);
5152
extern char *get_partition_relation(const char *part_name);
5253

5354
#endif /* PG_SHARDMAN_H */

src/pg_shardman.c

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,7 +792,7 @@ get_reptail_owner(const char *part_name)
792792
shmn_elog(FATAL, "Stmt failed : %s", sql);
793793

794794
if (SPI_processed == 0)
795-
owner = -1;
795+
owner = SHMN_INVALID_NODE_ID;
796796
else
797797
{
798798
owner = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
@@ -804,6 +804,41 @@ get_reptail_owner(const char *part_name)
804804
return owner;
805805
}
806806

807+
/*
808+
* Get node on which replica next to 'node_id' node in the 'part_name' replica
809+
* chain resides. SHMN_INVALID_NODE_ID is returned if such partition doesn't
810+
* exist at all or there is no next replica.
811+
*/
812+
int32
813+
get_next_node(const char *part_name, int32 node_id)
814+
{
815+
char *sql;
816+
bool isnull;
817+
int32 next;
818+
819+
SPI_PROLOG;
820+
sql = psprintf( /* allocated in SPI ctxt, freed with ctxt release */
821+
"select nxt from shardman.partitions where part_name = '%s'"
822+
" and owner = %d;", part_name, node_id);
823+
824+
if (SPI_execute(sql, true, 0) < 0)
825+
shmn_elog(FATAL, "Stmt failed : %s", sql);
826+
827+
if (SPI_processed == 0)
828+
next = SHMN_INVALID_NODE_ID;
829+
else
830+
{
831+
next = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
832+
SPI_tuptable->tupdesc,
833+
1, &isnull));
834+
if (isnull)
835+
next = SHMN_INVALID_NODE_ID;
836+
}
837+
838+
SPI_EPILOG;
839+
return next;
840+
}
841+
807842
/*
808843
* Get relation name of partition part_name. Memory is palloc'ed.
809844
* NULL is returned, if there is no such partition.

0 commit comments

Comments
 (0)