@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
99
99
int max_replication_slots = 0 ; /* the maximum number of replication
100
100
* slots */
101
101
102
+ static ReplicationSlot * SearchNamedReplicationSlot (const char * name );
103
+ static int ReplicationSlotAcquireInternal (ReplicationSlot * slot ,
104
+ const char * name , SlotAcquireBehavior behavior );
102
105
static void ReplicationSlotDropAcquired (void );
103
106
static void ReplicationSlotDropPtr (ReplicationSlot * slot );
104
107
@@ -322,102 +325,142 @@ ReplicationSlotCreate(const char *name, bool db_specific,
322
325
}
323
326
324
327
/*
325
- * Find a previously created slot and mark it as used by this backend .
328
+ * Search for the named replication slot .
326
329
*
327
- * The return value is only useful if behavior is SAB_Inquire, in which
328
- * it's zero if we successfully acquired the slot, or the PID of the
329
- * owning process otherwise. If behavior is SAB_Error, then trying to
330
- * acquire an owned slot is an error. If SAB_Block, we sleep until the
331
- * slot is released by the owning process.
330
+ * Return the replication slot if found, otherwise NULL.
331
+ *
332
+ * The caller must hold ReplicationSlotControlLock in shared mode.
332
333
*/
333
- int
334
- ReplicationSlotAcquire (const char * name , SlotAcquireBehavior behavior )
334
+ static ReplicationSlot *
335
+ SearchNamedReplicationSlot (const char * name )
335
336
{
336
- ReplicationSlot * slot ;
337
- int active_pid ;
338
337
int i ;
338
+ ReplicationSlot * slot = NULL ;
339
339
340
- retry :
341
- Assert ( MyReplicationSlot == NULL );
340
+ Assert ( LWLockHeldByMeInMode ( ReplicationSlotControlLock ,
341
+ LW_SHARED ) );
342
342
343
- /*
344
- * Search for the named slot and mark it active if we find it. If the
345
- * slot is already active, we exit the loop with active_pid set to the PID
346
- * of the backend that owns it.
347
- */
348
- active_pid = 0 ;
349
- slot = NULL ;
350
- LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
351
343
for (i = 0 ; i < max_replication_slots ; i ++ )
352
344
{
353
345
ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
354
346
355
347
if (s -> in_use && strcmp (name , NameStr (s -> data .name )) == 0 )
356
348
{
357
- /*
358
- * This is the slot we want; check if it's active under some other
359
- * process. In single user mode, we don't need this check.
360
- */
361
- if (IsUnderPostmaster )
362
- {
363
- /*
364
- * Get ready to sleep on it in case it is active. (We may end
365
- * up not sleeping, but we don't want to do this while holding
366
- * the spinlock.)
367
- */
368
- ConditionVariablePrepareToSleep (& s -> active_cv );
369
-
370
- SpinLockAcquire (& s -> mutex );
371
-
372
- active_pid = s -> active_pid ;
373
- if (active_pid == 0 )
374
- active_pid = s -> active_pid = MyProcPid ;
375
-
376
- SpinLockRelease (& s -> mutex );
377
- }
378
- else
379
- active_pid = MyProcPid ;
380
349
slot = s ;
381
-
382
350
break ;
383
351
}
384
352
}
385
- LWLockRelease (ReplicationSlotControlLock );
386
353
387
- /* If we did not find the slot, error out. */
388
- if (slot == NULL )
354
+ return slot ;
355
+ }
356
+
357
+ /*
358
+ * Find a previously created slot and mark it as used by this process.
359
+ *
360
+ * The return value is only useful if behavior is SAB_Inquire, in which
361
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
362
+ * exists, or the PID of the owning process otherwise. If behavior is
363
+ * SAB_Error, then trying to acquire an owned slot is an error.
364
+ * If SAB_Block, we sleep until the slot is released by the owning process.
365
+ */
366
+ int
367
+ ReplicationSlotAcquire (const char * name , SlotAcquireBehavior behavior )
368
+ {
369
+ return ReplicationSlotAcquireInternal (NULL , name , behavior );
370
+ }
371
+
372
+ /*
373
+ * Mark the specified slot as used by this process.
374
+ *
375
+ * Only one of slot and name can be specified.
376
+ * If slot == NULL, search for the slot with the given name.
377
+ *
378
+ * See comments about the return value in ReplicationSlotAcquire().
379
+ */
380
+ static int
381
+ ReplicationSlotAcquireInternal (ReplicationSlot * slot , const char * name ,
382
+ SlotAcquireBehavior behavior )
383
+ {
384
+ ReplicationSlot * s ;
385
+ int active_pid ;
386
+
387
+ AssertArg ((slot == NULL ) ^ (name == NULL ));
388
+
389
+ retry :
390
+ Assert (MyReplicationSlot == NULL );
391
+
392
+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
393
+
394
+ /*
395
+ * Search for the slot with the specified name if the slot to acquire is
396
+ * not given. If the slot is not found, we either return -1 or error out.
397
+ */
398
+ s = slot ? slot : SearchNamedReplicationSlot (name );
399
+ if (s == NULL || !s -> in_use )
400
+ {
401
+ LWLockRelease (ReplicationSlotControlLock );
402
+
403
+ if (behavior == SAB_Inquire )
404
+ return -1 ;
389
405
ereport (ERROR ,
390
406
(errcode (ERRCODE_UNDEFINED_OBJECT ),
391
- errmsg ("replication slot \"%s\" does not exist" , name )));
407
+ errmsg ("replication slot \"%s\" does not exist" ,
408
+ name ? name : NameStr (slot -> data .name ))));
409
+ }
392
410
393
411
/*
394
- * If we found the slot but it's already active in another backend, we
395
- * either error out or retry after a short wait, as caller specified.
412
+ * This is the slot we want; check if it's active under some other
413
+ * process. In single user mode, we don't need this check.
414
+ */
415
+ if (IsUnderPostmaster )
416
+ {
417
+ /*
418
+ * Get ready to sleep on the slot in case it is active if SAB_Block.
419
+ * (We may end up not sleeping, but we don't want to do this while
420
+ * holding the spinlock.)
421
+ */
422
+ if (behavior == SAB_Block )
423
+ ConditionVariablePrepareToSleep (& s -> active_cv );
424
+
425
+ SpinLockAcquire (& s -> mutex );
426
+ if (s -> active_pid == 0 )
427
+ s -> active_pid = MyProcPid ;
428
+ active_pid = s -> active_pid ;
429
+ SpinLockRelease (& s -> mutex );
430
+ }
431
+ else
432
+ active_pid = MyProcPid ;
433
+ LWLockRelease (ReplicationSlotControlLock );
434
+
435
+ /*
436
+ * If we found the slot but it's already active in another process, we
437
+ * either error out, return the PID of the owning process, or retry
438
+ * after a short wait, as caller specified.
396
439
*/
397
440
if (active_pid != MyProcPid )
398
441
{
399
442
if (behavior == SAB_Error )
400
443
ereport (ERROR ,
401
444
(errcode (ERRCODE_OBJECT_IN_USE ),
402
445
errmsg ("replication slot \"%s\" is active for PID %d" ,
403
- name , active_pid )));
446
+ NameStr ( s -> data . name ) , active_pid )));
404
447
else if (behavior == SAB_Inquire )
405
448
return active_pid ;
406
449
407
450
/* Wait here until we get signaled, and then restart */
408
- ConditionVariableSleep (& slot -> active_cv ,
451
+ ConditionVariableSleep (& s -> active_cv ,
409
452
WAIT_EVENT_REPLICATION_SLOT_DROP );
410
453
ConditionVariableCancelSleep ();
411
454
goto retry ;
412
455
}
413
- else
414
- ConditionVariableCancelSleep (); /* no sleep needed after all */
456
+ else if ( behavior == SAB_Block )
457
+ ConditionVariableCancelSleep (); /* no sleep needed after all */
415
458
416
459
/* Let everybody know we've modified this slot */
417
- ConditionVariableBroadcast (& slot -> active_cv );
460
+ ConditionVariableBroadcast (& s -> active_cv );
418
461
419
462
/* We made this slot active, so it's ours now. */
420
- MyReplicationSlot = slot ;
463
+ MyReplicationSlot = s ;
421
464
422
465
/* success */
423
466
return 0 ;
@@ -1100,43 +1143,82 @@ InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
1100
1143
ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1101
1144
XLogRecPtr restart_lsn = InvalidXLogRecPtr ;
1102
1145
NameData slotname ;
1146
+ int wspid ;
1147
+ int last_signaled_pid = 0 ;
1103
1148
1104
1149
if (!s -> in_use )
1105
1150
continue ;
1106
1151
1107
1152
SpinLockAcquire (& s -> mutex );
1108
- if (s -> data .restart_lsn == InvalidXLogRecPtr ||
1109
- s -> data .restart_lsn >= oldestLSN )
1110
- {
1111
- SpinLockRelease (& s -> mutex );
1112
- continue ;
1113
- }
1114
-
1115
1153
slotname = s -> data .name ;
1116
1154
restart_lsn = s -> data .restart_lsn ;
1117
-
1118
1155
SpinLockRelease (& s -> mutex );
1156
+
1157
+ if (XLogRecPtrIsInvalid (restart_lsn ) || restart_lsn >= oldestLSN )
1158
+ continue ;
1119
1159
LWLockRelease (ReplicationSlotControlLock );
1120
1160
1161
+ /* Get ready to sleep on the slot in case it is active */
1162
+ ConditionVariablePrepareToSleep (& s -> active_cv );
1163
+
1121
1164
for (;;)
1122
1165
{
1123
- int wspid = ReplicationSlotAcquire (NameStr (slotname ),
1124
- SAB_Inquire );
1166
+ /*
1167
+ * Try to mark this slot as used by this process.
1168
+ *
1169
+ * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
1170
+ * should not cancel the prepared condition variable
1171
+ * if this slot is active in other process. Because in this case
1172
+ * we have to wait on that CV for the process owning
1173
+ * the slot to be terminated, later.
1174
+ */
1175
+ wspid = ReplicationSlotAcquireInternal (s , NULL , SAB_Inquire );
1125
1176
1126
- /* no walsender? success! */
1127
- if (wspid == 0 )
1177
+ /*
1178
+ * Exit the loop if we successfully acquired the slot or
1179
+ * the slot was dropped during waiting for the owning process
1180
+ * to be terminated. For example, the latter case is likely to
1181
+ * happen when the slot is temporary because it's automatically
1182
+ * dropped by the termination of the owning process.
1183
+ */
1184
+ if (wspid <= 0 )
1128
1185
break ;
1129
1186
1130
- ereport (LOG ,
1131
- (errmsg ("terminating walsender %d because replication slot \"%s\" is too far behind" ,
1132
- wspid , NameStr (slotname ))));
1133
- (void ) kill (wspid , SIGTERM );
1187
+ /*
1188
+ * Signal to terminate the process that owns the slot.
1189
+ *
1190
+ * There is the race condition where other process may own
1191
+ * the slot after the process using it was terminated and before
1192
+ * this process owns it. To handle this case, we signal again
1193
+ * if the PID of the owning process is changed than the last.
1194
+ *
1195
+ * XXX This logic assumes that the same PID is not reused
1196
+ * very quickly.
1197
+ */
1198
+ if (last_signaled_pid != wspid )
1199
+ {
1200
+ ereport (LOG ,
1201
+ (errmsg ("terminating process %d because replication slot \"%s\" is too far behind" ,
1202
+ wspid , NameStr (slotname ))));
1203
+ (void ) kill (wspid , SIGTERM );
1204
+ last_signaled_pid = wspid ;
1205
+ }
1134
1206
1135
1207
ConditionVariableTimedSleep (& s -> active_cv , 10 ,
1136
1208
WAIT_EVENT_REPLICATION_SLOT_DROP );
1137
1209
}
1138
1210
ConditionVariableCancelSleep ();
1139
1211
1212
+ /*
1213
+ * Do nothing here and start from scratch if the slot has
1214
+ * already been dropped.
1215
+ */
1216
+ if (wspid == -1 )
1217
+ {
1218
+ CHECK_FOR_INTERRUPTS ();
1219
+ goto restart ;
1220
+ }
1221
+
1140
1222
ereport (LOG ,
1141
1223
(errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
1142
1224
NameStr (slotname ),
0 commit comments