Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 91f2cae

Browse files
committed
Read WAL directly from WAL buffers.
If available, read directly from WAL buffers, avoiding the need to go through the filesystem. Only for physical replication for now, but can be expanded to other callers. In preparation for replicating unflushed WAL data. Author: Bharath Rupireddy Discussion: https://postgr.es/m/CALj2ACXKKK%3DwbiG5_t6dGao5GoecMwRkhr7GjVBM_jg54%2BNa%3DQ%40mail.gmail.com Reviewed-by: Andres Freund, Alvaro Herrera, Nathan Bossart, Dilip Kumar, Nitin Jadhav, Melih Mutlu, Kyotaro Horiguchi
1 parent 09eb633 commit 91f2cae

File tree

4 files changed

+134
-4
lines changed

4 files changed

+134
-4
lines changed

src/backend/access/transam/xlog.c

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1705,6 +1705,126 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
17051705
return cachedPos + ptr % XLOG_BLCKSZ;
17061706
}
17071707

1708+
/*
1709+
* Read WAL data directly from WAL buffers, if available. Returns the number
1710+
* of bytes read successfully.
1711+
*
1712+
* Fewer than 'count' bytes may be read if some of the requested WAL data has
1713+
* already been evicted from the WAL buffers, or if the caller requests data
1714+
* that is not yet available.
1715+
*
1716+
* No locks are taken.
1717+
*
1718+
* The 'tli' argument is only used as a convenient safety check so that
1719+
* callers do not read from WAL buffers on a historical timeline.
1720+
*/
1721+
Size
1722+
WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
1723+
TimeLineID tli)
1724+
{
1725+
char *pdst = dstbuf;
1726+
XLogRecPtr recptr = startptr;
1727+
XLogRecPtr upto;
1728+
Size nbytes;
1729+
1730+
if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
1731+
return 0;
1732+
1733+
Assert(!XLogRecPtrIsInvalid(startptr));
1734+
1735+
/*
1736+
* Don't read past the available WAL data.
1737+
*
1738+
* Check using local copy of LogwrtResult. Ordinarily it's been updated by
1739+
* the caller when determining how far to read; but if not, it just means
1740+
* we'll read less data.
1741+
*
1742+
* XXX: the available WAL could be extended to the WAL insert pointer by
1743+
* calling WaitXLogInsertionsToFinish().
1744+
*/
1745+
upto = Min(startptr + count, LogwrtResult.Write);
1746+
nbytes = upto - startptr;
1747+
1748+
/*
1749+
* Loop through the buffers without a lock. For each buffer, atomically
1750+
* read and verify the end pointer, then copy the data out, and finally
1751+
* re-read and re-verify the end pointer.
1752+
*
1753+
* Once a page is evicted, it never returns to the WAL buffers, so if the
1754+
* end pointer matches the expected end pointer before and after we copy
1755+
* the data, then the right page must have been present during the data
1756+
* copy. Read barriers are necessary to ensure that the data copy actually
1757+
* happens between the two verification steps.
1758+
*
1759+
* If either verification fails, we simply terminate the loop and return
1760+
* with the data that had been already copied out successfully.
1761+
*/
1762+
while (nbytes > 0)
1763+
{
1764+
uint32 offset = recptr % XLOG_BLCKSZ;
1765+
int idx = XLogRecPtrToBufIdx(recptr);
1766+
XLogRecPtr expectedEndPtr;
1767+
XLogRecPtr endptr;
1768+
const char *page;
1769+
const char *psrc;
1770+
Size npagebytes;
1771+
1772+
/*
1773+
* Calculate the end pointer we expect in the xlblocks array if the
1774+
* correct page is present.
1775+
*/
1776+
expectedEndPtr = recptr + (XLOG_BLCKSZ - offset);
1777+
1778+
/*
1779+
* First verification step: check that the correct page is present in
1780+
* the WAL buffers.
1781+
*/
1782+
endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
1783+
if (expectedEndPtr != endptr)
1784+
break;
1785+
1786+
/*
1787+
* The correct page is present (or was at the time the endptr was
1788+
* read; must re-verify later). Calculate pointer to source data and
1789+
* determine how much data to read from this page.
1790+
*/
1791+
page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ;
1792+
psrc = page + offset;
1793+
npagebytes = Min(nbytes, XLOG_BLCKSZ - offset);
1794+
1795+
/*
1796+
* Ensure that the data copy and the first verification step are not
1797+
* reordered.
1798+
*/
1799+
pg_read_barrier();
1800+
1801+
/* data copy */
1802+
memcpy(pdst, psrc, npagebytes);
1803+
1804+
/*
1805+
* Ensure that the data copy and the second verification step are not
1806+
* reordered.
1807+
*/
1808+
pg_read_barrier();
1809+
1810+
/*
1811+
* Second verification step: check that the page we read from wasn't
1812+
* evicted while we were copying the data.
1813+
*/
1814+
endptr = pg_atomic_read_u64(&XLogCtl->xlblocks[idx]);
1815+
if (expectedEndPtr != endptr)
1816+
break;
1817+
1818+
pdst += npagebytes;
1819+
recptr += npagebytes;
1820+
nbytes -= npagebytes;
1821+
}
1822+
1823+
Assert(pdst - dstbuf <= count);
1824+
1825+
return pdst - dstbuf;
1826+
}
1827+
17081828
/*
17091829
* Converts a "usable byte position" to XLogRecPtr. A usable byte position
17101830
* is the position starting from the beginning of WAL, excluding all WAL

src/backend/access/transam/xlogreader.c

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1500,9 +1500,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
15001500
*
15011501
* Returns true if succeeded, false if an error occurs, in which case
15021502
* 'errinfo' receives error details.
1503-
*
1504-
* XXX probably this should be improved to suck data directly from the
1505-
* WAL buffers when possible.
15061503
*/
15071504
bool
15081505
WALRead(XLogReaderState *state,

src/backend/replication/walsender.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2966,6 +2966,7 @@ XLogSendPhysical(void)
29662966
Size nbytes;
29672967
XLogSegNo segno;
29682968
WALReadError errinfo;
2969+
Size rbytes;
29692970

29702971
/* If requested switch the WAL sender to the stopping state. */
29712972
if (got_STOPPING)
@@ -3181,7 +3182,16 @@ XLogSendPhysical(void)
31813182
enlargeStringInfo(&output_message, nbytes);
31823183

31833184
retry:
3184-
if (!WALRead(xlogreader,
3185+
/* attempt to read WAL from WAL buffers first */
3186+
rbytes = WALReadFromBuffers(&output_message.data[output_message.len],
3187+
startptr, nbytes, xlogreader->seg.ws_tli);
3188+
output_message.len += rbytes;
3189+
startptr += rbytes;
3190+
nbytes -= rbytes;
3191+
3192+
/* now read the remaining WAL from WAL file */
3193+
if (nbytes > 0 &&
3194+
!WALRead(xlogreader,
31853195
&output_message.data[output_message.len],
31863196
startptr,
31873197
nbytes,

src/include/access/xlog.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
252252

253253
extern void SetWalWriterSleeping(bool sleeping);
254254

255+
extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
256+
TimeLineID tli);
257+
255258
/*
256259
* Routines used by xlogrecovery.c to call back into xlog.c during recovery.
257260
*/

0 commit comments

Comments
 (0)