Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Use the regular main processing loop also in walsenders.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 5 Oct 2012 14:13:07 +0000 (17:13 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 5 Oct 2012 14:21:12 +0000 (17:21 +0300)
The regular backend's main loop handles signal handling and error recovery
better than the current WAL sender command loop does. For example, if the
client hangs and a SIGTERM is received before starting streaming, the
walsender will now terminate immediately, rather than hang until the
connection times out.

src/backend/replication/basebackup.c
src/backend/replication/walsender.c
src/backend/tcop/postgres.c
src/include/replication/walsender.h

index 4636e8d1c6fc74b93e5c48a282fc9f444bc91270..04681f4196299ce74739cc4cccf5ca305cffae97 100644 (file)
@@ -22,6 +22,7 @@
 #include "lib/stringinfo.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
+#include "miscadmin.h"
 #include "nodes/pg_list.h"
 #include "replication/basebackup.h"
 #include "replication/walsender.h"
@@ -30,7 +31,6 @@
 #include "storage/ipc.h"
 #include "utils/builtins.h"
 #include "utils/elog.h"
-#include "utils/memutils.h"
 #include "utils/ps_status.h"
 
 typedef struct
@@ -370,19 +370,10 @@ void
 SendBaseBackup(BaseBackupCmd *cmd)
 {
    DIR        *dir;
-   MemoryContext backup_context;
-   MemoryContext old_context;
    basebackup_options opt;
 
    parse_basebackup_options(cmd->options, &opt);
 
-   backup_context = AllocSetContextCreate(CurrentMemoryContext,
-                                          "Streaming base backup context",
-                                          ALLOCSET_DEFAULT_MINSIZE,
-                                          ALLOCSET_DEFAULT_INITSIZE,
-                                          ALLOCSET_DEFAULT_MAXSIZE);
-   old_context = MemoryContextSwitchTo(backup_context);
-
    WalSndSetState(WALSNDSTATE_BACKUP);
 
    if (update_process_title)
@@ -403,9 +394,6 @@ SendBaseBackup(BaseBackupCmd *cmd)
    perform_base_backup(&opt, dir);
 
    FreeDir(dir);
-
-   MemoryContextSwitchTo(old_context);
-   MemoryContextDelete(backup_context);
 }
 
 static void
@@ -606,7 +594,7 @@ sendDir(char *path, int basepathlen, bool sizeonly)
         * error in that case. The error handler further up will call
         * do_pg_abort_backup() for us.
         */
-       if (walsender_shutdown_requested || walsender_ready_to_stop)
+       if (ProcDiePending || walsender_ready_to_stop)
            ereport(ERROR,
                (errmsg("shutdown requested, aborting active base backup")));
 
index cc27848318bc223e931b842bcd94003a9fa556d7..0ba2ad4414062ead6c20c50cd6d5c84ba97b6086 100644 (file)
@@ -78,6 +78,8 @@ bool      am_walsender = false;       /* Am I a walsender process ? */
 bool       am_cascading_walsender = false;     /* Am I cascading WAL to
                                                 * another standby ? */
 
+static bool    replication_started = false; /* Started streaming yet? */
+
 /* User-settable parameters for walsender */
 int            max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int            replication_timeout = 60 * 1000;    /* maximum time to send one
@@ -113,21 +115,16 @@ static TimestampTz last_reply_timestamp;
 
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t walsender_shutdown_requested = false;
 volatile sig_atomic_t walsender_ready_to_stop = false;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
-static void WalSndShutdownHandler(SIGNAL_ARGS);
-static void WalSndQuickDieHandler(SIGNAL_ARGS);
 static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static bool HandleReplicationCommand(const char *cmd_string);
 static void WalSndLoop(void) __attribute__((noreturn));
-static void InitWalSnd(void);
-static void WalSndHandshake(void);
+static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogSend(char *msgbuf, bool *caughtup);
 static void IdentifySystem(void);
@@ -139,147 +136,48 @@ static void ProcessRepliesIfAny(void);
 static void WalSndKeepalive(char *msgbuf);
 
 
-/* Main entry point for walsender process */
+/* Initialize walsender process before entering the main command loop */
 void
-WalSenderMain(void)
+InitWalSender(void)
 {
-   MemoryContext walsnd_context;
-
    am_cascading_walsender = RecoveryInProgress();
 
    /* Create a per-walsender data structure in shared memory */
-   InitWalSnd();
-
-   /*
-    * Create a memory context that we will do all our work in.  We do this so
-    * that we can reset the context during error recovery and thereby avoid
-    * possible memory leaks.  Formerly this code just ran in
-    * TopMemoryContext, but resetting that would be a really bad idea.
-    *
-    * XXX: we don't actually attempt error recovery in walsender, we just
-    * close the connection and exit.
-    */
-   walsnd_context = AllocSetContextCreate(TopMemoryContext,
-                                          "Wal Sender",
-                                          ALLOCSET_DEFAULT_MINSIZE,
-                                          ALLOCSET_DEFAULT_INITSIZE,
-                                          ALLOCSET_DEFAULT_MAXSIZE);
-   MemoryContextSwitchTo(walsnd_context);
+   InitWalSenderSlot();
 
    /* Set up resource owner */
    CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
 
-   /* Unblock signals (they were blocked when the postmaster forked us) */
-   PG_SETMASK(&UnBlockSig);
-
    /*
     * Use the recovery target timeline ID during recovery
     */
    if (am_cascading_walsender)
        ThisTimeLineID = GetRecoveryTargetTLI();
-
-   /* Tell the standby that walsender is ready for receiving commands */
-   ReadyForQuery(DestRemote);
-
-   /* Handle handshake messages before streaming */
-   WalSndHandshake();
-
-   /* Initialize shared memory status */
-   {
-       /* use volatile pointer to prevent code rearrangement */
-       volatile WalSnd *walsnd = MyWalSnd;
-
-       SpinLockAcquire(&walsnd->mutex);
-       walsnd->sentPtr = sentPtr;
-       SpinLockRelease(&walsnd->mutex);
-   }
-
-   SyncRepInitConfig();
-
-   /* Main loop of walsender */
-   WalSndLoop();
 }
 
 /*
- * Execute commands from walreceiver, until we enter streaming mode.
+ * Clean up after an error.
+ *
+ * WAL sender processes don't use transactions like regular backends do.
+ * This function does any cleanup requited after an error in a WAL sender
+ * process, similar to what transaction abort does in a regular backend.
  */
-static void
-WalSndHandshake(void)
+void
+WalSndErrorCleanup()
 {
-   StringInfoData input_message;
-   bool        replication_started = false;
-
-   initStringInfo(&input_message);
-
-   while (!replication_started)
+   if (sendFile >= 0)
    {
-       int         firstchar;
-
-       WalSndSetState(WALSNDSTATE_STARTUP);
-       set_ps_display("idle", false);
-
-       /* Wait for a command to arrive */
-       firstchar = pq_getbyte();
-
-       /*
-        * Emergency bailout if postmaster has died.  This is to avoid the
-        * necessity for manual cleanup of all postmaster children.
-        */
-       if (!PostmasterIsAlive())
-           exit(1);
-
-       /*
-        * Check for any other interesting events that happened while we
-        * slept.
-        */
-       if (got_SIGHUP)
-       {
-           got_SIGHUP = false;
-           ProcessConfigFile(PGC_SIGHUP);
-       }
-
-       if (firstchar != EOF)
-       {
-           /*
-            * Read the message contents. This is expected to be done without
-            * blocking because we've been able to get message type code.
-            */
-           if (pq_getmessage(&input_message, 0))
-               firstchar = EOF;    /* suitable message already logged */
-       }
-
-       /* Handle the very limited subset of commands expected in this phase */
-       switch (firstchar)
-       {
-           case 'Q':           /* Query message */
-               {
-                   const char *query_string;
-
-                   query_string = pq_getmsgstring(&input_message);
-                   pq_getmsgend(&input_message);
-
-                   if (HandleReplicationCommand(query_string))
-                       replication_started = true;
-               }
-               break;
-
-           case 'X':
-               /* standby is closing the connection */
-               proc_exit(0);
-
-           case EOF:
-               /* standby disconnected unexpectedly */
-               ereport(COMMERROR,
-                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                        errmsg("unexpected EOF on standby connection")));
-               proc_exit(0);
-
-           default:
-               ereport(FATAL,
-                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                        errmsg("invalid standby handshake message type %d", firstchar)));
-       }
+       close(sendFile);
+       sendFile = -1;
    }
+
+   /*
+    * Don't return back to the command loop after we've started replicating.
+    * We've already marked us as an actively streaming WAL sender in the
+    * PMSignal slot, and there's currently no way to undo that.
+    */
+   if (replication_started)
+       proc_exit(0);
 }
 
 /*
@@ -350,15 +248,13 @@ IdentifySystem(void)
    pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
 
    pq_endmessage(&buf);
-
-   /* Send CommandComplete and ReadyForQuery messages */
-   EndCommand("SELECT", DestRemote);
-   ReadyForQuery(DestRemote);
-   /* ReadyForQuery did pq_flush for us */
 }
 
 /*
- * START_REPLICATION
+ * Handle START_REPLICATION command.
+ *
+ * At the moment, this never returns, but an ereport(ERROR) will take us back
+ * to the main loop.
  */
 static void
 StartReplication(StartReplicationCmd *cmd)
@@ -374,6 +270,7 @@ StartReplication(StartReplicationCmd *cmd)
     */
    MarkPostmasterChildWalSender();
    SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
+   replication_started = true;
 
    /*
     * When promoting a cascading standby, postmaster sends SIGUSR2 to any
@@ -435,15 +332,29 @@ StartReplication(StartReplicationCmd *cmd)
     * be shipped from that position
     */
    sentPtr = cmd->startpoint;
+
+   /* Also update the start position status in shared memory */
+   {
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalSnd *walsnd = MyWalSnd;
+
+       SpinLockAcquire(&walsnd->mutex);
+       walsnd->sentPtr = sentPtr;
+       SpinLockRelease(&walsnd->mutex);
+   }
+
+   SyncRepInitConfig();
+
+   /* Main loop of walsender */
+   WalSndLoop();
 }
 
 /*
  * Execute an incoming replication command.
  */
-static bool
-HandleReplicationCommand(const char *cmd_string)
+void
+exec_replication_command(const char *cmd_string)
 {
-   bool        replication_started = false;
    int         parse_rc;
    Node       *cmd_node;
    MemoryContext cmd_context;
@@ -451,6 +362,8 @@ HandleReplicationCommand(const char *cmd_string)
 
    elog(DEBUG1, "received replication command: %s", cmd_string);
 
+   CHECK_FOR_INTERRUPTS();
+
    cmd_context = AllocSetContextCreate(CurrentMemoryContext,
                                        "Replication command context",
                                        ALLOCSET_DEFAULT_MINSIZE,
@@ -476,18 +389,10 @@ HandleReplicationCommand(const char *cmd_string)
 
        case T_StartReplicationCmd:
            StartReplication((StartReplicationCmd *) cmd_node);
-
-           /* break out of the loop */
-           replication_started = true;
            break;
 
        case T_BaseBackupCmd:
            SendBaseBackup((BaseBackupCmd *) cmd_node);
-
-           /* Send CommandComplete and ReadyForQuery messages */
-           EndCommand("SELECT", DestRemote);
-           ReadyForQuery(DestRemote);
-           /* ReadyForQuery did pq_flush for us */
            break;
 
        default:
@@ -500,7 +405,8 @@ HandleReplicationCommand(const char *cmd_string)
    MemoryContextSwitchTo(old_context);
    MemoryContextDelete(cmd_context);
 
-   return replication_started;
+   /* Send CommandComplete message */
+   EndCommand("SELECT", DestRemote);
 }
 
 /*
@@ -710,7 +616,7 @@ ProcessStandbyHSFeedbackMessage(void)
    MyPgXact->xmin = reply.xmin;
 }
 
-/* Main loop of walsender process */
+/* Main loop of walsender process that streams the WAL over Copy messages. */
 static void
 WalSndLoop(void)
 {
@@ -754,15 +660,7 @@ WalSndLoop(void)
            SyncRepInitConfig();
        }
 
-       /* Normal exit from the walsender is here */
-       if (walsender_shutdown_requested)
-       {
-           /* Inform the standby that XLOG streaming is done */
-           pq_puttextmessage('C', "COPY 0");
-           pq_flush();
-
-           proc_exit(0);
-       }
+       CHECK_FOR_INTERRUPTS();
 
        /* Check for input from the client */
        ProcessRepliesIfAny();
@@ -813,7 +711,7 @@ WalSndLoop(void)
                XLogSend(output_message, &caughtup);
                if (caughtup && !pq_is_send_pending())
                {
-                   walsender_shutdown_requested = true;
+                   ProcDiePending = true;
                    continue;   /* don't want to wait more */
                }
            }
@@ -854,8 +752,11 @@ WalSndLoop(void)
            }
 
            /* Sleep until something happens or replication timeout */
+           ImmediateInterruptOK = true;
+           CHECK_FOR_INTERRUPTS();
            WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
                              MyProcPort->sock, sleeptime);
+           ImmediateInterruptOK = false;
 
            /*
             * Check for replication timeout.  Note we ignore the corner case
@@ -892,7 +793,7 @@ WalSndLoop(void)
 
 /* Initialize a per-walsender data structure for this walsender process */
 static void
-InitWalSnd(void)
+InitWalSenderSlot(void)
 {
    int         i;
 
@@ -1284,58 +1185,6 @@ WalSndSigHupHandler(SIGNAL_ARGS)
    errno = save_errno;
 }
 
-/* SIGTERM: set flag to shut down */
-static void
-WalSndShutdownHandler(SIGNAL_ARGS)
-{
-   int         save_errno = errno;
-
-   walsender_shutdown_requested = true;
-   if (MyWalSnd)
-       SetLatch(&MyWalSnd->latch);
-
-   /*
-    * Set the standard (non-walsender) state as well, so that we can abort
-    * things like do_pg_stop_backup().
-    */
-   InterruptPending = true;
-   ProcDiePending = true;
-
-   errno = save_errno;
-}
-
-/*
- * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
- *
- * Some backend has bought the farm,
- * so we need to stop what we're doing and exit.
- */
-static void
-WalSndQuickDieHandler(SIGNAL_ARGS)
-{
-   PG_SETMASK(&BlockSig);
-
-   /*
-    * We DO NOT want to run proc_exit() callbacks -- we're here because
-    * shared memory may be corrupted, so we don't want to try to clean up our
-    * transaction.  Just nail the windows shut and get out of town.  Now that
-    * there's an atexit callback to prevent third-party code from breaking
-    * things by calling exit() directly, we have to reset the callbacks
-    * explicitly to make this work as intended.
-    */
-   on_exit_reset();
-
-   /*
-    * Note we do exit(2) not exit(0).  This is to force the postmaster into a
-    * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
-    * backend.  This is necessary precisely because we don't clean up our
-    * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
-    * should ensure the postmaster sees this as a crash, too, but no harm in
-    * being doubly sure.)
-    */
-   exit(2);
-}
-
 /* SIGUSR1: set flag to send WAL records */
 static void
 WalSndXLogSendHandler(SIGNAL_ARGS)
@@ -1368,8 +1217,8 @@ WalSndSignals(void)
    pqsignal(SIGHUP, WalSndSigHupHandler);      /* set flag to read config
                                                 * file */
    pqsignal(SIGINT, SIG_IGN);  /* not used */
-   pqsignal(SIGTERM, WalSndShutdownHandler);   /* request shutdown */
-   pqsignal(SIGQUIT, WalSndQuickDieHandler);   /* hard crash time */
+   pqsignal(SIGTERM, die);                     /* request shutdown */
+   pqsignal(SIGQUIT, quickdie);                /* hard crash time */
    InitializeTimeouts();       /* establishes SIGALRM handler */
    pqsignal(SIGPIPE, SIG_IGN);
    pqsignal(SIGUSR1, WalSndXLogSendHandler);   /* request WAL sending */
index f1248a851bf90188da8d3a7e8b61ac99bf78ebbd..585db1af89cd37ba7e8f5327bbd992c45beaf288 100644 (file)
@@ -192,6 +192,7 @@ static int  InteractiveBackend(StringInfo inBuf);
 static int interactive_getc(void);
 static int SocketBackend(StringInfo inBuf);
 static int ReadCommand(StringInfo inBuf);
+static void forbidden_in_wal_sender(char firstchar);
 static List *pg_rewrite_query(Query *query);
 static bool check_log_statement(List *stmt_list);
 static int errdetail_execute(List *raw_parsetree_list);
@@ -3720,12 +3721,9 @@ PostgresMain(int argc, char *argv[], const char *username)
    if (IsUnderPostmaster && Log_disconnections)
        on_proc_exit(log_disconnections, 0);
 
-   /* If this is a WAL sender process, we're done with initialization. */
+   /* Perform initialization specific to a WAL sender process. */
    if (am_walsender)
-   {
-       WalSenderMain();        /* does not return */
-       abort();
-   }
+       InitWalSender();
 
    /*
     * process any libraries that should be preloaded at backend start (this
@@ -3835,6 +3833,9 @@ PostgresMain(int argc, char *argv[], const char *username)
         */
        AbortCurrentTransaction();
 
+       if (am_walsender)
+           WalSndErrorCleanup();
+
        /*
         * Now return to normal top-level context and clear ErrorContext for
         * next time.
@@ -3969,7 +3970,10 @@ PostgresMain(int argc, char *argv[], const char *username)
                    query_string = pq_getmsgstring(&input_message);
                    pq_getmsgend(&input_message);
 
-                   exec_simple_query(query_string);
+                   if (am_walsender)
+                       exec_replication_command(query_string);
+                   else
+                       exec_simple_query(query_string);
 
                    send_ready_for_query = true;
                }
@@ -3982,6 +3986,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                    int         numParams;
                    Oid        *paramTypes = NULL;
 
+                   forbidden_in_wal_sender(firstchar);
+
                    /* Set statement_timestamp() */
                    SetCurrentStatementStartTimestamp();
 
@@ -4004,6 +4010,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                break;
 
            case 'B':           /* bind */
+               forbidden_in_wal_sender(firstchar);
+
                /* Set statement_timestamp() */
                SetCurrentStatementStartTimestamp();
 
@@ -4019,6 +4027,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                    const char *portal_name;
                    int         max_rows;
 
+                   forbidden_in_wal_sender(firstchar);
+
                    /* Set statement_timestamp() */
                    SetCurrentStatementStartTimestamp();
 
@@ -4031,6 +4041,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                break;
 
            case 'F':           /* fastpath function call */
+               forbidden_in_wal_sender(firstchar);
+
                /* Set statement_timestamp() */
                SetCurrentStatementStartTimestamp();
 
@@ -4078,6 +4090,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                    int         close_type;
                    const char *close_target;
 
+                   forbidden_in_wal_sender(firstchar);
+
                    close_type = pq_getmsgbyte(&input_message);
                    close_target = pq_getmsgstring(&input_message);
                    pq_getmsgend(&input_message);
@@ -4120,6 +4134,8 @@ PostgresMain(int argc, char *argv[], const char *username)
                    int         describe_type;
                    const char *describe_target;
 
+                   forbidden_in_wal_sender(firstchar);
+
                    /* Set statement_timestamp() (needed for xact) */
                    SetCurrentStatementStartTimestamp();
 
@@ -4201,6 +4217,29 @@ PostgresMain(int argc, char *argv[], const char *username)
    }                           /* end of input-reading loop */
 }
 
+/*
+ * Throw an error if we're a WAL sender process.
+ *
+ * This is used to forbid anything else than simple query protocol messages
+ * in a WAL sender process.  'firstchar' specifies what kind of a forbidden
+ * message was received, and is used to construct the error message.
+ */
+static void
+forbidden_in_wal_sender(char firstchar)
+{
+   if (am_walsender)
+   {
+       if (firstchar == 'F')
+           ereport(ERROR,
+                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                    errmsg("fastpath function calls not supported in a replication connection")));
+       else
+           ereport(ERROR,
+                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                    errmsg("extended query protocol not supported in a replication connection")));
+   }
+}
+
 
 /*
  * Obtain platform stack depth limit (in bytes)
index bb85ccf7b22cea2a927a82be0ba6219c6fb5efe8..78e8558299ce844d8a7bd31ebeafc5a8f507f5a2 100644 (file)
@@ -19,7 +19,6 @@
 /* global state */
 extern bool am_walsender;
 extern bool am_cascading_walsender;
-extern volatile sig_atomic_t walsender_shutdown_requested;
 extern volatile sig_atomic_t walsender_ready_to_stop;
 extern bool wake_wal_senders;
 
@@ -27,7 +26,9 @@ extern bool wake_wal_senders;
 extern int max_wal_senders;
 extern int replication_timeout;
 
-extern void WalSenderMain(void) __attribute__((noreturn));
+extern void InitWalSender(void);
+extern void exec_replication_command(const char *query_string);
+extern void WalSndErrorCleanup(void);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
 extern void WalSndShmemInit(void);