Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 833bddd

Browse files
committed
Enabling/disabling synchronous replication.
But now we need to rewrite things, because enabling has to be performed in separate transaction to avoid hangup. Also, get_node_id renamed to my_id. Also, I have found func for text -> cstring conversion, yeah.
1 parent eecaf8d commit 833bddd

File tree

5 files changed

+194
-28
lines changed

5 files changed

+194
-28
lines changed

init.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ BEGIN
141141
raise INFO 'Booting master';
142142
PERFORM shardman.create_meta_pub();
143143

144-
master_id := shardman.get_node_id();
144+
master_id := shardman.my_id();
145145
IF master_id IS NULL THEN
146146
SELECT pg_settings.setting into master_connstring from pg_settings
147147
WHERE NAME = 'shardman.master_connstring';

membership.sql

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ CREATE TABLE nodes (
3131
-- of transaction. Later we should delete subscriptions fully.
3232
CREATE FUNCTION rm_node_worker_side() RETURNS TRIGGER AS $$
3333
BEGIN
34-
IF OLD.id = (SELECT shardman.get_node_id()) THEN
34+
IF OLD.id = (SELECT shardman.my_id()) THEN
3535
RAISE DEBUG '[SHARDMAN] rm_node_worker_side called';
3636
PERFORM shardman.pg_shardman_cleanup(false);
3737
END IF;
@@ -53,7 +53,7 @@ CREATE TABLE local_meta (
5353
INSERT INTO shardman.local_meta VALUES ('node_id', NULL);
5454

5555
-- Get local node id. NULL means node is not in the cluster yet.
56-
CREATE FUNCTION get_node_id() RETURNS int AS $$
56+
CREATE FUNCTION my_id() RETURNS int AS $$
5757
SELECT v::int FROM shardman.local_meta WHERE k = 'node_id';
5858
$$ LANGUAGE sql;
5959

@@ -95,9 +95,20 @@ BEGIN
9595
END
9696
$$ LANGUAGE plpgsql;
9797

98+
-- Get local node connstr. Throws an error, if node is not in cluster
99+
CREATE FUNCTION my_connstr() RETURNS text AS $$
100+
DECLARE
101+
connstr text := connstring FROM shardman.nodes WHERE id = shardman.my_id();
102+
BEGIN
103+
IF connstr IS NULL THEN
104+
RAISE EXCEPTION 'Node not in cluster, can''t get its connstring';
105+
END IF;
106+
RETURN connstr;
107+
END $$ LANGUAGE plpgsql;
108+
98109
-- Get connstr of worker node with id node_id. ERROR is raised if there isn't
99110
-- one.
100-
CREATE OR REPLACE FUNCTION get_worker_node_connstr(node_id int) RETURNS text as $$
111+
CREATE FUNCTION get_worker_node_connstr(node_id int) RETURNS text AS $$
101112
DECLARE
102113
connstr text := connstring FROM shardman.nodes WHERE id = node_id AND worker;
103114
BEGIN

shard.sql

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ DECLARE
2929
partition_names text[];
3030
pname text;
3131
BEGIN
32-
IF NEW.initial_node != (SELECT shardman.get_node_id()) THEN
32+
IF NEW.initial_node != (SELECT shardman.my_id()) THEN
3333
EXECUTE format('DROP TABLE IF EXISTS %I CASCADE;', NEW.relation);
3434
partition_names :=
3535
(SELECT ARRAY(SELECT part_name FROM shardman.gen_part_names(
@@ -94,8 +94,8 @@ CREATE TABLE partitions (
9494
CREATE FUNCTION new_primary() RETURNS TRIGGER AS $$
9595
BEGIN
9696
RAISE DEBUG '[SHARDMAN] new_primary trigger called on node % for part %, owner %',
97-
shardman.get_node_id(), NEW.part_name, NEW.owner;
98-
IF NEW.owner != shardman.get_node_id() THEN
97+
shardman.my_id(), NEW.part_name, NEW.owner;
98+
IF NEW.owner != shardman.my_id() THEN
9999
PERFORM shardman.replace_usual_part_with_foreign(NEW);
100100
END IF;
101101
RETURN NULL;
@@ -110,7 +110,7 @@ ALTER TABLE shardman.partitions ENABLE REPLICA TRIGGER new_primary;
110110
CREATE FUNCTION part_moved_prev(p_name name, src int, dst int)
111111
RETURNS void AS $$
112112
DECLARE
113-
me int := shardman.get_node_id();
113+
me int := shardman.my_id();
114114
lname text := shardman.get_data_lname(p_name, me, dst);
115115
BEGIN
116116
PERFORM shardman.create_repslot(lname);
@@ -132,7 +132,7 @@ DECLARE
132132
prev_lname text;
133133
prev_connstr text;
134134
BEGIN
135-
ASSERT dst = shardman.get_node_id(), 'part_moved_dst must be called on dst';
135+
ASSERT dst = shardman.my_id(), 'part_moved_dst must be called on dst';
136136
IF next_rep IS NOT NULL THEN -- we need to setup channel dst -> next replica
137137
next_lname := shardman.get_data_lname(p_name, dst, next_rep);
138138
-- This must be first write in the transaction!
@@ -159,7 +159,7 @@ END $$ LANGUAGE plpgsql STRICT;
159159
CREATE FUNCTION part_moved_next(p_name name, src int, dst int)
160160
RETURNS void AS $$
161161
DECLARE
162-
me int := shardman.get_node_id();
162+
me int := shardman.my_id();
163163
lname text := shardman.get_data_lname(p_name, dst, me);
164164
dst_connstr text := shardman.get_worker_node_connstr(dst);
165165
BEGIN
@@ -176,7 +176,7 @@ END $$ LANGUAGE plpgsql STRICT;
176176
CREATE FUNCTION part_moved() RETURNS TRIGGER AS $$
177177
DECLARE
178178
cp_logname text := shardman.get_cp_logname(NEW.part_name, OLD.owner, NEW.owner);
179-
me int := shardman.get_node_id();
179+
me int := shardman.my_id();
180180
prev_src_lname text;
181181
src_next_lname text;
182182
BEGIN
@@ -297,7 +297,7 @@ END $$ LANGUAGE plpgsql;
297297
-- CREATE FUNCTION replica_created_update_fdw() RETURNS TRIGGER AS $$
298298
-- BEGIN
299299
-- RAISE DEBUG '[SHARDMAN] replica_created_update_fdw trigger called';
300-
-- IF shardman.get_node_id() != NEW.prv THEN -- don't update on oldtail node
300+
-- IF shardman.my_id() != NEW.prv THEN -- don't update on oldtail node
301301
-- PERFORM shardman.update_fdw_server(NEW);
302302
-- END IF;
303303
-- RETURN NULL;
@@ -372,11 +372,11 @@ DECLARE
372372
connstring text;
373373
server_opts text;
374374
um_opts text;
375-
me int := shardman.get_node_id();
375+
me int := shardman.my_id();
376376
my_part shardman.partitions;
377377
BEGIN
378378
RAISE DEBUG '[SHARDMAN %] update_fdw_server called for part %, owner %',
379-
shardman.get_node_id(), part.part_name, part.owner;
379+
shardman.my_id(), part.part_name, part.owner;
380380

381381
SELECT * FROM shardman.partitions WHERE part_name = part.part_name AND
382382
owner = me INTO my_part;
@@ -451,7 +451,7 @@ BEGIN
451451
-- change reached us? We might also use it with local table (create
452452
-- foreign server pointing to it, etc), but that's just ugly.
453453
RAISE DEBUG '[SHARDMAN] my id: %, creating ft %',
454-
shardman.get_node_id(), part.part_name;
454+
shardman.my_id(), part.part_name;
455455
EXECUTE format('CREATE FOREIGN TABLE %I %s SERVER %I OPTIONS (table_name %L)',
456456
fdw_part_name,
457457
(SELECT
@@ -628,7 +628,7 @@ BEGIN
628628
END $$ LANGUAGE plpgsql STRICT;
629629

630630
CREATE FUNCTION gen_create_table_sql(relation text, connstring text) RETURNS text
631-
AS 'pg_shardman' LANGUAGE C;
631+
AS 'pg_shardman' LANGUAGE C STRICT;
632632

633633
CREATE FUNCTION reconstruct_table_attrs(relation regclass)
634634
RETURNS text AS 'pg_shardman' LANGUAGE C STRICT;
@@ -670,13 +670,32 @@ END $$ LANGUAGE plpgsql STRICT;
670670
-- Make sure that standby_name is present in synchronous_standby_names. If not,
671671
-- add it via ALTER SYSTEM and SIGHUP postmaster to reread conf.
672672
CREATE FUNCTION ensure_sync_standby(standby text) RETURNS void as $$
673+
DECLARE
674+
newval text := shardman.ensure_sync_standby_c(standby);
673675
BEGIN
674-
RAISE DEBUG '[SHARDMAN] imagine standby % added', standby;
676+
IF newval IS NOT NULL THEN
677+
RAISE DEBUG '[SHARDMAN] Adding standby %, new value is %', standby, newval;
678+
PERFORM shardman.set_sync_standbys(newval);
679+
PERFORM pg_reload_conf();
680+
END IF;
675681
END $$ LANGUAGE plpgsql STRICT;
682+
CREATE FUNCTION ensure_sync_standby_c(standby text) RETURNS text
683+
AS 'pg_shardman' LANGUAGE C;
676684

677685
-- Remove 'standby' from in synchronous_standby_names, if it is there, and SIGHUP
678686
-- postmaster.
679687
CREATE FUNCTION remove_sync_standby(standby text) RETURNS void as $$
688+
DECLARE
689+
newval text := shardman.remove_sync_standby_c(standby);
680690
BEGIN
681-
RAISE DEBUG '[SHARDMAN] imagine standby % removed', standby;
691+
IF newval IS NOT NULL THEN
692+
RAISE DEBUG '[SHARDMAN] Removing standby %, new value is %', standby, newval;
693+
PERFORM shardman.set_sync_standbys(newval);
694+
PERFORM pg_reload_conf();
695+
END IF;
682696
END $$ LANGUAGE plpgsql STRICT;
697+
CREATE FUNCTION remove_sync_standby_c(standby text) RETURNS text
698+
AS 'pg_shardman' LANGUAGE C;
699+
700+
CREATE FUNCTION set_sync_standbys(standby text) RETURNS void
701+
AS 'pg_shardman' LANGUAGE C;

src/pg_shardman.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ add_node(Cmd *cmd)
504504
{
505505
/* extension is installed, so we have to check whether this node
506506
* is already in the cluster */
507-
res = PQexec(conn, "select shardman.get_node_id();");
507+
res = PQexec(conn, "select shardman.my_id();");
508508
if (PQresultStatus(res) != PGRES_TUPLES_OK)
509509
{
510510
shmn_elog(NOTICE, "Failed to get node id, %s", PQerrorMessage(conn));

src/udf.c

Lines changed: 145 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "catalog/pg_type.h"
1919
#include "storage/lmgr.h"
2020
#include "replication/logicalworker.h"
21+
#include "replication/syncrep.h"
2122
#include "libpq-fe.h"
2223

2324
#include "pg_shardman.h"
@@ -138,8 +139,6 @@ reconstruct_table_attrs(PG_FUNCTION_ARGS)
138139
Relation local_rel = heap_open(relid, AccessExclusiveLock);
139140
TupleDesc local_descr = RelationGetDescr(local_rel);
140141
int i;
141-
text *text_query;
142-
int32 text_query_size;
143142

144143
initStringInfo(&query);
145144
appendStringInfoChar(&query, '(');
@@ -165,15 +164,9 @@ reconstruct_table_attrs(PG_FUNCTION_ARGS)
165164

166165
appendStringInfoChar(&query, ')');
167166

168-
/* Now prepare result as 'text' */
169-
text_query_size = VARHDRSZ + query.len;
170-
text_query = (text *) palloc(text_query_size);
171-
SET_VARSIZE(text_query, text_query_size);
172-
memcpy(VARDATA(text_query), query.data, query.len);
173-
174167
/* Let xact unlock this */
175168
heap_close(local_rel, NoLock);
176-
PG_RETURN_TEXT_P(text_query);
169+
PG_RETURN_TEXT_P(cstring_to_text(query.data));
177170
}
178171

179172
/*
@@ -297,9 +290,152 @@ reset_node_id_c(PG_FUNCTION_ARGS)
297290
PG_RETURN_VOID();
298291
}
299292

293+
/* Are we a logical apply worker? */
300294
PG_FUNCTION_INFO_V1(inside_apply_worker);
301295
Datum
302296
inside_apply_worker(PG_FUNCTION_ARGS)
303297
{
304298
PG_RETURN_BOOL(IsLogicalWorker());
305299
}
300+
301+
/*
302+
* Check whether 'standby' is present in current value of
303+
* synchronous_standby_names. If yes, return NULL. Otherwise, form properly
304+
* quoted new value of the setting with 'standby' appended. Currently we
305+
* support only the case when *all* standbys must agree on commit, so FIRST or
306+
* ANY doesn't matter here. '*' wildcard is not supported too.
307+
*/
308+
PG_FUNCTION_INFO_V1(ensure_sync_standby_c);
309+
Datum
310+
ensure_sync_standby_c(PG_FUNCTION_ARGS)
311+
{
312+
char *standby = text_to_cstring(PG_GETARG_TEXT_PP(0));
313+
char *cur_standby_name;
314+
StringInfoData standby_list;
315+
char *newval;
316+
int processed = 0;
317+
318+
initStringInfo(&standby_list);
319+
if (SyncRepConfig != NULL)
320+
{
321+
Assert(SyncRepConfig->num_sync == SyncRepConfig->nmembers);
322+
323+
cur_standby_name = SyncRepConfig->member_names;
324+
for (; processed < SyncRepConfig->nmembers; processed++)
325+
{
326+
Assert(strcmp(cur_standby_name, "*") != 0);
327+
if (pg_strcasecmp(cur_standby_name, standby) == 0)
328+
{
329+
PG_RETURN_NULL(); /* already set */
330+
}
331+
332+
if (processed != 0)
333+
appendStringInfoString(&standby_list, ", ");
334+
appendStringInfoString(&standby_list, quote_identifier(cur_standby_name));
335+
336+
cur_standby_name += strlen(cur_standby_name) + 1;
337+
}
338+
}
339+
340+
if (processed != 0)
341+
appendStringInfoString(&standby_list, ", ");
342+
appendStringInfoString(&standby_list, quote_identifier(standby));
343+
344+
newval = psprintf("FIRST %d (%s)", processed + 1, standby_list.data);
345+
PG_RETURN_TEXT_P(cstring_to_text(newval));
346+
}
347+
348+
/*
349+
* Check whether 'standby' is present in current value of
350+
* synchronous_standby_names. If no, return NULL. Otherwise, form properly
351+
* quoted new value of the setting with 'standby' removed. Currently we
352+
* support only the case when *all* standbys must agree on commit, so FIRST or
353+
* ANY doesn't matter here. '*' wildcard is not supported too.
354+
* All entries are removed.
355+
*/
356+
PG_FUNCTION_INFO_V1(remove_sync_standby_c);
357+
Datum
358+
remove_sync_standby_c(PG_FUNCTION_ARGS)
359+
{
360+
char *standby = text_to_cstring(PG_GETARG_TEXT_PP(0));
361+
char *cur_standby_name;
362+
StringInfoData standby_list;
363+
char *newval;
364+
int processed;
365+
int num_sync;
366+
367+
if (SyncRepConfig == NULL)
368+
PG_RETURN_NULL();
369+
370+
initStringInfo(&standby_list);
371+
Assert(SyncRepConfig->num_sync == SyncRepConfig->nmembers);
372+
373+
cur_standby_name = SyncRepConfig->member_names;
374+
num_sync = 0;
375+
for (processed = 0; processed < SyncRepConfig->nmembers; processed++)
376+
{
377+
Assert(strcmp(cur_standby_name, "*") != 0);
378+
if (pg_strcasecmp(cur_standby_name, standby) != 0)
379+
{
380+
if (num_sync != 0)
381+
appendStringInfoString(&standby_list, ", ");
382+
appendStringInfoString(&standby_list, quote_identifier(cur_standby_name));
383+
num_sync++;
384+
}
385+
386+
cur_standby_name += strlen(cur_standby_name) + 1;
387+
}
388+
389+
if (SyncRepConfig->num_sync == num_sync)
390+
PG_RETURN_NULL(); /* nothing was removed */
391+
392+
if (num_sync > 0)
393+
newval = psprintf("FIRST %d (%s)", num_sync, standby_list.data);
394+
else
395+
newval = "";
396+
PG_RETURN_TEXT_P(cstring_to_text(newval));
397+
}
398+
399+
/*
400+
* Execute ALTER SYSTEM SET synchronous_standby_names TO 'arg'. We can't do
401+
* that from usual function, because ALTER SYSTEM cannon be executed within
402+
* transaction, so we resort to another exquisite hack: we connect to
403+
* ourselves via libpq and perform the job.
404+
*/
405+
PG_FUNCTION_INFO_V1(set_sync_standbys);
406+
Datum
407+
set_sync_standbys(PG_FUNCTION_ARGS)
408+
{
409+
char *standbys = quote_literal_cstr(text_to_cstring(PG_GETARG_TEXT_PP(0)));
410+
char *cmd = psprintf("alter system set synchronous_standby_names to %s",
411+
standbys);
412+
char *my_id_sql = "select shardman.my_connstr();";
413+
char *connstr;
414+
PGconn *conn = NULL;
415+
PGresult *res = NULL;
416+
417+
SPI_connect();
418+
if (SPI_execute(my_id_sql, true, 0) < 0 || SPI_processed != 1)
419+
elog(FATAL, "Stmt failed: %s", my_id_sql);
420+
connstr = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
421+
422+
conn = PQconnectdb(connstr);
423+
if (PQstatus(conn) != CONNECTION_OK)
424+
{
425+
PQfinish(conn);
426+
elog(ERROR, "Connection to myself with connstr %s failed", connstr);
427+
428+
}
429+
res = PQexec(conn, cmd);
430+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
431+
{
432+
PQclear(res);
433+
PQfinish(conn);
434+
elog(ERROR, "setting sync standby namesfailed");
435+
}
436+
437+
PQclear(res);
438+
PQfinish(conn);
439+
SPI_finish(); /* Can't do earlier since connstr is allocated there */
440+
PG_RETURN_VOID();
441+
}

0 commit comments

Comments
 (0)