Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 29ebc48

Browse files
committed
Merge remote-tracking branch 'origin/master'
* Sane destination choosing in rebalance and set_replevel; * synchronous_commit on -> local in both data and copy subs. It is safe, because we are moving to spider model and don't have sync streaming replication anywhere (during replica move we don't need sync replication). * Keeping replica protecting triggers still commented as fix is not merged into PG yet.
2 parents 21213ed + 3db6d54 commit 29ebc48

File tree

5 files changed

+48
-7
lines changed

5 files changed

+48
-7
lines changed

shard.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ BEGIN
157157
PERFORM shardman.eliminate_sub(prev_lname);
158158
EXECUTE format(
159159
'CREATE SUBSCRIPTION %I connection %L
160-
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = on);',
160+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = local);',
161161
prev_lname, prev_connstr, prev_lname, prev_lname);
162162
-- If we have prev, we are replica
163163
PERFORM shardman.readonly_replica_on(p_name::regclass);
@@ -177,7 +177,7 @@ BEGIN
177177
PERFORM shardman.eliminate_sub(lname);
178178
EXECUTE format(
179179
'CREATE SUBSCRIPTION %I connection %L
180-
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = on);',
180+
PUBLICATION %I with (create_slot = false, slot_name = %L, copy_data = false, synchronous_commit = local);',
181181
lname, dst_connstr, lname, lname);
182182
END $$ LANGUAGE plpgsql STRICT;
183183

src/copypart.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ init_cp_state(CopyPartState *cps)
363363
" including storage);"
364364
" drop subscription if exists %s cascade;"
365365
" create subscription %s connection '%s' publication %s with"
366-
" (create_slot = false, slot_name = '%s');",
366+
" (create_slot = false, slot_name = '%s', synchronous_commit = local);",
367367
cps->part_name,
368368
cps->part_name, cps->relation,
369369
cps->logname,

src/include/pg_shardman.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,5 +131,5 @@ extern int32 get_prev_node(const char *part_name, int32 node_id, bool *part_exis
131131
extern char *get_partition_relation(const char *part_name);
132132
extern Partition *get_parts(const char *relation, uint64 *num_parts);
133133
extern RepCount *get_repcount(const char *relation, uint64 *num_parts);
134-
134+
extern bool node_has_partition(int32 node, const char *part_name);
135135
#endif /* PG_SHARDMAN_H */

src/pg_shardman.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,6 +1121,37 @@ get_prev_node(const char *part_name, int32 node_id, bool *part_exists)
11211121
return prev;
11221122
}
11231123

1124+
/*
1125+
* Node 'node' holds primary or replica of part_name?
1126+
*/
1127+
bool
1128+
node_has_partition(int32 node, const char *part_name)
1129+
{
1130+
char *sql;
1131+
int64 count = 0;
1132+
SPI_XACT_STATUS;
1133+
1134+
SPI_PROLOG;
1135+
1136+
sql = psprintf("select count(*) from shardman.partitions"
1137+
" where part_name = '%s' and owner = %d", part_name, node);
1138+
1139+
if (SPI_execute(sql, true, 0) < 0)
1140+
{
1141+
shmn_elog(FATAL, "Stmt failed : %s", sql);
1142+
}
1143+
if (SPI_processed != 0)
1144+
{
1145+
bool isnull;
1146+
count = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
1147+
SPI_tuptable->tupdesc,
1148+
1,
1149+
&isnull));
1150+
}
1151+
SPI_EPILOG;
1152+
return count != 0;
1153+
}
1154+
11241155
/*
11251156
* Get relation name of partition part_name. Memory is palloc'ed.
11261157
* NULL is returned, if there is no such partition.

src/shard.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,16 @@ rebalance(Cmd *cmd)
220220
for (part_idx = 0, worker_idx = 0; part_idx < num_parts; part_idx++)
221221
{
222222
Partition part = parts[part_idx];
223-
int32 worker = workers[worker_idx];
223+
int32 worker;
224224
MovePartState *mps = palloc0(sizeof(MovePartState));
225225

226+
do {
227+
worker = workers[worker_idx];
228+
worker_idx = (worker_idx + 1) % num_workers;
229+
} while (node_has_partition(worker, part.part_name));
230+
226231
init_mp_state(mps, part.part_name, part.owner, worker);
227232
tasks[part_idx] = (CopyPartState *) mps;
228-
worker_idx = (worker_idx + 1) % num_workers;
229233
}
230234

231235
exec_tasks(tasks, num_parts);
@@ -286,8 +290,14 @@ set_replevel(Cmd *cmd)
286290
if (rc.count < replevel)
287291
{
288292
CreateReplicaState *crs = palloc0(sizeof(CreateReplicaState));
289-
int32 dst_node = workers[rand() % num_workers];
293+
int32 dst_node;
294+
295+
do {
296+
dst_node = workers[rand() % num_workers];
297+
} while (node_has_partition(dst_node, rc.part_name));
298+
290299
init_cr_state(crs, rc.part_name, dst_node);
300+
291301
tasks[ntasks] = (CopyPartState *) crs;
292302
ntasks++;
293303
shmn_elog(DEBUG1, "Adding replica for shard %s on node %d",

0 commit comments

Comments
 (0)