@@ -106,91 +106,149 @@ CREATE TRIGGER new_primary AFTER INSERT ON shardman.partitions
106
106
-- fire trigger only on worker nodes
107
107
ALTER TABLE shardman .partitions ENABLE REPLICA TRIGGER new_primary;
108
108
109
- -- Executed on node with new primary , see mp_rebuild_lr
110
- CREATE FUNCTION primary_moved_create_data_pub (p_name name, src int , dst int )
109
+ -- Executed on prev replica after partition move , see mp_rebuild_lr
110
+ CREATE FUNCTION part_moved_prev (p_name name, src int , dst int )
111
111
RETURNS void AS $$
112
112
DECLARE
113
- -- Metadata is not yet updated, so taking nxt from src node
114
- replica int := nxt FROM shardman .partitions
115
- WHERE part_name = p_name AND owner = src;
116
- new_pubname text := shardman .get_data_pubname (p_name, dst);
117
- new_subname text := shardman .get_data_subname (p_name, dst, replica);
113
+ me int := shardman .get_node_id ();
114
+ lname text := shardman .get_data_lname (p_name, me, dst);
118
115
BEGIN
119
- PERFORM shardman .create_repslot (new_pubname);
120
- -- Create publication for new data channel
121
- EXECUTE format(' DROP PUBLICATION IF EXISTS %I' , new_pubname);
122
- EXECUTE format(' CREATE PUBLICATION %I FOR TABLE %I' ,
123
- new_pubname, p_name);
124
- -- Make this channel sync
125
- PERFORM shardman .ensure_sync_standby (new_subname);
116
+ PERFORM shardman .create_repslot (lname);
117
+ -- Create publication for new data channel prev replica -> dst, make it sync
118
+ EXECUTE format(' DROP PUBLICATION IF EXISTS %I' , lname);
119
+ EXECUTE format(' CREATE PUBLICATION %I FOR TABLE %I' , lname, p_name);
120
+ PERFORM shardman .ensure_sync_standby (lname);
126
121
END $$ LANGUAGE plpgsql STRICT;
127
122
128
- -- Executed on nearest replica after primary moved , see mp_rebuild_lr
129
- CREATE FUNCTION primary_moved_create_data_sub (p_name name, src int , dst int )
123
+ -- Executed on node with new part , see mp_rebuild_lr
124
+ CREATE FUNCTION part_moved_dst (p_name name, src int , dst int )
130
125
RETURNS void AS $$
131
126
DECLARE
132
- -- Metadata is not yet updated, so taking nxt from src node
133
- replica int : = nxt FROM shardman . partitions
134
- WHERE part_name = p_name AND owner = src;
135
- new_pubname text : = shardman . get_data_pubname (p_name, dst) ;
136
- new_subname text : = shardman . get_data_subname (p_name, dst, replica) ;
137
- cp_logname text : = shardman . get_cp_logname (p_name, src, dst) ;
138
- new_connstr text : = shardman . get_worker_node_connstr (dst) ;
127
+ next_rep int : = nxt FROM shardman . partitions WHERE part_name = p_name
128
+ AND owner = src;
129
+ prev_rep int : = prv FROM shardman . partitions WHERE part_name = p_name
130
+ AND owner = src ;
131
+ next_lname text ;
132
+ prev_lname text ;
133
+ prev_connstr text ;
139
134
BEGIN
140
- -- Drop subscription used for copy
141
- PERFORM shardman .eliminate_sub (cp_logname);
142
- -- Create subscription for new data channel
135
+ ASSERT dst = shardman .get_node_id (), ' part_moved_dst must be called on dst' ;
136
+ IF next_rep IS NOT NULL THEN -- we need to setup channel dst -> next replica
137
+ next_lname := shardman .get_data_lname (p_name, dst, next_rep);
138
+ -- This must be first write in the transaction!
139
+ PERFORM shardman .create_repslot (next_lname);
140
+ EXECUTE format(' DROP PUBLICATION IF EXISTS %I' , next_lname);
141
+ EXECUTE format(' CREATE PUBLICATION %I FOR TABLE %I' ,
142
+ next_lname, p_name);
143
+ -- Make this channel sync
144
+ PERFORM shardman .ensure_sync_standby (next_lname);
145
+ END IF;
146
+
147
+ IF prev_rep IS NOT NULL THEN -- we need to setup channel prev replica -> dst
148
+ prev_lname := shardman .get_data_lname (p_name, prev_rep, dst);
149
+ prev_connstr := shardman .get_worker_node_connstr (prev_rep);
150
+ PERFORM shardman .eliminate_sub (prev_lname);
151
+ EXECUTE format(
152
+ ' CREATE SUBSCRIPTION %I connection %L
153
+ PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);' ,
154
+ prev_lname, prev_connstr, prev_lname, prev_lname);
155
+ END IF;
156
+ END $$ LANGUAGE plpgsql STRICT;
157
+
158
+ -- Executed on next replica after partition move, see mp_rebuild_lr
159
+ CREATE FUNCTION part_moved_next (p_name name, src int , dst int )
160
+ RETURNS void AS $$
161
+ DECLARE
162
+ me int := shardman .get_node_id ();
163
+ lname text := shardman .get_data_lname (p_name, dst, me);
164
+ dst_connstr text := shardman .get_worker_node_connstr (dst);
165
+ BEGIN
166
+ -- Create subscription for new data channel dst -> next replica
143
167
-- It should never exist at this moment, but just in case...
144
- PERFORM shardman .eliminate_sub (new_subname );
168
+ PERFORM shardman .eliminate_sub (lname );
145
169
EXECUTE format(
146
170
' CREATE SUBSCRIPTION %I connection %L
147
171
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);' ,
148
- new_subname, new_connstr, new_pubname, new_pubname );
172
+ lname, dst_connstr, lname, lname );
149
173
END $$ LANGUAGE plpgsql STRICT;
150
174
151
175
-- Update metadata according to primary move
152
- CREATE FUNCTION primary_moved () RETURNS TRIGGER AS $$
176
+ CREATE FUNCTION part_moved () RETURNS TRIGGER AS $$
153
177
DECLARE
154
- cp_logname text := shardman .get_cp_logname (OLD .part_name , OLD .owner , NEW .owner );
155
- my_id int := shardman .get_node_id ();
178
+ cp_logname text := shardman .get_cp_logname (NEW .part_name , OLD .owner , NEW .owner );
179
+ me int := shardman .get_node_id ();
180
+ prev_src_lname text ;
181
+ src_next_lname text ;
156
182
BEGIN
157
- RAISE DEBUG ' [SHARDMAN] primary_moved trigger called for part %, owner %->%' ,
183
+ ASSERT NEW .owner != OLD .owner , ' part_moved handles only moved parts' ;
184
+ RAISE DEBUG ' [SHARDMAN] part_moved trigger called for part %, owner % -> %' ,
158
185
NEW .part_name , OLD .owner , NEW .owner ;
159
- ASSERT NEW .owner != OLD .owner , ' primary_moved handles only moved parts' ;
160
186
ASSERT NEW .nxt = OLD .nxt OR (NEW .nxt IS NULL AND OLD .nxt IS NULL ),
161
- ' both primary and replica must not be moved in one update' ;
162
- IF my_id = OLD .owner THEN -- src node
187
+ ' both part and replica must not be moved in one update' ;
188
+ ASSERT NEW .prv = OLD .prv OR (NEW .prv IS NULL AND OLD .prv IS NULL ),
189
+ ' both part and replica must not be moved in one update' ;
190
+ IF NEW .prv IS NOT NULL THEN
191
+ prev_src_lname := shardman .get_data_lname (NEW .part_name , NEW .prv , OLD .owner );
192
+ END IF;
193
+ IF NEW .nxt IS NOT NULL THEN
194
+ src_next_lname := shardman .get_data_lname (NEW .part_name , OLD .owner , NEW .nxt );
195
+ END IF;
196
+
197
+ IF me = OLD .owner THEN -- src node
163
198
-- Drop publication & repslot used for copy
164
199
PERFORM shardman .drop_repslot_and_pub (cp_logname);
165
- -- On src node, replace its partition with foreign one
166
- PERFORM shardman .replace_usual_part_with_foreign (NEW);
167
- ELSEIF my_id = NEW .owner THEN -- dst node
200
+ -- If primary part was moved, replace on src node its partition with
201
+ -- foreign one
202
+ IF NEW .prv IS NULL THEN
203
+ PERFORM shardman .replace_usual_part_with_foreign (NEW);
204
+ ELSE
205
+ -- On the other hand, if prev replica existed, drop sub for old
206
+ -- channel prev -> src
207
+ PERFORM shardman .eliminate_sub (src_next_lname);
208
+ END IF;
209
+ IF NEW .nxt IS NOT NULL THEN
210
+ -- If next replica existed, drop pub for old channel src -> next
211
+ PERFORM shardman .drop_repslot_and_pub (src_next_lname);
212
+ PERFORM shardman .remove_sync_standby (src_next_lname);
213
+ END IF;
214
+ -- Drop old table anyway;
215
+ EXECUTE format(' DROP TABLE IF EXISTS %I' , NEW .part_name );
216
+
217
+ ELSEIF me = NEW .owner THEN -- dst node
168
218
-- Drop subscription used for copy
169
219
PERFORM shardman .eliminate_sub (cp_logname);
170
- -- And replace moved table with foreign one
171
- PERFORM shardman .replace_foreign_part_with_usual (NEW);
172
- ELSE -- other nodes
173
- -- just update foreign server
174
- PERFORM shardman .update_fdw_server (NEW);
220
+ -- If primary part was moved, replace moved table with foreign one
221
+ IF NEW .prev IS NULL THEN
222
+ PERFORM shardman .replace_foreign_part_with_usual (NEW);
223
+ END IF;
224
+ ELSEIF me = NEW .prv THEN -- node with prev replica
225
+ -- Drop pub for old channel prev -> src
226
+ PERFORM shardman .drop_repslot_and_pub (prev_src_lname);
227
+ PERFORM shardman .remove_sync_standby (prev_src_lname);
228
+ ELSEIF me = NEW .nxt THEN -- node with next replica
229
+ -- Drop sub for old channel src -> next
230
+ PERFORM shardman .eliminate_sub (src_next_lname);
175
231
END IF;
232
+
233
+ -- And update fdw almost everywhere
234
+ PERFORM shardman .update_fdw_server (NEW);
176
235
RETURN NULL ;
177
236
END
178
237
$$ LANGUAGE plpgsql;
179
- CREATE TRIGGER primary_moved AFTER UPDATE ON shardman .partitions
238
+ CREATE TRIGGER part_moved AFTER UPDATE ON shardman .partitions
180
239
FOR EACH ROW WHEN (OLD .prv is NULL AND NEW .prv IS NULL -- it is primary
181
240
AND OLD .owner != NEW .owner -- and it is really moved
182
241
AND OLD .part_name = NEW .part_name ) -- sanity check
183
- EXECUTE PROCEDURE primary_moved ();
242
+ EXECUTE PROCEDURE part_moved ();
184
243
-- fire trigger only on worker nodes
185
- ALTER TABLE shardman .partitions ENABLE REPLICA TRIGGER primary_moved ;
244
+ ALTER TABLE shardman .partitions ENABLE REPLICA TRIGGER part_moved ;
186
245
187
246
-- Executed on newtail node, see cr_rebuild_lr
188
247
CREATE FUNCTION replica_created_drop_cp_sub (
189
248
part_name name, oldtail int , newtail int ) RETURNS void AS $$
190
249
DECLARE
191
250
cp_logname text := shardman .get_cp_logname (part_name, oldtail, newtail);
192
251
BEGIN
193
- PERFORM shardman .readonly_replica_on (part_name::regclass);
194
252
-- Drop subscription used for copy
195
253
PERFORM shardman .eliminate_sub (cp_logname);
196
254
END $$ LANGUAGE plpgsql;
@@ -200,39 +258,37 @@ CREATE FUNCTION replica_created_create_data_pub(
200
258
part_name name, oldtail int , newtail int ) RETURNS void AS $$
201
259
DECLARE
202
260
cp_logname text := shardman .get_cp_logname (part_name, oldtail, newtail);
203
- oldtail_pubname name := shardman .get_data_pubname (part_name, oldtail);
204
- newtail_subname name := shardman .get_data_subname (part_name, oldtail, newtail);
261
+ lname name := shardman .get_data_lname (part_name, oldtail, newtail);
205
262
BEGIN
206
263
-- Repslot for new data channel. Must be first, since we "cannot create
207
264
-- logical replication slot in transaction that has performed writes"
208
- PERFORM shardman .create_repslot (oldtail_pubname );
265
+ PERFORM shardman .create_repslot (lname );
209
266
-- Drop publication & repslot used for copy
210
267
PERFORM shardman .drop_repslot_and_pub (cp_logname);
211
268
-- Create publication for new data channel
212
- EXECUTE format(' DROP PUBLICATION IF EXISTS %I' , oldtail_pubname);
213
- EXECUTE format(' CREATE PUBLICATION %I FOR TABLE %I' ,
214
- oldtail_pubname, part_name);
269
+ EXECUTE format(' DROP PUBLICATION IF EXISTS %I' , lname);
270
+ EXECUTE format(' CREATE PUBLICATION %I FOR TABLE %I' , lname, part_name);
215
271
-- Make this channel sync
216
- PERFORM shardman .ensure_sync_standby (newtail_subname );
272
+ PERFORM shardman .ensure_sync_standby (lname );
217
273
-- Now it is safe to make old tail writable again
218
274
PERFORM shardman .readonly_table_off (part_name::regclass);
219
275
END $$ LANGUAGE plpgsql;
220
276
221
- -- Executed on oldtail node, see cr_rebuild_lr
277
+ -- Executed on newtail node, see cr_rebuild_lr
222
278
CREATE FUNCTION replica_created_create_data_sub (
223
279
part_name name, oldtail int , newtail int ) RETURNS void AS $$
224
280
DECLARE
225
- oldtail_pubname name := shardman .get_data_pubname (part_name, oldtail);
281
+ lname name := shardman .get_data_lname (part_name, oldtail, newtail );
226
282
oldtail_connstr text := shardman .get_worker_node_connstr (oldtail);
227
- newtail_subname name := shardman .get_data_subname (part_name, oldtail, newtail);
228
283
BEGIN
284
+ PERFORM shardman .readonly_replica_on (part_name::regclass);
229
285
-- Create subscription for new data channel
230
286
-- It should never exist at this moment, but just in case...
231
- PERFORM shardman .eliminate_sub (newtail_subname );
287
+ PERFORM shardman .eliminate_sub (lname );
232
288
EXECUTE format(
233
289
' CREATE SUBSCRIPTION %I connection %L
234
290
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);' ,
235
- newtail_subname , oldtail_connstr, oldtail_pubname, oldtail_pubname );
291
+ lname , oldtail_connstr, lname, lname );
236
292
END $$ LANGUAGE plpgsql;
237
293
238
294
-- TODO
@@ -581,27 +637,26 @@ BEGIN
581
637
RETURN format(' shardman_copy_%s_%s_%s' , part_name, src, dst);
582
638
END $$ LANGUAGE plpgsql STRICT;
583
639
584
- -- Convention about pub and repslot name used for data replication from part
585
- -- on pub_node node to any part. We don't change pub and repslot while
586
- -- switching subs, so sub node is not included here.
587
- CREATE FUNCTION get_data_pubname (part_name text , pub_node int )
588
- RETURNS name AS $$
589
- BEGIN
590
- RETURN format(' shardman_data_%s_%s' , part_name, pub_node);
591
- END $$ LANGUAGE plpgsql STRICT;
592
-
593
- -- Convention about sub and application_name used for data replication. We do
594
- -- recreate sub while switching pub, so pub node is included here.
595
- -- See comment to replica_created on why we don't reuse subs.
596
- CREATE FUNCTION get_data_subname (part_name text , pub_node int , sub_node int )
640
+ -- Convention about pub, repslot, sub and application_name used for data
641
+ -- replication. We do recreate sub while switching pub, so pub node is included
642
+ -- here, and recreate pub when switching sub, so including both in the name. See
643
+ -- comment to replica_created on why we don't reuse pubs and subs.
644
+ CREATE FUNCTION get_data_lname (part_name text , pub_node int , sub_node int )
597
645
RETURNS name AS $$
598
646
BEGIN
599
647
RETURN format(' shardman_data_%s_%s_%s' , part_name, pub_node, sub_node);
600
648
END $$ LANGUAGE plpgsql STRICT;
601
649
602
650
-- Make sure that standby_name is present in synchronous_standby_names. If not,
603
651
-- add it via ALTER SYSTEM and SIGHUP postmaster to reread conf.
604
- CREATE FUNCTION ensure_sync_standby (newtail_subname text ) RETURNS void as $$
652
+ CREATE FUNCTION ensure_sync_standby (standby text ) RETURNS void as $$
653
+ BEGIN
654
+ RAISE DEBUG ' [SHARDMAN] imagine standby % added' , standby;
655
+ END $$ LANGUAGE plpgsql STRICT;
656
+
657
+ -- Remove 'standby' from in synchronous_standby_names, if it is there, and SIGHUP
658
+ -- postmaster.
659
+ CREATE FUNCTION remove_sync_standby (standby text ) RETURNS void as $$
605
660
BEGIN
606
- RAISE DEBUG ' [SHARDMAN] imagine standby updated ' ;
661
+ RAISE DEBUG ' [SHARDMAN] imagine standby % removed ' , standby ;
607
662
END $$ LANGUAGE plpgsql STRICT;
0 commit comments