Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b2651f3

Browse files
committed
fix MtmUpdateLsnMapping() to filter flushpos for current node
1 parent ca43925 commit b2651f3

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

src/pglogical_receiver.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ MtmUpdateLsnMapping(int node_id, lsn_t end_lsn)
256256
lsn_t local_flush = GetFlushRecPtr();
257257
MemoryContext old_context = MemoryContextSwitchTo(TopMemoryContext);
258258

259+
Assert(MtmIsReceiver && !MtmIsPoolWorker);
260+
259261
if (end_lsn != INVALID_LSN)
260262
{
261263
/* Track commit lsn */
@@ -266,13 +268,16 @@ MtmUpdateLsnMapping(int node_id, lsn_t end_lsn)
266268
dlist_push_tail(&MtmLsnMapping, &flushpos->node);
267269
}
268270

271+
// XXX: now we use this only in receiver, but in case of recovery
272+
// one receiver can set flushPos for several nodes
269273
MtmLock(LW_EXCLUSIVE);
270274
dlist_foreach_modify(iter, &MtmLsnMapping)
271275
{
272276
flushpos = dlist_container(MtmFlushPosition, node, iter.cur);
273277
if (flushpos->local_end <= local_flush)
274278
{
275-
if (Mtm->nodes[node_id-1].flushPos < flushpos->remote_end)
279+
// XXX: clean on restart?
280+
if (Mtm->nodes[node_id-1].flushPos < flushpos->remote_end && node_id == flushpos->node_id)
276281
Mtm->nodes[node_id-1].flushPos = flushpos->remote_end;
277282
dlist_delete(iter.cur);
278283
pfree(flushpos);

0 commit comments

Comments
 (0)