Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3a90ceb

Browse files
committed
Updating metadata in-place during replica promotion.
The idea to commit each change in separate xact was actually stupid; plpgsql doesn't take new snapshot and we don't see the changes. For now just update in usual xact, as before. Also, we now force create_hash_partitions execution under READ COMMITTED, as pg_pathman wants.
1 parent b39ae6d commit 3a90ceb

File tree

2 files changed

+71
-57
lines changed

2 files changed

+71
-57
lines changed

pg_shardman--0.0.2.sql

Lines changed: 52 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ BEGIN
262262
-- Broadcast create table commands
263263
PERFORM shardman.broadcast(create_tables);
264264
-- Broadcast create hash partitions command
265-
PERFORM shardman.broadcast(create_partitions);
265+
PERFORM shardman.broadcast(create_partitions, iso_level => 'read committed');
266266
-- Broadcast create foreign table commands
267267
PERFORM shardman.broadcast(create_fdws);
268268
-- Broadcast replace hash partition commands
@@ -376,21 +376,11 @@ BEGIN
376376
rm_node_id),
377377
ignore_errors := true);
378378

379-
-- Remove node from metadata right away. We require from the user that after
380-
-- calling rm_node the node must never be accessed, so it makes sense to
381-
-- reflect metadata accordingly -- otherwise, we if fail somewhere down the
382-
-- road below, the user would have been tempted to change her mind and not
383-
-- to call rm_node again; it should be our responsibility to clean the things
384-
-- up in recovery() in case of failure.
385-
-- We want to see this change, so make sure we are running in READ COMMITTED.
386379
-- We set node_id of node's parts to NULL, meaning they are waiting for
387380
-- promotion. Replicas are removed with cascade.
388-
ASSERT current_setting('transaction_isolation') = 'read committed',
389-
'rm_node must be executed with READ COMMITTED isolation level';
390-
PERFORM shardman.broadcast(format(
391-
'{0:UPDATE shardman.partitions SET node_id=null WHERE node_id=%s;
392-
DELETE FROM shardman.nodes WHERE id=%s;}',
393-
rm_node_id, rm_node_id));
381+
UPDATE shardman.partitions SET node_id = NULL WHERE node_id=rm_node_id;
382+
DELETE FROM shardman.nodes WHERE id = rm_node_id;
383+
394384

395385
-- Remove all subscriptions and publications related to removed node
396386
FOR node IN SELECT * FROM shardman.nodes WHERE replication_group=repl_group
@@ -489,16 +479,16 @@ BEGIN
489479
WHERE id<>rm_node_id ORDER BY random() LIMIT 1;
490480
END IF;
491481

492-
-- Partition is successfully promoted, update metadata. It is important
493-
-- to commit that before sending new mappings, because otherwise if we
494-
-- fail during the latter, news about promoted replica will be lost;
495-
-- next time we might choose another replica to promote with some new
496-
-- data already written to previously promoted replica. Syncing
497-
-- replicas doesn't help us much here if we don't lock tables.
498-
PERFORM shardman.broadcast(format(
499-
'{0:UPDATE shardman.partitions SET node_id=%s WHERE part_name = %L;
500-
DELETE FROM shardman.replicas WHERE part_name = %L AND node_id = %s}',
501-
new_master_id, part.part_name, part.part_name, new_master_id));
482+
-- Partition is successfully promoted, update metadata. XXX: we should
483+
-- commit that before sending new mappings, because otherwise if we fail
484+
-- during the latter, news about promoted replica will be lost; next
485+
-- time we might choose another replica to promote with some new data
486+
-- already written to previously promoted replica. Syncing replicas
487+
-- doesn't help us much here if we don't lock tables.
488+
UPDATE shardman.partitions SET node_id = new_master_id
489+
WHERE part_name = part.part_name;
490+
DELETE FROM shardman.replicas WHERE part_name = part.part_name AND
491+
node_id = new_master_id;
502492

503493
-- Update pathman partition map at all nodes
504494
FOR node IN SELECT * FROM shardman.nodes WHERE id<>rm_node_id
@@ -611,15 +601,14 @@ BEGIN
611601
-- Create parent table at all nodes
612602
create_tables := format('%s{%s:%s}',
613603
create_tables, node.id, create_table);
614-
-- Create partitions using pathman at all nodes
615604
create_partitions := format('%s%s:select create_hash_partitions(%L,%L,%L);',
616605
create_partitions, node.id, rel_name, expr, part_count);
617606
END LOOP;
618607

619608
-- Broadcast create table commands
620609
PERFORM shardman.broadcast(create_tables);
621610
-- Broadcast create hash partitions command
622-
PERFORM shardman.broadcast(create_partitions);
611+
PERFORM shardman.broadcast(create_partitions, iso_level => 'read committed');
623612

624613
-- Get list of nodes in random order
625614
SELECT ARRAY(SELECT id from shardman.nodes ORDER BY random()) INTO node_ids;
@@ -650,7 +639,7 @@ BEGIN
650639
END LOOP;
651640

652641
-- Broadcast create foreign table commands
653-
PERFORM shardman.broadcast(create_fdws);
642+
PERFORM shardman.broadcast(create_fdws, iso_level => 'read committed');
654643
-- Broadcast replace hash partition commands
655644
PERFORM shardman.broadcast(replace_parts);
656645

@@ -673,6 +662,7 @@ DECLARE
673662
repl_group text;
674663
pubs text = '';
675664
subs text = '';
665+
sub text = '';
676666
sub_options text = '';
677667
BEGIN
678668
IF shardman.redirect_to_shardlord(format('set_redundancy(%L, %L)', rel_name,
@@ -709,8 +699,12 @@ BEGIN
709699
-- Establish publications and subscriptions for this partition
710700
pubs := format('%s%s:ALTER PUBLICATION node_%s ADD TABLE %I;',
711701
pubs, part.node_id, repl_node, part.part_name);
712-
subs := format('%s%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION%s;',
713-
subs, repl_node, repl_node, part.node_id, sub_options);
702+
sub := format('%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION%s;',
703+
repl_node, repl_node, part.node_id, sub_options);
704+
-- ignore duplicates
705+
IF position(sub in subs) = 0 THEN
706+
subs := subs || sub;
707+
END IF;
714708
END LOOP;
715709
END IF;
716710
END LOOP;
@@ -1410,6 +1404,7 @@ BEGIN
14101404
WHERE r.part_name=part.part_name ORDER BY random() LIMIT 1;
14111405
IF new_master_id IS NOT NULL
14121406
THEN -- exists some replica for this part, promote it
1407+
RAISE DEBUG '[SHMN] Promoting part % on node %', part.part_name, new_master_id;
14131408
-- If there are more than one replica of this partition, we need to
14141409
-- synchronize them
14151410
IF shardman.get_redundancy_of_partition(part.part_name) > 1
@@ -1425,16 +1420,20 @@ BEGIN
14251420
WHERE id<>rm_node_id ORDER BY random() LIMIT 1;
14261421
END IF;
14271422

1428-
-- Update metadata. It is important to commit that before sending new
1423+
-- Update metadata. XXX We should commit that before sending new
14291424
-- mappings, because otherwise if we fail during the latter, news about
14301425
-- promoted replica will be lost; next time we might choose another
14311426
-- replica to promote with some new data already written to previously
14321427
-- promoted replica. Syncing replicas doesn't help us much here if we
14331428
-- don't lock tables.
1434-
PERFORM shardman.broadcast(format(
1435-
'{0:UPDATE shardman.partitions SET node_id=%s WHERE part_name = %L;
1436-
DELETE FROM shardman.replicas WHERE part_name = %L AND node_id = %s}',
1437-
new_master_id, part.part_name, part.part_name, new_master_id));
1429+
UPDATE shardman.partitions SET node_id=new_master_id
1430+
WHERE part_name = part.part_name;
1431+
DELETE FROM shardman.replicas r WHERE r.part_name = part.part_name AND
1432+
r.node_id = new_master_id;
1433+
-- PERFORM shardman.broadcast(format(
1434+
-- '{0:UPDATE shardman.partitions SET node_id=%s WHERE part_name = %L;
1435+
-- DELETE FROM shardman.replicas WHERE part_name = %L AND node_id = %s}',
1436+
-- new_master_id, part.part_name, part.part_name, new_master_id));
14381437
END LOOP;
14391438

14401439
-- Fix replication channels
@@ -1601,7 +1600,8 @@ BEGIN
16011600
SELECT * INTO t FROM shardman.tables WHERE relation=part.relation;
16021601
PERFORM shardman.broadcast(format(
16031602
'%s:SELECT create_hash_partitions(%L,%L,%L);',
1604-
src_node.id, t.relation, t.sharding_key, t.partitions_count));
1603+
src_node.id, t.relation, t.sharding_key, t.partitions_count),
1604+
iso_level => 'read committed');
16051605
END IF;
16061606
RAISE NOTICE 'Replace % with % at node %',
16071607
part.part_name, fdw_part_name, src_node.id;
@@ -1628,7 +1628,8 @@ BEGIN
16281628
SELECT * INTO t FROM shardman.tables WHERE relation=part.relation;
16291629
PERFORM shardman.broadcast(format(
16301630
'%s:SELECT create_hash_partitions(%L, %L, %L);',
1631-
src_node.id, t.relation, t.sharding_key, t.partitions_count));
1631+
src_node.id, t.relation, t.sharding_key, t.partitions_count),
1632+
iso_level => 'read committed');
16321633
ELSE
16331634
RAISE NOTICE 'Replace % with % at node %',
16341635
fdw_part_name, part.part_name, src_node.id;
@@ -1827,7 +1828,9 @@ $$ LANGUAGE plpgsql;
18271828

18281829
-- Commit or rollback not completed distributed transactions.
18291830
-- All nodes must be alive for this to do something.
1830-
-- If coordinator is still in the cluster, we just try asking it.
1831+
-- If coordinator is still in the cluster, we just try asking it:
1832+
-- if xact committed on it, we commit it everywhere, if aborted, abort
1833+
-- everywhere.
18311834
-- If not, and there is only one participant, we simply commit the xact.
18321835
-- If n_participants > 1, and xact is prepared everywhere, commit it.
18331836
-- Otherwise, check WAL of every node; if COMMIT is found, COMMIT, if ABORT
@@ -1870,7 +1873,8 @@ BEGIN
18701873
cmds, node_id, node_id);
18711874
END LOOP;
18721875

1873-
-- Collected prepared xacts from all nodes
1876+
-- Collected prepared xacts from all nodes. They arrive as comma-separated
1877+
-- $node_id=>$gid
18741878
xacts := string_to_array(shardman.broadcast(cmds), ',');
18751879
-- empty string means no prepared xacts
18761880
xacts := array_remove(xacts, '');
@@ -1880,7 +1884,7 @@ BEGIN
18801884
xact_node_id := split_part(xact, '=>', 1);
18811885
gid := split_part(xact, '=>', 2);
18821886
sysid := split_part(gid, ':', 3)::bigint;
1883-
xid := split_part(gid, ':', 4)::bigint;
1887+
xid := split_part(gid, ':', 4)::bigint; -- coordinator's xid
18841888
SELECT id INTO coordinator FROM shardman.nodes WHERE system_id=sysid;
18851889
IF coordinator IS NULL
18861890
THEN
@@ -1982,7 +1986,7 @@ BEGIN
19821986
finish := format('%s%s:ROLLBACK PREPARED %L;', finish, xact_node_id, gid);
19831987
ELSEIF status IS NULL
19841988
THEN
1985-
RAISE WARNING 'Transaction % at coordinator % is too old to perform 2PC resolution',
1989+
RAISE WARNING 'Transaction % at coordinator % is too old to perform 2PC resolution or still in progress',
19861990
gid, coordinator;
19871991
END IF;
19881992
END IF;
@@ -2085,13 +2089,19 @@ RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
20852089
-- previous was already executed.
20862090
-- If super_connstr is true, super connstring is used everywhere, usual
20872091
-- connstr otherwise.
2092+
2093+
-- If iso_level is specified, cmd is wrapped in BEGIN TRANSACTION ISOLATION
2094+
-- LEVEL iso_level; ... END;
2095+
-- this allows to set isolation level; however you won't be able to get results
2096+
-- this way.
20882097
CREATE FUNCTION broadcast(cmds text,
20892098
ignore_errors bool = false,
20902099
two_phase bool = false,
20912100
sync_commit_on bool = false,
20922101
sequential bool = false,
2093-
super_connstr bool = false)
2094-
RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
2102+
super_connstr bool = false,
2103+
iso_level text = null)
2104+
RETURNS text AS 'pg_shardman' LANGUAGE C;
20952105

20962106
-- Options to postgres_fdw are specified in two places: user & password in user
20972107
-- mapping and everything else in create server. The problem is that we use

pg_shardman.c

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ broadcast(PG_FUNCTION_ARGS)
161161
bool sync_commit_on = PG_GETARG_BOOL(3);
162162
bool sequential = PG_GETARG_BOOL(4);
163163
bool super_connstr = PG_GETARG_BOOL(5);
164+
char* iso_level = (PG_GETARG_POINTER(6) != NULL) ?
165+
text_to_cstring(PG_GETARG_TEXT_PP(6)) : NULL;
164166
char* sep;
165167
char* sql;
166168
PGresult *res;
@@ -175,6 +177,7 @@ broadcast(PG_FUNCTION_ARGS)
175177
Channel* chan;
176178
PGconn* con;
177179
StringInfoData resp;
180+
StringInfoData fin_sql;
178181

179182
char const* errstr = "";
180183

@@ -246,30 +249,31 @@ broadcast(PG_FUNCTION_ARGS)
246249
PQerrorMessage(con));
247250
goto cleanup;
248251
}
252+
/* Build the actual sql to send, mem freed with ctxt */
253+
initStringInfo(&fin_sql);
249254
if (!sync_commit_on)
250-
{
251-
/* mem freed with context */
252-
if (two_phase)
253-
{
254-
sql = psprintf("SET SESSION synchronous_commit TO local; BEGIN; %s; PREPARE TRANSACTION 'shardlord';", sql);
255-
}
256-
else
257-
{
258-
sql = psprintf("SET SESSION synchronous_commit TO local; %s", sql);
259-
}
260-
}
261-
elog(DEBUG1, "Sending command '%s' to node %d", sql, node_id);
262-
if (!PQsendQuery(con, sql)
255+
appendStringInfoString(&fin_sql, "SET SESSION synchronous_commit TO local; ");
256+
if (iso_level)
257+
appendStringInfo(&fin_sql, "BEGIN TRANSACTION ISOLATION LEVEL %s; ", iso_level);
258+
appendStringInfoString(&fin_sql, sql);
259+
appendStringInfoChar(&fin_sql, ';'); /* it was removed after strchr */
260+
if (two_phase)
261+
appendStringInfoString(&fin_sql, "PREPARE TRANSACTION 'shardlord';");
262+
else if (iso_level)
263+
appendStringInfoString(&fin_sql, "END;");
264+
265+
elog(DEBUG1, "Sending command '%s' to node %d", fin_sql.data, node_id);
266+
if (!PQsendQuery(con, fin_sql.data)
263267
|| (sequential && !wait_command_completion(con)))
264268
{
265269
if (ignore_errors)
266270
{
267271
errstr = psprintf("%s<error>%d:Failed to send query '%s': %s</error>",
268-
errstr, node_id, sql, PQerrorMessage(con));
272+
errstr, node_id, fin_sql.data, PQerrorMessage(con));
269273
chan[n_cmds-1].sql = NULL;
270274
continue;
271275
}
272-
errstr = psprintf("Failed to send query '%s' to node %d: %s'", sql,
276+
errstr = psprintf("Failed to send query '%s' to node %d: %s'", fin_sql.data,
273277
node_id, PQerrorMessage(con));
274278
goto cleanup;
275279
}

0 commit comments

Comments
 (0)