Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit af275a1

Browse files
committed
Follow TLI of last replayed record, not recovery target TLI, in walsenders.
Most of the time, the last replayed record comes from the recovery target timeline, but there is a corner case where it makes a difference. When the startup process scans for a new timeline, and decides to change recovery target timeline, there is a window where the recovery target TLI has already been bumped, but there are no WAL segments from the new timeline in pg_xlog yet. For example, if we have just replayed up to point 0/30002D8, on timeline 1, there is a WAL file called 000000010000000000000003 in pg_xlog that contains the WAL up to that point. When recovery switches recovery target timeline to 2, a walsender can immediately try to read WAL from 0/30002D8, from timeline 2, so it will try to open WAL file 000000020000000000000003. However, that doesn't exist yet - the startup process hasn't copied that file from the archive yet nor has the walreceiver streamed it yet, so walsender fails with error "requested WAL segment 000000020000000000000003 has already been removed". That's harmless, in that the standby will try to reconnect later and by that time the segment is already created, but error messages that should be ignored are not good. To fix that, have walsender track the TLI of the last replayed record, instead of the recovery target timeline. That way walsender will not try to read anything from timeline 2, until the WAL segment has been created and at least one record has been replayed from it. The recovery target timeline is now xlog.c's internal affair, it doesn't need to be exposed in shared memory anymore. This fixes the error reported by Thom Brown. depesz the same error message, but I'm not sure if this fixes his scenario.
1 parent 1a11d46 commit af275a1

File tree

6 files changed

+92
-82
lines changed

6 files changed

+92
-82
lines changed

src/backend/access/transam/xlog.c

+27-54
Original file line numberDiff line numberDiff line change
@@ -453,6 +453,7 @@ typedef struct XLogCtlData
453453
* replayed, otherwise it's equal to lastReplayedEndRecPtr.
454454
*/
455455
XLogRecPtr lastReplayedEndRecPtr;
456+
TimeLineID lastReplayedTLI;
456457
XLogRecPtr replayEndRecPtr;
457458
TimeLineID replayEndTLI;
458459
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
@@ -3829,7 +3830,6 @@ rescanLatestTimeLine(void)
38293830
TimeLineID newtarget;
38303831
TimeLineHistoryEntry *currentTle = NULL;
38313832
/* use volatile pointer to prevent code rearrangement */
3832-
volatile XLogCtlData *xlogctl = XLogCtl;
38333833

38343834
newtarget = findNewestTimeLine(recoveryTargetTLI);
38353835
if (newtarget == recoveryTargetTLI)
@@ -3888,20 +3888,10 @@ rescanLatestTimeLine(void)
38883888
list_free_deep(expectedTLEs);
38893889
expectedTLEs = newExpectedTLEs;
38903890

3891-
SpinLockAcquire(&xlogctl->info_lck);
3892-
xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
3893-
SpinLockRelease(&xlogctl->info_lck);
3894-
38953891
ereport(LOG,
38963892
(errmsg("new target timeline is %u",
38973893
recoveryTargetTLI)));
38983894

3899-
/*
3900-
* Wake up any walsenders to notice that we have a new target timeline.
3901-
*/
3902-
if (AllowCascadeReplication())
3903-
WalSndWakeup();
3904-
39053895
return true;
39063896
}
39073897

@@ -5389,11 +5379,9 @@ StartupXLOG(void)
53895379
ControlFile->minRecoveryPointTLI)));
53905380

53915381
/*
5392-
* Save the selected recovery target timeline ID and
5393-
* archive_cleanup_command in shared memory so that other processes can
5394-
* see them
5382+
* Save archive_cleanup_command in shared memory so that other processes
5383+
* can see it.
53955384
*/
5396-
XLogCtl->RecoveryTargetTLI = recoveryTargetTLI;
53975385
strncpy(XLogCtl->archiveCleanupCommand,
53985386
archiveCleanupCommand ? archiveCleanupCommand : "",
53995387
sizeof(XLogCtl->archiveCleanupCommand));
@@ -5770,6 +5758,7 @@ StartupXLOG(void)
57705758
xlogctl->replayEndRecPtr = ReadRecPtr;
57715759
xlogctl->replayEndTLI = ThisTimeLineID;
57725760
xlogctl->lastReplayedEndRecPtr = EndRecPtr;
5761+
xlogctl->lastReplayedEndRecPtr = ThisTimeLineID;
57735762
xlogctl->recoveryLastXTime = 0;
57745763
xlogctl->currentChunkStartTime = 0;
57755764
xlogctl->recoveryPause = false;
@@ -5837,6 +5826,7 @@ StartupXLOG(void)
58375826
*/
58385827
do
58395828
{
5829+
bool switchedTLI = false;
58405830
#ifdef WAL_DEBUG
58415831
if (XLOG_DEBUG ||
58425832
(rmid == RM_XACT_ID && trace_recovery_messages <= DEBUG2) ||
@@ -5942,6 +5932,7 @@ StartupXLOG(void)
59425932

59435933
/* Following WAL records should be run with new TLI */
59445934
ThisTimeLineID = newTLI;
5935+
switchedTLI = true;
59455936
}
59465937
}
59475938

@@ -5974,6 +5965,7 @@ StartupXLOG(void)
59745965
*/
59755966
SpinLockAcquire(&xlogctl->info_lck);
59765967
xlogctl->lastReplayedEndRecPtr = EndRecPtr;
5968+
xlogctl->lastReplayedTLI = ThisTimeLineID;
59775969
SpinLockRelease(&xlogctl->info_lck);
59785970

59795971
/* Remember this record as the last-applied one */
@@ -5982,6 +5974,13 @@ StartupXLOG(void)
59825974
/* Allow read-only connections if we're consistent now */
59835975
CheckRecoveryConsistency();
59845976

5977+
/*
5978+
* If this record was a timeline switch, wake up any
5979+
* walsenders to notice that we are on a new timeline.
5980+
*/
5981+
if (switchedTLI && AllowCascadeReplication())
5982+
WalSndWakeup();
5983+
59855984
/* Exit loop if we reached inclusive recovery target */
59865985
if (!recoveryContinue)
59875986
break;
@@ -6822,23 +6821,6 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
68226821
*epoch = ckptXidEpoch;
68236822
}
68246823

6825-
/*
6826-
* GetRecoveryTargetTLI - get the current recovery target timeline ID
6827-
*/
6828-
TimeLineID
6829-
GetRecoveryTargetTLI(void)
6830-
{
6831-
/* use volatile pointer to prevent code rearrangement */
6832-
volatile XLogCtlData *xlogctl = XLogCtl;
6833-
TimeLineID result;
6834-
6835-
SpinLockAcquire(&xlogctl->info_lck);
6836-
result = xlogctl->RecoveryTargetTLI;
6837-
SpinLockRelease(&xlogctl->info_lck);
6838-
6839-
return result;
6840-
}
6841-
68426824
/*
68436825
* This must be called ONCE during postmaster or standalone-backend shutdown
68446826
*/
@@ -7642,10 +7624,16 @@ CreateRestartPoint(int flags)
76427624
*/
76437625
if (_logSegNo)
76447626
{
7627+
XLogRecPtr receivePtr;
7628+
XLogRecPtr replayPtr;
76457629
XLogRecPtr endptr;
76467630

7647-
/* Get the current (or recent) end of xlog */
7648-
endptr = GetStandbyFlushRecPtr();
7631+
/*
7632+
* Get the current end of xlog replayed or received, whichever is later.
7633+
*/
7634+
receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
7635+
replayPtr = GetXLogReplayRecPtr(NULL);
7636+
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
76497637

76507638
KeepLogSeg(endptr, &_logSegNo);
76517639
_logSegNo--;
@@ -9109,38 +9097,23 @@ do_pg_abort_backup(void)
91099097
* Exported to allow WALReceiver to read the pointer directly.
91109098
*/
91119099
XLogRecPtr
9112-
GetXLogReplayRecPtr(void)
9100+
GetXLogReplayRecPtr(TimeLineID *replayTLI)
91139101
{
91149102
/* use volatile pointer to prevent code rearrangement */
91159103
volatile XLogCtlData *xlogctl = XLogCtl;
91169104
XLogRecPtr recptr;
9105+
TimeLineID tli;
91179106

91189107
SpinLockAcquire(&xlogctl->info_lck);
91199108
recptr = xlogctl->lastReplayedEndRecPtr;
9109+
tli = xlogctl->lastReplayedTLI;
91209110
SpinLockRelease(&xlogctl->info_lck);
91219111

9112+
if (replayTLI)
9113+
*replayTLI = tli;
91229114
return recptr;
91239115
}
91249116

9125-
/*
9126-
* Get current standby flush position, ie, the last WAL position
9127-
* known to be fsync'd to disk in standby.
9128-
*/
9129-
XLogRecPtr
9130-
GetStandbyFlushRecPtr(void)
9131-
{
9132-
XLogRecPtr receivePtr;
9133-
XLogRecPtr replayPtr;
9134-
9135-
receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
9136-
replayPtr = GetXLogReplayRecPtr();
9137-
9138-
if (XLByteLT(receivePtr, replayPtr))
9139-
return replayPtr;
9140-
else
9141-
return receivePtr;
9142-
}
9143-
91449117
/*
91459118
* Get latest WAL insert pointer
91469119
*/

src/backend/access/transam/xlogfuncs.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
248248
XLogRecPtr recptr;
249249
char location[MAXFNAMELEN];
250250

251-
recptr = GetXLogReplayRecPtr();
251+
recptr = GetXLogReplayRecPtr(NULL);
252252

253253
if (recptr == 0)
254254
PG_RETURN_NULL();

src/backend/replication/walreceiver.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ WalReceiverMain(void)
370370
first_stream = false;
371371

372372
/* Initialize LogstreamResult and buffers for processing messages */
373-
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr();
373+
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
374374
initStringInfo(&reply_message);
375375
initStringInfo(&incoming_message);
376376

@@ -1026,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
10261026
/* Construct a new message */
10271027
writePtr = LogstreamResult.Write;
10281028
flushPtr = LogstreamResult.Flush;
1029-
applyPtr = GetXLogReplayRecPtr();
1029+
applyPtr = GetXLogReplayRecPtr(NULL);
10301030

10311031
resetStringInfo(&reply_message);
10321032
pq_sendbyte(&reply_message, 'r');

src/backend/replication/walreceiverfuncs.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ GetReplicationApplyDelay(void)
324324
receivePtr = walrcv->receivedUpto;
325325
SpinLockRelease(&walrcv->mutex);
326326

327-
replayPtr = GetXLogReplayRecPtr();
327+
replayPtr = GetXLogReplayRecPtr(NULL);
328328

329329
if (XLByteEQ(receivePtr, replayPtr))
330330
return 0;

src/backend/replication/walsender.c

+60-22
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ static void WalSndLoop(void);
169169
static void InitWalSenderSlot(void);
170170
static void WalSndKill(int code, Datum arg);
171171
static void XLogSend(bool *caughtup);
172+
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID currentTLI);
172173
static void IdentifySystem(void);
173174
static void StartReplication(StartReplicationCmd *cmd);
174175
static void ProcessStandbyMessage(void);
@@ -190,12 +191,6 @@ InitWalSender(void)
190191
/* Set up resource owner */
191192
CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
192193

193-
/*
194-
* Use the recovery target timeline ID during recovery
195-
*/
196-
if (am_cascading_walsender)
197-
ThisTimeLineID = GetRecoveryTargetTLI();
198-
199194
/*
200195
* Let postmaster know that we're a WAL sender. Once we've declared us as
201196
* a WAL sender process, postmaster will let us outlive the bgwriter and
@@ -254,8 +249,8 @@ IdentifySystem(void)
254249
am_cascading_walsender = RecoveryInProgress();
255250
if (am_cascading_walsender)
256251
{
257-
logptr = GetStandbyFlushRecPtr();
258-
ThisTimeLineID = GetRecoveryTargetTLI();
252+
/* this also updates ThisTimeLineID */
253+
logptr = GetStandbyFlushRecPtr(0);
259254
}
260255
else
261256
logptr = GetInsertRecPtr();
@@ -409,6 +404,7 @@ static void
409404
StartReplication(StartReplicationCmd *cmd)
410405
{
411406
StringInfoData buf;
407+
XLogRecPtr FlushPtr;
412408

413409
/*
414410
* We assume here that we're logging enough information in the WAL for
@@ -421,8 +417,17 @@ StartReplication(StartReplicationCmd *cmd)
421417

422418
/*
423419
* Select the timeline. If it was given explicitly by the client, use
424-
* that. Otherwise use the current ThisTimeLineID.
420+
* that. Otherwise use the timeline of the last replayed record, which
421+
* is kept in ThisTimeLineID.
425422
*/
423+
if (am_cascading_walsender)
424+
{
425+
/* this also updates ThisTimeLineID */
426+
FlushPtr = GetStandbyFlushRecPtr(0);
427+
}
428+
else
429+
FlushPtr = GetFlushRecPtr();
430+
426431
if (cmd->timeline != 0)
427432
{
428433
XLogRecPtr switchpoint;
@@ -494,7 +499,6 @@ StartReplication(StartReplicationCmd *cmd)
494499
if (!sendTimeLineIsHistoric ||
495500
XLByteLT(cmd->startpoint, sendTimeLineValidUpto))
496501
{
497-
XLogRecPtr FlushPtr;
498502
/*
499503
* When we first start replication the standby will be behind the primary.
500504
* For some applications, for example, synchronous replication, it is
@@ -516,10 +520,6 @@ StartReplication(StartReplicationCmd *cmd)
516520
* Don't allow a request to stream from a future point in WAL that
517521
* hasn't been flushed to disk in this server yet.
518522
*/
519-
if (am_cascading_walsender)
520-
FlushPtr = GetStandbyFlushRecPtr();
521-
else
522-
FlushPtr = GetFlushRecPtr();
523523
if (XLByteLT(FlushPtr, cmd->startpoint))
524524
{
525525
ereport(ERROR,
@@ -1330,7 +1330,7 @@ XLogSend(bool *caughtup)
13301330
* that gets lost on the master.
13311331
*/
13321332
if (am_cascading_walsender)
1333-
FlushPtr = GetStandbyFlushRecPtr();
1333+
FlushPtr = GetStandbyFlushRecPtr(sendTimeLine);
13341334
else
13351335
FlushPtr = GetFlushRecPtr();
13361336

@@ -1347,27 +1347,23 @@ XLogSend(bool *caughtup)
13471347
if (!sendTimeLineIsHistoric && am_cascading_walsender)
13481348
{
13491349
bool becameHistoric = false;
1350-
TimeLineID targetTLI;
13511350

13521351
if (!RecoveryInProgress())
13531352
{
13541353
/*
13551354
* We have been promoted. RecoveryInProgress() updated
13561355
* ThisTimeLineID to the new current timeline.
13571356
*/
1358-
targetTLI = ThisTimeLineID;
13591357
am_cascading_walsender = false;
13601358
becameHistoric = true;
13611359
}
13621360
else
13631361
{
13641362
/*
13651363
* Still a cascading standby. But is the timeline we're sending
1366-
* still the recovery target timeline?
1364+
* still the one recovery is recovering from?
13671365
*/
1368-
targetTLI = GetRecoveryTargetTLI();
1369-
1370-
if (targetTLI != sendTimeLine)
1366+
if (sendTimeLine != ThisTimeLineID)
13711367
becameHistoric = true;
13721368
}
13731369

@@ -1380,7 +1376,7 @@ XLogSend(bool *caughtup)
13801376
*/
13811377
List *history;
13821378

1383-
history = readTimeLineHistory(targetTLI);
1379+
history = readTimeLineHistory(ThisTimeLineID);
13841380
sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
13851381
Assert(XLByteLE(sentPtr, sendTimeLineValidUpto));
13861382
list_free_deep(history);
@@ -1521,6 +1517,48 @@ XLogSend(bool *caughtup)
15211517
return;
15221518
}
15231519

1520+
/*
1521+
* Returns the latest point in WAL that has been safely flushed to disk, and
1522+
* can be sent to the standby. This should only be called when in recovery,
1523+
* ie. we're streaming to a cascaded standby.
1524+
*
1525+
* If currentTLI is non-zero, the function returns the point that the WAL on
1526+
* the given timeline has been flushed upto. If recovery has already switched
1527+
* to a different timeline, InvalidXLogRecPtr is returned.
1528+
*
1529+
* As a side-effect, ThisTimeLineID is updated to the TLI of the last
1530+
* replayed WAL record.
1531+
*/
1532+
static XLogRecPtr
1533+
GetStandbyFlushRecPtr(TimeLineID currentTLI)
1534+
{
1535+
XLogRecPtr replayPtr;
1536+
TimeLineID replayTLI;
1537+
XLogRecPtr receivePtr;
1538+
TimeLineID receiveTLI;
1539+
XLogRecPtr result;
1540+
1541+
/*
1542+
* We can safely send what's already been replayed. Also, if walreceiver
1543+
* is streaming WAL from the same timeline, we can send anything that
1544+
* it has streamed, but hasn't been replayed yet.
1545+
*/
1546+
1547+
receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
1548+
replayPtr = GetXLogReplayRecPtr(&replayTLI);
1549+
1550+
ThisTimeLineID = replayTLI;
1551+
1552+
if (currentTLI != replayTLI && currentTLI != 0)
1553+
return InvalidXLogRecPtr;
1554+
1555+
result = replayPtr;
1556+
if (receiveTLI == currentTLI && receivePtr > replayPtr)
1557+
result = receivePtr;
1558+
1559+
return result;
1560+
}
1561+
15241562
/*
15251563
* Request walsenders to reload the currently-open WAL file
15261564
*/

0 commit comments

Comments
 (0)