Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0770988

Browse files
author
Daniel Shelepanov
committed
[PBCKP-278] ptrack adapted to hadling cfs relations
tags: cfs, ptrack
1 parent 936db26 commit 0770988

File tree

5 files changed

+290
-12
lines changed

5 files changed

+290
-12
lines changed

engine.c

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919

2020
#include "postgres.h"
21-
2221
#include <unistd.h>
2322
#include <sys/stat.h>
2423

@@ -36,6 +35,10 @@
3635
#include "catalog/pg_tablespace.h"
3736
#include "miscadmin.h"
3837
#include "port/pg_crc32c.h"
38+
#ifdef PGPRO_EE
39+
/* For file_is_in_cfs_tablespace() only. */
40+
#include "common/cfs_common.h"
41+
#endif
3942
#include "storage/copydir.h"
4043
#if PG_VERSION_NUM >= 120000
4144
#include "storage/md.h"
@@ -91,6 +94,68 @@ ptrack_write_chunk(int fd, pg_crc32c *crc, char *chunk, size_t size)
9194
}
9295
}
9396

97+
/*
98+
* Determines a file type which is one of the following:
99+
* * regular relation file
100+
* * regular relation file which is a part of a multifile relation
101+
* * everythong else
102+
*/
103+
FileType
104+
file_type_from_path(const char *filename) {
105+
ssize_t len = strlen(filename);
106+
bool met_dot;
107+
met_dot = false;
108+
109+
// For this length checks we assume that the filename is at least
110+
// 1 character longer than the corresponding extension ".cfm":
111+
// strlen(".cfm") == 4 therefore we assume that the filename can't be
112+
// shorter than 5 bytes, for example: "5.cfm".
113+
if(len >= 5 && strcmp(&filename[len-4], ".cfm") == 0)
114+
return FileTypeCFM;
115+
116+
// we expect filename to be ****/12345 or ****/12345.12
117+
for(ssize_t i = len - 1; i >= 0 && filename[i] != '/'; i--) {
118+
if(isdigit(filename[i]) != 0)
119+
continue;
120+
121+
// we expect the dot to appear only once
122+
if(filename[i] == '.') {
123+
if(met_dot)
124+
return FileTypeUnknown;
125+
126+
met_dot = true;
127+
} else
128+
// met anything except digit and dot => unknown file type
129+
return FileTypeUnknown;
130+
}
131+
132+
return met_dot ? FileTypeMainMulti : FileTypeMain;
133+
}
134+
135+
#ifdef PGPRO_EE
136+
/*
137+
* Determines the relation file size specified by fullpath as if it
138+
* was not compressed.
139+
*/
140+
off_t
141+
get_cfs_relation_file_decompressed_size(RelFileNodeBackend rnode, const char *fullpath, ForkNumber forknum) {
142+
File fd;
143+
int compressor;
144+
off_t size;
145+
146+
compressor = md_get_compressor_internal(rnode.node, rnode.backend, forknum);
147+
fd = PathNameOpenFile(fullpath, O_RDWR | PG_BINARY, compressor);
148+
149+
if(fd < 0)
150+
return (off_t)-1;
151+
152+
size = FileSize(fd);
153+
FileClose(fd);
154+
155+
return size;
156+
}
157+
#endif
158+
94159
/*
95160
* Delete ptrack files when ptrack is disabled.
96161
*
@@ -498,8 +563,13 @@ assign_ptrack_map_size(int newval, void *extra)
498563
* For use in functions that copy directories bypassing buffer manager.
499564
*/
500565
static void
566+
#ifdef PGPRO_EE
567+
ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
568+
const char *filepath, const char *filename, bool is_cfs)
569+
#else
501570
ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
502571
const char *filepath, const char *filename)
572+
#endif
503573
{
504574
RelFileNodeBackend rnode;
505575
ForkNumber forknum;
@@ -508,6 +578,9 @@ ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
508578
struct stat stat_buf;
509579
int oidchars;
510580
char oidbuf[OIDCHARS + 1];
581+
#ifdef PGPRO_EE
582+
off_t rel_size;
583+
#endif
511584

512585
/* Do not track temporary relations */
513586
if (looks_like_temp_rel_name(filename))
@@ -526,6 +599,21 @@ ptrack_mark_file(Oid dbOid, Oid tablespaceOid,
526599
oidbuf[oidchars] = '\0';
527600
nodeRel(nodeOf(rnode)) = atooid(oidbuf);
528601

602+
#ifdef PGPRO_EE
603+
// if current tablespace is cfs-compressed and md_get_compressor_internal
604+
// returns the type of the compressing algorithm for filepath, then it
605+
// needs to be de-compressed to obtain its size
606+
if(is_cfs && md_get_compressor_internal(rnode.node, rnode.backend, forknum) != 0) {
607+
rel_size = get_cfs_relation_file_decompressed_size(rnode, filepath, forknum);
608+
609+
if(rel_size == (off_t)-1) {
610+
elog(WARNING, "ptrack: could not open cfs-compressed relation file: %s", filepath);
611+
return;
612+
}
613+
614+
nblocks = rel_size / BLCKSZ;
615+
} else
616+
#endif
529617
/* Compute number of blocks based on file size */
530618
if (stat(filepath, &stat_buf) == 0)
531619
nblocks = stat_buf.st_size / BLCKSZ;
@@ -546,6 +634,9 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid)
546634
{
547635
DIR *dir;
548636
struct dirent *de;
637+
#ifdef PGPRO_EE
638+
bool is_cfs;
639+
#endif
549640

550641
/* Do not walk during bootstrap and if ptrack is disabled */
551642
if (ptrack_map_size == 0
@@ -554,6 +645,10 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid)
554645
|| InitializingParallelWorker)
555646
return;
556647

648+
#ifdef PGPRO_EE
649+
is_cfs = file_is_in_cfs_tablespace(path);
650+
#endif
651+
557652
dir = AllocateDir(path);
558653

559654
while ((de = ReadDirExtended(dir, path, LOG)) != NULL)
@@ -581,7 +676,11 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid)
581676
}
582677

583678
if (S_ISREG(fst.st_mode))
679+
#ifdef PGPRO_EE
680+
ptrack_mark_file(dbOid, tablespaceOid, subpath, de->d_name, is_cfs);
681+
#else
584682
ptrack_mark_file(dbOid, tablespaceOid, subpath, de->d_name);
683+
#endif
585684
}
586685

587686
FreeDir(dir); /* we ignore any error here */

engine.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ typedef struct PtrackMapHdr
7373

7474
typedef PtrackMapHdr * PtrackMap;
7575

76+
typedef enum {
77+
FileTypeUnknown,
78+
FileTypeMain,
79+
FileTypeMainMulti,
80+
FileTypeCFM
81+
} FileType;
82+
7683
/* Number of elements in ptrack map (LSN array) */
7784
#define PtrackContentNblocks \
7885
((ptrack_map_size - offsetof(PtrackMapHdr, entries) - sizeof(pg_crc32c)) / sizeof(pg_atomic_uint64))
@@ -111,4 +118,10 @@ extern void ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid);
111118
extern void ptrack_mark_block(RelFileNodeBackend smgr_rnode,
112119
ForkNumber forkno, BlockNumber blkno);
113120

121+
extern FileType file_type_from_path(const char *path);
122+
#ifdef PGPRO_EE
123+
extern off_t get_cfs_relation_file_decompressed_size(RelFileNodeBackend rnode,
124+
const char *fullpath, ForkNumber forknum);
125+
#endif
126+
114127
#endif /* PTRACK_ENGINE_H */

ptrack.c

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -251,14 +251,6 @@ ptrack_copydir_hook(const char *path)
251251

252252
elog(DEBUG1, "ptrack_copydir_hook: spcOid %u, dbOid %u", spcOid, dbOid);
253253

254-
#ifdef PGPRO_EE
255-
/*
256-
* Currently, we do not track files from compressed tablespaces in ptrack.
257-
*/
258-
if (file_is_in_cfs_tablespace(path))
259-
elog(DEBUG1, "ptrack_copydir_hook: skipping changes tracking in the CFS tablespace %u", spcOid);
260-
else
261-
#endif
262254
ptrack_walkdir(path, spcOid, dbOid);
263255

264256
if (prev_copydir_hook)
@@ -302,6 +294,10 @@ ptrack_gather_filelist(List **filelist, char *path, Oid spcOid, Oid dbOid)
302294
{
303295
DIR *dir;
304296
struct dirent *de;
297+
#ifdef PGPRO_EE
298+
bool is_cfs;
299+
is_cfs = file_is_in_cfs_tablespace(path);
300+
#endif
305301

306302
dir = AllocateDir(path);
307303

@@ -315,7 +311,8 @@ ptrack_gather_filelist(List **filelist, char *path, Oid spcOid, Oid dbOid)
315311

316312
if (strcmp(de->d_name, ".") == 0 ||
317313
strcmp(de->d_name, "..") == 0 ||
318-
looks_like_temp_rel_name(de->d_name))
314+
looks_like_temp_rel_name(de->d_name) ||
315+
file_type_from_path(de->d_name) == FileTypeCFM)
319316
continue;
320317

321318
snprintf(subpath, sizeof(subpath), "%s/%s", path, de->d_name);
@@ -362,6 +359,10 @@ ptrack_gather_filelist(List **filelist, char *path, Oid spcOid, Oid dbOid)
362359
nodeSpc(pfl->relnode) = spcOid == InvalidOid ? DEFAULTTABLESPACE_OID : spcOid;
363360
pfl->path = GetRelationPath(dbOid, nodeSpc(pfl->relnode),
364361
nodeRel(pfl->relnode), InvalidBackendId, pfl->forknum);
362+
#ifdef PGPRO_EE
363+
pfl->is_cfs_compressed = is_cfs
364+
&& md_get_compressor_internal(pfl->relnode, InvalidBackendId, pfl->forknum) != 0;
365+
#endif
365366

366367
*filelist = lappend(*filelist, pfl);
367368

@@ -403,6 +404,10 @@ ptrack_filelist_getnext(PtScanCtx * ctx)
403404
ListCell *cell;
404405
char *fullpath;
405406
struct stat fst;
407+
off_t rel_st_size = 0;
408+
#ifdef PGPRO_EE
409+
RelFileNodeBackend rnodebackend;
410+
#endif
406411

407412
/* No more file in the list */
408413
if (list_length(ctx->filelist) == 0)
@@ -449,14 +454,24 @@ ptrack_filelist_getnext(PtScanCtx * ctx)
449454
return ptrack_filelist_getnext(ctx);
450455
}
451456

457+
#ifdef PGPRO_EE
458+
rnodebackend.node = ctx->bid.relnode;
459+
rnodebackend.backend = InvalidBackendId;
460+
461+
if(pfl->is_cfs_compressed)
462+
rel_st_size = get_cfs_relation_file_decompressed_size(rnodebackend, fullpath, pfl->forknum);
463+
else
464+
#endif
465+
rel_st_size = fst.st_size;
466+
452467
if (pfl->segno > 0)
453468
{
454-
ctx->relsize = pfl->segno * RELSEG_SIZE + fst.st_size / BLCKSZ;
469+
ctx->relsize = pfl->segno * RELSEG_SIZE + rel_st_size / BLCKSZ;
455470
ctx->bid.blocknum = pfl->segno * RELSEG_SIZE;
456471
}
457472
else
458473
/* Estimate relsize as size of first segment in blocks */
459-
ctx->relsize = fst.st_size / BLCKSZ;
474+
ctx->relsize = rel_st_size / BLCKSZ;
460475

461476
elog(DEBUG3, "ptrack: got file %s with size %u from the file list", pfl->path, ctx->relsize);
462477

ptrack.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ typedef struct PtrackFileList_i
7878
ForkNumber forknum;
7979
int segno;
8080
char *path;
81+
#ifdef PGPRO_EE
82+
bool is_cfs_compressed;
83+
#endif
8184
} PtrackFileList_i;
8285

8386
#endif /* PTRACK_H */

0 commit comments

Comments
 (0)