#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
+#include "storage/md.h"
/* just to have something to put into aio_handle_cbs */
static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
+
+ CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
#undef CALLBACK_ENTRY
};
#include "storage/aio.h"
#include "storage/aio_internal.h"
+#include "storage/smgr.h"
/*
[PGAIO_TID_INVALID] = &(PgAioTargetInfo) {
.name = "invalid",
},
+ [PGAIO_TID_SMGR] = &aio_smgr_target_info,
};
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/startup.h"
+#include "storage/aio.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "utils/guc.h"
vfdP = &VfdCache[file];
+ pgaio_closing_fd(vfdP->fd);
+
/*
* Close the file. We aren't expecting this to fail; if it does, better
* to leak the FD than to mess up our internal state.
if (!FileIsNotOpen(file))
{
+ pgaio_closing_fd(vfdP->fd);
+
/* close the file */
if (close(vfdP->fd) != 0)
{
return returnCode;
}
+int
+FileStartReadV(PgAioHandle *ioh, File file,
+ int iovcnt, off_t offset,
+ uint32 wait_event_info)
+{
+ int returnCode;
+ Vfd *vfdP;
+
+ Assert(FileIsValid(file));
+
+ DO_DB(elog(LOG, "FileStartReadV: %d (%s) " INT64_FORMAT " %d",
+ file, VfdCache[file].fileName,
+ (int64) offset,
+ iovcnt));
+
+ returnCode = FileAccess(file);
+ if (returnCode < 0)
+ return returnCode;
+
+ vfdP = &VfdCache[file];
+
+ pgaio_io_start_readv(ioh, vfdP->fd, iovcnt, offset);
+
+ return 0;
+}
+
ssize_t
FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
uint32 wait_event_info)
int
FileGetRawDesc(File file)
{
+ int returnCode;
+
+ returnCode = FileAccess(file);
+ if (returnCode < 0)
+ return returnCode;
+
Assert(FileIsValid(file));
return VfdCache[file].fd;
}
result = closedir(desc->desc.dir);
break;
case AllocateDescRawFD:
+ pgaio_closing_fd(desc->desc.fd);
result = close(desc->desc.fd);
break;
default:
/* Only get here if someone passes us a file not in allocatedDescs */
elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
+ pgaio_closing_fd(fd);
+
return close(fd);
}
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "storage/md.h"
static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
MdfdVec *seg);
+static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
+static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
+
+const PgAioHandleCallbacks aio_md_readv_cb = {
+ .complete_shared = md_readv_complete,
+ .report = md_readv_report,
+};
+
+
static inline int
_mdfd_open_flags(void)
{
}
}
+/*
+ * mdstartreadv() -- Asynchronous version of mdreadv().
+ */
+void
+mdstartreadv(PgAioHandle *ioh,
+ SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ void **buffers, BlockNumber nblocks)
+{
+ off_t seekpos;
+ MdfdVec *v;
+ BlockNumber nblocks_this_segment;
+ struct iovec *iov;
+ int iovcnt;
+ int ret;
+
+ v = _mdfd_getseg(reln, forknum, blocknum, false,
+ EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+ seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+ Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+ nblocks_this_segment =
+ Min(nblocks,
+ RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+ if (nblocks_this_segment != nblocks)
+ elog(ERROR, "read crossing segment boundary");
+
+ iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+ Assert(nblocks <= iovcnt);
+
+ iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment);
+
+ Assert(iovcnt <= nblocks_this_segment);
+
+ if (!(io_direct_flags & IO_DIRECT_DATA))
+ pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
+
+ pgaio_io_set_target_smgr(ioh,
+ reln,
+ forknum,
+ blocknum,
+ nblocks,
+ false);
+ pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_READV, 0);
+
+ ret = FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ);
+ if (ret != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not start reading blocks %u..%u in file \"%s\": %m",
+ blocknum,
+ blocknum + nblocks_this_segment - 1,
+ FilePathName(v->mdfd_vfd))));
+
+ /*
+ * The error checks corresponding to the post-read checks in mdreadv() are
+ * in md_readv_complete().
+ */
+}
+
/*
* mdwritev() -- Write the supplied blocks at the appropriate location.
*
}
}
+int
+mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+ MdfdVec *v = mdopenfork(reln, forknum, EXTENSION_FAIL);
+
+ v = _mdfd_getseg(reln, forknum, blocknum, false,
+ EXTENSION_FAIL);
+
+ *off = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+ Assert(*off < (off_t) BLCKSZ * RELSEG_SIZE);
+
+ return FileGetRawDesc(v->mdfd_vfd);
+}
+
/*
* register_dirty_segment() -- Mark a relation segment as needing fsync
*
*/
return ftag->rlocator.dbOid == candidate->rlocator.dbOid;
}
+
+/*
+ * AIO completion callback for mdstartreadv().
+ */
+static PgAioResult
+md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
+{
+ PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+ PgAioResult result = prior_result;
+
+ if (prior_result.result < 0)
+ {
+ result.status = PGAIO_RS_ERROR;
+ result.id = PGAIO_HCB_MD_READV;
+ /* For "hard" errors, track the error number in error_data */
+ result.error_data = -prior_result.result;
+ result.result = 0;
+
+ /*
+ * Immediately log a message about the IO error, but only to the
+ * server log. The reason to do so immediately is that the originator
+ * might not process the query result immediately (because it is busy
+ * doing another part of query processing) or at all (e.g. if it was
+ * cancelled or errored out due to another IO also failing). The
+ * issuer of the IO will emit an ERROR when processing the IO's
+ * results
+ */
+ pgaio_result_report(result, td, LOG_SERVER_ONLY);
+
+ return result;
+ }
+
+ /*
+ * As explained above smgrstartreadv(), the smgr API operates on the level
+ * of blocks, rather than bytes. Convert.
+ */
+ result.result /= BLCKSZ;
+
+ Assert(result.result <= td->smgr.nblocks);
+
+ if (result.result == 0)
+ {
+ /* consider 0 blocks read a failure */
+ result.status = PGAIO_RS_ERROR;
+ result.id = PGAIO_HCB_MD_READV;
+ result.error_data = 0;
+
+ /* see comment above the "hard error" case */
+ pgaio_result_report(result, td, LOG_SERVER_ONLY);
+
+ return result;
+ }
+
+ if (result.status != PGAIO_RS_ERROR &&
+ result.result < td->smgr.nblocks)
+ {
+ /* partial reads should be retried at upper level */
+ result.status = PGAIO_RS_PARTIAL;
+ result.id = PGAIO_HCB_MD_READV;
+ }
+
+ return result;
+}
+
+/*
+ * AIO error reporting callback for mdstartreadv().
+ *
+ * Errors are encoded as follows:
+ * - PgAioResult.error_data != 0 encodes IO that failed with that errno
+ * - PgAioResult.error_data == 0 encodes IO that didn't read all data
+ */
+static void
+md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
+{
+ RelPathStr path;
+
+ path = relpathbackend(td->smgr.rlocator,
+ td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+ td->smgr.forkNum);
+
+ if (result.error_data != 0)
+ {
+ /* for errcode_for_file_access() and %m */
+ errno = result.error_data;
+
+ ereport(elevel,
+ errcode_for_file_access(),
+ errmsg("could not read blocks %u..%u in file \"%s\": %m",
+ td->smgr.blockNum,
+ td->smgr.blockNum + td->smgr.nblocks - 1,
+ path.str));
+ }
+ else
+ {
+ /*
+ * NB: This will typically only be output in debug messages, while
+ * retrying a partial IO.
+ */
+ ereport(elevel,
+ errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes",
+ td->smgr.blockNum,
+ td->smgr.blockNum + td->smgr.nblocks - 1,
+ path.str,
+ result.result * (size_t) BLCKSZ,
+ td->smgr.nblocks * (size_t) BLCKSZ));
+ }
+}
#include "access/xlogutils.h"
#include "lib/ilist.h"
#include "miscadmin.h"
+#include "storage/aio.h"
#include "storage/bufmgr.h"
#include "storage/ipc.h"
#include "storage/md.h"
void (*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
+ void (*smgr_startreadv) (PgAioHandle *ioh,
+ SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum,
+ void **buffers, BlockNumber nblocks);
void (*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffers, BlockNumber nblocks,
BlockNumber old_blocks, BlockNumber nblocks);
void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
void (*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
+ int (*smgr_fd) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
} f_smgr;
static const f_smgr smgrsw[] = {
.smgr_prefetch = mdprefetch,
.smgr_maxcombine = mdmaxcombine,
.smgr_readv = mdreadv,
+ .smgr_startreadv = mdstartreadv,
.smgr_writev = mdwritev,
.smgr_writeback = mdwriteback,
.smgr_nblocks = mdnblocks,
.smgr_truncate = mdtruncate,
.smgr_immedsync = mdimmedsync,
.smgr_registersync = mdregistersync,
+ .smgr_fd = mdfd,
}
};
static void smgrshutdown(int code, Datum arg);
static void smgrdestroy(SMgrRelation reln);
+static void smgr_aio_reopen(PgAioHandle *ioh);
+static char *smgr_aio_describe_identity(const PgAioTargetData *sd);
+
+
+const PgAioTargetInfo aio_smgr_target_info = {
+ .name = "smgr",
+ .reopen = smgr_aio_reopen,
+ .describe_identity = smgr_aio_describe_identity,
+};
+
/*
* smgrinit(), smgrshutdown() -- Initialize or shut down storage
RESUME_INTERRUPTS();
}
+/*
+ * smgrstartreadv() -- asynchronous version of smgrreadv()
+ *
+ * This starts an asynchronous readv IO using the IO handle `ioh`. Other than
+ * `ioh` all parameters are the same as smgrreadv().
+ *
+ * Completion callbacks above smgr will be passed the result as the number of
+ * successfully read blocks if the read [partially] succeeds (Buffers for
+ * blocks not successfully read might bear unspecified modifications, up to
+ * the full nblocks). This maintains the abstraction that smgr operates on the
+ * level of blocks, rather than bytes.
+ */
+void
+smgrstartreadv(PgAioHandle *ioh,
+ SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ void **buffers, BlockNumber nblocks)
+{
+ HOLD_INTERRUPTS();
+ smgrsw[reln->smgr_which].smgr_startreadv(ioh,
+ reln, forknum, blocknum, buffers,
+ nblocks);
+ RESUME_INTERRUPTS();
+}
+
/*
* smgrwritev() -- Write the supplied buffers out.
*
RESUME_INTERRUPTS();
}
+/*
+ * Return fd for the specified block number and update *off to the appropriate
+ * position.
+ *
+ * This is only to be used for when AIO needs to perform the IO in a different
+ * process than where it was issued (e.g. in an IO worker).
+ */
+static int
+smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+ int fd;
+
+ /*
+ * The caller needs to prevent interrupts from being processed, otherwise
+ * the FD could be closed prematurely.
+ */
+ Assert(!INTERRUPTS_CAN_BE_PROCESSED());
+
+ fd = smgrsw[reln->smgr_which].smgr_fd(reln, forknum, blocknum, off);
+
+ return fd;
+}
+
/*
* AtEOXact_SMgr
*
smgrreleaseall();
return true;
}
+
+/*
+ * Set target of the IO handle to be smgr and initialize all the relevant
+ * pieces of data.
+ */
+void
+pgaio_io_set_target_smgr(PgAioHandle *ioh,
+ SMgrRelationData *smgr,
+ ForkNumber forknum,
+ BlockNumber blocknum,
+ int nblocks,
+ bool skip_fsync)
+{
+ PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
+
+ pgaio_io_set_target(ioh, PGAIO_TID_SMGR);
+
+ /* backend is implied via IO owner */
+ sd->smgr.rlocator = smgr->smgr_rlocator.locator;
+ sd->smgr.forkNum = forknum;
+ sd->smgr.blockNum = blocknum;
+ sd->smgr.nblocks = nblocks;
+ sd->smgr.is_temp = SmgrIsTemp(smgr);
+ /* Temp relations should never be fsync'd */
+ sd->smgr.skip_fsync = skip_fsync && !SmgrIsTemp(smgr);
+}
+
+/*
+ * Callback for the smgr AIO target, to reopen the file (e.g. because the IO
+ * is executed in a worker).
+ */
+static void
+smgr_aio_reopen(PgAioHandle *ioh)
+{
+ PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
+ PgAioOpData *od = pgaio_io_get_op_data(ioh);
+ SMgrRelation reln;
+ ProcNumber procno;
+ uint32 off;
+
+ /*
+ * The caller needs to prevent interrupts from being processed, otherwise
+ * the FD could be closed again before we get to executing the IO.
+ */
+ Assert(!INTERRUPTS_CAN_BE_PROCESSED());
+
+ if (sd->smgr.is_temp)
+ procno = pgaio_io_get_owner(ioh);
+ else
+ procno = INVALID_PROC_NUMBER;
+
+ reln = smgropen(sd->smgr.rlocator, procno);
+ switch (pgaio_io_get_op(ioh))
+ {
+ case PGAIO_OP_INVALID:
+ pg_unreachable();
+ break;
+ case PGAIO_OP_READV:
+ od->read.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
+ Assert(off == od->read.offset);
+ break;
+ case PGAIO_OP_WRITEV:
+ od->write.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
+ Assert(off == od->write.offset);
+ break;
+ }
+}
+
+/*
+ * Callback for the smgr AIO target, describing the target of the IO.
+ */
+static char *
+smgr_aio_describe_identity(const PgAioTargetData *sd)
+{
+ RelPathStr path;
+ char *desc;
+
+ path = relpathbackend(sd->smgr.rlocator,
+ sd->smgr.is_temp ?
+ MyProcNumber : INVALID_PROC_NUMBER,
+ sd->smgr.forkNum);
+
+ if (sd->smgr.nblocks == 0)
+ desc = psprintf(_("file \"%s\""), path.str);
+ else if (sd->smgr.nblocks == 1)
+ desc = psprintf(_("block %u in file \"%s\""),
+ sd->smgr.blockNum,
+ path.str);
+ else
+ desc = psprintf(_("blocks %u..%u in file \"%s\""),
+ sd->smgr.blockNum,
+ sd->smgr.blockNum + sd->smgr.nblocks - 1,
+ path.str);
+
+ return desc;
+}
{
/* intentionally the zero value, to help catch zeroed memory etc */
PGAIO_TID_INVALID = 0,
+ PGAIO_TID_SMGR,
} PgAioTargetID;
-#define PGAIO_TID_COUNT (PGAIO_TID_INVALID + 1)
+#define PGAIO_TID_COUNT (PGAIO_TID_SMGR + 1)
/*
typedef enum PgAioHandleCallbackID
{
PGAIO_HCB_INVALID,
+
+ PGAIO_HCB_MD_READV,
} PgAioHandleCallbackID;
*/
typedef union PgAioTargetData
{
- /* just as an example placeholder for later */
struct
{
- uint32 queue_id;
- } wal;
+ RelFileLocator rlocator; /* physical relation identifier */
+ BlockNumber blockNum; /* blknum relative to begin of reln */
+ BlockNumber nblocks;
+ ForkNumber forkNum:8; /* don't waste 4 byte for four values */
+ bool is_temp:1; /* proc can be inferred by owning AIO */
+ bool skip_fsync:1;
+ } smgr;
} PgAioTargetData;
* prototypes for functions in fd.c
*/
+struct PgAioHandle;
+
/* Operations on virtual Files --- equivalent to Unix kernel file ops */
extern File PathNameOpenFile(const char *fileName, int fileFlags);
extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event_info);
extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
extern int FileSync(File file, uint32 wait_event_info);
extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);
#ifndef MD_H
#define MD_H
+#include "storage/aio_types.h"
#include "storage/block.h"
#include "storage/relfilelocator.h"
#include "storage/smgr.h"
#include "storage/sync.h"
+extern const PgAioHandleCallbacks aio_md_readv_cb;
+
/* md storage manager functionality */
extern void mdinit(void);
extern void mdopen(SMgrRelation reln);
BlockNumber blocknum);
extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
+extern void mdstartreadv(PgAioHandle *ioh,
+ SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+ void **buffers, BlockNumber nblocks);
extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffers, BlockNumber nblocks, bool skipFsync);
BlockNumber old_blocks, BlockNumber nblocks);
extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
+extern int mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
extern void ForgetDatabaseSyncRequests(Oid dbid);
extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
#define SMGR_H
#include "lib/ilist.h"
+#include "storage/aio_types.h"
#include "storage/block.h"
#include "storage/relfilelocator.h"
#define SmgrIsTemp(smgr) \
RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
+extern const PgAioTargetInfo aio_smgr_target_info;
+
extern void smgrinit(void);
extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
void **buffers, BlockNumber nblocks);
+extern void smgrstartreadv(PgAioHandle *ioh,
+ SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum,
+ void **buffers, BlockNumber nblocks);
extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum,
const void **buffers, BlockNumber nblocks,
smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
}
+extern void pgaio_io_set_target_smgr(PgAioHandle *ioh,
+ SMgrRelationData *smgr,
+ ForkNumber forknum,
+ BlockNumber blocknum,
+ int nblocks,
+ bool skip_fsync);
+
#endif /* SMGR_H */