|
23 | 23 | #include "access/xlog_internal.h"
|
24 | 24 | #include "replication/walprotocol.h"
|
25 | 25 | #include "utils/datetime.h"
|
| 26 | +#include "utils/timestamp.h" |
26 | 27 |
|
27 | 28 | #include "receivelog.h"
|
28 | 29 | #include "streamutil.h"
|
@@ -195,6 +196,51 @@ localGetCurrentTimestamp(void)
|
195 | 196 | return result;
|
196 | 197 | }
|
197 | 198 |
|
| 199 | +/* |
| 200 | + * Local version of TimestampDifference(), since we are not |
| 201 | + * linked with backend code. |
| 202 | + */ |
| 203 | +static void |
| 204 | +localTimestampDifference(TimestampTz start_time, TimestampTz stop_time, |
| 205 | + long *secs, int *microsecs) |
| 206 | +{ |
| 207 | + TimestampTz diff = stop_time - start_time; |
| 208 | + |
| 209 | + if (diff <= 0) |
| 210 | + { |
| 211 | + *secs = 0; |
| 212 | + *microsecs = 0; |
| 213 | + } |
| 214 | + else |
| 215 | + { |
| 216 | +#ifdef HAVE_INT64_TIMESTAMP |
| 217 | + *secs = (long) (diff / USECS_PER_SEC); |
| 218 | + *microsecs = (int) (diff % USECS_PER_SEC); |
| 219 | +#else |
| 220 | + *secs = (long) diff; |
| 221 | + *microsecs = (int) ((diff - *secs) * 1000000.0); |
| 222 | +#endif |
| 223 | + } |
| 224 | +} |
| 225 | + |
| 226 | +/* |
| 227 | + * Local version of TimestampDifferenceExceeds(), since we are not |
| 228 | + * linked with backend code. |
| 229 | + */ |
| 230 | +static bool |
| 231 | +localTimestampDifferenceExceeds(TimestampTz start_time, |
| 232 | + TimestampTz stop_time, |
| 233 | + int msec) |
| 234 | +{ |
| 235 | + TimestampTz diff = stop_time - start_time; |
| 236 | + |
| 237 | +#ifdef HAVE_INT64_TIMESTAMP |
| 238 | + return (diff >= msec * INT64CONST(1000)); |
| 239 | +#else |
| 240 | + return (diff * 1000.0 >= msec); |
| 241 | +#endif |
| 242 | +} |
| 243 | + |
198 | 244 | /*
|
199 | 245 | * Receive a log stream starting at the specified position.
|
200 | 246 | *
|
@@ -306,7 +352,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
|
306 | 352 | */
|
307 | 353 | now = localGetCurrentTimestamp();
|
308 | 354 | if (standby_message_timeout > 0 &&
|
309 |
| - last_status < now - standby_message_timeout * 1000000) |
| 355 | + localTimestampDifferenceExceeds(last_status, now, |
| 356 | + standby_message_timeout)) |
310 | 357 | {
|
311 | 358 | /* Time to send feedback! */
|
312 | 359 | char replybuf[sizeof(StandbyReplyMessage) + 1];
|
@@ -345,10 +392,16 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
|
345 | 392 | FD_SET(PQsocket(conn), &input_mask);
|
346 | 393 | if (standby_message_timeout)
|
347 | 394 | {
|
348 |
| - timeout.tv_sec = last_status + standby_message_timeout - now - 1; |
| 395 | + TimestampTz targettime; |
| 396 | + |
| 397 | + targettime = TimestampTzPlusMilliseconds(last_status, |
| 398 | + standby_message_timeout - 1); |
| 399 | + localTimestampDifference(now, |
| 400 | + targettime, |
| 401 | + &timeout.tv_sec, |
| 402 | + (int *)&timeout.tv_usec); |
349 | 403 | if (timeout.tv_sec <= 0)
|
350 | 404 | timeout.tv_sec = 1; /* Always sleep at least 1 sec */
|
351 |
| - timeout.tv_usec = 0; |
352 | 405 | timeoutptr = &timeout;
|
353 | 406 | }
|
354 | 407 | else
|
|
0 commit comments