Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
aio: Implement support for reads in smgr/md/fd
authorAndres Freund <andres@anarazel.de>
Sat, 29 Mar 2025 17:38:35 +0000 (13:38 -0400)
committerAndres Freund <andres@anarazel.de>
Sat, 29 Mar 2025 17:38:35 +0000 (13:38 -0400)
This implements the following:

1) An smgr AIO target, for AIO on smgr files. This should be usable not just
   for md.c but also other SMGR implementation if we ever get them.
2) readv support in fd.c, which requires a small bit of infrastructure work in
   fd.c
3) smgr.c and md.c support for readv

There still is nothing performing AIO, but as of this commit it would be
possible.

As part of this change FileGetRawDesc() actually ensures that the file is
opened - previously it was basically not usable. It's used to reopen a file in
IO workers.

Reviewed-by: Noah Misch <noah@leadboat.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m

src/backend/storage/aio/aio_callback.c
src/backend/storage/aio/aio_target.c
src/backend/storage/file/fd.c
src/backend/storage/smgr/md.c
src/backend/storage/smgr/smgr.c
src/include/storage/aio.h
src/include/storage/aio_types.h
src/include/storage/fd.h
src/include/storage/md.h
src/include/storage/smgr.h

index 413ea3247c2bef62727a9ad4522abc0ebacfa452..53db6e194afdede2190200121191ee89a3107e58 100644 (file)
@@ -18,6 +18,7 @@
 #include "miscadmin.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
+#include "storage/md.h"
 
 
 /* just to have something to put into aio_handle_cbs */
@@ -37,6 +38,8 @@ typedef struct PgAioHandleCallbacksEntry
 static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
 #define CALLBACK_ENTRY(id, callback)  [id] = {.cb = &callback, .name = #callback}
    CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
+
+   CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
 #undef CALLBACK_ENTRY
 };
 
index 536c7f91f5e79a56f4d5ed8de3e806f6b06af74f..ac6c74f4ff264cd9322ebb479a59dfbdef3585a8 100644 (file)
@@ -16,6 +16,7 @@
 
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
+#include "storage/smgr.h"
 
 
 /*
@@ -25,6 +26,7 @@ static const PgAioTargetInfo *pgaio_target_info[] = {
    [PGAIO_TID_INVALID] = &(PgAioTargetInfo) {
        .name = "invalid",
    },
+   [PGAIO_TID_SMGR] = &aio_smgr_target_info,
 };
 
 
index 84873f4148874f02a0c09c335e1e65f05807bce1..0e8299dd55646eb46abdaa922d2cfd463aeb8d84 100644 (file)
@@ -94,6 +94,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/startup.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
@@ -1296,6 +1297,8 @@ LruDelete(File file)
 
    vfdP = &VfdCache[file];
 
+   pgaio_closing_fd(vfdP->fd);
+
    /*
     * Close the file.  We aren't expecting this to fail; if it does, better
     * to leak the FD than to mess up our internal state.
@@ -1989,6 +1992,8 @@ FileClose(File file)
 
    if (!FileIsNotOpen(file))
    {
+       pgaio_closing_fd(vfdP->fd);
+
        /* close the file */
        if (close(vfdP->fd) != 0)
        {
@@ -2212,6 +2217,32 @@ retry:
    return returnCode;
 }
 
+int
+FileStartReadV(PgAioHandle *ioh, File file,
+              int iovcnt, off_t offset,
+              uint32 wait_event_info)
+{
+   int         returnCode;
+   Vfd        *vfdP;
+
+   Assert(FileIsValid(file));
+
+   DO_DB(elog(LOG, "FileStartReadV: %d (%s) " INT64_FORMAT " %d",
+              file, VfdCache[file].fileName,
+              (int64) offset,
+              iovcnt));
+
+   returnCode = FileAccess(file);
+   if (returnCode < 0)
+       return returnCode;
+
+   vfdP = &VfdCache[file];
+
+   pgaio_io_start_readv(ioh, vfdP->fd, iovcnt, offset);
+
+   return 0;
+}
+
 ssize_t
 FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
           uint32 wait_event_info)
@@ -2500,6 +2531,12 @@ FilePathName(File file)
 int
 FileGetRawDesc(File file)
 {
+   int         returnCode;
+
+   returnCode = FileAccess(file);
+   if (returnCode < 0)
+       return returnCode;
+
    Assert(FileIsValid(file));
    return VfdCache[file].fd;
 }
@@ -2780,6 +2817,7 @@ FreeDesc(AllocateDesc *desc)
            result = closedir(desc->desc.dir);
            break;
        case AllocateDescRawFD:
+           pgaio_closing_fd(desc->desc.fd);
            result = close(desc->desc.fd);
            break;
        default:
@@ -2848,6 +2886,8 @@ CloseTransientFile(int fd)
    /* Only get here if someone passes us a file not in allocatedDescs */
    elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
 
+   pgaio_closing_fd(fd);
+
    return close(fd);
 }
 
index f3220f98dc4295db8ca4932d8c1a745b3a690253..b55892b94051c02fd6a30393de23ceebdaf18adb 100644 (file)
@@ -31,6 +31,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/md.h"
@@ -152,6 +153,15 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum,
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
                              MdfdVec *seg);
 
+static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
+static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
+
+const PgAioHandleCallbacks aio_md_readv_cb = {
+   .complete_shared = md_readv_complete,
+   .report = md_readv_report,
+};
+
+
 static inline int
 _mdfd_open_flags(void)
 {
@@ -937,6 +947,69 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
    }
 }
 
+/*
+ * mdstartreadv() -- Asynchronous version of mdreadv().
+ */
+void
+mdstartreadv(PgAioHandle *ioh,
+            SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+            void **buffers, BlockNumber nblocks)
+{
+   off_t       seekpos;
+   MdfdVec    *v;
+   BlockNumber nblocks_this_segment;
+   struct iovec *iov;
+   int         iovcnt;
+   int         ret;
+
+   v = _mdfd_getseg(reln, forknum, blocknum, false,
+                    EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+   seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+   Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+   nblocks_this_segment =
+       Min(nblocks,
+           RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+   if (nblocks_this_segment != nblocks)
+       elog(ERROR, "read crossing segment boundary");
+
+   iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+   Assert(nblocks <= iovcnt);
+
+   iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment);
+
+   Assert(iovcnt <= nblocks_this_segment);
+
+   if (!(io_direct_flags & IO_DIRECT_DATA))
+       pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
+
+   pgaio_io_set_target_smgr(ioh,
+                            reln,
+                            forknum,
+                            blocknum,
+                            nblocks,
+                            false);
+   pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_READV, 0);
+
+   ret = FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ);
+   if (ret != 0)
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not start reading blocks %u..%u in file \"%s\": %m",
+                       blocknum,
+                       blocknum + nblocks_this_segment - 1,
+                       FilePathName(v->mdfd_vfd))));
+
+   /*
+    * The error checks corresponding to the post-read checks in mdreadv() are
+    * in md_readv_complete().
+    */
+}
+
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
  *
@@ -1365,6 +1438,21 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
    }
 }
 
+int
+mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+   MdfdVec    *v = mdopenfork(reln, forknum, EXTENSION_FAIL);
+
+   v = _mdfd_getseg(reln, forknum, blocknum, false,
+                    EXTENSION_FAIL);
+
+   *off = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+   Assert(*off < (off_t) BLCKSZ * RELSEG_SIZE);
+
+   return FileGetRawDesc(v->mdfd_vfd);
+}
+
 /*
  * register_dirty_segment() -- Mark a relation segment as needing fsync
  *
@@ -1841,3 +1929,111 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate)
     */
    return ftag->rlocator.dbOid == candidate->rlocator.dbOid;
 }
+
+/*
+ * AIO completion callback for mdstartreadv().
+ */
+static PgAioResult
+md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
+{
+   PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+   PgAioResult result = prior_result;
+
+   if (prior_result.result < 0)
+   {
+       result.status = PGAIO_RS_ERROR;
+       result.id = PGAIO_HCB_MD_READV;
+       /* For "hard" errors, track the error number in error_data */
+       result.error_data = -prior_result.result;
+       result.result = 0;
+
+       /*
+        * Immediately log a message about the IO error, but only to the
+        * server log. The reason to do so immediately is that the originator
+        * might not process the query result immediately (because it is busy
+        * doing another part of query processing) or at all (e.g. if it was
+        * cancelled or errored out due to another IO also failing).  The
+        * issuer of the IO will emit an ERROR when processing the IO's
+        * results
+        */
+       pgaio_result_report(result, td, LOG_SERVER_ONLY);
+
+       return result;
+   }
+
+   /*
+    * As explained above smgrstartreadv(), the smgr API operates on the level
+    * of blocks, rather than bytes. Convert.
+    */
+   result.result /= BLCKSZ;
+
+   Assert(result.result <= td->smgr.nblocks);
+
+   if (result.result == 0)
+   {
+       /* consider 0 blocks read a failure */
+       result.status = PGAIO_RS_ERROR;
+       result.id = PGAIO_HCB_MD_READV;
+       result.error_data = 0;
+
+       /* see comment above the "hard error" case */
+       pgaio_result_report(result, td, LOG_SERVER_ONLY);
+
+       return result;
+   }
+
+   if (result.status != PGAIO_RS_ERROR &&
+       result.result < td->smgr.nblocks)
+   {
+       /* partial reads should be retried at upper level */
+       result.status = PGAIO_RS_PARTIAL;
+       result.id = PGAIO_HCB_MD_READV;
+   }
+
+   return result;
+}
+
+/*
+ * AIO error reporting callback for mdstartreadv().
+ *
+ * Errors are encoded as follows:
+ * - PgAioResult.error_data != 0 encodes IO that failed with that errno
+ * - PgAioResult.error_data == 0 encodes IO that didn't read all data
+ */
+static void
+md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
+{
+   RelPathStr  path;
+
+   path = relpathbackend(td->smgr.rlocator,
+                         td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+                         td->smgr.forkNum);
+
+   if (result.error_data != 0)
+   {
+       /* for errcode_for_file_access() and %m */
+       errno = result.error_data;
+
+       ereport(elevel,
+               errcode_for_file_access(),
+               errmsg("could not read blocks %u..%u in file \"%s\": %m",
+                      td->smgr.blockNum,
+                      td->smgr.blockNum + td->smgr.nblocks - 1,
+                      path.str));
+   }
+   else
+   {
+       /*
+        * NB: This will typically only be output in debug messages, while
+        * retrying a partial IO.
+        */
+       ereport(elevel,
+               errcode(ERRCODE_DATA_CORRUPTED),
+               errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes",
+                      td->smgr.blockNum,
+                      td->smgr.blockNum + td->smgr.nblocks - 1,
+                      path.str,
+                      result.result * (size_t) BLCKSZ,
+                      td->smgr.nblocks * (size_t) BLCKSZ));
+   }
+}
index af74f54b43bc50d5677ca4828e8c1ee51d55c32f..320dc04d83ae4a7d98be692603d4ed3bbc3b063c 100644 (file)
@@ -66,6 +66,7 @@
 #include "access/xlogutils.h"
 #include "lib/ilist.h"
 #include "miscadmin.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
 #include "storage/md.h"
@@ -106,6 +107,10 @@ typedef struct f_smgr
    void        (*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
                               BlockNumber blocknum,
                               void **buffers, BlockNumber nblocks);
+   void        (*smgr_startreadv) (PgAioHandle *ioh,
+                                   SMgrRelation reln, ForkNumber forknum,
+                                   BlockNumber blocknum,
+                                   void **buffers, BlockNumber nblocks);
    void        (*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
                                BlockNumber blocknum,
                                const void **buffers, BlockNumber nblocks,
@@ -117,6 +122,7 @@ typedef struct f_smgr
                                  BlockNumber old_blocks, BlockNumber nblocks);
    void        (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
    void        (*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
+   int         (*smgr_fd) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -134,12 +140,14 @@ static const f_smgr smgrsw[] = {
        .smgr_prefetch = mdprefetch,
        .smgr_maxcombine = mdmaxcombine,
        .smgr_readv = mdreadv,
+       .smgr_startreadv = mdstartreadv,
        .smgr_writev = mdwritev,
        .smgr_writeback = mdwriteback,
        .smgr_nblocks = mdnblocks,
        .smgr_truncate = mdtruncate,
        .smgr_immedsync = mdimmedsync,
        .smgr_registersync = mdregistersync,
+       .smgr_fd = mdfd,
    }
 };
 
@@ -157,6 +165,16 @@ static dlist_head unpinned_relns;
 static void smgrshutdown(int code, Datum arg);
 static void smgrdestroy(SMgrRelation reln);
 
+static void smgr_aio_reopen(PgAioHandle *ioh);
+static char *smgr_aio_describe_identity(const PgAioTargetData *sd);
+
+
+const PgAioTargetInfo aio_smgr_target_info = {
+   .name = "smgr",
+   .reopen = smgr_aio_reopen,
+   .describe_identity = smgr_aio_describe_identity,
+};
+
 
 /*
  * smgrinit(), smgrshutdown() -- Initialize or shut down storage
@@ -709,6 +727,30 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
    RESUME_INTERRUPTS();
 }
 
+/*
+ * smgrstartreadv() -- asynchronous version of smgrreadv()
+ *
+ * This starts an asynchronous readv IO using the IO handle `ioh`. Other than
+ * `ioh` all parameters are the same as smgrreadv().
+ *
+ * Completion callbacks above smgr will be passed the result as the number of
+ * successfully read blocks if the read [partially] succeeds (Buffers for
+ * blocks not successfully read might bear unspecified modifications, up to
+ * the full nblocks). This maintains the abstraction that smgr operates on the
+ * level of blocks, rather than bytes.
+ */
+void
+smgrstartreadv(PgAioHandle *ioh,
+              SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+              void **buffers, BlockNumber nblocks)
+{
+   HOLD_INTERRUPTS();
+   smgrsw[reln->smgr_which].smgr_startreadv(ioh,
+                                            reln, forknum, blocknum, buffers,
+                                            nblocks);
+   RESUME_INTERRUPTS();
+}
+
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
@@ -917,6 +959,29 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
    RESUME_INTERRUPTS();
 }
 
+/*
+ * Return fd for the specified block number and update *off to the appropriate
+ * position.
+ *
+ * This is only to be used for when AIO needs to perform the IO in a different
+ * process than where it was issued (e.g. in an IO worker).
+ */
+static int
+smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+   int         fd;
+
+   /*
+    * The caller needs to prevent interrupts from being processed, otherwise
+    * the FD could be closed prematurely.
+    */
+   Assert(!INTERRUPTS_CAN_BE_PROCESSED());
+
+   fd = smgrsw[reln->smgr_which].smgr_fd(reln, forknum, blocknum, off);
+
+   return fd;
+}
+
 /*
  * AtEOXact_SMgr
  *
@@ -945,3 +1010,99 @@ ProcessBarrierSmgrRelease(void)
    smgrreleaseall();
    return true;
 }
+
+/*
+ * Set target of the IO handle to be smgr and initialize all the relevant
+ * pieces of data.
+ */
+void
+pgaio_io_set_target_smgr(PgAioHandle *ioh,
+                        SMgrRelationData *smgr,
+                        ForkNumber forknum,
+                        BlockNumber blocknum,
+                        int nblocks,
+                        bool skip_fsync)
+{
+   PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
+
+   pgaio_io_set_target(ioh, PGAIO_TID_SMGR);
+
+   /* backend is implied via IO owner */
+   sd->smgr.rlocator = smgr->smgr_rlocator.locator;
+   sd->smgr.forkNum = forknum;
+   sd->smgr.blockNum = blocknum;
+   sd->smgr.nblocks = nblocks;
+   sd->smgr.is_temp = SmgrIsTemp(smgr);
+   /* Temp relations should never be fsync'd */
+   sd->smgr.skip_fsync = skip_fsync && !SmgrIsTemp(smgr);
+}
+
+/*
+ * Callback for the smgr AIO target, to reopen the file (e.g. because the IO
+ * is executed in a worker).
+ */
+static void
+smgr_aio_reopen(PgAioHandle *ioh)
+{
+   PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
+   PgAioOpData *od = pgaio_io_get_op_data(ioh);
+   SMgrRelation reln;
+   ProcNumber  procno;
+   uint32      off;
+
+   /*
+    * The caller needs to prevent interrupts from being processed, otherwise
+    * the FD could be closed again before we get to executing the IO.
+    */
+   Assert(!INTERRUPTS_CAN_BE_PROCESSED());
+
+   if (sd->smgr.is_temp)
+       procno = pgaio_io_get_owner(ioh);
+   else
+       procno = INVALID_PROC_NUMBER;
+
+   reln = smgropen(sd->smgr.rlocator, procno);
+   switch (pgaio_io_get_op(ioh))
+   {
+       case PGAIO_OP_INVALID:
+           pg_unreachable();
+           break;
+       case PGAIO_OP_READV:
+           od->read.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
+           Assert(off == od->read.offset);
+           break;
+       case PGAIO_OP_WRITEV:
+           od->write.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
+           Assert(off == od->write.offset);
+           break;
+   }
+}
+
+/*
+ * Callback for the smgr AIO target, describing the target of the IO.
+ */
+static char *
+smgr_aio_describe_identity(const PgAioTargetData *sd)
+{
+   RelPathStr  path;
+   char       *desc;
+
+   path = relpathbackend(sd->smgr.rlocator,
+                         sd->smgr.is_temp ?
+                         MyProcNumber : INVALID_PROC_NUMBER,
+                         sd->smgr.forkNum);
+
+   if (sd->smgr.nblocks == 0)
+       desc = psprintf(_("file \"%s\""), path.str);
+   else if (sd->smgr.nblocks == 1)
+       desc = psprintf(_("block %u in file \"%s\""),
+                       sd->smgr.blockNum,
+                       path.str);
+   else
+       desc = psprintf(_("blocks %u..%u in file \"%s\""),
+                       sd->smgr.blockNum,
+                       sd->smgr.blockNum + sd->smgr.nblocks - 1,
+                       path.str);
+
+   return desc;
+}
index cc987556e147c354df44da30c0c35a3a7e9b7283..6b08304094330c222148caa7a769f336e9fd45f2 100644 (file)
@@ -117,9 +117,10 @@ typedef enum PgAioTargetID
 {
    /* intentionally the zero value, to help catch zeroed memory etc */
    PGAIO_TID_INVALID = 0,
+   PGAIO_TID_SMGR,
 } PgAioTargetID;
 
-#define PGAIO_TID_COUNT (PGAIO_TID_INVALID + 1)
+#define PGAIO_TID_COUNT (PGAIO_TID_SMGR + 1)
 
 
 /*
@@ -191,6 +192,8 @@ struct PgAioTargetInfo
 typedef enum PgAioHandleCallbackID
 {
    PGAIO_HCB_INVALID,
+
+   PGAIO_HCB_MD_READV,
 } PgAioHandleCallbackID;
 
 
index dddda3a3e2f57b815360a9fecfc3699749229aca..debe8163d4e11df167c75157e9e1d3d2d5398f73 100644 (file)
@@ -60,11 +60,15 @@ typedef struct PgAioWaitRef
  */
 typedef union PgAioTargetData
 {
-   /* just as an example placeholder for later */
    struct
    {
-       uint32      queue_id;
-   }           wal;
+       RelFileLocator rlocator;    /* physical relation identifier */
+       BlockNumber blockNum;   /* blknum relative to begin of reln */
+       BlockNumber nblocks;
+       ForkNumber  forkNum:8;  /* don't waste 4 byte for four values */
+       bool        is_temp:1;  /* proc can be inferred by owning AIO */
+       bool        skip_fsync:1;
+   }           smgr;
 } PgAioTargetData;
 
 
index e3067ab659778486f91d816b9c18bee5707abe80..b77d8e5e30e3a7fd619403fbc749a1f9e235ad3b 100644 (file)
@@ -101,6 +101,8 @@ extern PGDLLIMPORT int max_safe_fds;
  * prototypes for functions in fd.c
  */
 
+struct PgAioHandle;
+
 /* Operations on virtual Files --- equivalent to Unix kernel file ops */
 extern File PathNameOpenFile(const char *fileName, int fileFlags);
 extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
@@ -109,6 +111,7 @@ extern void FileClose(File file);
 extern int FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
 extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
 extern int FileSync(File file, uint32 wait_event_info);
 extern int FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern int FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);
index 05bf537066ebda66ffed222c434692b0b44f9c99..9d7131eff4384cc40340139dca94e92cb553bb20 100644 (file)
 #ifndef MD_H
 #define MD_H
 
+#include "storage/aio_types.h"
 #include "storage/block.h"
 #include "storage/relfilelocator.h"
 #include "storage/smgr.h"
 #include "storage/sync.h"
 
+extern const PgAioHandleCallbacks aio_md_readv_cb;
+
 /* md storage manager functionality */
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
@@ -36,6 +39,9 @@ extern uint32 mdmaxcombine(SMgrRelation reln, ForkNumber forknum,
                           BlockNumber blocknum);
 extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
                    void **buffers, BlockNumber nblocks);
+extern void mdstartreadv(PgAioHandle *ioh,
+                        SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+                        void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
                     BlockNumber blocknum,
                     const void **buffers, BlockNumber nblocks, bool skipFsync);
@@ -46,6 +52,7 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
                       BlockNumber old_blocks, BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
+extern int mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
index 4016b206ad66c7ec0e2b664a4a1257b771c24427..856ebcda350e9552fee95bbbfe6a0fdfb80ab76c 100644 (file)
@@ -15,6 +15,7 @@
 #define SMGR_H
 
 #include "lib/ilist.h"
+#include "storage/aio_types.h"
 #include "storage/block.h"
 #include "storage/relfilelocator.h"
 
@@ -73,6 +74,8 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
    RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+extern const PgAioTargetInfo aio_smgr_target_info;
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -97,6 +100,10 @@ extern uint32 smgrmaxcombine(SMgrRelation reln, ForkNumber forknum,
 extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
                      BlockNumber blocknum,
                      void **buffers, BlockNumber nblocks);
+extern void smgrstartreadv(PgAioHandle *ioh,
+                          SMgrRelation reln, ForkNumber forknum,
+                          BlockNumber blocknum,
+                          void **buffers, BlockNumber nblocks);
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
                       BlockNumber blocknum,
                       const void **buffers, BlockNumber nblocks,
@@ -127,4 +134,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
    smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
 }
 
+extern void pgaio_io_set_target_smgr(PgAioHandle *ioh,
+                                    SMgrRelationData *smgr,
+                                    ForkNumber forknum,
+                                    BlockNumber blocknum,
+                                    int nblocks,
+                                    bool skip_fsync);
+
 #endif                         /* SMGR_H */