Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9915de6

Browse files
committed
Fix race conditions in replication slot operations
It is relatively easy to get a replication slot to look as still active while one process is in the process of getting rid of it; when some other process tries to "acquire" the slot, it would fail with an error message of "replication slot XYZ is active for PID N". The error message in itself is fine, except that when the intention is to drop the slot, it is unhelpful: the useful behavior would be to wait until the slot is no longer acquired, so that the drop can proceed. To implement this, we use a condition variable so that slot acquisition can be told to wait on that condition variable if the slot is already acquired, and we make any change in active_pid broadcast a signal on the condition variable. Thus, as soon as the slot is released, the drop will proceed properly. Reported by: Tom Lane Discussion: https://postgr.es/m/11904.1499039688@sss.pgh.pa.us Authors: Petr Jelínek, Álvaro Herrera
1 parent 4132dbe commit 9915de6

File tree

5 files changed

+118
-56
lines changed

5 files changed

+118
-56
lines changed

src/backend/replication/logical/logicalfuncs.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
244244
else
245245
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
246246

247-
ReplicationSlotAcquire(NameStr(*name));
247+
ReplicationSlotAcquire(NameStr(*name), true);
248248

249249
PG_TRY();
250250
{

src/backend/replication/slot.c

+91-31
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void)
157157
/* everything else is zeroed by the memset above */
158158
SpinLockInit(&slot->mutex);
159159
LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
160+
ConditionVariableInit(&slot->active_cv);
160161
}
161162
}
162163
}
@@ -313,61 +314,102 @@ ReplicationSlotCreate(const char *name, bool db_specific,
313314
LWLockRelease(ReplicationSlotControlLock);
314315

315316
/*
316-
* Now that the slot has been marked as in_use and in_active, it's safe to
317+
* Now that the slot has been marked as in_use and active, it's safe to
317318
* let somebody else try to allocate a slot.
318319
*/
319320
LWLockRelease(ReplicationSlotAllocationLock);
321+
322+
/* Let everybody know we've modified this slot */
323+
ConditionVariableBroadcast(&slot->active_cv);
320324
}
321325

322326
/*
323327
* Find a previously created slot and mark it as used by this backend.
324328
*/
325329
void
326-
ReplicationSlotAcquire(const char *name)
330+
ReplicationSlotAcquire(const char *name, bool nowait)
327331
{
328-
ReplicationSlot *slot = NULL;
332+
ReplicationSlot *slot;
333+
int active_pid;
329334
int i;
330-
int active_pid = 0; /* Keep compiler quiet */
331335

336+
retry:
332337
Assert(MyReplicationSlot == NULL);
333338

334-
/* Search for the named slot and mark it active if we find it. */
339+
/*
340+
* Search for the named slot and mark it active if we find it. If the
341+
* slot is already active, we exit the loop with active_pid set to the PID
342+
* of the backend that owns it.
343+
*/
344+
active_pid = 0;
345+
slot = NULL;
335346
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
336347
for (i = 0; i < max_replication_slots; i++)
337348
{
338349
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
339350

340351
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
341352
{
353+
/*
354+
* This is the slot we want. We don't know yet if it's active,
355+
* so get ready to sleep on it in case it is. (We may end up not
356+
* sleeping, but we don't want to do this while holding the
357+
* spinlock.)
358+
*/
359+
ConditionVariablePrepareToSleep(&s->active_cv);
360+
342361
SpinLockAcquire(&s->mutex);
362+
343363
active_pid = s->active_pid;
344364
if (active_pid == 0)
345365
active_pid = s->active_pid = MyProcPid;
366+
346367
SpinLockRelease(&s->mutex);
347368
slot = s;
369+
348370
break;
349371
}
350372
}
351373
LWLockRelease(ReplicationSlotControlLock);
352374

353-
/* If we did not find the slot or it was already active, error out. */
375+
/* If we did not find the slot, error out. */
354376
if (slot == NULL)
355377
ereport(ERROR,
356378
(errcode(ERRCODE_UNDEFINED_OBJECT),
357379
errmsg("replication slot \"%s\" does not exist", name)));
380+
381+
/*
382+
* If we found the slot but it's already active in another backend, we
383+
* either error out or retry after a short wait, as caller specified.
384+
*/
358385
if (active_pid != MyProcPid)
359-
ereport(ERROR,
360-
(errcode(ERRCODE_OBJECT_IN_USE),
361-
errmsg("replication slot \"%s\" is active for PID %d",
362-
name, active_pid)));
386+
{
387+
if (nowait)
388+
ereport(ERROR,
389+
(errcode(ERRCODE_OBJECT_IN_USE),
390+
errmsg("replication slot \"%s\" is active for PID %d",
391+
name, active_pid)));
392+
393+
/* Wait here until we get signaled, and then restart */
394+
ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK);
395+
ConditionVariableCancelSleep();
396+
goto retry;
397+
}
398+
else
399+
ConditionVariableCancelSleep(); /* no sleep needed after all */
400+
401+
/* Let everybody know we've modified this slot */
402+
ConditionVariableBroadcast(&slot->active_cv);
363403

364404
/* We made this slot active, so it's ours now. */
365405
MyReplicationSlot = slot;
366406
}
367407

368408
/*
369-
* Release a replication slot, this or another backend can ReAcquire it
370-
* later. Resources this slot requires will be preserved.
409+
* Release the replication slot that this backend considers to own.
410+
*
411+
* This or another backend can re-acquire the slot later.
412+
* Resources this slot requires will be preserved.
371413
*/
372414
void
373415
ReplicationSlotRelease(void)
@@ -385,17 +427,6 @@ ReplicationSlotRelease(void)
385427
*/
386428
ReplicationSlotDropAcquired();
387429
}
388-
else if (slot->data.persistency == RS_PERSISTENT)
389-
{
390-
/*
391-
* Mark persistent slot inactive. We're not freeing it, just
392-
* disconnecting.
393-
*/
394-
SpinLockAcquire(&slot->mutex);
395-
slot->active_pid = 0;
396-
SpinLockRelease(&slot->mutex);
397-
}
398-
399430

400431
/*
401432
* If slot needed to temporarily restrain both data and catalog xmin to
@@ -412,6 +443,18 @@ ReplicationSlotRelease(void)
412443
ReplicationSlotsComputeRequiredXmin(false);
413444
}
414445

446+
if (slot->data.persistency == RS_PERSISTENT)
447+
{
448+
/*
449+
* Mark persistent slot inactive. We're not freeing it, just
450+
* disconnecting, but wake up others that may be waiting for it.
451+
*/
452+
SpinLockAcquire(&slot->mutex);
453+
slot->active_pid = 0;
454+
SpinLockRelease(&slot->mutex);
455+
ConditionVariableBroadcast(&slot->active_cv);
456+
}
457+
415458
MyReplicationSlot = NULL;
416459

417460
/* might not have been set when we've been a plain slot */
@@ -430,32 +473,43 @@ ReplicationSlotCleanup(void)
430473

431474
Assert(MyReplicationSlot == NULL);
432475

433-
/*
434-
* No need for locking as we are only interested in slots active in
435-
* current process and those are not touched by other processes.
436-
*/
476+
restart:
477+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
437478
for (i = 0; i < max_replication_slots; i++)
438479
{
439480
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
440481

482+
if (!s->in_use)
483+
continue;
484+
485+
SpinLockAcquire(&s->mutex);
441486
if (s->active_pid == MyProcPid)
442487
{
443-
Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
488+
Assert(s->data.persistency == RS_TEMPORARY);
489+
SpinLockRelease(&s->mutex);
490+
LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
444491

445492
ReplicationSlotDropPtr(s);
493+
494+
ConditionVariableBroadcast(&s->active_cv);
495+
goto restart;
446496
}
497+
else
498+
SpinLockRelease(&s->mutex);
447499
}
500+
501+
LWLockRelease(ReplicationSlotControlLock);
448502
}
449503

450504
/*
451505
* Permanently drop replication slot identified by the passed in name.
452506
*/
453507
void
454-
ReplicationSlotDrop(const char *name)
508+
ReplicationSlotDrop(const char *name, bool nowait)
455509
{
456510
Assert(MyReplicationSlot == NULL);
457511

458-
ReplicationSlotAcquire(name);
512+
ReplicationSlotAcquire(name, nowait);
459513

460514
ReplicationSlotDropAcquired();
461515
}
@@ -527,6 +581,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
527581
slot->active_pid = 0;
528582
SpinLockRelease(&slot->mutex);
529583

584+
/* wake up anyone waiting on this slot */
585+
ConditionVariableBroadcast(&slot->active_cv);
586+
530587
ereport(fail_softly ? WARNING : ERROR,
531588
(errcode_for_file_access(),
532589
errmsg("could not rename file \"%s\" to \"%s\": %m",
@@ -535,15 +592,18 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
535592

536593
/*
537594
* The slot is definitely gone. Lock out concurrent scans of the array
538-
* long enough to kill it. It's OK to clear the active flag here without
595+
* long enough to kill it. It's OK to clear the active PID here without
539596
* grabbing the mutex because nobody else can be scanning the array here,
540597
* and nobody can be attached to this slot and thus access it without
541598
* scanning the array.
599+
*
600+
* Also wake up processes waiting for it.
542601
*/
543602
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
544603
slot->active_pid = 0;
545604
slot->in_use = false;
546605
LWLockRelease(ReplicationSlotControlLock);
606+
ConditionVariableBroadcast(&slot->active_cv);
547607

548608
/*
549609
* Slot is dead and doesn't prevent resource removal anymore, recompute

src/backend/replication/slotfuncs.c

+16-18
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
171171

172172
CheckSlotRequirements();
173173

174-
ReplicationSlotDrop(NameStr(*name));
174+
ReplicationSlotDrop(NameStr(*name), false);
175175

176176
PG_RETURN_VOID();
177177
}
@@ -221,6 +221,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
221221

222222
MemoryContextSwitchTo(oldcontext);
223223

224+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
224225
for (slotno = 0; slotno < max_replication_slots; slotno++)
225226
{
226227
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
@@ -238,25 +239,21 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
238239
NameData plugin;
239240
int i;
240241

241-
SpinLockAcquire(&slot->mutex);
242242
if (!slot->in_use)
243-
{
244-
SpinLockRelease(&slot->mutex);
245243
continue;
246-
}
247-
else
248-
{
249-
xmin = slot->data.xmin;
250-
catalog_xmin = slot->data.catalog_xmin;
251-
database = slot->data.database;
252-
restart_lsn = slot->data.restart_lsn;
253-
confirmed_flush_lsn = slot->data.confirmed_flush;
254-
namecpy(&slot_name, &slot->data.name);
255-
namecpy(&plugin, &slot->data.plugin);
256-
257-
active_pid = slot->active_pid;
258-
persistency = slot->data.persistency;
259-
}
244+
245+
SpinLockAcquire(&slot->mutex);
246+
247+
xmin = slot->data.xmin;
248+
catalog_xmin = slot->data.catalog_xmin;
249+
database = slot->data.database;
250+
restart_lsn = slot->data.restart_lsn;
251+
confirmed_flush_lsn = slot->data.confirmed_flush;
252+
namecpy(&slot_name, &slot->data.name);
253+
namecpy(&plugin, &slot->data.plugin);
254+
active_pid = slot->active_pid;
255+
persistency = slot->data.persistency;
256+
260257
SpinLockRelease(&slot->mutex);
261258

262259
memset(nulls, 0, sizeof(nulls));
@@ -309,6 +306,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
309306

310307
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
311308
}
309+
LWLockRelease(ReplicationSlotControlLock);
312310

313311
tuplestore_donestoring(tupstore);
314312

src/backend/replication/walsender.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd)
541541

542542
if (cmd->slotname)
543543
{
544-
ReplicationSlotAcquire(cmd->slotname);
544+
ReplicationSlotAcquire(cmd->slotname, true);
545545
if (SlotIsLogical(MyReplicationSlot))
546546
ereport(ERROR,
547547
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
10281028
static void
10291029
DropReplicationSlot(DropReplicationSlotCmd *cmd)
10301030
{
1031-
ReplicationSlotDrop(cmd->slotname);
1031+
ReplicationSlotDrop(cmd->slotname, false);
10321032
EndCommand("DROP_REPLICATION_SLOT", DestRemote);
10331033
}
10341034

@@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
10461046

10471047
Assert(!MyReplicationSlot);
10481048

1049-
ReplicationSlotAcquire(cmd->slotname);
1049+
ReplicationSlotAcquire(cmd->slotname, true);
10501050

10511051
/*
10521052
* Force a disconnect, so that the decoding code doesn't need to care

src/include/replication/slot.h

+7-3
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@
1212
#include "fmgr.h"
1313
#include "access/xlog.h"
1414
#include "access/xlogreader.h"
15+
#include "storage/condition_variable.h"
1516
#include "storage/lwlock.h"
1617
#include "storage/shmem.h"
1718
#include "storage/spin.h"
1819

1920
/*
2021
* Behaviour of replication slots, upon release or crash.
2122
*
22-
* Slots marked as PERSISTENT are crashsafe and will not be dropped when
23+
* Slots marked as PERSISTENT are crash-safe and will not be dropped when
2324
* released. Slots marked as EPHEMERAL will be dropped when released or after
2425
* restarts.
2526
*
@@ -117,6 +118,9 @@ typedef struct ReplicationSlot
117118
/* is somebody performing io on this slot? */
118119
LWLock io_in_progress_lock;
119120

121+
/* Condition variable signalled when active_pid changes */
122+
ConditionVariable active_cv;
123+
120124
/* all the remaining data is only used for logical slots */
121125

122126
/*
@@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void);
162166
extern void ReplicationSlotCreate(const char *name, bool db_specific,
163167
ReplicationSlotPersistency p);
164168
extern void ReplicationSlotPersist(void);
165-
extern void ReplicationSlotDrop(const char *name);
169+
extern void ReplicationSlotDrop(const char *name, bool nowait);
166170

167-
extern void ReplicationSlotAcquire(const char *name);
171+
extern void ReplicationSlotAcquire(const char *name, bool nowait);
168172
extern void ReplicationSlotRelease(void);
169173
extern void ReplicationSlotCleanup(void);
170174
extern void ReplicationSlotSave(void);

0 commit comments

Comments
 (0)