8
8
*
9
9
*
10
10
* IDENTIFICATION
11
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.270 2008/11/19 10:34:50 heikki Exp $
11
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.271 2008/12/03 13:05:22 heikki Exp $
12
12
*
13
13
*
14
14
* INTERFACE ROUTINES
47
47
#include "access/transam.h"
48
48
#include "access/tuptoaster.h"
49
49
#include "access/valid.h"
50
+ #include "access/visibilitymap.h"
50
51
#include "access/xact.h"
51
52
#include "access/xlogutils.h"
52
53
#include "catalog/catalog.h"
@@ -195,6 +196,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
195
196
int ntup ;
196
197
OffsetNumber lineoff ;
197
198
ItemId lpp ;
199
+ bool all_visible ;
198
200
199
201
Assert (page < scan -> rs_nblocks );
200
202
@@ -233,20 +235,32 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
233
235
lines = PageGetMaxOffsetNumber (dp );
234
236
ntup = 0 ;
235
237
238
+ /*
239
+ * If the all-visible flag indicates that all tuples on the page are
240
+ * visible to everyone, we can skip the per-tuple visibility tests.
241
+ */
242
+ all_visible = PageIsAllVisible (dp );
243
+
236
244
for (lineoff = FirstOffsetNumber , lpp = PageGetItemId (dp , lineoff );
237
245
lineoff <= lines ;
238
246
lineoff ++ , lpp ++ )
239
247
{
240
248
if (ItemIdIsNormal (lpp ))
241
249
{
242
- HeapTupleData loctup ;
243
250
bool valid ;
244
251
245
- loctup .t_data = (HeapTupleHeader ) PageGetItem ((Page ) dp , lpp );
246
- loctup .t_len = ItemIdGetLength (lpp );
247
- ItemPointerSet (& (loctup .t_self ), page , lineoff );
252
+ if (all_visible )
253
+ valid = true;
254
+ else
255
+ {
256
+ HeapTupleData loctup ;
257
+
258
+ loctup .t_data = (HeapTupleHeader ) PageGetItem ((Page ) dp , lpp );
259
+ loctup .t_len = ItemIdGetLength (lpp );
260
+ ItemPointerSet (& (loctup .t_self ), page , lineoff );
248
261
249
- valid = HeapTupleSatisfiesVisibility (& loctup , snapshot , buffer );
262
+ valid = HeapTupleSatisfiesVisibility (& loctup , snapshot , buffer );
263
+ }
250
264
if (valid )
251
265
scan -> rs_vistuples [ntup ++ ] = lineoff ;
252
266
}
@@ -1860,6 +1874,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
1860
1874
TransactionId xid = GetCurrentTransactionId ();
1861
1875
HeapTuple heaptup ;
1862
1876
Buffer buffer ;
1877
+ bool all_visible_cleared = false;
1863
1878
1864
1879
if (relation -> rd_rel -> relhasoids )
1865
1880
{
@@ -1920,6 +1935,12 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
1920
1935
1921
1936
RelationPutHeapTuple (relation , buffer , heaptup );
1922
1937
1938
+ if (PageIsAllVisible (BufferGetPage (buffer )))
1939
+ {
1940
+ all_visible_cleared = true;
1941
+ PageClearAllVisible (BufferGetPage (buffer ));
1942
+ }
1943
+
1923
1944
/*
1924
1945
* XXX Should we set PageSetPrunable on this page ?
1925
1946
*
@@ -1943,6 +1964,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
1943
1964
Page page = BufferGetPage (buffer );
1944
1965
uint8 info = XLOG_HEAP_INSERT ;
1945
1966
1967
+ xlrec .all_visible_cleared = all_visible_cleared ;
1946
1968
xlrec .target .node = relation -> rd_node ;
1947
1969
xlrec .target .tid = heaptup -> t_self ;
1948
1970
rdata [0 ].data = (char * ) & xlrec ;
@@ -1994,6 +2016,11 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
1994
2016
1995
2017
UnlockReleaseBuffer (buffer );
1996
2018
2019
+ /* Clear the bit in the visibility map if necessary */
2020
+ if (all_visible_cleared )
2021
+ visibilitymap_clear (relation ,
2022
+ ItemPointerGetBlockNumber (& (heaptup -> t_self )));
2023
+
1997
2024
/*
1998
2025
* If tuple is cachable, mark it for invalidation from the caches in case
1999
2026
* we abort. Note it is OK to do this after releasing the buffer, because
@@ -2070,6 +2097,7 @@ heap_delete(Relation relation, ItemPointer tid,
2070
2097
Buffer buffer ;
2071
2098
bool have_tuple_lock = false;
2072
2099
bool iscombo ;
2100
+ bool all_visible_cleared = false;
2073
2101
2074
2102
Assert (ItemPointerIsValid (tid ));
2075
2103
@@ -2216,6 +2244,12 @@ heap_delete(Relation relation, ItemPointer tid,
2216
2244
*/
2217
2245
PageSetPrunable (page , xid );
2218
2246
2247
+ if (PageIsAllVisible (page ))
2248
+ {
2249
+ all_visible_cleared = true;
2250
+ PageClearAllVisible (page );
2251
+ }
2252
+
2219
2253
/* store transaction information of xact deleting the tuple */
2220
2254
tp .t_data -> t_infomask &= ~(HEAP_XMAX_COMMITTED |
2221
2255
HEAP_XMAX_INVALID |
@@ -2237,6 +2271,7 @@ heap_delete(Relation relation, ItemPointer tid,
2237
2271
XLogRecPtr recptr ;
2238
2272
XLogRecData rdata [2 ];
2239
2273
2274
+ xlrec .all_visible_cleared = all_visible_cleared ;
2240
2275
xlrec .target .node = relation -> rd_node ;
2241
2276
xlrec .target .tid = tp .t_self ;
2242
2277
rdata [0 ].data = (char * ) & xlrec ;
@@ -2281,6 +2316,10 @@ heap_delete(Relation relation, ItemPointer tid,
2281
2316
*/
2282
2317
CacheInvalidateHeapTuple (relation , & tp );
2283
2318
2319
+ /* Clear the bit in the visibility map if necessary */
2320
+ if (all_visible_cleared )
2321
+ visibilitymap_clear (relation , BufferGetBlockNumber (buffer ));
2322
+
2284
2323
/* Now we can release the buffer */
2285
2324
ReleaseBuffer (buffer );
2286
2325
@@ -2388,6 +2427,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
2388
2427
bool have_tuple_lock = false;
2389
2428
bool iscombo ;
2390
2429
bool use_hot_update = false;
2430
+ bool all_visible_cleared = false;
2431
+ bool all_visible_cleared_new = false;
2391
2432
2392
2433
Assert (ItemPointerIsValid (otid ));
2393
2434
@@ -2763,6 +2804,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
2763
2804
MarkBufferDirty (newbuf );
2764
2805
MarkBufferDirty (buffer );
2765
2806
2807
+ /*
2808
+ * Note: we mustn't clear PD_ALL_VISIBLE flags before writing the WAL
2809
+ * record, because log_heap_update looks at those flags to set the
2810
+ * corresponding flags in the WAL record.
2811
+ */
2812
+
2766
2813
/* XLOG stuff */
2767
2814
if (!relation -> rd_istemp )
2768
2815
{
@@ -2778,6 +2825,18 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
2778
2825
PageSetTLI (BufferGetPage (buffer ), ThisTimeLineID );
2779
2826
}
2780
2827
2828
+ /* Clear PD_ALL_VISIBLE flags */
2829
+ if (PageIsAllVisible (BufferGetPage (buffer )))
2830
+ {
2831
+ all_visible_cleared = true;
2832
+ PageClearAllVisible (BufferGetPage (buffer ));
2833
+ }
2834
+ if (newbuf != buffer && PageIsAllVisible (BufferGetPage (newbuf )))
2835
+ {
2836
+ all_visible_cleared_new = true;
2837
+ PageClearAllVisible (BufferGetPage (newbuf ));
2838
+ }
2839
+
2781
2840
END_CRIT_SECTION ();
2782
2841
2783
2842
if (newbuf != buffer )
@@ -2791,6 +2850,12 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
2791
2850
*/
2792
2851
CacheInvalidateHeapTuple (relation , & oldtup );
2793
2852
2853
+ /* Clear bits in visibility map */
2854
+ if (all_visible_cleared )
2855
+ visibilitymap_clear (relation , BufferGetBlockNumber (buffer ));
2856
+ if (all_visible_cleared_new )
2857
+ visibilitymap_clear (relation , BufferGetBlockNumber (newbuf ));
2858
+
2794
2859
/* Now we can release the buffer(s) */
2795
2860
if (newbuf != buffer )
2796
2861
ReleaseBuffer (newbuf );
@@ -3411,6 +3476,11 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
3411
3476
3412
3477
LockBuffer (* buffer , BUFFER_LOCK_UNLOCK );
3413
3478
3479
+ /*
3480
+ * Don't update the visibility map here. Locking a tuple doesn't
3481
+ * change visibility info.
3482
+ */
3483
+
3414
3484
/*
3415
3485
* Now that we have successfully marked the tuple as locked, we can
3416
3486
* release the lmgr tuple lock, if we had it.
@@ -3916,7 +3986,9 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
3916
3986
3917
3987
xlrec .target .node = reln -> rd_node ;
3918
3988
xlrec .target .tid = from ;
3989
+ xlrec .all_visible_cleared = PageIsAllVisible (BufferGetPage (oldbuf ));
3919
3990
xlrec .newtid = newtup -> t_self ;
3991
+ xlrec .new_all_visible_cleared = PageIsAllVisible (BufferGetPage (newbuf ));
3920
3992
3921
3993
rdata [0 ].data = (char * ) & xlrec ;
3922
3994
rdata [0 ].len = SizeOfHeapUpdate ;
@@ -4185,13 +4257,25 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
4185
4257
OffsetNumber offnum ;
4186
4258
ItemId lp = NULL ;
4187
4259
HeapTupleHeader htup ;
4260
+ BlockNumber blkno ;
4261
+
4262
+ blkno = ItemPointerGetBlockNumber (& (xlrec -> target .tid ));
4263
+
4264
+ /*
4265
+ * The visibility map always needs to be updated, even if the heap page
4266
+ * is already up-to-date.
4267
+ */
4268
+ if (xlrec -> all_visible_cleared )
4269
+ {
4270
+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4271
+ visibilitymap_clear (reln , blkno );
4272
+ FreeFakeRelcacheEntry (reln );
4273
+ }
4188
4274
4189
4275
if (record -> xl_info & XLR_BKP_BLOCK_1 )
4190
4276
return ;
4191
4277
4192
- buffer = XLogReadBuffer (xlrec -> target .node ,
4193
- ItemPointerGetBlockNumber (& (xlrec -> target .tid )),
4194
- false);
4278
+ buffer = XLogReadBuffer (xlrec -> target .node , blkno , false);
4195
4279
if (!BufferIsValid (buffer ))
4196
4280
return ;
4197
4281
page = (Page ) BufferGetPage (buffer );
@@ -4223,6 +4307,9 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
4223
4307
/* Mark the page as a candidate for pruning */
4224
4308
PageSetPrunable (page , record -> xl_xid );
4225
4309
4310
+ if (xlrec -> all_visible_cleared )
4311
+ PageClearAllVisible (page );
4312
+
4226
4313
/* Make sure there is no forward chain link in t_ctid */
4227
4314
htup -> t_ctid = xlrec -> target .tid ;
4228
4315
PageSetLSN (page , lsn );
@@ -4249,11 +4336,22 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
4249
4336
Size freespace ;
4250
4337
BlockNumber blkno ;
4251
4338
4339
+ blkno = ItemPointerGetBlockNumber (& (xlrec -> target .tid ));
4340
+
4341
+ /*
4342
+ * The visibility map always needs to be updated, even if the heap page
4343
+ * is already up-to-date.
4344
+ */
4345
+ if (xlrec -> all_visible_cleared )
4346
+ {
4347
+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4348
+ visibilitymap_clear (reln , blkno );
4349
+ FreeFakeRelcacheEntry (reln );
4350
+ }
4351
+
4252
4352
if (record -> xl_info & XLR_BKP_BLOCK_1 )
4253
4353
return ;
4254
4354
4255
- blkno = ItemPointerGetBlockNumber (& (xlrec -> target .tid ));
4256
-
4257
4355
if (record -> xl_info & XLOG_HEAP_INIT_PAGE )
4258
4356
{
4259
4357
buffer = XLogReadBuffer (xlrec -> target .node , blkno , true);
@@ -4307,6 +4405,10 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
4307
4405
4308
4406
PageSetLSN (page , lsn );
4309
4407
PageSetTLI (page , ThisTimeLineID );
4408
+
4409
+ if (xlrec -> all_visible_cleared )
4410
+ PageClearAllVisible (page );
4411
+
4310
4412
MarkBufferDirty (buffer );
4311
4413
UnlockReleaseBuffer (buffer );
4312
4414
@@ -4347,6 +4449,18 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
4347
4449
uint32 newlen ;
4348
4450
Size freespace ;
4349
4451
4452
+ /*
4453
+ * The visibility map always needs to be updated, even if the heap page
4454
+ * is already up-to-date.
4455
+ */
4456
+ if (xlrec -> all_visible_cleared )
4457
+ {
4458
+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4459
+ visibilitymap_clear (reln ,
4460
+ ItemPointerGetBlockNumber (& xlrec -> target .tid ));
4461
+ FreeFakeRelcacheEntry (reln );
4462
+ }
4463
+
4350
4464
if (record -> xl_info & XLR_BKP_BLOCK_1 )
4351
4465
{
4352
4466
if (samepage )
@@ -4411,6 +4525,9 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
4411
4525
/* Mark the page as a candidate for pruning */
4412
4526
PageSetPrunable (page , record -> xl_xid );
4413
4527
4528
+ if (xlrec -> all_visible_cleared )
4529
+ PageClearAllVisible (page );
4530
+
4414
4531
/*
4415
4532
* this test is ugly, but necessary to avoid thinking that insert change
4416
4533
* is already applied
@@ -4426,6 +4543,17 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
4426
4543
4427
4544
newt :;
4428
4545
4546
+ /*
4547
+ * The visibility map always needs to be updated, even if the heap page
4548
+ * is already up-to-date.
4549
+ */
4550
+ if (xlrec -> new_all_visible_cleared )
4551
+ {
4552
+ Relation reln = CreateFakeRelcacheEntry (xlrec -> target .node );
4553
+ visibilitymap_clear (reln , ItemPointerGetBlockNumber (& xlrec -> newtid ));
4554
+ FreeFakeRelcacheEntry (reln );
4555
+ }
4556
+
4429
4557
if (record -> xl_info & XLR_BKP_BLOCK_2 )
4430
4558
return ;
4431
4559
@@ -4504,6 +4632,9 @@ newsame:;
4504
4632
if (offnum == InvalidOffsetNumber )
4505
4633
elog (PANIC , "heap_update_redo: failed to add tuple" );
4506
4634
4635
+ if (xlrec -> new_all_visible_cleared )
4636
+ PageClearAllVisible (page );
4637
+
4507
4638
freespace = PageGetHeapFreeSpace (page ); /* needed to update FSM below */
4508
4639
4509
4640
PageSetLSN (page , lsn );
0 commit comments