@@ -127,6 +127,11 @@ typedef struct
127
127
128
128
int num_requests ; /* current # of requests */
129
129
int max_requests ; /* allocated array size */
130
+
131
+ int head ; /* Index of the first request in the ring
132
+ * buffer */
133
+ int tail ; /* Index of the last request in the ring
134
+ * buffer */
130
135
CheckpointerRequest requests [FLEXIBLE_ARRAY_MEMBER ];
131
136
} CheckpointerShmemStruct ;
132
137
@@ -135,6 +140,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
135
140
/* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
136
141
#define WRITES_PER_ABSORB 1000
137
142
143
+ /* Maximum number of checkpointer requests to process in one batch */
144
+ #define CKPT_REQ_BATCH_SIZE 10000
145
+
146
+ /* Max number of requests the checkpointer request queue can hold */
147
+ #define MAX_CHECKPOINT_REQUESTS 10000000
148
+
138
149
/*
139
150
* GUC parameters
140
151
*/
@@ -970,7 +981,8 @@ CheckpointerShmemInit(void)
970
981
*/
971
982
MemSet (CheckpointerShmem , 0 , size );
972
983
SpinLockInit (& CheckpointerShmem -> ckpt_lck );
973
- CheckpointerShmem -> max_requests = NBuffers ;
984
+ CheckpointerShmem -> max_requests = Min (NBuffers , MAX_CHECKPOINT_REQUESTS );
985
+ CheckpointerShmem -> head = CheckpointerShmem -> tail = 0 ;
974
986
ConditionVariableInit (& CheckpointerShmem -> start_cv );
975
987
ConditionVariableInit (& CheckpointerShmem -> done_cv );
976
988
}
@@ -1148,6 +1160,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
1148
1160
{
1149
1161
CheckpointerRequest * request ;
1150
1162
bool too_full ;
1163
+ int insert_pos ;
1151
1164
1152
1165
if (!IsUnderPostmaster )
1153
1166
return false; /* probably shouldn't even get here */
@@ -1171,10 +1184,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
1171
1184
}
1172
1185
1173
1186
/* OK, insert request */
1174
- request = & CheckpointerShmem -> requests [CheckpointerShmem -> num_requests ++ ];
1187
+ insert_pos = CheckpointerShmem -> tail ;
1188
+ request = & CheckpointerShmem -> requests [insert_pos ];
1175
1189
request -> ftag = * ftag ;
1176
1190
request -> type = type ;
1177
1191
1192
+ CheckpointerShmem -> tail = (CheckpointerShmem -> tail + 1 ) % CheckpointerShmem -> max_requests ;
1193
+ CheckpointerShmem -> num_requests ++ ;
1194
+
1178
1195
/* If queue is more than half full, nudge the checkpointer to empty it */
1179
1196
too_full = (CheckpointerShmem -> num_requests >=
1180
1197
CheckpointerShmem -> max_requests / 2 );
@@ -1216,12 +1233,16 @@ CompactCheckpointerRequestQueue(void)
1216
1233
struct CheckpointerSlotMapping
1217
1234
{
1218
1235
CheckpointerRequest request ;
1219
- int slot ;
1236
+ int ring_idx ;
1220
1237
};
1221
1238
1222
- int n ,
1223
- preserve_count ;
1239
+ int n ;
1224
1240
int num_skipped = 0 ;
1241
+ int head ;
1242
+ int max_requests ;
1243
+ int num_requests ;
1244
+ int read_idx ,
1245
+ write_idx ;
1225
1246
HASHCTL ctl ;
1226
1247
HTAB * htab ;
1227
1248
bool * skip_slot ;
@@ -1233,8 +1254,13 @@ CompactCheckpointerRequestQueue(void)
1233
1254
if (CritSectionCount > 0 )
1234
1255
return false;
1235
1256
1257
+ max_requests = CheckpointerShmem -> max_requests ;
1258
+ num_requests = CheckpointerShmem -> num_requests ;
1259
+
1236
1260
/* Initialize skip_slot array */
1237
- skip_slot = palloc0 (sizeof (bool ) * CheckpointerShmem -> num_requests );
1261
+ skip_slot = palloc0 (sizeof (bool ) * max_requests );
1262
+
1263
+ head = CheckpointerShmem -> head ;
1238
1264
1239
1265
/* Initialize temporary hash table */
1240
1266
ctl .keysize = sizeof (CheckpointerRequest );
@@ -1258,7 +1284,8 @@ CompactCheckpointerRequestQueue(void)
1258
1284
* away preceding entries that would end up being canceled anyhow), but
1259
1285
* it's not clear that the extra complexity would buy us anything.
1260
1286
*/
1261
- for (n = 0 ; n < CheckpointerShmem -> num_requests ; n ++ )
1287
+ read_idx = head ;
1288
+ for (n = 0 ; n < num_requests ; n ++ )
1262
1289
{
1263
1290
CheckpointerRequest * request ;
1264
1291
struct CheckpointerSlotMapping * slotmap ;
@@ -1271,16 +1298,19 @@ CompactCheckpointerRequestQueue(void)
1271
1298
* CheckpointerShmemInit. Note also that RelFileLocator had better
1272
1299
* contain no pad bytes.
1273
1300
*/
1274
- request = & CheckpointerShmem -> requests [n ];
1301
+ request = & CheckpointerShmem -> requests [read_idx ];
1275
1302
slotmap = hash_search (htab , request , HASH_ENTER , & found );
1276
1303
if (found )
1277
1304
{
1278
1305
/* Duplicate, so mark the previous occurrence as skippable */
1279
- skip_slot [slotmap -> slot ] = true;
1306
+ skip_slot [slotmap -> ring_idx ] = true;
1280
1307
num_skipped ++ ;
1281
1308
}
1282
1309
/* Remember slot containing latest occurrence of this request value */
1283
- slotmap -> slot = n ;
1310
+ slotmap -> ring_idx = read_idx ;
1311
+
1312
+ /* Move to the next request in the ring buffer */
1313
+ read_idx = (read_idx + 1 ) % max_requests ;
1284
1314
}
1285
1315
1286
1316
/* Done with the hash table. */
@@ -1294,17 +1324,34 @@ CompactCheckpointerRequestQueue(void)
1294
1324
}
1295
1325
1296
1326
/* We found some duplicates; remove them. */
1297
- preserve_count = 0 ;
1298
- for (n = 0 ; n < CheckpointerShmem -> num_requests ; n ++ )
1327
+ read_idx = write_idx = head ;
1328
+ for (n = 0 ; n < num_requests ; n ++ )
1299
1329
{
1300
- if (skip_slot [n ])
1301
- continue ;
1302
- CheckpointerShmem -> requests [preserve_count ++ ] = CheckpointerShmem -> requests [n ];
1330
+ /* If this slot is NOT skipped, keep it */
1331
+ if (!skip_slot [read_idx ])
1332
+ {
1333
+ /* If the read and write positions are different, copy the request */
1334
+ if (write_idx != read_idx )
1335
+ CheckpointerShmem -> requests [write_idx ] =
1336
+ CheckpointerShmem -> requests [read_idx ];
1337
+
1338
+ /* Advance the write position */
1339
+ write_idx = (write_idx + 1 ) % max_requests ;
1340
+ }
1341
+
1342
+ read_idx = (read_idx + 1 ) % max_requests ;
1303
1343
}
1344
+
1345
+ /*
1346
+ * Update ring buffer state: head remains the same, tail moves, count
1347
+ * decreases
1348
+ */
1349
+ CheckpointerShmem -> tail = write_idx ;
1350
+ CheckpointerShmem -> num_requests -= num_skipped ;
1351
+
1304
1352
ereport (DEBUG1 ,
1305
1353
(errmsg_internal ("compacted fsync request queue from %d entries to %d entries" ,
1306
- CheckpointerShmem -> num_requests , preserve_count )));
1307
- CheckpointerShmem -> num_requests = preserve_count ;
1354
+ num_requests , CheckpointerShmem -> num_requests )));
1308
1355
1309
1356
/* Cleanup. */
1310
1357
pfree (skip_slot );
@@ -1325,40 +1372,61 @@ AbsorbSyncRequests(void)
1325
1372
{
1326
1373
CheckpointerRequest * requests = NULL ;
1327
1374
CheckpointerRequest * request ;
1328
- int n ;
1375
+ int n ,
1376
+ i ;
1377
+ bool loop ;
1329
1378
1330
1379
if (!AmCheckpointerProcess ())
1331
1380
return ;
1332
1381
1333
- LWLockAcquire (CheckpointerCommLock , LW_EXCLUSIVE );
1334
-
1335
- /*
1336
- * We try to avoid holding the lock for a long time by copying the request
1337
- * array, and processing the requests after releasing the lock.
1338
- *
1339
- * Once we have cleared the requests from shared memory, we have to PANIC
1340
- * if we then fail to absorb them (eg, because our hashtable runs out of
1341
- * memory). This is because the system cannot run safely if we are unable
1342
- * to fsync what we have been told to fsync. Fortunately, the hashtable
1343
- * is so small that the problem is quite unlikely to arise in practice.
1344
- */
1345
- n = CheckpointerShmem -> num_requests ;
1346
- if (n > 0 )
1382
+ do
1347
1383
{
1348
- requests = (CheckpointerRequest * ) palloc (n * sizeof (CheckpointerRequest ));
1349
- memcpy (requests , CheckpointerShmem -> requests , n * sizeof (CheckpointerRequest ));
1350
- }
1384
+ LWLockAcquire (CheckpointerCommLock , LW_EXCLUSIVE );
1385
+
1386
+ /*
1387
+ * We try to avoid holding the lock for a long time by copying the
1388
+ * request array, and processing the requests after releasing the
1389
+ * lock.
1390
+ *
1391
+ * Once we have cleared the requests from shared memory, we have to
1392
+ * PANIC if we then fail to absorb them (eg, because our hashtable
1393
+ * runs out of memory). This is because the system cannot run safely
1394
+ * if we are unable to fsync what we have been told to fsync.
1395
+ * Fortunately, the hashtable is so small that the problem is quite
1396
+ * unlikely to arise in practice.
1397
+ *
1398
+ * Note: we could not palloc more than 1Gb of memory, thus make sure
1399
+ * that the maximum number of elements will fit in the requests
1400
+ * buffer.
1401
+ */
1402
+ n = Min (CheckpointerShmem -> num_requests , CKPT_REQ_BATCH_SIZE );
1403
+ if (n > 0 )
1404
+ {
1405
+ if (!requests )
1406
+ requests = (CheckpointerRequest * ) palloc (n * sizeof (CheckpointerRequest ));
1351
1407
1352
- START_CRIT_SECTION ();
1408
+ for (i = 0 ; i < n ; i ++ )
1409
+ {
1410
+ requests [i ] = CheckpointerShmem -> requests [CheckpointerShmem -> head ];
1411
+ CheckpointerShmem -> head = (CheckpointerShmem -> head + 1 ) % CheckpointerShmem -> max_requests ;
1412
+ }
1353
1413
1354
- CheckpointerShmem -> num_requests = 0 ;
1414
+ CheckpointerShmem -> num_requests -= n ;
1355
1415
1356
- LWLockRelease (CheckpointerCommLock );
1416
+ }
1417
+
1418
+ START_CRIT_SECTION ();
1419
+
1420
+ /* Are there any requests in the queue? If so, keep going. */
1421
+ loop = CheckpointerShmem -> num_requests != 0 ;
1422
+
1423
+ LWLockRelease (CheckpointerCommLock );
1357
1424
1358
- for (request = requests ; n > 0 ; request ++ , n -- )
1359
- RememberSyncRequest (& request -> ftag , request -> type );
1425
+ for (request = requests ; n > 0 ; request ++ , n -- )
1426
+ RememberSyncRequest (& request -> ftag , request -> type );
1360
1427
1361
- END_CRIT_SECTION ();
1428
+ END_CRIT_SECTION ();
1429
+ } while (loop );
1362
1430
1363
1431
if (requests )
1364
1432
pfree (requests );
0 commit comments