Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit eecaf8d

Browse files
committed
move_part debugged
1 parent 0aae49e commit eecaf8d

File tree

5 files changed

+62
-43
lines changed

5 files changed

+62
-43
lines changed

bin/common.sh

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ master_port=5432
2121
# declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3")
2222
# declare -a worker_ports=("5433" "5434")
2323

24-
declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3" "${HOME}/postgres/data4")
25-
declare -a worker_ports=("5433" "5434" "5435")
24+
# declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3" "${HOME}/postgres/data4")
25+
# declare -a worker_ports=("5433" "5434" "5435")
26+
27+
declare -a worker_datadirs=("${HOME}/postgres/data2" "${HOME}/postgres/data3" "${HOME}/postgres/data4" "${HOME}/postgres/data5")
28+
declare -a worker_ports=("5433" "5434" "5435" "5436")
2629

2730
#------------------------------------------------------------
2831
PATH="$PATH:${pgpath}bin/"
@@ -64,10 +67,12 @@ function run_demo()
6467
psql -p 5433 -c "INSERT INTO pt SELECT generate_series(1, 10), random();"
6568
psql -c "select shardman.add_node('port=5433');"
6669
psql -c "select shardman.add_node('port=5434');"
70+
psql -c "select shardman.add_node('port=5435');"
71+
psql -c "select shardman.add_node('port=5436');"
72+
6773
psql -p 5433 -c "drop table if exists pt_0;" # drop replica
6874
psql -c "select shardman.create_hash_partitions(2, 'pt', 'id', 2);"
69-
70-
psql -c "select shardman.add_node('port=5435');"
71-
# psql -c "select shardman.move_primary('pt_0', 4);"
72-
# psql -c "select shardman.create_replica('pt_0', 2);"
75+
psql -c "select shardman.create_replica('pt_0', 3);"
76+
psql -c "select shardman.create_replica('pt_0', 5);"
77+
psql -c "select shardman.move_part('pt_0', 4, 3);"
7378
}

init.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ CREATE TRIGGER cmd_log_inserts
5252
CREATE TABLE cmd_opts (
5353
id bigserial PRIMARY KEY,
5454
cmd_id bigint REFERENCES cmd_log(id),
55-
opt text NOT NULL
55+
opt text
5656
);
5757

5858
-- Interface functions

shard.sql

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ CREATE TABLE partitions (
9393
-- it.
9494
CREATE FUNCTION new_primary() RETURNS TRIGGER AS $$
9595
BEGIN
96-
RAISE DEBUG '[SHARDMAN] new_primary trigger called for part %, owner %',
97-
NEW.part_name, NEW.owner;
96+
RAISE DEBUG '[SHARDMAN] new_primary trigger called on node % for part %, owner %',
97+
shardman.get_node_id(), NEW.part_name, NEW.owner;
9898
IF NEW.owner != shardman.get_node_id() THEN
9999
PERFORM shardman.replace_usual_part_with_foreign(NEW);
100100
END IF;
@@ -150,7 +150,7 @@ BEGIN
150150
PERFORM shardman.eliminate_sub(prev_lname);
151151
EXECUTE format(
152152
'CREATE SUBSCRIPTION %I connection %L
153-
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);',
153+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = on);',
154154
prev_lname, prev_connstr, prev_lname, prev_lname);
155155
END IF;
156156
END $$ LANGUAGE plpgsql STRICT;
@@ -168,7 +168,7 @@ BEGIN
168168
PERFORM shardman.eliminate_sub(lname);
169169
EXECUTE format(
170170
'CREATE SUBSCRIPTION %I connection %L
171-
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);',
171+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = on);',
172172
lname, dst_connstr, lname, lname);
173173
END $$ LANGUAGE plpgsql STRICT;
174174

@@ -204,7 +204,7 @@ BEGIN
204204
ELSE
205205
-- On the other hand, if prev replica existed, drop sub for old
206206
-- channel prev -> src
207-
PERFORM shardman.eliminate_sub(src_next_lname);
207+
PERFORM shardman.eliminate_sub(prev_src_lname);
208208
END IF;
209209
IF NEW.nxt IS NOT NULL THEN
210210
-- If next replica existed, drop pub for old channel src -> next
@@ -218,7 +218,7 @@ BEGIN
218218
-- Drop subscription used for copy
219219
PERFORM shardman.eliminate_sub(cp_logname);
220220
-- If primary part was moved, replace moved table with foreign one
221-
IF NEW.prev IS NULL THEN
221+
IF NEW.prv IS NULL THEN
222222
PERFORM shardman.replace_foreign_part_with_usual(NEW);
223223
END IF;
224224
ELSEIF me = NEW.prv THEN -- node with prev replica
@@ -236,8 +236,7 @@ BEGIN
236236
END
237237
$$ LANGUAGE plpgsql;
238238
CREATE TRIGGER part_moved AFTER UPDATE ON shardman.partitions
239-
FOR EACH ROW WHEN (OLD.prv is NULL AND NEW.prv IS NULL -- it is primary
240-
AND OLD.owner != NEW.owner -- and it is really moved
239+
FOR EACH ROW WHEN (OLD.owner != NEW.owner -- part is really moved
241240
AND OLD.part_name = NEW.part_name) -- sanity check
242241
EXECUTE PROCEDURE part_moved();
243242
-- fire trigger only on worker nodes
@@ -287,7 +286,7 @@ BEGIN
287286
PERFORM shardman.eliminate_sub(lname);
288287
EXECUTE format(
289288
'CREATE SUBSCRIPTION %I connection %L
290-
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false);',
289+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = on);',
291290
lname, oldtail_connstr, lname, lname);
292291
END $$ LANGUAGE plpgsql;
293292

@@ -324,18 +323,19 @@ BEGIN
324323
END $$ LANGUAGE plpgsql STRICT;
325324

326325
-- Drop all foreign server's options. Yes, I don't know simpler ways.
327-
CREATE FUNCTION reset_foreign_server_opts(srvname name) RETURNS void AS $$
326+
CREATE FUNCTION reset_foreign_server_opts(sname name) RETURNS void AS $$
328327
DECLARE
329328
opts text[];
330329
opt text;
331330
opt_key text;
332331
BEGIN
333-
EXECUTE format($q$select coalesce(srvoptions, '{}'::text[]) FROM
332+
ASSERT EXISTS (SELECT 1 FROM pg_foreign_server WHERE srvname = sname);
333+
EXECUTE format($q$SELECT coalesce(srvoptions, '{}'::text[]) FROM
334334
pg_foreign_server WHERE srvname = %L$q$,
335-
srvname) INTO opts;
335+
sname) INTO opts;
336336
FOREACH opt IN ARRAY opts LOOP
337337
opt_key := regexp_replace(substring(opt from '^.*?='), '=$', '');
338-
EXECUTE format('ALTER SERVER %I OPTIONS (DROP %s);', srvname, opt_key);
338+
EXECUTE format('ALTER SERVER %I OPTIONS (DROP %s);', sname, opt_key);
339339
END LOOP;
340340
END $$ LANGUAGE plpgsql STRICT;
341341
-- Same for resetting user mapping opts
@@ -359,29 +359,49 @@ BEGIN
359359
END LOOP;
360360
END $$ LANGUAGE plpgsql STRICT;
361361

362-
-- Update foreign server and user mapping params according to partition part, so
363-
-- this is expected to be called on server/um params change. We use dedicated
364-
-- server for each partition because we plan to use multiple hosts/ports in
365-
-- connstrings for transient fallback to replica if server with main partition
366-
-- fails. FDW server, user mapping, foreign table and (obviously) parent partition
367-
-- must exist when called.
362+
-- Update foreign server and user mapping params on current node according to
363+
-- partition part, so this is expected to be called on server/um params
364+
-- change. We use dedicated server for each partition because we plan to use
365+
-- multiple hosts/ports in connstrings for transient fallback to replica if
366+
-- server with main partition fails. FDW server, user mapping, foreign table and
367+
-- (obviously) parent partition must exist when called if they need to be
368+
-- updated; however, it is ok to call this func on nodes which don't need fdw
369+
-- setup for this part because they hold primary partition.
368370
CREATE FUNCTION update_fdw_server(part partitions) RETURNS void AS $$
369371
DECLARE
370372
connstring text;
371373
server_opts text;
372374
um_opts text;
375+
me int := shardman.get_node_id();
376+
my_part shardman.partitions;
373377
BEGIN
374-
-- ALTER FOREIGN TABLE doesn't support changing server, ALTER SERVER doesn't
375-
-- support dropping all params, and I don't want to recreate foreign table
376-
-- each time server params change, so resorting to these hacks.
377-
PERFORM shardman.reset_foreign_server_opts(part.part_name);
378-
PERFORM shardman.reset_um_opts(part.part_name, current_user::regrole);
378+
RAISE DEBUG '[SHARDMAN %] update_fdw_server called for part %, owner %',
379+
shardman.get_node_id(), part.part_name, part.owner;
380+
381+
SELECT * FROM shardman.partitions WHERE part_name = part.part_name AND
382+
owner = me INTO my_part;
383+
IF my_part.part_name IS NOT NULL THEN -- we are holding the part
384+
IF my_part.prv IS NULL THEN
385+
RAISE DEBUG '[SHARDMAN %] we are holding primary for part %s, not updating fdw server for it',
386+
me, part.part_name;
387+
RETURN;
388+
ELSE
389+
RAISE DEBUG '[SHARDMAN %] we are holding replica for part %s, updating fdw server for it',
390+
me, part.part_name;
391+
END IF;
392+
END IF;
379393

380394
SELECT nodes.connstring FROM shardman.nodes WHERE id = part.owner
381395
INTO connstring;
382396
SELECT * FROM shardman.conninfo_to_postgres_fdw_opts(connstring, 'ADD ')
383397
INTO server_opts, um_opts;
384398

399+
-- ALTER FOREIGN TABLE doesn't support changing server, ALTER SERVER doesn't
400+
-- support dropping all params, and I don't want to recreate foreign table
401+
-- each time server params change, so resorting to these hacks.
402+
PERFORM shardman.reset_foreign_server_opts(part.part_name);
403+
PERFORM shardman.reset_um_opts(part.part_name, current_user::regrole);
404+
385405
IF server_opts != '' THEN
386406
EXECUTE format('ALTER SERVER %I %s', part.part_name, server_opts);
387407
END IF;

src/pg_shardman.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,7 +856,7 @@ get_prev_node(const char *part_name, int32 node_id, bool *part_exists)
856856

857857
SPI_PROLOG;
858858
sql = psprintf( /* allocated in SPI ctxt, freed with ctxt release */
859-
"select prev from shardman.partitions where part_name = '%s'"
859+
"select prv from shardman.partitions where part_name = '%s'"
860860
" and owner = %d;", part_name, node_id);
861861

862862
if (SPI_execute(sql, true, 0) < 0)

src/shard.c

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ static void exec_move_part(MovePartState *cps);
251251
static void exec_create_replica(CreateReplicaState *cps);
252252
static int mp_rebuild_lr(MovePartState *cps);
253253
static int cr_rebuild_lr(CreateReplicaState *cps);
254-
static void exec_move_replica(CopyPartState *cps);
255254
static int cp_start_tablesync(CopyPartState *cpts);
256255
static int cp_start_finalsync(CopyPartState *cpts);
257256
static int cp_finalize(CopyPartState *cpts);
@@ -439,12 +438,13 @@ init_mp_state(MovePartState *mps, const char *part_name, int32 src_node,
439438
return;
440439
}
441440
mps->cp.type = COPYPARTTASK_MOVE_PRIMARY;
441+
mps->prev_node = SHMN_INVALID_NODE_ID;
442442
}
443443
else
444444
{
445445
bool part_exists;
446446
/*
447-
* Make sure that part exists on the node and get prev at the same
447+
* Make sure that part exists on src node and get prev at the same
448448
* time to see whether it is a primary or no.
449449
*/
450450
mps->prev_node = get_prev_node(part_name, src_node, &part_exists);
@@ -457,7 +457,10 @@ init_mp_state(MovePartState *mps, const char *part_name, int32 src_node,
457457
}
458458
mps->cp.src_node = src_node;
459459
if (mps->prev_node == SHMN_INVALID_NODE_ID)
460+
{
460461
mps->cp.type = COPYPARTTASK_MOVE_PRIMARY;
462+
mps->prev_node = SHMN_INVALID_NODE_ID;
463+
}
461464
else
462465
{
463466
mps->cp.type = COPYPARTTASK_MOVE_REPLICA;
@@ -1060,15 +1063,6 @@ cr_rebuild_lr(CreateReplicaState *crs)
10601063
return 0;
10611064
}
10621065

1063-
/*
1064-
* One iteration of move replica task execution
1065-
*/
1066-
void
1067-
exec_move_replica(CopyPartState *cps)
1068-
{
1069-
1070-
}
1071-
10721066
/*
10731067
* Actually run CopyPartState state machine. On return, cps values say when (if
10741068
* ever) we want to be executed again.

0 commit comments

Comments
 (0)