Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8e90ec5

Browse files
author
Amit Kapila
committed
Track statistics for streaming of changes from ReorderBuffer.
This adds the statistics about transactions streamed to the decoding output plugin from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats and call pg_stat_reset_replication_slot to reset the stats of a particular slot. Users can pass NULL in pg_stat_reset_replication_slot to reset stats of all the slots. Commit 9868167 has added the basic infrastructure to capture the stats of slot and this commit extends the statistics collector to track additional information about slots. Bump the catversion as we have added new columns in the catalog entry. Author: Ajin Cherian and Amit Kapila Reviewed-by: Sawada Masahiko and Dilip Kumar Discussion: https://postgr.es/m/CAA4eK1+chpEomLzgSoky-D31qev19AmECNiEAietPQUGEFhtVA@mail.gmail.com
1 parent 94bc27b commit 8e90ec5

File tree

12 files changed

+111
-17
lines changed

12 files changed

+111
-17
lines changed

doc/src/sgml/monitoring.sgml

+38
Original file line numberDiff line numberDiff line change
@@ -2632,6 +2632,44 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
26322632
</para></entry>
26332633
</row>
26342634

2635+
<row>
2636+
<entry role="catalog_table_entry"><para role="column_definition">
2637+
<structfield>stream_txns</structfield> <type>bigint</type>
2638+
</para>
2639+
<para>
2640+
Number of in-progress transactions streamed to the decoding output plugin
2641+
after the memory used by logical decoding of changes from WAL for this
2642+
slot exceeds <literal>logical_decoding_work_mem</literal>. Streaming only
2643+
works with toplevel transactions (subtransactions can't be streamed
2644+
independently), so the counter does not get incremented for subtransactions.
2645+
</para></entry>
2646+
</row>
2647+
2648+
<row>
2649+
<entry role="catalog_table_entry"><para role="column_definition">
2650+
<structfield>stream_count</structfield><type>bigint</type>
2651+
</para>
2652+
<para>
2653+
Number of times in-progress transactions were streamed to the decoding
2654+
output plugin while decoding changes from WAL for this slot. Transactions
2655+
may get streamed repeatedly, and this counter gets incremented on every
2656+
such invocation.
2657+
</para></entry>
2658+
</row>
2659+
2660+
<row>
2661+
<entry role="catalog_table_entry"><para role="column_definition">
2662+
<structfield>stream_bytes</structfield><type>bigint</type>
2663+
</para>
2664+
<para>
2665+
Amount of decoded in-progress transaction data streamed to the decoding
2666+
output plugin while decoding changes from WAL for this slot. This and other
2667+
streaming counters for this slot can be used to gauge the network I/O which
2668+
occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>.
2669+
</para>
2670+
</entry>
2671+
</row>
2672+
26352673
<row>
26362674
<entry role="catalog_table_entry"><para role="column_definition">
26372675
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>

src/backend/catalog/system_views.sql

+3
Original file line numberDiff line numberDiff line change
@@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS
802802
s.spill_txns,
803803
s.spill_count,
804804
s.spill_bytes,
805+
s.stream_txns,
806+
s.stream_count,
807+
s.stream_bytes,
805808
s.stats_reset
806809
FROM pg_stat_get_replication_slots() AS s;
807810

src/backend/postmaster/pgstat.c

+10-1
Original file line numberDiff line numberDiff line change
@@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize)
17081708
*/
17091709
void
17101710
pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
1711-
int spillbytes)
1711+
int spillbytes, int streamtxns, int streamcount, int streambytes)
17121712
{
17131713
PgStat_MsgReplSlot msg;
17141714

@@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
17211721
msg.m_spill_txns = spilltxns;
17221722
msg.m_spill_count = spillcount;
17231723
msg.m_spill_bytes = spillbytes;
1724+
msg.m_stream_txns = streamtxns;
1725+
msg.m_stream_count = streamcount;
1726+
msg.m_stream_bytes = streambytes;
17241727
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
17251728
}
17261729

@@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
68926895
replSlotStats[idx].spill_txns += msg->m_spill_txns;
68936896
replSlotStats[idx].spill_count += msg->m_spill_count;
68946897
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
6898+
replSlotStats[idx].stream_txns += msg->m_stream_txns;
6899+
replSlotStats[idx].stream_count += msg->m_stream_count;
6900+
replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
68956901
}
68966902
}
68976903

@@ -7125,6 +7131,9 @@ pgstat_reset_replslot(int i, TimestampTz ts)
71257131
replSlotStats[i].spill_txns = 0;
71267132
replSlotStats[i].spill_count = 0;
71277133
replSlotStats[i].spill_bytes = 0;
7134+
replSlotStats[i].stream_txns = 0;
7135+
replSlotStats[i].stream_count = 0;
7136+
replSlotStats[i].stream_bytes = 0;
71287137
replSlotStats[i].stat_reset_timestamp = ts;
71297138
}
71307139

src/backend/replication/logical/logical.c

+13-6
Original file line numberDiff line numberDiff line change
@@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
14711471
ReorderBuffer *rb = ctx->reorder;
14721472

14731473
/*
1474-
* Nothing to do if we haven't spilled anything since the last time the
1475-
* stats has been sent.
1474+
* Nothing to do if we haven't spilled or streamed anything since the last
1475+
* time the stats has been sent.
14761476
*/
1477-
if (rb->spillBytes <= 0)
1477+
if (rb->spillBytes <= 0 && rb->streamBytes <= 0)
14781478
return;
14791479

1480-
elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld",
1480+
elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld",
14811481
rb,
14821482
(long long) rb->spillTxns,
14831483
(long long) rb->spillCount,
1484-
(long long) rb->spillBytes);
1484+
(long long) rb->spillBytes,
1485+
(long long) rb->streamTxns,
1486+
(long long) rb->streamCount,
1487+
(long long) rb->streamBytes);
14851488

14861489
pgstat_report_replslot(NameStr(ctx->slot->data.name),
1487-
rb->spillTxns, rb->spillCount, rb->spillBytes);
1490+
rb->spillTxns, rb->spillCount, rb->spillBytes,
1491+
rb->streamTxns, rb->streamCount, rb->streamBytes);
14881492
rb->spillTxns = 0;
14891493
rb->spillCount = 0;
14901494
rb->spillBytes = 0;
1495+
rb->streamTxns = 0;
1496+
rb->streamCount = 0;
1497+
rb->streamBytes = 0;
14911498
}

src/backend/replication/logical/reorderbuffer.c

+20
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ ReorderBufferAllocate(void)
346346
buffer->spillTxns = 0;
347347
buffer->spillCount = 0;
348348
buffer->spillBytes = 0;
349+
buffer->streamTxns = 0;
350+
buffer->streamCount = 0;
351+
buffer->streamBytes = 0;
349352

350353
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
351354

@@ -3482,6 +3485,8 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
34823485
{
34833486
Snapshot snapshot_now;
34843487
CommandId command_id;
3488+
Size stream_bytes;
3489+
bool txn_is_streamed;
34853490

34863491
/* We can never reach here for a subtransaction. */
34873492
Assert(txn->toptxn == NULL);
@@ -3562,10 +3567,25 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
35623567
txn->snapshot_now = NULL;
35633568
}
35643569

3570+
/*
3571+
* Remember this information to be used later to update stats. We can't
3572+
* update the stats here as an error while processing the changes would
3573+
* lead to the accumulation of stats even though we haven't streamed all
3574+
* the changes.
3575+
*/
3576+
txn_is_streamed = rbtxn_is_streamed(txn);
3577+
stream_bytes = txn->total_size;
3578+
35653579
/* Process and send the changes to output plugin. */
35663580
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
35673581
command_id, true);
35683582

3583+
rb->streamCount += 1;
3584+
rb->streamBytes += stream_bytes;
3585+
3586+
/* Don't consider already streamed transaction. */
3587+
rb->streamTxns += (txn_is_streamed) ? 0 : 1;
3588+
35693589
Assert(dlist_is_empty(&txn->changes));
35703590
Assert(txn->nentries == 0);
35713591
Assert(txn->nentries_mem == 0);

src/backend/replication/slot.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
320320
* ReplicationSlotAllocationLock.
321321
*/
322322
if (SlotIsLogical(slot))
323-
pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
323+
pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
324324

325325
/*
326326
* Now that the slot has been marked as in_use and active, it's safe to

src/backend/utils/adt/pgstatfuncs.c

+6-3
Original file line numberDiff line numberDiff line change
@@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
21532153
Datum
21542154
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
21552155
{
2156-
#define PG_STAT_GET_REPLICATION_SLOT_COLS 5
2156+
#define PG_STAT_GET_REPLICATION_SLOT_COLS 8
21572157
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
21582158
TupleDesc tupdesc;
21592159
Tuplestorestate *tupstore;
@@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
22012201
values[1] = Int64GetDatum(s->spill_txns);
22022202
values[2] = Int64GetDatum(s->spill_count);
22032203
values[3] = Int64GetDatum(s->spill_bytes);
2204+
values[4] = Int64GetDatum(s->stream_txns);
2205+
values[5] = Int64GetDatum(s->stream_count);
2206+
values[6] = Int64GetDatum(s->stream_bytes);
22042207

22052208
if (s->stat_reset_timestamp == 0)
2206-
nulls[4] = true;
2209+
nulls[7] = true;
22072210
else
2208-
values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
2211+
values[7] = TimestampTzGetDatum(s->stat_reset_timestamp);
22092212

22102213
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
22112214
}

src/include/catalog/catversion.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/* yyyymmddN */
56-
#define CATALOG_VERSION_NO 202010281
56+
#define CATALOG_VERSION_NO 202010291
5757

5858
#endif

src/include/catalog/pg_proc.dat

+3-3
Original file line numberDiff line numberDiff line change
@@ -5260,9 +5260,9 @@
52605260
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
52615261
proretset => 't', provolatile => 's', proparallel => 'r',
52625262
prorettype => 'record', proargtypes => '',
5263-
proallargtypes => '{text,int8,int8,int8,timestamptz}',
5264-
proargmodes => '{o,o,o,o,o}',
5265-
proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stats_reset}',
5263+
proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}',
5264+
proargmodes => '{o,o,o,o,o,o,o,o}',
5265+
proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}',
52665266
prosrc => 'pg_stat_get_replication_slots' },
52675267
{ oid => '6118', descr => 'statistics: information about subscription',
52685268
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',

src/include/pgstat.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot
492492
PgStat_Counter m_spill_txns;
493493
PgStat_Counter m_spill_count;
494494
PgStat_Counter m_spill_bytes;
495+
PgStat_Counter m_stream_txns;
496+
PgStat_Counter m_stream_count;
497+
PgStat_Counter m_stream_bytes;
495498
} PgStat_MsgReplSlot;
496499

497500

@@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats
823826
PgStat_Counter spill_txns;
824827
PgStat_Counter spill_count;
825828
PgStat_Counter spill_bytes;
829+
PgStat_Counter stream_txns;
830+
PgStat_Counter stream_count;
831+
PgStat_Counter stream_bytes;
826832
TimestampTz stat_reset_timestamp;
827833
} PgStat_ReplSlotStats;
828834

@@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void);
13871393
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
13881394
extern void pgstat_report_checksum_failure(void);
13891395
extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
1390-
int spillbytes);
1396+
int spillbytes, int streamtxns, int streamcount, int streambytes);
13911397
extern void pgstat_report_replslot_drop(const char *slotname);
13921398

13931399
extern void pgstat_initialize(void);

src/include/replication/reorderbuffer.h

+5
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,11 @@ struct ReorderBuffer
551551
int64 spillTxns; /* number of transactions spilled to disk */
552552
int64 spillCount; /* spill-to-disk invocation counter */
553553
int64 spillBytes; /* amount of data spilled to disk */
554+
555+
/* Statistics about transactions streamed to the decoding output plugin */
556+
int64 streamTxns; /* number of transactions streamed */
557+
int64 streamCount; /* streaming invocation counter */
558+
int64 streamBytes; /* amount of data streamed */
554559
};
555560

556561

src/test/regress/expected/rules.out

+4-1
Original file line numberDiff line numberDiff line change
@@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.slot_name,
20222022
s.spill_txns,
20232023
s.spill_count,
20242024
s.spill_bytes,
2025+
s.stream_txns,
2026+
s.stream_count,
2027+
s.stream_bytes,
20252028
s.stats_reset
2026-
FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stats_reset);
2029+
FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset);
20272030
pg_stat_slru| SELECT s.name,
20282031
s.blks_zeroed,
20292032
s.blks_hit,

0 commit comments

Comments
 (0)