|
| 1 | +/* ------------------------------------------------------------------------ |
| 2 | + * |
| 3 | + * init.sql |
| 4 | + * Commands infrastructure, interface funcs, common utility funcs. |
| 5 | + * |
| 6 | + * ------------------------------------------------------------------------ |
| 7 | + */ |
| 8 | + |
| 9 | +-- complain if script is sourced in psql, rather than via CREATE EXTENSION |
| 10 | +\echo Use "CREATE EXTENSION pg_shardman" to load this file. \quit |
| 11 | + |
| 12 | +-- Functions here use some gucs defined in .so, so we have to ensure that the |
| 13 | +-- library is actually loaded. |
| 14 | +DO $$ |
| 15 | +BEGIN |
| 16 | +-- Yes, malicious user might have another extension containing 'pg_shardman'... |
| 17 | +-- Probably better just call no-op func from the library |
| 18 | + IF strpos(current_setting('shared_preload_libraries'), 'pg_shardman') = 0 THEN |
| 19 | + RAISE EXCEPTION 'pg_shardman must be loaded via shared_preload_libraries. Refusing to |
| 20 | + proceed.'; |
| 21 | + END IF; |
| 22 | +END |
| 23 | +$$; |
| 24 | + |
| 25 | +-- available commands |
| 26 | +CREATE TYPE cmd AS ENUM ('add_node', 'rm_node', 'create_hash_partitions', |
| 27 | + 'move_primary'); |
| 28 | +-- command status |
| 29 | +CREATE TYPE cmd_status AS ENUM ('waiting', 'canceled', 'failed', 'in progress', |
| 30 | + 'success'); |
| 31 | + |
| 32 | +CREATE TABLE cmd_log ( |
| 33 | + id bigserial PRIMARY KEY, |
| 34 | + cmd_type cmd NOT NULL, |
| 35 | + status cmd_status DEFAULT 'waiting' NOT NULL |
| 36 | +); |
| 37 | + |
| 38 | +-- Notify shardman master bgw about new commands |
| 39 | +CREATE FUNCTION notify_shardmaster() RETURNS trigger AS $$ |
| 40 | +BEGIN |
| 41 | + NOTIFY shardman_cmd_log_update; |
| 42 | + RETURN NULL; |
| 43 | +END |
| 44 | +$$ LANGUAGE plpgsql; |
| 45 | +CREATE TRIGGER cmd_log_inserts |
| 46 | + AFTER INSERT ON cmd_log |
| 47 | + FOR EACH STATEMENT EXECUTE PROCEDURE notify_shardmaster(); |
| 48 | + |
| 49 | +-- probably better to keep opts in an array field, but working with arrays from |
| 50 | +-- libpq is not very handy |
| 51 | +-- opts must be inserted sequentially, we order by them by id |
| 52 | +CREATE TABLE cmd_opts ( |
| 53 | + id bigserial PRIMARY KEY, |
| 54 | + cmd_id bigint REFERENCES cmd_log(id), |
| 55 | + opt text NOT NULL |
| 56 | +); |
| 57 | + |
| 58 | +-- Interface functions |
| 59 | + |
| 60 | +-- Add a node. Its state will be reset, all shardman data lost. |
| 61 | +CREATE FUNCTION add_node(connstring text) RETURNS void AS $$ |
| 62 | +DECLARE |
| 63 | + c_id int; |
| 64 | +BEGIN |
| 65 | + INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'add_node') |
| 66 | + RETURNING id INTO c_id; |
| 67 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, connstring); |
| 68 | +END |
| 69 | +$$ LANGUAGE plpgsql; |
| 70 | + |
| 71 | +-- Remove node. Its state will be reset, all shardman data lost. |
| 72 | +CREATE FUNCTION rm_node(node_id int) RETURNS void AS $$ |
| 73 | +DECLARE |
| 74 | + c_id int; |
| 75 | +BEGIN |
| 76 | + INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'rm_node') |
| 77 | + RETURNING id INTO c_id; |
| 78 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, node_id); |
| 79 | +END |
| 80 | +$$ LANGUAGE plpgsql; |
| 81 | + |
| 82 | +-- Shard table with hash partitions. Params as in pathman, except for relation |
| 83 | +-- (master doesn't know oid of the table) |
| 84 | +CREATE FUNCTION create_hash_partitions( |
| 85 | + node_id int, expr text, relation text, partitions_count int) |
| 86 | + RETURNS void AS $$ |
| 87 | +DECLARE |
| 88 | + c_id int; |
| 89 | +BEGIN |
| 90 | + INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'create_hash_partitions') |
| 91 | + RETURNING id INTO c_id; |
| 92 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, node_id); |
| 93 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, expr); |
| 94 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, relation); |
| 95 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, partitions_count); |
| 96 | +END |
| 97 | +$$ LANGUAGE plpgsql; |
| 98 | + |
| 99 | +-- Move master partition to another node. Params: |
| 100 | +-- 'part_name' is name of the partition to move |
| 101 | +-- 'dest' is id of the destination node |
| 102 | +CREATE FUNCTION move_primary(part_name text, dest int) RETURNS int AS $$ |
| 103 | +DECLARE |
| 104 | + c_id int; |
| 105 | +BEGIN |
| 106 | + INSERT INTO @extschema@.cmd_log VALUES (DEFAULT, 'move_primary') |
| 107 | + RETURNING id INTO c_id; |
| 108 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, part_name); |
| 109 | + INSERT INTO @extschema@.cmd_opts VALUES (DEFAULT, c_id, dest); |
| 110 | + RETURN c_id; |
| 111 | +END $$ LANGUAGE plpgsql; |
| 112 | + |
| 113 | +-- Internal functions |
| 114 | + |
| 115 | +-- Called on shardmaster bgw start. Add itself to nodes table, set id, create |
| 116 | +-- publication. |
| 117 | +CREATE FUNCTION master_boot() RETURNS void AS $$ |
| 118 | +DECLARE |
| 119 | + -- If we have never booted as a master before, we have a work to do |
| 120 | + init_master bool DEFAULT false; |
| 121 | + master_connstring text; |
| 122 | + master_id int; |
| 123 | +BEGIN |
| 124 | + raise INFO 'Booting master'; |
| 125 | + PERFORM shardman.create_meta_pub(); |
| 126 | + |
| 127 | + master_id := shardman.get_node_id(); |
| 128 | + IF master_id IS NULL THEN |
| 129 | + SELECT pg_settings.setting into master_connstring from pg_settings |
| 130 | + WHERE NAME = 'shardman.master_connstring'; |
| 131 | + EXECUTE format( |
| 132 | + 'INSERT INTO @extschema@.nodes VALUES (DEFAULT, %L, NULL, false, true) |
| 133 | + RETURNING id', master_connstring) INTO master_id; |
| 134 | + PERFORM shardman.set_node_id(master_id); |
| 135 | + init_master := true; |
| 136 | + ELSE |
| 137 | + EXECUTE 'SELECT NOT (SELECT master FROM shardman.nodes WHERE id = $1)' |
| 138 | + INTO init_master USING master_id; |
| 139 | + EXECUTE 'UPDATE shardman.nodes SET master = true WHERE id = $1' USING master_id; |
| 140 | + END IF; |
| 141 | + IF init_master THEN |
| 142 | + -- TODO: set up lr channels |
| 143 | + END IF; |
| 144 | +END $$ LANGUAGE plpgsql; |
| 145 | + |
| 146 | +-- These tables will be replicated to worker nodes, notifying them about changes. |
| 147 | +-- Called on master. |
| 148 | +CREATE FUNCTION create_meta_pub() RETURNS void AS $$ |
| 149 | +BEGIN |
| 150 | + IF NOT EXISTS (SELECT * FROM pg_publication WHERE pubname = 'shardman_meta_pub') THEN |
| 151 | + CREATE PUBLICATION shardman_meta_pub FOR TABLE |
| 152 | + shardman.nodes, shardman.tables, shardman.partitions; |
| 153 | + END IF; |
| 154 | +END; |
| 155 | +$$ LANGUAGE plpgsql; |
| 156 | + |
| 157 | +-- Recreate logical pgoutput replication slot. Drops existing slot. |
| 158 | +CREATE FUNCTION create_repslot(slot_name text) RETURNS void AS $$ |
| 159 | +BEGIN |
| 160 | + PERFORM shardman.drop_repslot(slot_name); |
| 161 | + EXECUTE format('SELECT pg_create_logical_replication_slot(%L, %L)', |
| 162 | + slot_name, 'pgoutput'); |
| 163 | +END |
| 164 | +$$ LANGUAGE plpgsql; |
| 165 | + |
| 166 | +-- Drop replication slot, if it exists. |
| 167 | +-- About 'hard' option: we can't just drop replication slots because |
| 168 | +-- pg_drop_replication_slot will bail out with ERROR if connection is active. |
| 169 | +-- Therefore the caller must either ensure that the connection is dead (e.g. |
| 170 | +-- drop subscription on far end) or pass 'true' to 'with_fire' option, which does |
| 171 | +-- the following dirty hack. It kills twice active walsender with 1 second |
| 172 | +-- interval. After the first kill, replica will immediately try to reconnect, |
| 173 | +-- so the connection resurrects instantly. However, if we kill it second time, |
| 174 | +-- replica won't try to reconnect until wal_retrieve_retry_interval after its |
| 175 | +-- first reaction passes, which is 5 secs by default. Of course, this is not |
| 176 | +-- reliable and should be redesigned. |
| 177 | +CREATE FUNCTION drop_repslot(slot_name text, with_fire bool DEFAULT false) |
| 178 | + RETURNS void AS $$ |
| 179 | +DECLARE |
| 180 | + slot_exists bool; |
| 181 | +BEGIN |
| 182 | + RAISE DEBUG 'Dropping repslot %', slot_name; |
| 183 | + EXECUTE format('SELECT EXISTS (SELECT * FROM pg_replication_slots |
| 184 | + WHERE slot_name = %L)', slot_name) INTO slot_exists; |
| 185 | + IF slot_exists THEN |
| 186 | + IF with_fire THEN -- kill walsender twice |
| 187 | + RAISE DEBUG 'Killing repslot % with fire', slot_name; |
| 188 | + PERFORM shardman.terminate_repslot_walsender(slot_name); |
| 189 | + PERFORM pg_sleep(1); |
| 190 | + PERFORM shardman.terminate_repslot_walsender(slot_name); |
| 191 | + END IF; |
| 192 | + EXECUTE format('SELECT pg_drop_replication_slot(%L)', slot_name); |
| 193 | + END IF; |
| 194 | +END |
| 195 | +$$ LANGUAGE plpgsql STRICT; |
| 196 | +CREATE FUNCTION terminate_repslot_walsender(slot_name text) RETURNS void AS $$ |
| 197 | +BEGIN |
| 198 | + EXECUTE format('SELECT pg_terminate_backend(active_pid) FROM |
| 199 | + pg_replication_slots WHERE slot_name = %L', slot_name); |
| 200 | +END $$ LANGUAGE plpgsql STRICT; |
| 201 | + |
| 202 | +-- If sub exists, disable it, detach repslot from it and possibly drop. We |
| 203 | +-- manage repslots ourselves, so it is essential to detach rs before dropping |
| 204 | +-- sub, and repslots can't be detached while subscription is active. |
| 205 | +CREATE FUNCTION eliminate_sub(subname name, drop_sub bool DEFAULT true) |
| 206 | + RETURNS void AS $$ |
| 207 | +DECLARE |
| 208 | + sub_exists bool; |
| 209 | +BEGIN |
| 210 | + EXECUTE format('SELECT count(*) > 0 FROM pg_subscription WHERE subname |
| 211 | + = %L', subname) INTO sub_exists; |
| 212 | + IF sub_exists THEN |
| 213 | + EXECUTE format('ALTER SUBSCRIPTION %I DISABLE', subname); |
| 214 | + EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE)', subname); |
| 215 | + IF drop_sub THEN |
| 216 | + EXECUTE format('DROP SUBSCRIPTION %I', subname); |
| 217 | + END IF; |
| 218 | + END IF; |
| 219 | +END $$ LANGUAGE plpgsql STRICT; |
| 220 | + |
| 221 | +-- Remove all our logical replication stuff in case of drop extension. |
| 222 | +-- Dropping extension cleanup is not that easy: |
| 223 | +-- - pg offers event triggers sql_drop, dd_command_end and ddl_command_start |
| 224 | +-- - sql_drop looks like what we need, but we we can't do it from deleting |
| 225 | +-- extension itself -- the trigger will be already deleted at the moment we |
| 226 | +-- need it. |
| 227 | +-- - same with dd_command_end |
| 228 | +-- - ddl_command_start apparently doesn't provide us with info what exactly |
| 229 | +-- is happening, I mean its impossible to learn with plpgsql what extension |
| 230 | +-- is deleting. |
| 231 | +-- - because of that I resort to C function which examines parse tree and if |
| 232 | +-- it is our extension is being deleted, it calls plpgsql cleanup func |
| 233 | +CREATE OR REPLACE FUNCTION pg_shardman_cleanup(drop_subs bool DEFAULT true) |
| 234 | + RETURNS void AS $$ |
| 235 | +DECLARE |
| 236 | + pub record; |
| 237 | + sub record; |
| 238 | + rs record; |
| 239 | +BEGIN |
| 240 | + FOR pub IN SELECT pubname FROM pg_publication WHERE pubname LIKE 'shardman_%' LOOP |
| 241 | + EXECUTE format('DROP PUBLICATION %I', pub.pubname); |
| 242 | + END LOOP; |
| 243 | + FOR sub IN SELECT subname FROM pg_subscription WHERE subname LIKE 'shardman_%' LOOP |
| 244 | + PERFORM shardman.eliminate_sub(sub.subname, drop_subs); |
| 245 | + END LOOP; |
| 246 | + -- TODO: drop repslots gracefully? For that we should iterate over all active |
| 247 | + -- subscribers and turn off subscriptions first. |
| 248 | + FOR rs IN SELECT slot_name FROM pg_replication_slots |
| 249 | + WHERE slot_name LIKE 'shardman_%' AND slot_type = 'logical' LOOP |
| 250 | + PERFORM shardman.drop_repslot(rs.slot_name, true); |
| 251 | + END LOOP; |
| 252 | + |
| 253 | + PERFORM shardman.reset_node_id(); |
| 254 | +END; |
| 255 | +$$ LANGUAGE plpgsql; |
| 256 | +CREATE FUNCTION pg_shardman_cleanup_c() RETURNS event_trigger |
| 257 | + AS 'pg_shardman' LANGUAGE C; |
| 258 | +CREATE EVENT TRIGGER cleanup_lr_trigger ON ddl_command_start |
| 259 | + WHEN TAG in ('DROP EXTENSION') |
| 260 | + EXECUTE PROCEDURE pg_shardman_cleanup_c(); |
0 commit comments