@@ -1161,116 +1161,183 @@ ReplicationSlotReserveWal(void)
1161
1161
}
1162
1162
1163
1163
/*
1164
- * Mark any slot that points to an LSN older than the given segment
1165
- * as invalid; it requires WAL that's about to be removed .
1164
+ * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
1165
+ * and mark it invalid, if necessary and possible .
1166
1166
*
1167
- * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1167
+ * Returns whether ReplicationSlotControlLock was released in the interim (and
1168
+ * in that case we're not holding the lock at return, otherwise we are).
1169
+ *
1170
+ * This is inherently racy, because we release the LWLock
1171
+ * for syscalls, so caller must restart if we return true.
1168
1172
*/
1169
- void
1170
- InvalidateObsoleteReplicationSlots ( XLogSegNo oldestSegno )
1173
+ static bool
1174
+ InvalidatePossiblyObsoleteSlot ( ReplicationSlot * s , XLogRecPtr oldestLSN )
1171
1175
{
1172
- XLogRecPtr oldestLSN ;
1173
-
1174
- XLogSegNoOffsetToRecPtr (oldestSegno , 0 , wal_segment_size , oldestLSN );
1176
+ int last_signaled_pid = 0 ;
1177
+ bool released_lock = false;
1175
1178
1176
- restart :
1177
- LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
1178
- for (int i = 0 ; i < max_replication_slots ; i ++ )
1179
+ for (;;)
1179
1180
{
1180
- ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1181
- XLogRecPtr restart_lsn = InvalidXLogRecPtr ;
1181
+ XLogRecPtr restart_lsn ;
1182
1182
NameData slotname ;
1183
- int wspid ;
1184
- int last_signaled_pid = 0 ;
1183
+ int active_pid = 0 ;
1184
+
1185
+ Assert (LWLockHeldByMeInMode (ReplicationSlotControlLock , LW_SHARED ));
1185
1186
1186
1187
if (!s -> in_use )
1187
- continue ;
1188
+ {
1189
+ if (released_lock )
1190
+ LWLockRelease (ReplicationSlotControlLock );
1191
+ break ;
1192
+ }
1188
1193
1194
+ /*
1195
+ * Check if the slot needs to be invalidated. If it needs to be
1196
+ * invalidated, and is not currently acquired, acquire it and mark it
1197
+ * as having been invalidated. We do this with the spinlock held to
1198
+ * avoid race conditions -- for example the restart_lsn could move
1199
+ * forward, or the slot could be dropped.
1200
+ */
1189
1201
SpinLockAcquire (& s -> mutex );
1190
- slotname = s -> data . name ;
1202
+
1191
1203
restart_lsn = s -> data .restart_lsn ;
1192
- SpinLockRelease (& s -> mutex );
1193
1204
1205
+ /*
1206
+ * If the slot is already invalid or is fresh enough, we don't need to
1207
+ * do anything.
1208
+ */
1194
1209
if (XLogRecPtrIsInvalid (restart_lsn ) || restart_lsn >= oldestLSN )
1195
- continue ;
1196
- LWLockRelease (ReplicationSlotControlLock );
1197
- CHECK_FOR_INTERRUPTS ();
1210
+ {
1211
+ SpinLockRelease (& s -> mutex );
1212
+ if (released_lock )
1213
+ LWLockRelease (ReplicationSlotControlLock );
1214
+ break ;
1215
+ }
1216
+
1217
+ slotname = s -> data .name ;
1218
+ active_pid = s -> active_pid ;
1219
+
1220
+ /*
1221
+ * If the slot can be acquired, do so and mark it invalidated
1222
+ * immediately. Otherwise we'll signal the owning process, below, and
1223
+ * retry.
1224
+ */
1225
+ if (active_pid == 0 )
1226
+ {
1227
+ MyReplicationSlot = s ;
1228
+ s -> active_pid = MyProcPid ;
1229
+ s -> data .invalidated_at = restart_lsn ;
1230
+ s -> data .restart_lsn = InvalidXLogRecPtr ;
1231
+ }
1198
1232
1199
- /* Get ready to sleep on the slot in case it is active */
1200
- ConditionVariablePrepareToSleep (& s -> active_cv );
1233
+ SpinLockRelease (& s -> mutex );
1201
1234
1202
- for (;; )
1235
+ if ( active_pid != 0 )
1203
1236
{
1204
1237
/*
1205
- * Try to mark this slot as used by this process.
1206
- *
1207
- * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should
1208
- * not cancel the prepared condition variable if this slot is
1209
- * active in other process. Because in this case we have to wait
1210
- * on that CV for the process owning the slot to be terminated,
1211
- * later.
1238
+ * Prepare the sleep on the slot's condition variable before
1239
+ * releasing the lock, to close a possible race condition if the
1240
+ * slot is released before the sleep below.
1212
1241
*/
1213
- wspid = ReplicationSlotAcquireInternal ( s , NULL , SAB_Inquire );
1242
+ ConditionVariablePrepareToSleep ( & s -> active_cv );
1214
1243
1215
- /*
1216
- * Exit the loop if we successfully acquired the slot or the slot
1217
- * was dropped during waiting for the owning process to be
1218
- * terminated. For example, the latter case is likely to happen
1219
- * when the slot is temporary because it's automatically dropped
1220
- * by the termination of the owning process.
1221
- */
1222
- if (wspid <= 0 )
1223
- break ;
1244
+ LWLockRelease (ReplicationSlotControlLock );
1245
+ released_lock = true;
1224
1246
1225
1247
/*
1226
- * Signal to terminate the process that owns the slot.
1248
+ * Signal to terminate the process that owns the slot, if we
1249
+ * haven't already signalled it. (Avoidance of repeated
1250
+ * signalling is the only reason for there to be a loop in this
1251
+ * routine; otherwise we could rely on caller's restart loop.)
1227
1252
*
1228
- * There is the race condition where other process may own the
1229
- * slot after the process using it was terminated and before this
1230
- * process owns it. To handle this case, we signal again if the
1231
- * PID of the owning process is changed than the last.
1232
- *
1233
- * XXX This logic assumes that the same PID is not reused very
1234
- * quickly.
1253
+ * There is the race condition that other process may own the slot
1254
+ * after its current owner process is terminated and before this
1255
+ * process owns it. To handle that, we signal only if the PID of
1256
+ * the owning process has changed from the previous time. (This
1257
+ * logic assumes that the same PID is not reused very quickly.)
1235
1258
*/
1236
- if (last_signaled_pid != wspid )
1259
+ if (last_signaled_pid != active_pid )
1237
1260
{
1238
1261
ereport (LOG ,
1239
- (errmsg ("terminating process %d because replication slot \"%s\" is too far behind" ,
1240
- wspid , NameStr (slotname ))));
1241
- (void ) kill (wspid , SIGTERM );
1242
- last_signaled_pid = wspid ;
1262
+ (errmsg ("terminating process %d to release replication slot \"%s\"" ,
1263
+ active_pid , NameStr (slotname ))));
1264
+
1265
+ (void ) kill (active_pid , SIGTERM );
1266
+ last_signaled_pid = active_pid ;
1243
1267
}
1244
1268
1245
- ConditionVariableTimedSleep (& s -> active_cv , 10 ,
1246
- WAIT_EVENT_REPLICATION_SLOT_DROP );
1269
+ /* Wait until the slot is released. */
1270
+ ConditionVariableSleep (& s -> active_cv ,
1271
+ WAIT_EVENT_REPLICATION_SLOT_DROP );
1272
+
1273
+ /*
1274
+ * Re-acquire lock and start over; we expect to invalidate the slot
1275
+ * next time (unless another process acquires the slot in the
1276
+ * meantime).
1277
+ */
1278
+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
1279
+ continue ;
1247
1280
}
1248
- ConditionVariableCancelSleep ();
1281
+ else
1282
+ {
1283
+ /*
1284
+ * We hold the slot now and have already invalidated it; flush it
1285
+ * to ensure that state persists.
1286
+ *
1287
+ * Don't want to hold ReplicationSlotControlLock across file
1288
+ * system operations, so release it now but be sure to tell caller
1289
+ * to restart from scratch.
1290
+ */
1291
+ LWLockRelease (ReplicationSlotControlLock );
1292
+ released_lock = true;
1249
1293
1250
- /*
1251
- * Do nothing here and start from scratch if the slot has already been
1252
- * dropped.
1253
- */
1254
- if (wspid == -1 )
1255
- goto restart ;
1294
+ /* Make sure the invalidated state persists across server restart */
1295
+ ReplicationSlotMarkDirty ();
1296
+ ReplicationSlotSave ();
1297
+ ReplicationSlotRelease ();
1256
1298
1257
- ereport (LOG ,
1258
- (errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
1259
- NameStr (slotname ),
1260
- LSN_FORMAT_ARGS (restart_lsn ))));
1299
+ ereport (LOG ,
1300
+ (errmsg ("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size" ,
1301
+ NameStr (slotname ),
1302
+ LSN_FORMAT_ARGS (restart_lsn ))));
1261
1303
1262
- SpinLockAcquire ( & s -> mutex );
1263
- s -> data . invalidated_at = s -> data . restart_lsn ;
1264
- s -> data . restart_lsn = InvalidXLogRecPtr ;
1265
- SpinLockRelease ( & s -> mutex );
1304
+ /* done with this slot for now */
1305
+ break ;
1306
+ }
1307
+ }
1266
1308
1267
- /* Make sure the invalidated state persists across server restart */
1268
- ReplicationSlotMarkDirty ();
1269
- ReplicationSlotSave ();
1270
- ReplicationSlotRelease ();
1309
+ Assert (released_lock == !LWLockHeldByMe (ReplicationSlotControlLock ));
1271
1310
1272
- /* if we did anything, start from scratch */
1273
- goto restart ;
1311
+ return released_lock ;
1312
+ }
1313
+
1314
+ /*
1315
+ * Mark any slot that points to an LSN older than the given segment
1316
+ * as invalid; it requires WAL that's about to be removed.
1317
+ *
1318
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1319
+ */
1320
+ void
1321
+ InvalidateObsoleteReplicationSlots (XLogSegNo oldestSegno )
1322
+ {
1323
+ XLogRecPtr oldestLSN ;
1324
+
1325
+ XLogSegNoOffsetToRecPtr (oldestSegno , 0 , wal_segment_size , oldestLSN );
1326
+
1327
+ restart :
1328
+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
1329
+ for (int i = 0 ; i < max_replication_slots ; i ++ )
1330
+ {
1331
+ ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1332
+
1333
+ if (!s -> in_use )
1334
+ continue ;
1335
+
1336
+ if (InvalidatePossiblyObsoleteSlot (s , oldestLSN ))
1337
+ {
1338
+ /* if the lock was released, start from scratch */
1339
+ goto restart ;
1340
+ }
1274
1341
}
1275
1342
LWLockRelease (ReplicationSlotControlLock );
1276
1343
}
0 commit comments