Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3ac298d

Browse files
bdrouvotAWSCommitfest Bot
authored and
Commitfest Bot
committed
Flush the IO statistics of active walsenders
The walsender does not flush its IO statistics until it exits. The issue is there since pg_stat_io has been introduced in a9c70b4. This commits: 1. ensures it does not wait to exit to flush its IO statistics 2. flush its IO statistics periodically to not overload the walsender 3. adds a test for a physical walsender and a test for a logical walsender
1 parent d5d85f1 commit 3ac298d

File tree

3 files changed

+74
-18
lines changed

3 files changed

+74
-18
lines changed

src/backend/replication/walsender.c

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,14 @@
9191
#include "utils/guc.h"
9292
#include "utils/memutils.h"
9393
#include "utils/pg_lsn.h"
94+
#include "utils/pgstat_internal.h"
9495
#include "utils/ps_status.h"
9596
#include "utils/timeout.h"
9697
#include "utils/timestamp.h"
9798

99+
/* Minimum interval walsender IO stats flushes */
100+
#define MIN_IOSTATS_FLUSH_INTERVAL 1000
101+
98102
/*
99103
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
100104
*
@@ -2742,6 +2746,8 @@ WalSndCheckTimeOut(void)
27422746
static void
27432747
WalSndLoop(WalSndSendDataCallback send_data)
27442748
{
2749+
TimestampTz last_flush = 0;
2750+
27452751
/*
27462752
* Initialize the last reply timestamp. That enables timeout processing
27472753
* from hereon.
@@ -2836,30 +2842,51 @@ WalSndLoop(WalSndSendDataCallback send_data)
28362842
* WalSndWaitForWal() handle any other blocking; idle receivers need
28372843
* its additional actions. For physical replication, also block if
28382844
* caught up; its send_data does not block.
2845+
*
2846+
* When the WAL sender is caught up or has pending data to send, we
2847+
* also periodically report I/O statistics. It's done periodically to
2848+
* not overload the WAL sender.
28392849
*/
2840-
if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2841-
!streamingDoneSending) ||
2842-
pq_is_send_pending())
2850+
if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending())
28432851
{
2844-
long sleeptime;
2845-
int wakeEvents;
2852+
TimestampTz now;
28462853

2847-
if (!streamingDoneReceiving)
2848-
wakeEvents = WL_SOCKET_READABLE;
2849-
else
2850-
wakeEvents = 0;
2854+
now = GetCurrentTimestamp();
28512855

2852-
/*
2853-
* Use fresh timestamp, not last_processing, to reduce the chance
2854-
* of reaching wal_sender_timeout before sending a keepalive.
2855-
*/
2856-
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2856+
if (TimestampDifferenceExceeds(last_flush, now, MIN_IOSTATS_FLUSH_INTERVAL))
2857+
{
2858+
/*
2859+
* Report IO statistics
2860+
*/
2861+
pgstat_flush_io(false);
2862+
(void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
2863+
last_flush = now;
2864+
}
28572865

2858-
if (pq_is_send_pending())
2859-
wakeEvents |= WL_SOCKET_WRITEABLE;
2866+
if (send_data != XLogSendLogical || pq_is_send_pending())
2867+
{
2868+
long sleeptime;
2869+
int wakeEvents;
2870+
2871+
if (!streamingDoneReceiving)
2872+
wakeEvents = WL_SOCKET_READABLE;
2873+
else
2874+
wakeEvents = 0;
2875+
2876+
/*
2877+
* Use fresh timestamp, not last_processing, to reduce the
2878+
* chance of reaching wal_sender_timeout before sending a
2879+
* keepalive.
2880+
*/
2881+
sleeptime = WalSndComputeSleeptime(now);
2882+
2883+
if (pq_is_send_pending())
2884+
wakeEvents |= WL_SOCKET_WRITEABLE;
2885+
2886+
/* Sleep until something happens or we time out */
2887+
WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2888+
}
28602889

2861-
/* Sleep until something happens or we time out */
2862-
WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
28632890
}
28642891
}
28652892
}

src/test/recovery/t/001_stream_rep.pl

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
has_streaming => 1);
4343
$node_standby_2->start;
4444

45+
# To check that an active walsender updates its IO statistics below.
46+
$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
47+
4548
# Create some content on primary and check its presence in standby nodes
4649
$node_primary->safe_psql('postgres',
4750
"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -333,6 +336,19 @@ sub test_target_session_attrs
333336

334337
note "switching to physical replication slot";
335338

339+
# Wait for the walsender to update its IO statistics.
340+
# Has to be done before the next restart and far enough from the
341+
# pg_stat_reset_shared('io') to minimize the risk of polling for too long.
342+
$node_primary->poll_query_until(
343+
'postgres',
344+
qq[SELECT sum(reads) > 0
345+
FROM pg_catalog.pg_stat_io
346+
WHERE backend_type = 'walsender'
347+
AND object = 'wal']
348+
)
349+
or die
350+
"Timed out while waiting for the walsender to update its IO statistics";
351+
336352
# Switch to using a physical replication slot. We can do this without a new
337353
# backup since physical slots can go backwards if needed. Do so on both
338354
# standbys. Since we're going to be testing things that affect the slot state,

src/test/subscription/t/001_rep_changes.pl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,19 @@
184184
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_no_col");
185185
is($result, qq(2), 'check replicated changes for table having no columns');
186186

187+
# Wait for the walsender to update its IO statistics.
188+
# Has to be done far enough from the CREATE SUBSCRIPTION to minimize the risk
189+
# of polling for too long.
190+
$node_publisher->poll_query_until(
191+
'postgres',
192+
qq[SELECT sum(reads) > 0
193+
FROM pg_catalog.pg_stat_io
194+
WHERE backend_type = 'walsender'
195+
AND object = 'wal']
196+
)
197+
or die
198+
"Timed out while waiting for the walsender to update its IO statistics";
199+
187200
# insert some duplicate rows
188201
$node_publisher->safe_psql('postgres',
189202
"INSERT INTO tab_full SELECT generate_series(1,10)");

0 commit comments

Comments
 (0)