@@ -648,6 +648,8 @@ static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
648
648
649
649
static bool XLogCheckBuffer (XLogRecData * rdata , bool doPageWrites ,
650
650
XLogRecPtr * lsn , BkpBlock * bkpb );
651
+ static Buffer RestoreBackupBlockContents (XLogRecPtr lsn , BkpBlock bkpb ,
652
+ char * blk , bool get_cleanup_lock , bool keep_buffer );
651
653
static bool AdvanceXLInsertBuffer (bool new_segment );
652
654
static bool XLogCheckpointNeeded (XLogSegNo new_segno );
653
655
static void XLogWrite (XLogwrtRqst WriteRqst , bool flexible , bool xlog_switch );
@@ -731,7 +733,6 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
731
733
bool updrqst ;
732
734
bool doPageWrites ;
733
735
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH );
734
- bool isHint = (rmid == RM_XLOG_ID && info == XLOG_HINT );
735
736
uint8 info_orig = info ;
736
737
static XLogRecord * rechdr ;
737
738
@@ -1001,18 +1002,6 @@ begin:;
1001
1002
goto begin ;
1002
1003
}
1003
1004
1004
- /*
1005
- * If this is a hint record and we don't need a backup block then
1006
- * we have no more work to do and can exit quickly without inserting
1007
- * a WAL record at all. In that case return InvalidXLogRecPtr.
1008
- */
1009
- if (isHint && !(info & XLR_BKP_BLOCK_MASK ))
1010
- {
1011
- LWLockRelease (WALInsertLock );
1012
- END_CRIT_SECTION ();
1013
- return InvalidXLogRecPtr ;
1014
- }
1015
-
1016
1005
/*
1017
1006
* If the current page is completely full, the record goes to the next
1018
1007
* page, right after the page header.
@@ -3158,8 +3147,6 @@ Buffer
3158
3147
RestoreBackupBlock (XLogRecPtr lsn , XLogRecord * record , int block_index ,
3159
3148
bool get_cleanup_lock , bool keep_buffer )
3160
3149
{
3161
- Buffer buffer ;
3162
- Page page ;
3163
3150
BkpBlock bkpb ;
3164
3151
char * blk ;
3165
3152
int i ;
@@ -3177,42 +3164,8 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
3177
3164
if (i == block_index )
3178
3165
{
3179
3166
/* Found it, apply the update */
3180
- buffer = XLogReadBufferExtended (bkpb .node , bkpb .fork , bkpb .block ,
3181
- RBM_ZERO );
3182
- Assert (BufferIsValid (buffer ));
3183
- if (get_cleanup_lock )
3184
- LockBufferForCleanup (buffer );
3185
- else
3186
- LockBuffer (buffer , BUFFER_LOCK_EXCLUSIVE );
3187
-
3188
- page = (Page ) BufferGetPage (buffer );
3189
-
3190
- if (bkpb .hole_length == 0 )
3191
- {
3192
- memcpy ((char * ) page , blk , BLCKSZ );
3193
- }
3194
- else
3195
- {
3196
- memcpy ((char * ) page , blk , bkpb .hole_offset );
3197
- /* must zero-fill the hole */
3198
- MemSet ((char * ) page + bkpb .hole_offset , 0 , bkpb .hole_length );
3199
- memcpy ((char * ) page + (bkpb .hole_offset + bkpb .hole_length ),
3200
- blk + bkpb .hole_offset ,
3201
- BLCKSZ - (bkpb .hole_offset + bkpb .hole_length ));
3202
- }
3203
-
3204
- /*
3205
- * Any checksum set on this page will be invalid. We don't need
3206
- * to reset it here since it will be set before being written.
3207
- */
3208
-
3209
- PageSetLSN (page , lsn );
3210
- MarkBufferDirty (buffer );
3211
-
3212
- if (!keep_buffer )
3213
- UnlockReleaseBuffer (buffer );
3214
-
3215
- return buffer ;
3167
+ return RestoreBackupBlockContents (lsn , bkpb , blk , get_cleanup_lock ,
3168
+ keep_buffer );
3216
3169
}
3217
3170
3218
3171
blk += BLCKSZ - bkpb .hole_length ;
@@ -3223,6 +3176,56 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
3223
3176
return InvalidBuffer ; /* keep compiler quiet */
3224
3177
}
3225
3178
3179
+ /*
3180
+ * Workhorse for RestoreBackupBlock usable without an xlog record
3181
+ *
3182
+ * Restores a full-page image from BkpBlock and a data pointer.
3183
+ */
3184
+ static Buffer
3185
+ RestoreBackupBlockContents (XLogRecPtr lsn , BkpBlock bkpb , char * blk ,
3186
+ bool get_cleanup_lock , bool keep_buffer )
3187
+ {
3188
+ Buffer buffer ;
3189
+ Page page ;
3190
+
3191
+ buffer = XLogReadBufferExtended (bkpb .node , bkpb .fork , bkpb .block ,
3192
+ RBM_ZERO );
3193
+ Assert (BufferIsValid (buffer ));
3194
+ if (get_cleanup_lock )
3195
+ LockBufferForCleanup (buffer );
3196
+ else
3197
+ LockBuffer (buffer , BUFFER_LOCK_EXCLUSIVE );
3198
+
3199
+ page = (Page ) BufferGetPage (buffer );
3200
+
3201
+ if (bkpb .hole_length == 0 )
3202
+ {
3203
+ memcpy ((char * ) page , blk , BLCKSZ );
3204
+ }
3205
+ else
3206
+ {
3207
+ memcpy ((char * ) page , blk , bkpb .hole_offset );
3208
+ /* must zero-fill the hole */
3209
+ MemSet ((char * ) page + bkpb .hole_offset , 0 , bkpb .hole_length );
3210
+ memcpy ((char * ) page + (bkpb .hole_offset + bkpb .hole_length ),
3211
+ blk + bkpb .hole_offset ,
3212
+ BLCKSZ - (bkpb .hole_offset + bkpb .hole_length ));
3213
+ }
3214
+
3215
+ /*
3216
+ * The checksum value on this page is currently invalid. We don't
3217
+ * need to reset it here since it will be set before being written.
3218
+ */
3219
+
3220
+ PageSetLSN (page , lsn );
3221
+ MarkBufferDirty (buffer );
3222
+
3223
+ if (!keep_buffer )
3224
+ UnlockReleaseBuffer (buffer );
3225
+
3226
+ return buffer ;
3227
+ }
3228
+
3226
3229
/*
3227
3230
* Attempt to read an XLOG record.
3228
3231
*
@@ -7635,45 +7638,86 @@ XLogRestorePoint(const char *rpName)
7635
7638
* Write a backup block if needed when we are setting a hint. Note that
7636
7639
* this may be called for a variety of page types, not just heaps.
7637
7640
*
7638
- * Deciding the "if needed" part is delicate and requires us to either
7639
- * grab WALInsertLock or check the info_lck spinlock. If we check the
7640
- * spinlock and it says Yes then we will need to get WALInsertLock as well,
7641
- * so the design choice here is to just go straight for the WALInsertLock
7642
- * and trust that calls to this function are minimised elsewhere.
7643
- *
7644
7641
* Callable while holding just share lock on the buffer content.
7645
7642
*
7646
- * Possible that multiple concurrent backends could attempt to write
7647
- * WAL records. In that case, more than one backup block may be recorded
7648
- * though that isn't important to the outcome and the backup blocks are
7649
- * likely to be identical anyway.
7643
+ * We can't use the plain backup block mechanism since that relies on the
7644
+ * Buffer being exclusively locked. Since some modifications (setting LSN, hint
7645
+ * bits) are allowed in a sharelocked buffer that can lead to wal checksum
7646
+ * failures. So instead we copy the page and insert the copied data as normal
7647
+ * record data.
7648
+ *
7649
+ * We only need to do something if page has not yet been full page written in
7650
+ * this checkpoint round. The LSN of the inserted wal record is returned if we
7651
+ * had to write, InvalidXLogRecPtr otherwise.
7652
+ *
7653
+ * It is possible that multiple concurrent backends could attempt to write WAL
7654
+ * records. In that case, multiple copies of the same block would be recorded
7655
+ * in separate WAL records by different backends, though that is still OK from
7656
+ * a correctness perspective.
7657
+ *
7658
+ * Note that this only works for buffers that fit the standard page model,
7659
+ * i.e. those for which buffer_std == true
7650
7660
*/
7651
- #define XLOG_HINT_WATERMARK 13579
7652
7661
XLogRecPtr
7653
7662
XLogSaveBufferForHint (Buffer buffer )
7654
7663
{
7664
+ XLogRecPtr recptr = InvalidXLogRecPtr ;
7665
+ XLogRecPtr lsn ;
7666
+ XLogRecData rdata [2 ];
7667
+ BkpBlock bkpb ;
7668
+
7655
7669
/*
7656
- * Make an XLOG entry reporting the hint
7670
+ * Ensure no checkpoint can change our view of RedoRecPtr.
7657
7671
*/
7658
- XLogRecData rdata [2 ];
7659
- int watermark = XLOG_HINT_WATERMARK ;
7672
+ Assert (MyPgXact -> delayChkpt );
7673
+
7674
+ /*
7675
+ * Update RedoRecPtr so XLogCheckBuffer can make the right decision
7676
+ */
7677
+ GetRedoRecPtr ();
7660
7678
7661
7679
/*
7662
- * Not allowed to have zero-length records, so use a small watermark
7680
+ * Setup phony rdata element for use within XLogCheckBuffer only.
7681
+ * We reuse and reset rdata for any actual WAL record insert.
7663
7682
*/
7664
- rdata [0 ].data = (char * ) (& watermark );
7665
- rdata [0 ].len = sizeof (int );
7666
- rdata [0 ].buffer = InvalidBuffer ;
7667
- rdata [0 ].buffer_std = false;
7668
- rdata [0 ].next = & (rdata [1 ]);
7683
+ rdata [0 ].buffer = buffer ;
7684
+ rdata [0 ].buffer_std = true;
7685
+
7686
+ if (XLogCheckBuffer (rdata , true, & lsn , & bkpb ))
7687
+ {
7688
+ char copied_buffer [BLCKSZ ];
7689
+ char * origdata = (char * ) BufferGetBlock (buffer );
7690
+
7691
+ /*
7692
+ * Copy buffer so we don't have to worry about concurrent hint bit or
7693
+ * lsn updates. We assume pd_lower/upper cannot be changed without an
7694
+ * exclusive lock, so the contents bkp are not racy.
7695
+ */
7696
+ memcpy (copied_buffer , origdata , bkpb .hole_offset );
7697
+ memcpy (copied_buffer + bkpb .hole_offset ,
7698
+ origdata + bkpb .hole_offset + bkpb .hole_length ,
7699
+ BLCKSZ - bkpb .hole_offset - bkpb .hole_length );
7700
+
7701
+ /*
7702
+ * Header for backup block.
7703
+ */
7704
+ rdata [0 ].data = (char * ) & bkpb ;
7705
+ rdata [0 ].len = sizeof (BkpBlock );
7706
+ rdata [0 ].buffer = InvalidBuffer ;
7707
+ rdata [0 ].next = & (rdata [1 ]);
7669
7708
7670
- rdata [1 ].data = NULL ;
7671
- rdata [1 ].len = 0 ;
7672
- rdata [1 ].buffer = buffer ;
7673
- rdata [1 ].buffer_std = true;
7674
- rdata [1 ].next = NULL ;
7709
+ /*
7710
+ * Save copy of the buffer.
7711
+ */
7712
+ rdata [1 ].data = copied_buffer ;
7713
+ rdata [1 ].len = BLCKSZ - bkpb .hole_length ;
7714
+ rdata [1 ].buffer = InvalidBuffer ;
7715
+ rdata [1 ].next = NULL ;
7675
7716
7676
- return XLogInsert (RM_XLOG_ID , XLOG_HINT , rdata );
7717
+ recptr = XLogInsert (RM_XLOG_ID , XLOG_HINT , rdata );
7718
+ }
7719
+
7720
+ return recptr ;
7677
7721
}
7678
7722
7679
7723
/*
@@ -7842,8 +7886,8 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
7842
7886
{
7843
7887
uint8 info = record -> xl_info & ~XLR_INFO_MASK ;
7844
7888
7845
- /* Backup blocks are not used in most xlog records */
7846
- Assert (info == XLOG_HINT || !(record -> xl_info & XLR_BKP_BLOCK_MASK ));
7889
+ /* Backup blocks are not used by XLOG rmgr */
7890
+ Assert (!(record -> xl_info & XLR_BKP_BLOCK_MASK ));
7847
7891
7848
7892
if (info == XLOG_NEXTOID )
7849
7893
{
@@ -8038,31 +8082,27 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
8038
8082
}
8039
8083
else if (info == XLOG_HINT )
8040
8084
{
8041
- #ifdef USE_ASSERT_CHECKING
8042
- int * watermark = (int * ) XLogRecGetData (record );
8043
- #endif
8044
-
8045
- /* Check the watermark is correct for the hint record */
8046
- Assert (* watermark == XLOG_HINT_WATERMARK );
8047
-
8048
- /* Backup blocks must be present for smgr hint records */
8049
- Assert (record -> xl_info & XLR_BKP_BLOCK_MASK );
8085
+ char * data ;
8086
+ BkpBlock bkpb ;
8050
8087
8051
8088
/*
8052
- * Hint records have no information that needs to be replayed.
8053
- * The sole purpose of them is to ensure that a hint bit does
8054
- * not cause a checksum invalidation if a hint bit write should
8055
- * cause a torn page. So the body of the record is empty but
8056
- * there must be one backup block.
8089
+ * Hint bit records contain a backup block stored "inline" in the normal
8090
+ * data since the locking when writing hint records isn't sufficient to
8091
+ * use the normal backup block mechanism, which assumes exclusive lock
8092
+ * on the buffer supplied.
8057
8093
*
8058
- * Since the only change in the backup block is a hint bit,
8059
- * there is no confict with Hot Standby .
8094
+ * Since the only change in these backup block are hint bits, there are
8095
+ * no recovery conflicts generated .
8060
8096
*
8061
8097
* This also means there is no corresponding API call for this,
8062
8098
* so an smgr implementation has no need to implement anything.
8063
8099
* Which means nothing is needed in md.c etc
8064
8100
*/
8065
- RestoreBackupBlock (lsn , record , 0 , false, false);
8101
+ data = XLogRecGetData (record );
8102
+ memcpy (& bkpb , data , sizeof (BkpBlock ));
8103
+ data += sizeof (BkpBlock );
8104
+
8105
+ RestoreBackupBlockContents (lsn , bkpb , data , false, false);
8066
8106
}
8067
8107
else if (info == XLOG_BACKUP_END )
8068
8108
{
0 commit comments