Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Put the logic to wait for WAL in standby mode to a separate function.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 9 Oct 2012 16:20:17 +0000 (19:20 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Tue, 9 Oct 2012 16:20:17 +0000 (19:20 +0300)
This is just refactoring with no user-visible effect, to make the code more
readable.

src/backend/access/transam/xlog.c

index 3e4da46ac24a8817b8f4cf03053e74a81aea5c90..17ceda3b1ad311776e108307a3f4cc2f1247a2b9 100644 (file)
@@ -157,6 +157,9 @@ HotStandbyState standbyState = STANDBY_DISABLED;
 
 static XLogRecPtr LastRec;
 
+/* Local copy of WalRcv->receivedUpto */
+static XLogRecPtr receivedUpto = 0;
+
 /*
  * During recovery, lastFullPageWrites keeps track of full_page_writes that
  * the replayed WAL records indicate. It's initialized with full_page_writes
@@ -538,6 +541,7 @@ static int  readFile = -1;
 static XLogSegNo readSegNo = 0;
 static uint32 readOff = 0;
 static uint32 readLen = 0;
+static bool    readFileHeaderValidated = false;
 static int readSource = 0;     /* XLOG_FROM_* code */
 
 /*
@@ -628,6 +632,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
 static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, int sources);
 static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
             bool randAccess);
+static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
+                           bool fetching_ckpt);
 static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr);
 static void XLogFileClose(void);
 static void PreallocXlogFiles(XLogRecPtr endptr);
@@ -2685,6 +2691,9 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
        if (source != XLOG_FROM_STREAM)
            XLogReceiptTime = GetCurrentTimestamp();
 
+       /* The file header needs to be validated on first access */
+       readFileHeaderValidated = false;
+
        return fd;
    }
    if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -9233,12 +9242,9 @@ static bool
 XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
             bool randAccess)
 {
-   static XLogRecPtr receivedUpto = 0;
-   bool        switched_segment = false;
    uint32      targetPageOff;
    uint32      targetRecOff;
    XLogSegNo   targetSegNo;
-   static pg_time_t last_fail_time = 0;
 
    XLByteToSeg(*RecPtr, targetSegNo);
    targetPageOff = (((*RecPtr) % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
@@ -9283,208 +9289,9 @@ retry:
    {
        if (StandbyMode)
        {
-           /*
-            * In standby mode, wait for the requested record to become
-            * available, either via restore_command succeeding to restore the
-            * segment, or via walreceiver having streamed the record.
-            */
-           for (;;)
-           {
-               if (WalRcvInProgress())
-               {
-                   bool        havedata;
-
-                   /*
-                    * If we find an invalid record in the WAL streamed from
-                    * master, something is seriously wrong. There's little
-                    * chance that the problem will just go away, but PANIC is
-                    * not good for availability either, especially in hot
-                    * standby mode. Disconnect, and retry from
-                    * archive/pg_xlog again. The WAL in the archive should be
-                    * identical to what was streamed, so it's unlikely that
-                    * it helps, but one can hope...
-                    */
-                   if (failedSources & XLOG_FROM_STREAM)
-                   {
-                       ShutdownWalRcv();
-                       continue;
-                   }
-
-                   /*
-                    * Walreceiver is active, so see if new data has arrived.
-                    *
-                    * We only advance XLogReceiptTime when we obtain fresh
-                    * WAL from walreceiver and observe that we had already
-                    * processed everything before the most recent "chunk"
-                    * that it flushed to disk.  In steady state where we are
-                    * keeping up with the incoming data, XLogReceiptTime will
-                    * be updated on each cycle.  When we are behind,
-                    * XLogReceiptTime will not advance, so the grace time
-                    * alloted to conflicting queries will decrease.
-                    */
-                   if (XLByteLT(*RecPtr, receivedUpto))
-                       havedata = true;
-                   else
-                   {
-                       XLogRecPtr  latestChunkStart;
-
-                       receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-                       if (XLByteLT(*RecPtr, receivedUpto))
-                       {
-                           havedata = true;
-                           if (!XLByteLT(*RecPtr, latestChunkStart))
-                           {
-                               XLogReceiptTime = GetCurrentTimestamp();
-                               SetCurrentChunkStartTime(XLogReceiptTime);
-                           }
-                       }
-                       else
-                           havedata = false;
-                   }
-                   if (havedata)
-                   {
-                       /*
-                        * Great, streamed far enough. Open the file if it's
-                        * not open already.  Use XLOG_FROM_STREAM so that
-                        * source info is set correctly and XLogReceiptTime
-                        * isn't changed.
-                        */
-                       if (readFile < 0)
-                       {
-                           readFile =
-                               XLogFileRead(readSegNo, PANIC,
-                                            recoveryTargetTLI,
-                                            XLOG_FROM_STREAM, false);
-                           Assert(readFile >= 0);
-                           switched_segment = true;
-                       }
-                       else
-                       {
-                           /* just make sure source info is correct... */
-                           readSource = XLOG_FROM_STREAM;
-                           XLogReceiptSource = XLOG_FROM_STREAM;
-                       }
-                       break;
-                   }
-
-                   /*
-                    * Data not here yet, so check for trigger then sleep for
-                    * five seconds like in the WAL file polling case below.
-                    */
-                   if (CheckForStandbyTrigger())
-                       goto retry;
-
-                   /*
-                    * Wait for more WAL to arrive, or timeout to be reached
-                    */
-                   WaitLatch(&XLogCtl->recoveryWakeupLatch,
-                             WL_LATCH_SET | WL_TIMEOUT,
-                             5000L);
-                   ResetLatch(&XLogCtl->recoveryWakeupLatch);
-               }
-               else
-               {
-                   int         sources;
-                   pg_time_t   now;
-
-                   /*
-                    * Until walreceiver manages to reconnect, poll the
-                    * archive.
-                    */
-                   if (readFile >= 0)
-                   {
-                       close(readFile);
-                       readFile = -1;
-                   }
-                   /* Reset curFileTLI if random fetch. */
-                   if (randAccess)
-                       curFileTLI = 0;
-
-                   /*
-                    * Try to restore the file from archive, or read an
-                    * existing file from pg_xlog.
-                    */
-                   sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
-                   if (!(sources & ~failedSources))
-                   {
-                       /*
-                        * We've exhausted all options for retrieving the
-                        * file. Retry.
-                        */
-                       failedSources = 0;
-
-                       /*
-                        * Before we sleep, re-scan for possible new timelines
-                        * if we were requested to recover to the latest
-                        * timeline.
-                        */
-                       if (recoveryTargetIsLatest)
-                       {
-                           if (rescanLatestTimeLine())
-                               continue;
-                       }
-
-                       /*
-                        * If it hasn't been long since last attempt, sleep to
-                        * avoid busy-waiting.
-                        */
-                       now = (pg_time_t) time(NULL);
-                       if ((now - last_fail_time) < 5)
-                       {
-                           pg_usleep(1000000L * (5 - (now - last_fail_time)));
-                           now = (pg_time_t) time(NULL);
-                       }
-                       last_fail_time = now;
-
-                       /*
-                        * If primary_conninfo is set, launch walreceiver to
-                        * try to stream the missing WAL, before retrying to
-                        * restore from archive/pg_xlog.
-                        *
-                        * If fetching_ckpt is TRUE, RecPtr points to the
-                        * initial checkpoint location. In that case, we use
-                        * RedoStartLSN as the streaming start position
-                        * instead of RecPtr, so that when we later jump
-                        * backwards to start redo at RedoStartLSN, we will
-                        * have the logs streamed already.
-                        */
-                       if (PrimaryConnInfo)
-                       {
-                           RequestXLogStreaming(
-                                     fetching_ckpt ? RedoStartLSN : *RecPtr,
-                                                PrimaryConnInfo);
-                           continue;
-                       }
-                   }
-                   /* Don't try to read from a source that just failed */
-                   sources &= ~failedSources;
-                   readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2,
-                                                 sources);
-                   switched_segment = true;
-                   if (readFile >= 0)
-                       break;
-
-                   /*
-                    * Nope, not found in archive and/or pg_xlog.
-                    */
-                   failedSources |= sources;
-
-                   /*
-                    * Check to see if the trigger file exists. Note that we
-                    * do this only after failure, so when you create the
-                    * trigger file, we still finish replaying as much as we
-                    * can from archive and pg_xlog before failover.
-                    */
-                   if (CheckForStandbyTrigger())
-                       goto triggered;
-               }
-
-               /*
-                * This possibly-long loop needs to handle interrupts of
-                * startup process.
-                */
-               HandleStartupProcInterrupts();
-           }
+           if (!WaitForWALToBecomeAvailable(*RecPtr, randAccess,
+                                            fetching_ckpt))
+               goto triggered;
        }
        else
        {
@@ -9502,7 +9309,6 @@ retry:
                    sources |= XLOG_FROM_ARCHIVE;
 
                readFile = XLogFileReadAnyTLI(readSegNo, emode, sources);
-               switched_segment = true;
                if (readFile < 0)
                    return false;
            }
@@ -9533,7 +9339,7 @@ retry:
    else
        readLen = XLOG_BLCKSZ;
 
-   if (switched_segment && targetPageOff != 0)
+   if (!readFileHeaderValidated && targetPageOff != 0)
    {
        /*
         * Whenever switching to a new WAL segment, we read the first page of
@@ -9582,6 +9388,8 @@ retry:
    if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
        goto next_record_is_invalid;
 
+   readFileHeaderValidated = true;
+
    Assert(targetSegNo == readSegNo);
    Assert(targetPageOff == readOff);
    Assert(targetRecOff < readLen);
@@ -9613,6 +9421,213 @@ triggered:
    return false;
 }
 
+/*
+ * In standby mode, wait for the requested record to become available, either
+ * via restore_command succeeding to restore the segment, or via walreceiver
+ * having streamed the record (or via someone copying the segment directly to
+ * pg_xlog, but that is not documented or recommended).
+ *
+ * When the requested record becomes available, the function opens the file
+ * containing it (if not open already), and returns true. When end of standby
+ * mode is triggered by the user, and there is no more WAL available, returns
+ * false.
+ */
+static bool
+WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
+                           bool fetching_ckpt)
+{
+   static pg_time_t last_fail_time = 0;
+
+   for (;;)
+   {
+       if (WalRcvInProgress())
+       {
+           bool        havedata;
+
+           /*
+            * If we find an invalid record in the WAL streamed from master,
+            * something is seriously wrong. There's little chance that the
+            * problem will just go away, but PANIC is not good for
+            * availability either, especially in hot standby mode.
+            * Disconnect, and retry from archive/pg_xlog again. The WAL in
+            * the archive should be identical to what was streamed, so it's
+            * unlikely that it helps, but one can hope...
+            */
+           if (failedSources & XLOG_FROM_STREAM)
+           {
+               ShutdownWalRcv();
+               continue;
+           }
+
+           /*
+            * Walreceiver is active, so see if new data has arrived.
+            *
+            * We only advance XLogReceiptTime when we obtain fresh WAL from
+            * walreceiver and observe that we had already processed
+            * everything before the most recent "chunk" that it flushed to
+            * disk.  In steady state where we are keeping up with the
+            * incoming data, XLogReceiptTime will be updated on each cycle.
+            * When we are behind, XLogReceiptTime will not advance, so the
+            * grace time allotted to conflicting queries will decrease.
+            */
+           if (XLByteLT(RecPtr, receivedUpto))
+               havedata = true;
+           else
+           {
+               XLogRecPtr  latestChunkStart;
+
+               receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
+               if (XLByteLT(RecPtr, receivedUpto))
+               {
+                   havedata = true;
+                   if (!XLByteLT(RecPtr, latestChunkStart))
+                   {
+                       XLogReceiptTime = GetCurrentTimestamp();
+                       SetCurrentChunkStartTime(XLogReceiptTime);
+                   }
+               }
+               else
+                   havedata = false;
+           }
+           if (havedata)
+           {
+               /*
+                * Great, streamed far enough.  Open the file if it's not open
+                * already.  Use XLOG_FROM_STREAM so that source info is set
+                * correctly and XLogReceiptTime isn't changed.
+                */
+               if (readFile < 0)
+               {
+                   readFile = XLogFileRead(readSegNo, PANIC,
+                                           curFileTLI,
+                                           XLOG_FROM_STREAM, false);
+                   Assert(readFile >= 0);
+               }
+               else
+               {
+                   /* just make sure source info is correct... */
+                   readSource = XLOG_FROM_STREAM;
+                   XLogReceiptSource = XLOG_FROM_STREAM;
+               }
+               break;
+           }
+
+           /*
+            * Data not here yet, so check for trigger then sleep for five
+            * seconds like in the WAL file polling case below.
+            */
+           if (CheckForStandbyTrigger())
+               return false;
+
+           /*
+            * Wait for more WAL to arrive, or timeout to be reached
+            */
+           WaitLatch(&XLogCtl->recoveryWakeupLatch,
+                     WL_LATCH_SET | WL_TIMEOUT,
+                     5000L);
+           ResetLatch(&XLogCtl->recoveryWakeupLatch);
+       }
+       else
+       {
+           /*
+            * WAL receiver is not active. Poll the archive.
+            */
+           int         sources;
+           pg_time_t   now;
+
+           if (readFile >= 0)
+           {
+               close(readFile);
+               readFile = -1;
+           }
+           /* Reset curFileTLI if random fetch. */
+           if (randAccess)
+               curFileTLI = 0;
+
+           /*
+            * Try to restore the file from archive, or read an existing file
+            * from pg_xlog.
+            */
+           sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
+           if (!(sources & ~failedSources))
+           {
+               /*
+                * We've exhausted all options for retrieving the file. Retry.
+                */
+               failedSources = 0;
+
+               /*
+                * Before we sleep, re-scan for possible new timelines if we
+                * were requested to recover to the latest timeline.
+                */
+               if (recoveryTargetIsLatest)
+               {
+                   if (rescanLatestTimeLine())
+                       continue;
+               }
+
+               /*
+                * If it hasn't been long since last attempt, sleep to avoid
+                * busy-waiting.
+                */
+               now = (pg_time_t) time(NULL);
+               if ((now - last_fail_time) < 5)
+               {
+                   pg_usleep(1000000L * (5 - (now - last_fail_time)));
+                   now = (pg_time_t) time(NULL);
+               }
+               last_fail_time = now;
+
+               /*
+                * If primary_conninfo is set, launch walreceiver to try to
+                * stream the missing WAL, before retrying to restore from
+                * archive/pg_xlog.
+                *
+                * If fetching_ckpt is TRUE, RecPtr points to the initial
+                * checkpoint location. In that case, we use RedoStartLSN as
+                * the streaming start position instead of RecPtr, so that
+                * when we later jump backwards to start redo at RedoStartLSN,
+                * we will have the logs streamed already.
+                */
+               if (PrimaryConnInfo)
+               {
+                   XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
+
+                   RequestXLogStreaming(ptr, PrimaryConnInfo);
+                   continue;
+               }
+           }
+           /* Don't try to read from a source that just failed */
+           sources &= ~failedSources;
+           readFile = XLogFileReadAnyTLI(readSegNo, DEBUG2, sources);
+           if (readFile >= 0)
+               break;
+
+           /*
+            * Nope, not found in archive and/or pg_xlog.
+            */
+           failedSources |= sources;
+
+           /*
+            * Check to see if the trigger file exists. Note that we do this
+            * only after failure, so when you create the trigger file, we
+            * still finish replaying as much as we can from archive and
+            * pg_xlog before failover.
+            */
+           if (CheckForStandbyTrigger())
+               return false;
+       }
+
+       /*
+        * This possibly-long loop needs to handle interrupts of startup
+        * process.
+        */
+       HandleStartupProcInterrupts();
+   }
+
+   return true;
+}
+
 /*
  * Determine what log level should be used to report a corrupt WAL record
  * in the current WAL page, previously read by XLogPageRead().