Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7280715

Browse files
committed
[PGPRO-2648] pg_replication_slot_advance non-donor slots during recovery.
To trim WAL on non-donors during recovery as it may be very long.
1 parent d19af4f commit 7280715

File tree

4 files changed

+68
-14
lines changed

4 files changed

+68
-14
lines changed

src/include/syncpoint.h

+2
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ extern Syncpoint *SyncpointGetAllLatest(int sender_node_id);
4141
extern XLogRecPtr GetRecoveryHorizon(int sender_node_id);
4242
extern HTAB *RecoveryFilterLoad(int filter_node_id, Syncpoint *spvector, MtmConfig *mtm_cfg);
4343

44+
extern char* pg_lsn_out_c(XLogRecPtr lsn);
45+
4446
#endif /* SYNCPOINT_H */

src/pglogical_apply.c

+7
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,14 @@ process_syncpoint(MtmReceiverWorkerContext *rwctx, const char *msg, XLogRecPtr r
302302
MtmEndSession(42, false);
303303
SyncpointRegister(origin_node, origin_lsn, receiver_lsn);
304304
if (rwctx->mode == REPLMODE_RECOVERY)
305+
{
305306
ReplicationSlotRelease();
307+
/*
308+
* if we are in recovery, ping non-donor receivers that they might
309+
* succeed in advancing the slot.
310+
*/
311+
ConditionVariableBroadcast(&Mtm->receivers_cv);
312+
}
306313
}
307314

308315
/* TODO: make messaging layer for logical messages like existing dmq one */

src/pglogical_receiver.c

+58-13
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,11 @@ bool MtmIsReceiver;
6767

6868
typedef struct
6969
{
70-
MtmReceiverWorkerContext w;
71-
PGconn *conn;
70+
MtmReceiverWorkerContext w;
71+
XLogRecPtr last_reported_flush;
72+
PGconn *conn;
7273
} MtmReceiverContext;
7374

74-
typedef struct MtmFlushPosition
75-
{
76-
dlist_node node;
77-
int node_id;
78-
XLogRecPtr local_end;
79-
XLogRecPtr remote_end;
80-
} MtmFlushPosition;
81-
8275
char const *const MtmReplicationModeMnem[] =
8376
{
8477
"disabled",
@@ -96,6 +89,9 @@ static volatile sig_atomic_t got_sighup = false;
9689
static void fe_sendint64(int64 i, char *buf);
9790
static int64 fe_recvint64(char *buf);
9891

92+
static void MtmMaybeAdvanceSlot(MtmReceiverContext *rctx, char *conninfo);
93+
static PGconn *receiver_connect(char *conninfo);
94+
9995
void pglogical_receiver_main(Datum main_arg);
10096

10197
static void
@@ -156,6 +152,47 @@ sendFeedback(PGconn *conn, int64 now, int node_id)
156152
return true;
157153
}
158154

155+
/*
156+
* pg_replication_slot_advance sender slot if we can do that further last
157+
* advancement. Note that decoding session startup is quite heavy operation as
158+
* we must read all unacked WAL + earlier chunk up to suitable snapshot
159+
* serialization point (which is created mostly each
160+
* LOG_SNAPSHOT_INTERVAL_MS).
161+
*/
162+
static void
163+
MtmMaybeAdvanceSlot(MtmReceiverContext *rctx, char *conninfo)
164+
{
165+
XLogRecPtr upto = GetRecoveryHorizon(rctx->w.sender_node_id);
166+
char *upto_text;
167+
char *sql;
168+
PGresult *res;
169+
170+
/* already acked this */
171+
if (upto <= rctx->last_reported_flush)
172+
return;
173+
174+
rctx->conn = receiver_connect(conninfo);
175+
176+
upto_text = pg_lsn_out_c(upto);
177+
sql = psprintf("select pg_replication_slot_advance('" MULTIMASTER_SLOT_PATTERN "', '%s');",
178+
Mtm->my_node_id,
179+
upto_text);
180+
181+
res = PQexec(rctx->conn, sql);
182+
if (PQresultStatus(res) != PGRES_TUPLES_OK)
183+
{
184+
mtm_log(ERROR, "%s at node %d failed: %s",
185+
sql, rctx->w.sender_node_id, PQresultErrorMessage(res));
186+
}
187+
rctx->last_reported_flush = upto;
188+
mtm_log(MtmReceiverFeedback, "advanced slot to %s", upto_text);
189+
190+
pfree(upto_text);
191+
pfree(sql);
192+
PQfinish(rctx->conn);
193+
rctx->conn = NULL;
194+
}
195+
159196
/*
160197
* Converts an int64 to network byte order.
161198
*/
@@ -549,6 +586,7 @@ void
549586
pglogical_receiver_main(Datum main_arg)
550587
{
551588
/* Variables for replication connection */
589+
char *conninfo;
552590
PQExpBuffer query;
553591
PGresult *res;
554592
MtmReceiverContext *rctx;
@@ -607,6 +645,7 @@ pglogical_receiver_main(Datum main_arg)
607645
ActivePortal->sourceText = "";
608646

609647
receiver_mtm_cfg = MtmLoadConfig();
648+
conninfo = MtmNodeById(receiver_mtm_cfg, sender)->conninfo;
610649

611650
/* Keep us informed about subscription changes. */
612651
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
@@ -621,7 +660,6 @@ pglogical_receiver_main(Datum main_arg)
621660
XLogRecPtr remote_start;
622661
Syncpoint *spvector = NULL;
623662
HTAB *filter_map = NULL;
624-
char *conninfo;
625663
nodemask_t connected_mask;
626664

627665
/*
@@ -651,6 +689,14 @@ pglogical_receiver_main(Datum main_arg)
651689
if (rctx->w.mode != REPLMODE_DISABLED)
652690
break; /* success */
653691

692+
/*
693+
* So this receiver can't work which usually means we are in
694+
* recovery and donor is not our sender. Attempt to advance our
695+
* sender slot then -- this allows to trim WAL on non-donors
696+
* during recovery which may be very long.
697+
*/
698+
MtmMaybeAdvanceSlot(rctx, conninfo);
699+
654700
ConditionVariableSleep(&Mtm->receivers_cv, PG_WAIT_EXTENSION);
655701
}
656702
ConditionVariableCancelSleep();
@@ -678,7 +724,6 @@ pglogical_receiver_main(Datum main_arg)
678724
}
679725

680726
/* Establish connection to the remote server */
681-
conninfo = MtmNodeById(receiver_mtm_cfg, sender)->conninfo;
682727
rctx->conn = receiver_connect(conninfo);
683728

684729
/* Create new slot if needed */
@@ -819,8 +864,8 @@ pglogical_receiver_main(Datum main_arg)
819864
if (got_sighup)
820865
{
821866
/* Process config file */
822-
ProcessConfigFile(PGC_SIGHUP);
823867
got_sighup = false;
868+
ProcessConfigFile(PGC_SIGHUP);
824869
ereport(LOG, (MTM_ERRMSG("%s: processed SIGHUP",
825870
MyBgworkerEntry->bgw_name)));
826871
}

src/syncpoint.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ MaybeLogSyncpoint(void)
106106
}
107107

108108
/* copied from pg_lsn_out */
109-
static char*
109+
char*
110110
pg_lsn_out_c(XLogRecPtr lsn)
111111
{
112112
char buf[32];

0 commit comments

Comments
 (0)