Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a7d518a

Browse files
anarazelCommitfest Bot
authored and
Commitfest Bot
committed
aio: Implement smgr/md/fd write support
TODO: - Right now the sync.c integration with smgr.c/md.c isn't properly safe to use in a critical section The only reason it doesn't immediately fail is that it's reasonably rare that RegisterSyncRequest() fails *and* either: - smgropen()->hash_search(HASH_ENTER) decides to resize the hash table, even though the lookup is guaranteed to succeed for io_method=worker. - an io_method=uring completion is run in a different backend and smgropen() needs to build a new entry and thus needs to allocate memory For a bit I thought this could be worked around easily enough by not doing an smgropen() in mdsyncfiletag(), or adding a "fallible" smgropen() and instead just opening the file directly. That actually does kinda solve the problem, but only because the memory allocation in PathNameOpenFile() uses malloc(), not palloc() and thus doesn't trigger - temp_file_limit implementation
1 parent 615ce93 commit a7d518a

File tree

8 files changed

+269
-0
lines changed

8 files changed

+269
-0
lines changed

src/backend/storage/aio/aio_callback.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
4141
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
4242

4343
CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
44+
CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb),
4445

4546
CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
4647

src/backend/storage/file/fd.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,6 +2348,34 @@ FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
23482348
return returnCode;
23492349
}
23502350

2351+
int
2352+
FileStartWriteV(PgAioHandle *ioh, File file,
2353+
int iovcnt, off_t offset,
2354+
uint32 wait_event_info)
2355+
{
2356+
int returnCode;
2357+
Vfd *vfdP;
2358+
2359+
Assert(FileIsValid(file));
2360+
2361+
DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d",
2362+
file, VfdCache[file].fileName,
2363+
(int64) offset,
2364+
iovcnt));
2365+
2366+
returnCode = FileAccess(file);
2367+
if (returnCode < 0)
2368+
return returnCode;
2369+
2370+
vfdP = &VfdCache[file];
2371+
2372+
/* FIXME: think about / reimplement temp_file_limit */
2373+
2374+
pgaio_io_start_writev(ioh, vfdP->fd, iovcnt, offset);
2375+
2376+
return 0;
2377+
}
2378+
23512379
int
23522380
FileSync(File file, uint32 wait_event_info)
23532381
{

src/backend/storage/smgr/md.c

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
155155

156156
static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
157157
static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
158+
static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
159+
static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
158160

159161
const PgAioHandleCallbacks aio_md_readv_cb = {
160162
.complete_shared = md_readv_complete,
161163
.report = md_readv_report,
162164
};
163165

166+
const PgAioHandleCallbacks aio_md_writev_cb = {
167+
.complete_shared = md_writev_complete,
168+
.report = md_writev_report,
169+
};
170+
164171

165172
static inline int
166173
_mdfd_open_flags(void)
@@ -1143,6 +1150,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
11431150
}
11441151
}
11451152

1153+
/*
1154+
* mdstartwritev() -- Asynchronous version of mdrwritev().
1155+
*/
1156+
void
1157+
mdstartwritev(PgAioHandle *ioh,
1158+
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
1159+
const void **buffers, BlockNumber nblocks, bool skipFsync)
1160+
{
1161+
off_t seekpos;
1162+
MdfdVec *v;
1163+
BlockNumber nblocks_this_segment;
1164+
struct iovec *iov;
1165+
int iovcnt;
1166+
int ret;
1167+
1168+
v = _mdfd_getseg(reln, forknum, blocknum, false,
1169+
EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
1170+
1171+
seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
1172+
1173+
Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
1174+
1175+
nblocks_this_segment =
1176+
Min(nblocks,
1177+
RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
1178+
1179+
if (nblocks_this_segment != nblocks)
1180+
elog(ERROR, "write crossing segment boundary");
1181+
1182+
iovcnt = pgaio_io_get_iovec(ioh, &iov);
1183+
1184+
Assert(nblocks <= iovcnt);
1185+
1186+
iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment);
1187+
1188+
Assert(iovcnt <= nblocks_this_segment);
1189+
1190+
if (!(io_direct_flags & IO_DIRECT_DATA))
1191+
pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
1192+
1193+
pgaio_io_set_target_smgr(ioh,
1194+
reln,
1195+
forknum,
1196+
blocknum,
1197+
nblocks,
1198+
skipFsync);
1199+
pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV, 0);
1200+
1201+
ret = FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
1202+
if (ret != 0)
1203+
ereport(ERROR,
1204+
(errcode_for_file_access(),
1205+
errmsg("could not start writing blocks %u..%u in file \"%s\": %m",
1206+
blocknum,
1207+
blocknum + nblocks_this_segment - 1,
1208+
FilePathName(v->mdfd_vfd))));
1209+
}
1210+
11461211

11471212
/*
11481213
* mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1531,6 +1596,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
15311596
}
15321597
}
15331598

1599+
/*
1600+
* Like register_dirty_segment(), except for use by AIO. In the completion
1601+
* callback we don't have access to the MdfdVec (the completion callback might
1602+
* be executed in a different backend than the issuing backend), therefore we
1603+
* have to implement this slightly differently.
1604+
*/
1605+
static void
1606+
register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno)
1607+
{
1608+
FileTag tag;
1609+
1610+
INIT_MD_FILETAG(tag, locator, forknum, segno);
1611+
1612+
/*
1613+
* Can't block here waiting for checkpointer to accept our sync request,
1614+
* as checkpointer might be waiting for this AIO to finish if offloaded to
1615+
* a worker.
1616+
*/
1617+
if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ ))
1618+
{
1619+
char path[MAXPGPATH];
1620+
1621+
ereport(DEBUG1,
1622+
(errmsg_internal("could not forward fsync request because request queue is full")));
1623+
1624+
/* reuse mdsyncfiletag() to avoid duplicating code */
1625+
if (mdsyncfiletag(&tag, path))
1626+
ereport(data_sync_elevel(ERROR),
1627+
(errcode_for_file_access(),
1628+
errmsg("could not fsync file \"%s\": %m",
1629+
path)));
1630+
}
1631+
}
1632+
15341633
/*
15351634
* register_unlink_segment() -- Schedule a file to be deleted after next checkpoint
15361635
*/
@@ -2065,3 +2164,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
20652164
td->smgr.nblocks * (size_t) BLCKSZ));
20662165
}
20672166
}
2167+
2168+
/*
2169+
* AIO completion callback for mdstartwritev().
2170+
*/
2171+
static PgAioResult
2172+
md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
2173+
{
2174+
PgAioTargetData *td = pgaio_io_get_target_data(ioh);
2175+
PgAioResult result = prior_result;
2176+
2177+
if (prior_result.result < 0)
2178+
{
2179+
result.status = PGAIO_RS_ERROR;
2180+
result.id = PGAIO_HCB_MD_WRITEV;
2181+
/* For "hard" errors, track the error number in error_data */
2182+
result.error_data = -prior_result.result;
2183+
result.result = 0;
2184+
2185+
pgaio_result_report(result, td, LOG);
2186+
2187+
return result;
2188+
}
2189+
2190+
/*
2191+
* As explained above smgrstartwritev(), the smgr API operates on the
2192+
* level of blocks, rather than bytes. Convert.
2193+
*/
2194+
result.result /= BLCKSZ;
2195+
2196+
Assert(result.result <= td->smgr.nblocks);
2197+
2198+
if (result.result == 0)
2199+
{
2200+
/* consider 0 blocks written a failure */
2201+
result.status = PGAIO_RS_ERROR;
2202+
result.id = PGAIO_HCB_MD_WRITEV;
2203+
result.error_data = 0;
2204+
2205+
pgaio_result_report(result, td, LOG);
2206+
2207+
return result;
2208+
}
2209+
2210+
if (result.status != PGAIO_RS_ERROR &&
2211+
result.result < td->smgr.nblocks)
2212+
{
2213+
/* partial writes should be retried at upper level */
2214+
result.status = PGAIO_RS_PARTIAL;
2215+
result.id = PGAIO_HCB_MD_WRITEV;
2216+
}
2217+
2218+
if (!td->smgr.skip_fsync)
2219+
register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum,
2220+
td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE));
2221+
2222+
return result;
2223+
}
2224+
2225+
/*
2226+
* AIO error reporting callback for mdstartwritev().
2227+
*/
2228+
static void
2229+
md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel)
2230+
{
2231+
RelPathStr path;
2232+
2233+
path = relpathbackend(td->smgr.rlocator,
2234+
td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
2235+
td->smgr.forkNum);
2236+
2237+
if (result.error_data != 0)
2238+
{
2239+
errno = result.error_data; /* for errcode_for_file_access() */
2240+
2241+
ereport(elevel,
2242+
errcode_for_file_access(),
2243+
errmsg("could not write blocks %u..%u in file \"%s\": %m",
2244+
td->smgr.blockNum,
2245+
td->smgr.blockNum + td->smgr.nblocks,
2246+
path.str)
2247+
);
2248+
}
2249+
else
2250+
{
2251+
/*
2252+
* NB: This will typically only be output in debug messages, while
2253+
* retrying a partial IO.
2254+
*/
2255+
ereport(elevel,
2256+
errcode(ERRCODE_DATA_CORRUPTED),
2257+
errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes",
2258+
td->smgr.blockNum,
2259+
td->smgr.blockNum + td->smgr.nblocks - 1,
2260+
path.str,
2261+
result.result * (size_t) BLCKSZ,
2262+
td->smgr.nblocks * (size_t) BLCKSZ
2263+
)
2264+
);
2265+
}
2266+
}

src/backend/storage/smgr/smgr.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ typedef struct f_smgr
115115
BlockNumber blocknum,
116116
const void **buffers, BlockNumber nblocks,
117117
bool skipFsync);
118+
void (*smgr_startwritev) (PgAioHandle *ioh,
119+
SMgrRelation reln, ForkNumber forknum,
120+
BlockNumber blocknum,
121+
const void **buffers, BlockNumber nblocks,
122+
bool skipFsync);
118123
void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
119124
BlockNumber blocknum, BlockNumber nblocks);
120125
BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -142,6 +147,7 @@ static const f_smgr smgrsw[] = {
142147
.smgr_readv = mdreadv,
143148
.smgr_startreadv = mdstartreadv,
144149
.smgr_writev = mdwritev,
150+
.smgr_startwritev = mdstartwritev,
145151
.smgr_writeback = mdwriteback,
146152
.smgr_nblocks = mdnblocks,
147153
.smgr_truncate = mdtruncate,
@@ -795,6 +801,29 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
795801
RESUME_INTERRUPTS();
796802
}
797803

804+
/*
805+
* smgrstartwritev() -- asynchronous version of smgrwritev()
806+
*
807+
* This starts an asynchronous writev IO using the IO handle `ioh`. Other than
808+
* `ioh` all parameters are the same as smgrwritev().
809+
*
810+
* Completion callbacks above smgr will be passed the result as the number of
811+
* successfully written blocks if the write [partially] succeeds. This
812+
* maintains the abstraction that smgr operates on the level of blocks, rather
813+
* than bytes.
814+
*/
815+
void
816+
smgrstartwritev(PgAioHandle *ioh,
817+
SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
818+
const void **buffers, BlockNumber nblocks, bool skipFsync)
819+
{
820+
HOLD_INTERRUPTS();
821+
smgrsw[reln->smgr_which].smgr_startwritev(ioh,
822+
reln, forknum, blocknum, buffers,
823+
nblocks, skipFsync);
824+
RESUME_INTERRUPTS();
825+
}
826+
798827
/*
799828
* smgrwriteback() -- Trigger kernel writeback for the supplied range of
800829
* blocks.

src/include/storage/aio.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ typedef enum PgAioHandleCallbackID
194194
PGAIO_HCB_INVALID = 0,
195195

196196
PGAIO_HCB_MD_READV,
197+
PGAIO_HCB_MD_WRITEV,
197198

198199
PGAIO_HCB_SHARED_BUFFER_READV,
199200

src/include/storage/fd.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event
112112
extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
113113
extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
114114
extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
115+
extern int FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
115116
extern int FileSync(File file, uint32 wait_event_info);
116117
extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
117118
extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);

src/include/storage/md.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "storage/sync.h"
2222

2323
extern const PgAioHandleCallbacks aio_md_readv_cb;
24+
extern const PgAioHandleCallbacks aio_md_writev_cb;
2425

2526
/* md storage manager functionality */
2627
extern void mdinit(void);
@@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh,
4546
extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
4647
BlockNumber blocknum,
4748
const void **buffers, BlockNumber nblocks, bool skipFsync);
49+
extern void mdstartwritev(PgAioHandle *ioh,
50+
SMgrRelation reln, ForkNumber forknum,
51+
BlockNumber blocknum,
52+
const void **buffers, BlockNumber nblocks, bool skipFsync);
4853
extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
4954
BlockNumber blocknum, BlockNumber nblocks);
5055
extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);

src/include/storage/smgr.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
108108
BlockNumber blocknum,
109109
const void **buffers, BlockNumber nblocks,
110110
bool skipFsync);
111+
extern void smgrstartwritev(PgAioHandle *ioh,
112+
SMgrRelation reln, ForkNumber forknum,
113+
BlockNumber blocknum,
114+
const void **buffers, BlockNumber nblocks,
115+
bool skipFsync);
111116
extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
112117
BlockNumber blocknum, BlockNumber nblocks);
113118
extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);

0 commit comments

Comments
 (0)