Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4d3ccd3

Browse files
committed
Make it possible to specify redundancy in create_hash_partition
1 parent dd15e6e commit 4d3ccd3

File tree

2 files changed

+87
-27
lines changed

2 files changed

+87
-27
lines changed

pg_shardman--1.0.sql

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -115,14 +115,15 @@ BEGIN
115115
THEN
116116
-- Take all nodes in replicationg group excluding myself
117117
FOR node IN SELECT * FROM shardman.nodes WHERE replication_group = repl_group LOOP
118-
sync_standbys :=
119-
coalesce(ARRAY(SELECT format('sub_%s_%s', id, node.id) FROM shardman.nodes
118+
-- Construct list of synchronous standbyes=subscriptions to this node
119+
sync_standbys :=
120+
coalesce(ARRAY(SELECT format('sub_%s_%s', id, node.id) FROM shardman.nodes
120121
WHERE replication_group = repl_group AND id <> node.id), '{}'::text[]);
121-
sync := format('%s%s:ALTER SYSTEM SET synchronous_standby_names to ''FIRST %s (%s)'';',
122-
sync, node.id, array_length(sync_standbys, 1),
123-
array_to_string(sync_standbys, ','));
124-
conf := format('%s%s:SELECT pg_reload_conf();', conf, node.id);
125-
END LOOP;
122+
sync := format('%s%s:ALTER SYSTEM SET synchronous_standby_names to ''FIRST %s (%s)'';',
123+
sync, node.id, array_length(sync_standbys, 1),
124+
array_to_string(sync_standbys, ','));
125+
conf := format('%s%s:SELECT pg_reload_conf();', conf, node.id);
126+
END LOOP;
126127

127128
PERFORM shardman.broadcast(sync, sync_commit => true);
128129
PERFORM shardman.broadcast(conf);
@@ -299,7 +300,7 @@ BEGIN
299300
pubs := format('%s%s:ALTER PUBLICATION node_%s ADD TABLE %I;',
300301
pubs, new_master_id, repl.node_id, part.part_name);
301302
-- And refresh subscriptions and replicas
302-
subs := format('%s%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION;',
303+
subs := format('%s%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION WITH (copy_data=false);',
303304
subs, repl.node_id, repl.node_id, new_master_id);
304305
END LOOP;
305306

@@ -343,7 +344,7 @@ $$ LANGUAGE plpgsql;
343344
-- It also scatter partitions through all nodes.
344345
-- This function expects that empty table is created at shardlord.
345346
-- So it can be executed only at shardlord and there is no need to redirect this function to shardlord.
346-
CREATE FUNCTION create_hash_partitions(rel regclass, expr text, part_count int)
347+
CREATE FUNCTION create_hash_partitions(rel regclass, expr text, part_count int, redundancy int = 0)
347348
RETURNS void AS $$
348349
DECLARE
349350
create_table text;
@@ -362,7 +363,7 @@ DECLARE
362363
i int;
363364
n_nodes int;
364365
BEGIN
365-
IF EXISTS(SELECT relation FROM shardman.partitions WHERE relation = rel_name)
366+
IF EXISTS(SELECT relation FROM shardman.tables WHERE relation = rel_name)
366367
THEN
367368
RAISE EXCEPTION 'Table % is already sharded', rel_name;
368369
END IF;
@@ -419,13 +420,18 @@ BEGIN
419420
PERFORM shardman.broadcast(create_fdws);
420421
-- Broadcast replace hash partition commands
421422
PERFORM shardman.broadcast(replace_parts);
423+
424+
IF redundancy <> 0
425+
THEN
426+
PERFORM shardman.set_redundancy(rel, redundancy, copy_data => false);
427+
END IF;
422428
END
423429
$$ LANGUAGE plpgsql;
424430

425431
-- Provide requested level of redundancy. 0 means no redundancy.
426432
-- If existing level of redundancy is greater than specified, then right now this
427433
-- function does nothing.
428-
CREATE FUNCTION set_redundancy(rel regclass, redundancy int)
434+
CREATE FUNCTION set_redundancy(rel regclass, redundancy int, copy_data bool = true)
429435
RETURNS void AS $$
430436
DECLARE
431437
part shardman.partitions;
@@ -435,12 +441,17 @@ DECLARE
435441
pubs text = '';
436442
subs text = '';
437443
rel_name text = rel::text;
444+
sub_options text = '';
438445
BEGIN
439446
IF shardman.redirect_to_shardlord(format('set_redundancy(%L, %L)', rel_name, redundancy))
440447
THEN
441448
RETURN;
442449
END IF;
443450

451+
IF NOT copy_data THEN
452+
sub_options := ' WITH (copy_data=false)';
453+
END IF;
454+
444455
-- Loop through all partitions of this table
445456
FOR part IN SELECT * from shardman.partitions where relation=rel_name
446457
LOOP
@@ -460,16 +471,16 @@ BEGIN
460471
-- Establish publications and subscriptions for this partition
461472
pubs := format('%s%s:ALTER PUBLICATION node_%s ADD TABLE %I;',
462473
pubs, part.node_id, repl_node, part.part_name);
463-
subs := format('%s%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION;',
464-
subs, repl_node, repl_node, part.node_id);
474+
subs := format('%s%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION%s;',
475+
subs, repl_node, repl_node, part.node_id, sub_options);
465476
END LOOP;
466477
END IF;
467478
END LOOP;
468479

469480
-- Broadcast alter publication commands
470481
PERFORM shardman.broadcast(pubs);
471482
-- Broadcast alter subscription commands
472-
PERFORM shardman.broadcast(subs);
483+
PERFORM shardman.broadcast(subs, synchronous => copy_data);
473484

474485
-- This function doesn't wait completion of replication sync
475486
END
@@ -585,8 +596,8 @@ BEGIN
585596
%s:ALTER PUBLICATION node_%s ADD TABLE %I;',
586597
pubs, src_node_id, repl_node_id, mv_part_name,
587598
dst_node_id, repl_node_id, mv_part_name);
588-
subs := format('%s%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION;
589-
%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION;',
599+
subs := format('%s%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION WITH (copy_data=false);
600+
%s:ALTER SUBSCRIPTION sub_%s_%s REFRESH PUBLICATION WITH (copy_data=false);',
590601
subs, repl_node_id, repl_node_id, src_node_id,
591602
repl_node_id, repl_node_id, dst_node_id);
592603
END LOOP;
@@ -691,8 +702,11 @@ RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
691702
-- prepared with subsequent commit or rollback of prepared transaction at second
692703
-- phase of two phase commit.
693704
-- If sync_commit is false, we do set session synchronous_commit to local;
694-
CREATE FUNCTION broadcast(cmds text, ignore_errors bool = false,
695-
two_phase bool = false, sync_commit bool = false)
705+
CREATE FUNCTION broadcast(cmds text,
706+
ignore_errors bool = false,
707+
two_phase bool = false,
708+
sync_commit bool = false,
709+
synchronous bool = false)
696710
RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
697711

698712
-- Options to postgres_fdw are specified in two places: user & password in user

pg_shardman.c

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111
#include "miscadmin.h"
1212
#include "executor/spi.h"
1313
#include "funcapi.h"
14+
#include "pgstat.h"
1415
#include "utils/guc.h"
1516
#include "utils/rel.h"
1617
#include "utils/builtins.h"
1718
#include "utils/lsyscache.h"
1819
#include "catalog/pg_type.h"
1920
#include "access/htup_details.h"
21+
#include "storage/latch.h"
2022

2123
/* ensure that extension won't load against incompatible version of Postgres */
2224
PG_MODULE_MAGIC;
@@ -28,6 +30,7 @@ PG_FUNCTION_INFO_V1(reconstruct_table_attrs);
2830
PG_FUNCTION_INFO_V1(pq_conninfo_parse);
2931

3032
/* GUC variables */
33+
static bool is_shardlord;
3134
static bool sync_replication;
3235
static char *shardlord_connstring;
3336

@@ -49,6 +52,16 @@ _PG_init()
4952
0,
5053
NULL, NULL, NULL);
5154

55+
DefineCustomBoolVariable(
56+
"shardman.shardlord",
57+
"This node is the shardlord?",
58+
NULL,
59+
&is_shardlord,
60+
false,
61+
PGC_SUSET,
62+
0,
63+
NULL, NULL, NULL);
64+
5265
DefineCustomStringVariable(
5366
"shardman.shardlord_connstring",
5467
"Active only if shardman.shardlord is on. Connstring to reach shardlord from"
@@ -73,13 +86,38 @@ synchronous_replication(PG_FUNCTION_ARGS)
7386
PG_RETURN_BOOL(sync_replication);
7487
}
7588

89+
static bool
90+
wait_command_completion(PGconn* conn)
91+
{
92+
while (PQisBusy(conn))
93+
{
94+
/* Sleep until there's something to do */
95+
int wc = WaitLatchOrSocket(MyLatch,
96+
WL_LATCH_SET | WL_SOCKET_READABLE,
97+
PQsocket(conn),
98+
-1L, PG_WAIT_EXTENSION);
99+
ResetLatch(MyLatch);
100+
101+
CHECK_FOR_INTERRUPTS();
102+
103+
/* Data available in socket? */
104+
if (wc & WL_SOCKET_READABLE)
105+
{
106+
if (!PQconsumeInput(conn))
107+
return false;
108+
}
109+
}
110+
return true;
111+
}
112+
76113
Datum
77114
broadcast(PG_FUNCTION_ARGS)
78115
{
79116
char* sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
80117
bool ignore_errors = PG_GETARG_BOOL(1);
81118
bool two_phase = PG_GETARG_BOOL(2);
82119
bool sync_commit = PG_GETARG_BOOL(3);
120+
bool sequential = PG_GETARG_BOOL(4);
83121
char* sep;
84122
PGresult *res;
85123
char* fetch_node_connstr;
@@ -113,16 +151,22 @@ broadcast(PG_FUNCTION_ARGS)
113151
elog(ERROR, "SHARDMAN: Invalid command string: %s", sql);
114152
}
115153
sql += n;
116-
fetch_node_connstr = psprintf("select connection_string from shardman.nodes where id=%d", node_id);
117-
if (SPI_exec(fetch_node_connstr, 0) < 0 || SPI_processed != 1)
154+
if (node_id != 0)
118155
{
119-
elog(ERROR, "SHARDMAN: Failed to fetch connection string for node %d",
120-
node_id);
121-
}
122-
pfree(fetch_node_connstr);
123-
124-
conn_str = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
156+
fetch_node_connstr = psprintf("select connection_string from shardman.nodes where id=%d", node_id);
157+
if (SPI_exec(fetch_node_connstr, 0) < 0 || SPI_processed != 1)
158+
{
159+
elog(ERROR, "SHARDMAN: Failed to fetch connection string for node %d",
160+
node_id);
161+
}
162+
pfree(fetch_node_connstr);
125163

164+
conn_str = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
165+
}
166+
else
167+
{
168+
conn_str = shardlord_connstring;
169+
}
126170
if (n_cmds >= n_cons)
127171
{
128172
conn = (PGconn**) repalloc(conn, sizeof(PGconn*) * (n_cons *= 2));
@@ -154,7 +198,8 @@ broadcast(PG_FUNCTION_ARGS)
154198
}
155199
}
156200
elog(DEBUG1, "Send command '%s' to node %d", sql, node_id);
157-
if (!PQsendQuery(conn[n_cmds - 1], sql))
201+
if (!PQsendQuery(conn[n_cmds - 1], sql)
202+
|| (sequential && !wait_command_completion(conn[n_cmds - 1])))
158203
{
159204
if (ignore_errors)
160205
{
@@ -169,6 +214,7 @@ broadcast(PG_FUNCTION_ARGS)
169214
}
170215
if (!sync_commit)
171216
pfree(sql);
217+
172218
sql = sep + 1;
173219
}
174220

0 commit comments

Comments
 (0)