Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 06828c5

Browse files
Separate messages for standby replies and hot standby feedback.
Allow messages to be sent at different times, and greatly reduce the frequency of hot standby feedback. Refactor to allow additional message types.
1 parent 45a6d79 commit 06828c5

File tree

3 files changed

+168
-75
lines changed

3 files changed

+168
-75
lines changed

src/backend/replication/walreceiver.c

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ static struct
9595
} LogstreamResult;
9696

9797
static StandbyReplyMessage reply_message;
98+
static StandbyHSFeedbackMessage feedback_message;
9899

99100
/*
100101
* About SIGTERM handling:
@@ -123,6 +124,7 @@ static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
123124
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
124125
static void XLogWalRcvFlush(bool dying);
125126
static void XLogWalRcvSendReply(void);
127+
static void XLogWalRcvSendHSFeedback(void);
126128

127129
/* Signal handlers */
128130
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@@ -317,6 +319,7 @@ WalReceiverMain(void)
317319

318320
/* Let the master know that we received some data. */
319321
XLogWalRcvSendReply();
322+
XLogWalRcvSendHSFeedback();
320323

321324
/*
322325
* If we've written some records, flush them to disk and let the
@@ -331,6 +334,7 @@ WalReceiverMain(void)
331334
* the master anyway, to report any progress in applying WAL.
332335
*/
333336
XLogWalRcvSendReply();
337+
XLogWalRcvSendHSFeedback();
334338
}
335339
}
336340
}
@@ -619,40 +623,82 @@ XLogWalRcvSendReply(void)
619623
reply_message.apply = GetXLogReplayRecPtr();
620624
reply_message.sendTime = now;
621625

622-
/*
623-
* Get the OldestXmin and its associated epoch
624-
*/
625-
if (hot_standby_feedback && HotStandbyActive())
626-
{
627-
TransactionId nextXid;
628-
uint32 nextEpoch;
629-
630-
reply_message.xmin = GetOldestXmin(true, false);
631-
632-
/*
633-
* Get epoch and adjust if nextXid and oldestXmin are different
634-
* sides of the epoch boundary.
635-
*/
636-
GetNextXidAndEpoch(&nextXid, &nextEpoch);
637-
if (nextXid < reply_message.xmin)
638-
nextEpoch--;
639-
reply_message.epoch = nextEpoch;
640-
}
641-
else
642-
{
643-
reply_message.xmin = InvalidTransactionId;
644-
reply_message.epoch = 0;
645-
}
646-
647-
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
626+
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
648627
reply_message.write.xlogid, reply_message.write.xrecoff,
649628
reply_message.flush.xlogid, reply_message.flush.xrecoff,
650-
reply_message.apply.xlogid, reply_message.apply.xrecoff,
651-
reply_message.xmin,
652-
reply_message.epoch);
629+
reply_message.apply.xlogid, reply_message.apply.xrecoff);
653630

654631
/* Prepend with the message type and send it. */
655632
buf[0] = 'r';
656633
memcpy(&buf[1], &reply_message, sizeof(StandbyReplyMessage));
657634
walrcv_send(buf, sizeof(StandbyReplyMessage) + 1);
658635
}
636+
637+
/*
638+
* Send hot standby feedback message to primary, plus the current time,
639+
* in case they don't have a watch.
640+
*/
641+
static void
642+
XLogWalRcvSendHSFeedback(void)
643+
{
644+
char buf[sizeof(StandbyHSFeedbackMessage) + 1];
645+
TimestampTz now;
646+
TransactionId nextXid;
647+
uint32 nextEpoch;
648+
TransactionId xmin;
649+
650+
/*
651+
* If the user doesn't want status to be reported to the master, be sure
652+
* to exit before doing anything at all.
653+
*/
654+
if (!hot_standby_feedback)
655+
return;
656+
657+
/* Get current timestamp. */
658+
now = GetCurrentTimestamp();
659+
660+
/*
661+
* Send feedback at most once per wal_receiver_status_interval.
662+
*/
663+
if (!TimestampDifferenceExceeds(feedback_message.sendTime, now,
664+
wal_receiver_status_interval * 1000))
665+
return;
666+
667+
/*
668+
* If Hot Standby is not yet active there is nothing to send.
669+
* Check this after the interval has expired to reduce number of
670+
* calls.
671+
*/
672+
if (!HotStandbyActive())
673+
return;
674+
675+
/*
676+
* Make the expensive call to get the oldest xmin once we are
677+
* certain everything else has been checked.
678+
*/
679+
xmin = GetOldestXmin(true, false);
680+
681+
/*
682+
* Get epoch and adjust if nextXid and oldestXmin are different
683+
* sides of the epoch boundary.
684+
*/
685+
GetNextXidAndEpoch(&nextXid, &nextEpoch);
686+
if (nextXid < xmin)
687+
nextEpoch--;
688+
689+
/*
690+
* Always send feedback message.
691+
*/
692+
feedback_message.sendTime = now;
693+
feedback_message.xmin = xmin;
694+
feedback_message.epoch = nextEpoch;
695+
696+
elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u",
697+
feedback_message.xmin,
698+
feedback_message.epoch);
699+
700+
/* Prepend with the message type and send it. */
701+
buf[0] = 'h';
702+
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
703+
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
704+
}

src/backend/replication/walsender.c

Lines changed: 80 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ static void WalSndKill(int code, Datum arg);
116116
static bool XLogSend(char *msgbuf, bool *caughtup);
117117
static void IdentifySystem(void);
118118
static void StartReplication(StartReplicationCmd * cmd);
119+
static void ProcessStandbyMessage(void);
119120
static void ProcessStandbyReplyMessage(void);
121+
static void ProcessStandbyHSFeedbackMessage(void);
120122
static void ProcessRepliesIfAny(void);
121123

122124

@@ -456,54 +458,55 @@ ProcessRepliesIfAny(void)
456458
unsigned char firstchar;
457459
int r;
458460

459-
r = pq_getbyte_if_available(&firstchar);
460-
if (r < 0)
461-
{
462-
/* unexpected error or EOF */
463-
ereport(COMMERROR,
464-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
465-
errmsg("unexpected EOF on standby connection")));
466-
proc_exit(0);
467-
}
468-
if (r == 0)
461+
for (;;)
469462
{
470-
/* no data available without blocking */
471-
return;
472-
}
463+
r = pq_getbyte_if_available(&firstchar);
464+
if (r < 0)
465+
{
466+
/* unexpected error or EOF */
467+
ereport(COMMERROR,
468+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
469+
errmsg("unexpected EOF on standby connection")));
470+
proc_exit(0);
471+
}
472+
if (r == 0)
473+
{
474+
/* no data available without blocking */
475+
return;
476+
}
473477

474-
/* Handle the very limited subset of commands expected in this phase */
475-
switch (firstchar)
476-
{
477-
/*
478-
* 'd' means a standby reply wrapped in a CopyData packet.
479-
*/
480-
case 'd':
481-
ProcessStandbyReplyMessage();
482-
break;
478+
/* Handle the very limited subset of commands expected in this phase */
479+
switch (firstchar)
480+
{
481+
/*
482+
* 'd' means a standby reply wrapped in a CopyData packet.
483+
*/
484+
case 'd':
485+
ProcessStandbyMessage();
486+
break;
483487

484-
/*
485-
* 'X' means that the standby is closing down the socket.
486-
*/
487-
case 'X':
488-
proc_exit(0);
488+
/*
489+
* 'X' means that the standby is closing down the socket.
490+
*/
491+
case 'X':
492+
proc_exit(0);
489493

490-
default:
491-
ereport(FATAL,
492-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
493-
errmsg("invalid standby closing message type %d",
494-
firstchar)));
494+
default:
495+
ereport(FATAL,
496+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
497+
errmsg("invalid standby closing message type %d",
498+
firstchar)));
499+
}
495500
}
496501
}
497502

498503
/*
499504
* Process a status update message received from standby.
500505
*/
501506
static void
502-
ProcessStandbyReplyMessage(void)
507+
ProcessStandbyMessage(void)
503508
{
504-
StandbyReplyMessage reply;
505509
char msgtype;
506-
TransactionId newxmin = InvalidTransactionId;
507510

508511
resetStringInfo(&reply_message);
509512

@@ -523,22 +526,39 @@ ProcessStandbyReplyMessage(void)
523526
* one type.
524527
*/
525528
msgtype = pq_getmsgbyte(&reply_message);
526-
if (msgtype != 'r')
529+
530+
switch (msgtype)
527531
{
528-
ereport(COMMERROR,
529-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
530-
errmsg("unexpected message type %c", msgtype)));
531-
proc_exit(0);
532+
case 'r':
533+
ProcessStandbyReplyMessage();
534+
break;
535+
536+
case 'h':
537+
ProcessStandbyHSFeedbackMessage();
538+
break;
539+
540+
default:
541+
ereport(COMMERROR,
542+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
543+
errmsg("unexpected message type %c", msgtype)));
544+
proc_exit(0);
532545
}
546+
}
547+
548+
/*
549+
* Regular reply from standby advising of WAL positions on standby server.
550+
*/
551+
static void
552+
ProcessStandbyReplyMessage(void)
553+
{
554+
StandbyReplyMessage reply;
533555

534556
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyReplyMessage));
535557

536-
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X xmin %u epoch %u",
558+
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X",
537559
reply.write.xlogid, reply.write.xrecoff,
538560
reply.flush.xlogid, reply.flush.xrecoff,
539-
reply.apply.xlogid, reply.apply.xrecoff,
540-
reply.xmin,
541-
reply.epoch);
561+
reply.apply.xlogid, reply.apply.xrecoff);
542562

543563
/*
544564
* Update shared state for this WalSender process
@@ -554,6 +574,22 @@ ProcessStandbyReplyMessage(void)
554574
walsnd->apply = reply.apply;
555575
SpinLockRelease(&walsnd->mutex);
556576
}
577+
}
578+
579+
/*
580+
* Hot Standby feedback
581+
*/
582+
static void
583+
ProcessStandbyHSFeedbackMessage(void)
584+
{
585+
StandbyHSFeedbackMessage reply;
586+
TransactionId newxmin = InvalidTransactionId;
587+
588+
pq_copymsgbytes(&reply_message, (char *) &reply, sizeof(StandbyHSFeedbackMessage));
589+
590+
elog(DEBUG2, "hot standby feedback xmin %u epoch %u",
591+
reply.xmin,
592+
reply.epoch);
557593

558594
/*
559595
* Update the WalSender's proc xmin to allow it to be visible

src/include/replication/walprotocol.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,18 @@ typedef struct
5656
XLogRecPtr flush;
5757
XLogRecPtr apply;
5858

59+
/* Sender's system clock at the time of transmission */
60+
TimestampTz sendTime;
61+
} StandbyReplyMessage;
62+
63+
/*
64+
* Hot Standby feedback from standby (message type 'h'). This is wrapped within
65+
* a CopyData message at the FE/BE protocol level.
66+
*
67+
* Note that the data length is not specified here.
68+
*/
69+
typedef struct
70+
{
5971
/*
6072
* The current xmin and epoch from the standby, for Hot Standby feedback.
6173
* This may be invalid if the standby-side does not support feedback,
@@ -64,10 +76,9 @@ typedef struct
6476
TransactionId xmin;
6577
uint32 epoch;
6678

67-
6879
/* Sender's system clock at the time of transmission */
6980
TimestampTz sendTime;
70-
} StandbyReplyMessage;
81+
} StandbyHSFeedbackMessage;
7182

7283
/*
7384
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.

0 commit comments

Comments
 (0)