Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Prevent WAL files created by pg_basebackup -x/X from being archived again.
authorAndres Freund <andres@anarazel.de>
Sat, 3 Jan 2015 19:51:52 +0000 (20:51 +0100)
committerAndres Freund <andres@anarazel.de>
Sat, 3 Jan 2015 19:54:13 +0000 (20:54 +0100)
WAL (and timeline history) files created by pg_basebackup did not
maintain the new base backup's archive status. That's currently not a
problem if the new node is used as a standby - but if that node is
promoted all still existing files can get archived again.  With a high
wal_keep_segment settings that can happen a significant time later -
which is quite confusing.

Change both the backend (for the -x/-X fetch case) and pg_basebackup
(for -X stream) itself to always mark WAL/timeline files included in
the base backup as .done. That's in line with walreceiver.c doing so.

The verbosity of the pg_basebackup changes show pretty clearly that it
needs some refactoring, but that'd result in not be backpatchable
changes.

Backpatch to 9.1 where pg_basebackup was introduced.

Discussion: 20141205002854.GE21964@awork2.anarazel.de

src/backend/replication/basebackup.c
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_receivexlog.c
src/bin/pg_basebackup/receivelog.c
src/bin/pg_basebackup/receivelog.h

index a00bea6fc51984c5762ba0a5e26e73c546ec5601..896ab6917a6d925789ae0d61ac2ddfa65aca3f64 100644 (file)
@@ -400,6 +400,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                         errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
            }
 
+           /* send the WAL file itself */
            _tarWriteHeader(pathbuf, NULL, &statbuf);
 
            while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
@@ -424,7 +425,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
            }
 
            /* XLogSegSize is a multiple of 512, so no need for padding */
+
            FreeFile(fp);
+
+           /*
+            * Mark file as archived, otherwise files can get archived again
+            * after promotion of a new node. This is in line with
+            * walreceiver.c always doing a XLogArchiveForceDone() after a
+            * complete segment.
+            */
+           StatusFilePath(pathbuf, walFiles[i], ".done");
+           sendFileWithContent(pathbuf, "");
        }
 
        /*
@@ -447,6 +458,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                         errmsg("could not stat file \"%s\": %m", pathbuf)));
 
            sendFile(pathbuf, pathbuf, &statbuf, false);
+
+           /* unconditionally mark file as archived */
+           StatusFilePath(pathbuf, fname, ".done");
+           sendFileWithContent(pathbuf, "");
        }
 
        /* Send CopyDone message for the last tar file */
@@ -881,6 +896,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
                _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
            }
            size += 512;        /* Size of the header just added */
+
+           /*
+            * Also send archive_status directory (by hackishly reusing
+            * statbuf from above ...).
+            */
+           if (!sizeonly)
+               _tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
+           size += 512;        /* Size of the header just added */
+
            continue;           /* don't recurse into pg_xlog */
        }
 
index f13e376d3d3573cb5346f1970cf0f1b19c41e85e..61d2cb3a73b9661c3c7d6383d270e923da3877b4 100644 (file)
@@ -259,7 +259,7 @@ LogStreamerMain(logstreamer_param *param)
    if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                           param->sysidentifier, param->xlogdir,
                           reached_end_position, standby_message_timeout,
-                          true))
+                          true, true))
 
        /*
         * Any errors will already have been reported in the function process,
@@ -281,6 +281,7 @@ static void
 StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 {
    logstreamer_param *param;
+   char        statusdir[MAXPGPATH];
 
    param = xmalloc0(sizeof(logstreamer_param));
    param->timeline = timeline;
@@ -314,13 +315,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
        /* Error message already written in GetConnection() */
        exit(1);
 
+   snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+
    /*
-    * Always in plain format, so we can write to basedir/pg_xlog. But the
-    * directory entry in the tar file may arrive later, so make sure it's
-    * created before we start.
+    * Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
+    * basedir/pg_xlog as the directory entry in the tar file may arrive
+    * later.
     */
-   snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
-   verify_dir_is_empty_or_create(param->xlogdir);
+   snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+            basedir);
+
+   if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+   {
+       fprintf(stderr,
+               _("%s: could not create directory \"%s\": %s\n"),
+               progname, statusdir, strerror(errno));
+       disconnect_and_exit(1);
+   }
 
    /*
     * Start a child process and tell it to start streaming. On Unix, this is
@@ -403,6 +414,23 @@ verify_dir_is_empty_or_create(char *dirname)
    }
 }
 
+/*
+ * Returns whether the string `str' has the postfix `end'.
+ */
+static bool
+pg_str_endswith(const char *str, const char *end)
+{
+   size_t      slen = strlen(str);
+   size_t      elen = strlen(end);
+
+   /* can't be a postfix if longer */
+   if (elen > slen)
+       return false;
+
+   /* compare the end of the strings */
+   str += slen - elen;
+   return strcmp(str, end) == 0;
+}
 
 /*
  * Print a progress report based on the global variables. If verbose output
@@ -835,10 +863,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
                    {
                        /*
                         * When streaming WAL, pg_xlog will have been created
-                        * by the wal receiver process, so just ignore failure
-                        * on that.
+                        * by the wal receiver process. So just ignore creation
+                        * failures on related directories.
                         */
-                       if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
+                       if (!((pg_str_endswith(filename, "/pg_xlog") ||
+                              pg_str_endswith(filename, "/archive_status")) &&
+                             errno == EEXIST))
                        {
                            fprintf(stderr,
                            _("%s: could not create directory \"%s\": %s\n"),
index 6c998735d9155f2c33fcc11e517b1481cf978c94..8167aebe9d939befe235b60dee5449d59395c736 100644 (file)
@@ -321,7 +321,7 @@ StreamLog(void)
                progname, startpos.xlogid, startpos.xrecoff, timeline);
 
    ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
-                     stop_streaming, standby_message_timeout, false);
+                     stop_streaming, standby_message_timeout, false, true);
 
    PQfinish(conn);
 }
index 3a46767417eea2bc214396fab1b4da67e4c49b06..9cd0942de3af2f882af26ae8826514b0608deafa 100644 (file)
@@ -44,6 +44,35 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
 static int walfile = -1;
 
 
+static bool
+mark_file_as_archived(const char *basedir, const char *fname)
+{
+   int fd;
+   static char tmppath[MAXPGPATH];
+
+   snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
+            basedir, fname);
+
+   fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+   if (fd < 0)
+   {
+       fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   if (fsync(fd) != 0)
+   {
+       fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   close(fd);
+
+   return true;
+}
+
 /*
  * Open a new WAL file in the specified directory. Store the name
  * (not including the full directory) in namebuf. Assumes there is
@@ -133,7 +162,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
  * completed writing the whole segment.
  */
 static bool
-close_walfile(char *basedir, char *walname, bool segment_complete)
+close_walfile(char *basedir, char *walname, bool segment_complete,
+             bool mark_done)
 {
    off_t       currpos = lseek(walfile, 0, SEEK_CUR);
 
@@ -184,6 +214,19 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
                _("%s: not renaming \"%s\", segment is not complete\n"),
                progname, walname);
 
+   /*
+    * Mark file as archived if requested by the caller - pg_basebackup needs
+    * to do so as files can otherwise get archived again after promotion of a
+    * new node. This is in line with walreceiver.c always doing a
+    * XLogArchiveForceDone() after a complete segment.
+    */
+   if (currpos == XLOG_SEG_SIZE && mark_done)
+   {
+       /* writes error message if failed */
+       if (!mark_file_as_archived(basedir, walname))
+           return false;
+   }
+
    return true;
 }
 
@@ -284,7 +327,8 @@ bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                  char *sysidentifier, char *basedir,
                  stream_stop_callback stream_stop,
-                 int standby_message_timeout, bool rename_partial)
+                 int standby_message_timeout, bool rename_partial,
+                 bool mark_done)
 {
    char        query[128];
    char        current_walfile_name[MAXPGPATH];
@@ -343,7 +387,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        return false;
    }
    PQclear(res);
-
    /*
     * Receive the actual xlog data
     */
@@ -367,7 +410,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
        if (stream_stop && stream_stop(blockpos, timeline, false))
        {
            if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
-                                               rename_partial))
+                                               rename_partial, mark_done))
                /* Potential error message is written by close_walfile */
                goto error;
            return true;
@@ -579,7 +622,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
            /* Did we reach the end of a WAL segment? */
            if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
            {
-               if (!close_walfile(basedir, current_walfile_name, false))
+               if (!close_walfile(basedir, current_walfile_name, false,
+                                  mark_done))
                    /* Error message written in close_walfile() */
                    goto error;
 
index 7176a68beaab6d9f1d39d19a7e8cd35037e80b0c..5ebf31d7c244b528f3284e7f110ddf5ab8ff3a6f 100644 (file)
@@ -13,4 +13,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
                  char *basedir,
                  stream_stop_callback stream_stop,
                  int standby_message_timeout,
-                 bool rename_partial);
+                 bool rename_partial,
+                 bool mark_done);