Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/walsender.c36
1 files changed, 34 insertions, 2 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1028919aecb..216baeda5cd 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -91,10 +91,14 @@
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+/* Minimum interval used by walsender for stats flushes, in ms */
+#define WALSENDER_STATS_FLUSH_INTERVAL 1000
+
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
*
@@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc)
int wakeEvents;
uint32 wait_event = 0;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+ TimestampTz last_flush = 0;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
@@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc)
{
bool wait_for_standby_at_stop = false;
long sleeptime;
+ TimestampTz now;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
@@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc)
* new WAL to be generated. (But if we have nothing to send, we don't
* want to wake on socket-writable.)
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_SOCKET_READABLE;
@@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc)
Assert(wait_event != 0);
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+ last_flush = now;
+ }
+
WalSndWait(wakeEvents, sleeptime, wait_event);
}
@@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void)
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
+ TimestampTz last_flush = 0;
+
/*
* Initialize the last reply timestamp. That enables timeout processing
* from hereon.
@@ -2836,6 +2854,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
* WalSndWaitForWal() handle any other blocking; idle receivers need
* its additional actions. For physical replication, also block if
* caught up; its send_data does not block.
+ *
+ * The IO statistics are reported in WalSndWaitForWal() for the
+ * logical WAL senders.
*/
if ((WalSndCaughtUp && send_data != XLogSendLogical &&
!streamingDoneSending) ||
@@ -2843,6 +2864,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
{
long sleeptime;
int wakeEvents;
+ TimestampTz now;
if (!streamingDoneReceiving)
wakeEvents = WL_SOCKET_READABLE;
@@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data)
* Use fresh timestamp, not last_processing, to reduce the chance
* of reaching wal_sender_timeout before sending a keepalive.
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+ last_flush = now;
+ }
+
/* Sleep until something happens or we time out */
WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
}