Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1ff4dde

Browse files
committed
fix parallel execution of pg_ptrack_get_block()
1 parent 7ee0803 commit 1ff4dde

File tree

5 files changed

+95
-22
lines changed

5 files changed

+95
-22
lines changed

src/backup.c

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,6 @@ static bool backup_in_progress = false;
6969
/* Is pg_stop_backup() was sent */
7070
static bool pg_stop_backup_is_sent = false;
7171

72-
typedef struct
73-
{
74-
const char *from_root;
75-
const char *to_root;
76-
parray *backup_files_list;
77-
parray *prev_backup_filelist;
78-
XLogRecPtr prev_backup_start_lsn;
79-
} backup_files_args;
80-
8172
/*
8273
* Backup routines
8374
*/
@@ -656,6 +647,8 @@ do_backup_instance(void)
656647
arg->backup_files_list = backup_files_list;
657648
arg->prev_backup_filelist = prev_backup_filelist;
658649
arg->prev_backup_start_lsn = prev_backup_start_lsn;
650+
arg->thread_backup_conn = NULL;
651+
arg->thread_cancel_conn = NULL;
659652
backup_threads_args[i] = arg;
660653
}
661654

@@ -678,6 +671,8 @@ do_backup_instance(void)
678671
for (i = 0; i < num_threads; i++)
679672
{
680673
pthread_join(backup_threads[i], NULL);
674+
if (backup_threads_args[i]->thread_backup_conn != NULL)
675+
pgut_disconnect(backup_threads_args[i]->thread_backup_conn);
681676
pg_free(backup_threads_args[i]);
682677
}
683678

@@ -1956,7 +1951,8 @@ backup_files(void *arg)
19561951
if (file->is_datafile && !file->is_cfs)
19571952
{
19581953
/* backup block by block if datafile AND not compressed by cfs*/
1959-
if (!backup_data_file(arguments->from_root,
1954+
if (!backup_data_file(arguments,
1955+
arguments->from_root,
19601956
arguments->to_root, file,
19611957
arguments->prev_backup_start_lsn,
19621958
current.backup_mode))
@@ -2686,7 +2682,8 @@ get_last_ptrack_lsn(void)
26862682
}
26872683

26882684
char *
2689-
pg_ptrack_get_block(Oid dbOid,
2685+
pg_ptrack_get_block(backup_files_args *arguments,
2686+
Oid dbOid,
26902687
Oid tblsOid,
26912688
Oid relOid,
26922689
BlockNumber blknum,
@@ -2695,7 +2692,6 @@ pg_ptrack_get_block(Oid dbOid,
26952692
PGresult *res;
26962693
char *params[4];
26972694
char *result;
2698-
PGconn *tmp_conn = NULL;
26992695

27002696
params[0] = palloc(64);
27012697
params[1] = palloc(64);
@@ -2711,10 +2707,13 @@ pg_ptrack_get_block(Oid dbOid,
27112707
sprintf(params[2], "%i", relOid);
27122708
sprintf(params[3], "%u", blknum);
27132709

2714-
tmp_conn = pgut_connect(pgut_dbname);
2710+
if (arguments->thread_backup_conn == NULL)
2711+
arguments->thread_backup_conn = pgut_connect(pgut_dbname);
27152712

27162713
//elog(LOG, "db %i pg_ptrack_get_block(%i, %i, %u)",dbOid, tblsOid, relOid, blknum);
2717-
res = pgut_execute(tmp_conn, "SELECT pg_ptrack_get_block_2($1, $2, $3, $4)",
2714+
res = pgut_execute_parallel(arguments->thread_backup_conn,
2715+
arguments->thread_cancel_conn,
2716+
"SELECT pg_ptrack_get_block_2($1, $2, $3, $4)",
27182717
4, (const char **)params, true);
27192718

27202719
if (PQnfields(res) != 1)
@@ -2735,7 +2734,6 @@ pg_ptrack_get_block(Oid dbOid,
27352734
result_size);
27362735

27372736
PQclear(res);
2738-
pgut_disconnect(tmp_conn);
27392737

27402738
pfree(params[0]);
27412739
pfree(params[1]);

src/data.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ read_page_from_file(pgFile *file, BlockNumber blknum,
222222
* to the backup file.
223223
*/
224224
static void
225-
backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
225+
backup_data_page(backup_files_args *arguments,
226+
pgFile *file, XLogRecPtr prev_backup_start_lsn,
226227
BlockNumber blknum, BlockNumber nblocks,
227228
FILE *in, FILE *out,
228229
pg_crc32 *crc, int *n_skipped,
@@ -274,7 +275,7 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
274275

275276
free(page);
276277
page = NULL;
277-
page = (Page) pg_ptrack_get_block(file->dbOid, file->tblspcOid,
278+
page = (Page) pg_ptrack_get_block(arguments, file->dbOid, file->tblspcOid,
278279
file->relOid, absolute_blknum, &page_size);
279280

280281
if (page == NULL)
@@ -371,7 +372,8 @@ backup_data_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
371372
* backup with special header.
372373
*/
373374
bool
374-
backup_data_file(const char *from_root, const char *to_root,
375+
backup_data_file(backup_files_args* arguments,
376+
const char *from_root, const char *to_root,
375377
pgFile *file, XLogRecPtr prev_backup_start_lsn,
376378
BackupMode backup_mode)
377379
{
@@ -453,7 +455,7 @@ backup_data_file(const char *from_root, const char *to_root,
453455
{
454456
for (blknum = 0; blknum < nblocks; blknum++)
455457
{
456-
backup_data_page(file, prev_backup_start_lsn, blknum,
458+
backup_data_page(arguments, file, prev_backup_start_lsn, blknum,
457459
nblocks, in, out, &(file->crc),
458460
&n_blocks_skipped, backup_mode);
459461
n_blocks_read++;
@@ -466,7 +468,7 @@ backup_data_file(const char *from_root, const char *to_root,
466468
iter = datapagemap_iterate(&file->pagemap);
467469
while (datapagemap_next(iter, &blknum))
468470
{
469-
backup_data_page(file, prev_backup_start_lsn, blknum,
471+
backup_data_page(arguments, file, prev_backup_start_lsn, blknum,
470472
nblocks, in, out, &(file->crc),
471473
&n_blocks_skipped, backup_mode);
472474
n_blocks_read++;

src/pg_probackup.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,16 @@ typedef union DataPage
242242
char data[BLCKSZ];
243243
} DataPage;
244244

245+
typedef struct
246+
{
247+
const char *from_root;
248+
const char *to_root;
249+
parray *backup_files_list;
250+
parray *prev_backup_filelist;
251+
XLogRecPtr prev_backup_start_lsn;
252+
PGconn *thread_backup_conn;
253+
PGconn *thread_cancel_conn;
254+
} backup_files_args;
245255

246256
/*
247257
* return pointer that exceeds the length of prefix from character string.
@@ -323,7 +333,9 @@ extern const char *deparse_backup_mode(BackupMode mode);
323333
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
324334
BlockNumber blkno);
325335

326-
extern char *pg_ptrack_get_block(Oid dbOid, Oid tblsOid, Oid relOid, BlockNumber blknum,
336+
extern char *pg_ptrack_get_block(backup_files_args *arguments,
337+
Oid dbOid, Oid tblsOid, Oid relOid,
338+
BlockNumber blknum,
327339
size_t *result_size);
328340
/* in restore.c */
329341
extern int do_restore_or_validate(time_t target_backup_id,
@@ -428,7 +440,8 @@ extern int pgFileCompareLinked(const void *f1, const void *f2);
428440
extern int pgFileCompareSize(const void *f1, const void *f2);
429441

430442
/* in data.c */
431-
extern bool backup_data_file(const char *from_root, const char *to_root,
443+
extern bool backup_data_file(backup_files_args* arguments,
444+
const char *from_root, const char *to_root,
432445
pgFile *file, XLogRecPtr prev_backup_start_lsn,
433446
BackupMode backup_mode);
434447
extern void restore_data_file(const char *from_root, const char *to_root,

src/utils/pgut.c

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,63 @@ pgut_set_port(const char *new_port)
15091509
port = new_port;
15101510
}
15111511

1512+
1513+
PGresult *
1514+
pgut_execute_parallel(PGconn* conn,
1515+
PGconn* cancel_conn, const char *query,
1516+
int nParams, const char **params,
1517+
bool text_result)
1518+
{
1519+
PGresult *res;
1520+
1521+
if (interrupted && !in_cleanup)
1522+
elog(ERROR, "interrupted");
1523+
1524+
/* write query to elog if verbose */
1525+
if (LOG_LEVEL_CONSOLE <= LOG || LOG_LEVEL_FILE <= LOG)
1526+
{
1527+
int i;
1528+
1529+
if (strchr(query, '\n'))
1530+
elog(LOG, "(query)\n%s", query);
1531+
else
1532+
elog(LOG, "(query) %s", query);
1533+
for (i = 0; i < nParams; i++)
1534+
elog(LOG, "\t(param:%d) = %s", i, params[i] ? params[i] : "(null)");
1535+
}
1536+
1537+
if (conn == NULL)
1538+
{
1539+
elog(ERROR, "not connected");
1540+
return NULL;
1541+
}
1542+
1543+
//on_before_exec(conn);
1544+
if (nParams == 0)
1545+
res = PQexec(conn, query);
1546+
else
1547+
res = PQexecParams(conn, query, nParams, NULL, params, NULL, NULL,
1548+
/*
1549+
* Specify zero to obtain results in text format,
1550+
* or one to obtain results in binary format.
1551+
*/
1552+
(text_result) ? 0 : 1);
1553+
//on_after_exec();
1554+
1555+
switch (PQresultStatus(res))
1556+
{
1557+
case PGRES_TUPLES_OK:
1558+
case PGRES_COMMAND_OK:
1559+
case PGRES_COPY_IN:
1560+
break;
1561+
default:
1562+
elog(ERROR, "query failed: %squery was: %s",
1563+
PQerrorMessage(conn), query);
1564+
break;
1565+
}
1566+
1567+
return res;
1568+
}
15121569
PGresult *
15131570
pgut_execute(PGconn* conn, const char *query, int nParams, const char **params,
15141571
bool text_result)

src/utils/pgut.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ extern PGconn *pgut_connect_replication_extended(const char *pghost, const char
127127
extern void pgut_disconnect(PGconn *conn);
128128
extern PGresult *pgut_execute(PGconn* conn, const char *query, int nParams,
129129
const char **params, bool text_result);
130+
extern PGresult *pgut_execute_parallel(PGconn* conn, PGconn* cancel_conn,
131+
const char *query, int nParams,
132+
const char **params, bool text_result);
130133
extern bool pgut_send(PGconn* conn, const char *query, int nParams, const char **params, int elevel);
131134
extern void pgut_cancel(PGconn* conn);
132135
extern int pgut_wait(int num, PGconn *connections[], struct timeval *timeout);

0 commit comments

Comments
 (0)