Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a0c7fcd

Browse files
author
Arthur Zakirov
committed
Added wait_replica_wal_lsn() function
1 parent 0084441 commit a0c7fcd

File tree

1 file changed

+67
-10
lines changed

1 file changed

+67
-10
lines changed

src/backup.c

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ static int checkpoint_timeout(void);
8787
static void add_pgdata_files(parray *files, const char *root);
8888
static void write_backup_file_list(parray *files, const char *root);
8989
static void wait_wal_lsn(XLogRecPtr lsn);
90+
static void wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup);
9091
static void make_pagemap_from_ptrack(parray *files);
9192
static void StreamLog(void *arg);
9293

@@ -610,18 +611,22 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
610611
const char *params[2];
611612
uint32 xlogid;
612613
uint32 xrecoff;
614+
PGconn *conn;
613615

614616
params[0] = label;
615617

618+
/* For replica we call pg_start_backup() on master */
619+
conn = (from_replica) ? master_conn : backup_conn;
620+
616621
/* 2nd argument is 'fast'*/
617622
params[1] = smooth ? "false" : "true";
618623
if (!exclusive_backup)
619-
res = pgut_execute(backup_conn,
624+
res = pgut_execute(conn,
620625
"SELECT pg_start_backup($1, $2, false)",
621626
2,
622627
params);
623628
else
624-
res = pgut_execute(backup_conn,
629+
res = pgut_execute(conn,
625630
"SELECT pg_start_backup($1, $2)",
626631
2,
627632
params);
@@ -635,14 +640,6 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup)
635640

636641
PQclear(res);
637642

638-
/*
639-
* Switch to a new WAL segment. It is necessary to get archived WAL
640-
* segment, which includes start LSN of current backup.
641-
*
642-
* Do not switch for standby node and if backup is stream.
643-
*/
644-
if (!from_replica && !stream_wal)
645-
pg_switch_wal(backup_conn);
646643
if (!stream_wal)
647644
/*
648645
* Do not wait start_lsn for stream backup.
@@ -910,6 +907,66 @@ wait_wal_lsn(XLogRecPtr lsn)
910907
}
911908
}
912909

910+
/*
911+
* Wait for target 'lsn' on replica instance.
912+
*/
913+
static void
914+
wait_replica_wal_lsn(XLogRecPtr lsn, bool is_start_backup)
915+
{
916+
uint32 try_count = 0;
917+
918+
Assert(from_replica);
919+
920+
while (true)
921+
{
922+
PGresult *res;
923+
uint32 xlogid;
924+
uint32 xrecoff;
925+
XLogRecPtr replica_lsn;
926+
927+
/*
928+
* For lsn from pg_start_backup() we need it to be replayed on replica's
929+
* data.
930+
*/
931+
if (is_start_backup)
932+
res = pgut_execute(backup_conn, "SELECT pg_last_xlog_replay_location()",
933+
0, NULL);
934+
/*
935+
* For lsn from pg_stop_backup() we need it only to be received by
936+
* replica and fsync()'ed on WAL segment.
937+
*/
938+
else
939+
res = pgut_execute(backup_conn, "SELECT pg_last_xlog_receive_location()",
940+
0, NULL);
941+
942+
/* Extract timeline and LSN from result */
943+
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
944+
/* Calculate LSN */
945+
replica_lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
946+
PQclear(res);
947+
948+
/* target lsn was replicated */
949+
if (replica_lsn >= lsn)
950+
break;
951+
952+
sleep(1);
953+
if (interrupted)
954+
elog(ERROR, "Interrupted during waiting for target LSN");
955+
try_count++;
956+
957+
/* Inform user if target lsn is absent in first attempt */
958+
if (try_count == 1)
959+
elog(INFO, "Wait for target LSN %X/%X to be received by replica",
960+
(uint32) (lsn >> 32), (uint32) lsn);
961+
962+
if (replica_timeout > 0 && try_count > replica_timeout)
963+
elog(ERROR, "Target LSN %X/%X could not be recevied by replica "
964+
"in %d seconds",
965+
(uint32) (lsn >> 32), (uint32) lsn,
966+
replica_timeout);
967+
}
968+
}
969+
913970
/*
914971
* Notify end of backup to PostgreSQL server.
915972
*/

0 commit comments

Comments
 (0)