Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a58f600

Browse files
committed
Check for pressence of partition at destination node in rebalance and set_replelvel
1 parent 21e0505 commit a58f600

File tree

5 files changed

+49
-11
lines changed

5 files changed

+49
-11
lines changed

shard.sql

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ BEGIN
157157
PERFORM shardman.eliminate_sub(prev_lname);
158158
EXECUTE format(
159159
'CREATE SUBSCRIPTION %I connection %L
160-
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = on);',
160+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = local);',
161161
prev_lname, prev_connstr, prev_lname, prev_lname);
162162
-- If we have prev, we are replica
163163
PERFORM shardman.readonly_replica_on(p_name::regclass);
@@ -177,7 +177,7 @@ BEGIN
177177
PERFORM shardman.eliminate_sub(lname);
178178
EXECUTE format(
179179
'CREATE SUBSCRIPTION %I connection %L
180-
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = on);',
180+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = local);',
181181
lname, dst_connstr, lname, lname);
182182
END $$ LANGUAGE plpgsql STRICT;
183183

@@ -664,8 +664,8 @@ CREATE FUNCTION readonly_table_on(relation regclass)
664664
RETURNS void AS $$
665665
BEGIN
666666
-- Create go away trigger to prevent any modifications
667-
-- PERFORM shardman.readonly_table_off(relation);
668-
-- PERFORM shardman.create_modification_triggers(relation, 'shardman_readonly', 'shardman.go_away()');
667+
PERFORM shardman.readonly_table_off(relation);
668+
PERFORM shardman.create_modification_triggers(relation, 'shardman_readonly', 'shardman.go_away()');
669669
END
670670
$$ LANGUAGE plpgsql STRICT;
671671
CREATE FUNCTION go_away() RETURNS TRIGGER AS $$
@@ -688,8 +688,8 @@ CREATE FUNCTION readonly_replica_on(relation regclass)
688688
BEGIN
689689
RAISE DEBUG '[SHMN] table % made read-only for all but apply workers',
690690
relation;
691-
-- PERFORM shardman.readonly_replica_off(relation);
692-
-- PERFORM shardman.create_modification_triggers(relation, 'shardman_readonly_replica', 'shardman.ror_go_away()');
691+
PERFORM shardman.readonly_replica_off(relation);
692+
PERFORM shardman.create_modification_triggers(relation, 'shardman_readonly_replica', 'shardman.ror_go_away()');
693693
END $$ LANGUAGE plpgsql STRICT;
694694
-- This function is impudent because it is used as both stmt and row trigger.
695695
-- The idea is that we must never reach RETURN NEW after stmt row trigger,

src/copypart.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ init_cp_state(CopyPartState *cps)
370370
" including storage);"
371371
" drop subscription if exists %s cascade;"
372372
" create subscription %s connection '%s' publication %s with"
373-
" (create_slot = false, slot_name = '%s');",
373+
" (create_slot = false, slot_name = '%s', synchronous_commit = local);",
374374
cps->part_name,
375375
cps->part_name, cps->relation,
376376
cps->logname,

src/include/pg_shardman.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,5 @@ extern int32 get_prev_node(const char *part_name, int32 node_id, bool *part_exis
131131
extern char *get_partition_relation(const char *part_name);
132132
extern Partition *get_parts(const char *relation, uint64 *num_parts);
133133
extern RepCount *get_repcount(const char *relation, uint64 *num_parts);
134-
134+
extern bool node_has_partition(int32 node, const char *part_name);
135135
#endif /* PG_SHARDMAN_H */

src/pg_shardman.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,34 @@ get_prev_node(const char *part_name, int32 node_id, bool *part_exists)
11211121
return prev;
11221122
}
11231123

1124+
bool
1125+
node_has_partition(int32 node, const char *part_name)
1126+
{
1127+
char *sql;
1128+
int64 count = 0;
1129+
SPI_XACT_STATUS;
1130+
1131+
SPI_PROLOG;
1132+
1133+
sql = psprintf("select count(*) from shardman.partitions"
1134+
" where part_name = '%s' and owner = %d", part_name, node);
1135+
1136+
if (SPI_execute(sql, true, 0) < 0)
1137+
{
1138+
shmn_elog(FATAL, "Stmt failed : %s", sql);
1139+
}
1140+
if (SPI_processed != 0)
1141+
{
1142+
bool isnull;
1143+
count = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
1144+
SPI_tuptable->tupdesc,
1145+
1,
1146+
&isnull));
1147+
}
1148+
SPI_EPILOG;
1149+
return count != 0;
1150+
}
1151+
11241152
/*
11251153
* Get relation name of partition part_name. Memory is palloc'ed.
11261154
* NULL is returned, if there is no such partition.

src/shard.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,16 @@ rebalance(Cmd *cmd)
220220
for (part_idx = 0, worker_idx = 0; part_idx < num_parts; part_idx++)
221221
{
222222
Partition part = parts[part_idx];
223-
int32 worker = workers[worker_idx];
223+
int32 worker;
224224
MovePartState *mps = palloc0(sizeof(MovePartState));
225225

226+
do {
227+
worker = workers[worker_idx];
228+
worker_idx = (worker_idx + 1) % num_workers;
229+
} while (node_has_partition(worker, part.part_name));
230+
226231
init_mp_state(mps, part.part_name, part.owner, worker);
227232
tasks[part_idx] = (CopyPartState *) mps;
228-
worker_idx = (worker_idx + 1) % num_workers;
229233
}
230234

231235
exec_tasks(tasks, num_parts);
@@ -286,8 +290,14 @@ set_replevel(Cmd *cmd)
286290
if (rc.count < replevel)
287291
{
288292
CreateReplicaState *crs = palloc0(sizeof(CreateReplicaState));
289-
int32 dst_node = workers[rand() % num_workers];
293+
int32 dst_node;
294+
295+
do {
296+
dst_node = workers[rand() % num_workers];
297+
} while (node_has_partition(dst_node, rc.part_name));
298+
290299
init_cr_state(crs, rc.part_name, dst_node);
300+
291301
tasks[ntasks] = (CopyPartState *) crs;
292302
ntasks++;
293303
shmn_elog(DEBUG1, "Adding replica for shard %s on node %d",

0 commit comments

Comments
 (0)