Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b78ceb9

Browse files
committed
Fix several new syncpoint machinery - add/stop node jogs.
1) Encode join_node syncpoints in hex; 2) Set Syncpoint.local_lsn of existing nodes in new node pulling from donor (otherwise their changes are skipped); 3) Flush WAL before filter loading (tests demonstrated empty xact doesn't flush WAL, so filter skips and we eventually get it twice); 4) Accurately wait for slot creation in 005_add_stop_node.pl tests before basebackup.
1 parent 224cba5 commit b78ceb9

6 files changed

+62
-15
lines changed

Cluster.pm

+10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package Cluster;
2+
23
use strict;
34
use warnings;
5+
46
use PostgresNode;
57
use TestLib;
68
use Test::More;
@@ -193,6 +195,14 @@ sub safe_psql
193195
return $node->safe_psql($node->{dbname}, $query);
194196
}
195197

198+
sub poll_query_until
199+
{
200+
my ($self, $node_off, $query) = @_;
201+
my $node = $self->{nodes}->[$node_off];
202+
note("polling query $query");
203+
return $node->poll_query_until($node->{dbname}, $query);
204+
}
205+
196206
sub connstr
197207
{
198208
my ($self, $node_off) = @_;

multimaster--1.0.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,8 @@ ALTER TABLE mtm.syncpoints ENABLE ALWAYS TRIGGER syncpoints_trigger;
280280
* just a mark that node have applied everything up to origin_lsn.
281281
* 2) emitpoint: means all origin_node transactions with start_lsn >= origin_lsn
282282
* at receiver have or will have start_lsn >= receiver_lsn. So recovering
283-
* nodes should translate origin_lsn they need to the appropriate emitpoint of
284-
* donor and request streaming since it.
283+
* node should translate origin_lsn it needs to the appropriate emitpoint of
284+
* donor and request streaming since it (and ack it in its reports).
285285
* In the current blocking implementation single record represents both
286286
* absorbpoint and emitpoint. To make future development easier, hints are put
287287
* in places where the distinction would matter.

src/multimaster.c

+6-4
Original file line numberDiff line numberDiff line change
@@ -1167,8 +1167,6 @@ mtm_join_node(PG_FUNCTION_ARGS)
11671167
while (!MtmAllApplyWorkersFinished())
11681168
MtmSleep(USECS_PER_SEC / 10);
11691169

1170-
curr_lsn = GetXLogWriteRecPtr();
1171-
11721170
for (i = 0; i < cfg->n_nodes; i++)
11731171
{
11741172
char *ro_name;
@@ -1183,7 +1181,7 @@ mtm_join_node(PG_FUNCTION_ARGS)
11831181
ro_id = replorigin_by_name(ro_name, false);
11841182
olsn = replorigin_get_progress(ro_id, false);
11851183

1186-
msg = psprintf("F_%d_" UINT64_FORMAT,
1184+
msg = psprintf("F_%d_%" INT64_MODIFIER "X",
11871185
cfg->nodes[i].node_id, olsn);
11881186

11891187
replorigin_session_origin = cfg->nodes[i].origin_id;
@@ -1225,7 +1223,11 @@ mtm_join_node(PG_FUNCTION_ARGS)
12251223
}
12261224
ConditionVariableBroadcast(&Mtm->receiver_barrier_cv);
12271225

1228-
/* as we going to write that lsn on a new node, let's sync it */
1226+
/*
1227+
* Ensure new node will get 'Z' only after eating all syncpoints written
1228+
* above.
1229+
*/
1230+
curr_lsn = GetXLogWriteRecPtr();
12291231
XLogFlush(curr_lsn);
12301232

12311233
/* fill MTM_NODES with a adjusted list of nodes */

src/pglogical_receiver.c

+18-2
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,8 @@ MtmFilterTransaction(char *record, int size, Syncpoint *spvector,
305305
default:
306306
Assert(false);
307307
}
308-
} else if (msgtype == 'M')
308+
}
309+
else if (msgtype == 'M')
309310
{
310311
char action = pq_getmsgbyte(&s);
311312
int messageSize;
@@ -671,6 +672,8 @@ pglogical_receiver_main(Datum main_arg)
671672
*/
672673
if (receiver_mtm_cfg->backup_node_id > 0)
673674
{
675+
int i;
676+
674677
spvector = palloc0(MTM_MAX_NODES * sizeof(Syncpoint));
675678
/*
676679
* Immediately after basebackup we don't have any
@@ -681,6 +684,15 @@ pglogical_receiver_main(Datum main_arg)
681684
*/
682685
spvector[sender - 1] = SyncpointGetLatest(sender);
683686
remote_start = receiver_mtm_cfg->backup_end_lsn;
687+
/*
688+
* all other origins are also filtered from filter position
689+
* to donor
690+
*/
691+
for (i = 0; i < receiver_mtm_cfg->n_nodes; i++)
692+
{
693+
spvector[receiver_mtm_cfg->nodes[i].node_id - 1].local_lsn =
694+
spvector[sender - 1].local_lsn;
695+
}
684696
/*
685697
* we must filter out all xacts since sp to donor in this
686698
* special add_node case, so MtmInvalidNodeId
@@ -721,7 +733,11 @@ pglogical_receiver_main(Datum main_arg)
721733
appendStringInfo(message, ", syncpoint_vector (origin/local) = {");
722734
for (i = 0; i < MTM_MAX_NODES; i++)
723735
{
724-
if (spvector[i].origin_lsn != InvalidXLogRecPtr || spvector[i].local_lsn != InvalidXLogRecPtr)
736+
/*
737+
* local_lsn must be always (even before first syncpoint)
738+
* non-zero in used cell; it is filter slot position.
739+
*/
740+
if (spvector[i].local_lsn != InvalidXLogRecPtr)
725741
{
726742
appendStringInfo(message, "%d: " LSN_FMT "/" LSN_FMT ", ",
727743
i + 1,

src/syncpoint.c

+20-5
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ SyncpointGetLatest(int origin_node_id)
232232
Assert(TupleDescAttr(tupdesc, 0)->atttypid == LSNOID);
233233

234234
sp.local_lsn = DatumGetLSN(SPI_getbinval(tup, tupdesc, 2, &isnull));
235-
Assert(!isnull);
235+
/* xxx c.f. similar in SyncpointGetAllLatest */
236+
if (isnull)
237+
mtm_log(ERROR, "fill_filter_since is null for node %d", origin_node_id);
236238
Assert(TupleDescAttr(tupdesc, 1)->atttypid == LSNOID);
237239
}
238240
else if (rc == SPI_OK_SELECT && SPI_processed == 0)
@@ -307,12 +309,18 @@ SyncpointGetAllLatest(int sender_node_id)
307309
origin_lsn = DatumGetLSN(SPI_getbinval(tup, tupdesc, 2, &isnull));
308310
Assert(!isnull);
309311
Assert(TupleDescAttr(tupdesc, 1)->atttypid == LSNOID);
312+
spvector[node_id - 1].origin_lsn = origin_lsn;
310313

311314
local_lsn = DatumGetLSN(SPI_getbinval(tup, tupdesc, 3, &isnull));
312-
Assert(!isnull);
313315
Assert(TupleDescAttr(tupdesc, 2)->atttypid == LSNOID);
314-
315-
spvector[node_id - 1].origin_lsn = origin_lsn;
316+
/*
317+
* XXX: there is no barrier between filter slot creation and
318+
* receiver start, so in theory (really unlikely) fastest receiver
319+
* may spin up before some slot is created by monitor and
320+
* fill_filter_since would be null. Shout in this case.
321+
*/
322+
if (isnull)
323+
mtm_log(ERROR, "fill_filter_since is null for node %d", node_id);
316324
spvector[node_id - 1].local_lsn = local_lsn;
317325
}
318326
}
@@ -392,9 +400,16 @@ RecoveryFilterLoad(int filter_node_id, Syncpoint *spvector, MtmConfig *mtm_cfg)
392400
HTAB *filter_map;
393401
int estimate_size;
394402
XLogRecPtr start_lsn = PG_UINT64_MAX;
395-
XLogRecPtr current_last_lsn = GetFlushRecPtr();
403+
XLogRecPtr current_last_lsn;
396404
int i;
397405

406+
/*
407+
* ensure we will scan everything written up to this point
408+
* (if xact was empty it hadn't performed flush on commit)
409+
*/
410+
XLogFlush(GetXLogWriteRecPtr());
411+
current_last_lsn = GetFlushRecPtr();
412+
398413
Assert(current_last_lsn != InvalidXLogRecPtr);
399414

400415
/* start from minimal among all of syncpoints */

t/005_add_stop_node.pl

+6-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use strict;
22
use warnings;
3+
4+
use Carp;
35
use PostgresNode;
46
use Cluster;
57
use TestLib;
@@ -101,8 +103,10 @@
101103
{
102104
$cluster->pgbench(0, ('-N', '-n', -t => '100') );
103105
}
104-
sleep(2); # XXX sleep a bit to ensure monitor creates slot for new node on donor
105-
# which will be used by basebackup
106+
# ensure monitor creates slot for new node on donor which will be used by
107+
# basebackup before proceeding
108+
$cluster->poll_query_until(0, "select exists(select * from pg_replication_slots where slot_name = 'mtm_filter_slot_${new_node_id}');")
109+
or croak "timed out waiting for slot creation";
106110
my $end_lsn = $cluster->backup_and_init(0, $new_node_off, $new_node_id);
107111
$cluster->release_socket($sock);
108112
$cluster->{nodes}->[$new_node_off]->append_conf('postgresql.conf', q{unix_socket_directories = ''

0 commit comments

Comments
 (0)