Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a7d4ef9

Browse files
Daniil DavidovCommitfest Bot
Daniil Davidov
authored and
Commitfest Bot
committed
Arbitrary xid and mxid for resetwal
1 parent caa76b9 commit a7d4ef9

File tree

3 files changed

+476
-3
lines changed

3 files changed

+476
-3
lines changed

src/bin/pg_resetwal/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ include $(top_builddir)/src/Makefile.global
1717

1818
LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils
1919

20+
# required for 03_advance.pl
21+
REGRESS_SHLIB=$(top_builddir)/src/test/regress/regress$(DLSUFFIX)
22+
export REGRESS_SHLIB
23+
2024
OBJS = \
2125
$(WIN32RES) \
2226
pg_resetwal.o

src/bin/pg_resetwal/pg_resetwal.c

Lines changed: 335 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ static bool guessed = false; /* T if we had to guess at any values */
6565
static const char *progname;
6666
static uint32 set_xid_epoch = (uint32) -1;
6767
static TransactionId set_oldest_xid = 0;
68-
static TransactionId set_xid = 0;
68+
static uint64 set_xid = 0;
6969
static TransactionId set_oldest_commit_ts_xid = 0;
7070
static TransactionId set_newest_commit_ts_xid = 0;
7171
static Oid set_oid = 0;
@@ -89,7 +89,41 @@ static void KillExistingArchiveStatus(void);
8989
static void KillExistingWALSummaries(void);
9090
static void WriteEmptyXLOG(void);
9191
static void usage(void);
92+
static void AdvanceNextXid(TransactionId oldval, TransactionId newval);
93+
static void AdvanceNextMultiXid(MultiXactId oldval, MultiXactId newval);
9294

95+
/*
96+
* Note: this structure is copied from commit_ts.c and should be kept in sync.
97+
*/
98+
typedef struct CommitTimestampEntry
99+
{
100+
TimestampTz time;
101+
RepOriginId nodeid;
102+
} CommitTimestampEntry;
103+
104+
/*
105+
* Note: these macros are copied from clog.c, commit_ts.c and subtrans.c and
106+
* should be kept in sync.
107+
*/
108+
#define CLOG_BITS_PER_XACT 2
109+
#define CLOG_XACTS_PER_BYTE 4
110+
#define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
111+
112+
#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
113+
#define TransactionIdToByte(xid) (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
114+
#define TransactionIdToBIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
115+
116+
#define SUBTRANS_XACTS_PER_PAGE (BLCKSZ / sizeof(TransactionId))
117+
118+
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
119+
sizeof(RepOriginId))
120+
121+
#define COMMIT_TS_XACTS_PER_PAGE \
122+
(BLCKSZ / SizeOfCommitTimestampEntry)
123+
124+
#define MULTIXACT_OFFSETS_PER_PAGE (BLCKSZ / sizeof(MultiXactOffset))
125+
126+
#define SLRU_PAGES_PER_SEGMENT 32
93127

94128
int
95129
main(int argc, char *argv[])
@@ -441,9 +475,47 @@ main(int argc, char *argv[])
441475
}
442476

443477
if (set_xid != 0)
478+
{
479+
FullTransactionId current_fxid = ControlFile.checkPointCopy.nextXid;
480+
FullTransactionId full_datfrozenxid;
481+
uint32 current_epoch;
482+
483+
full_datfrozenxid =
484+
FullTransactionIdFromEpochAndXid(EpochFromFullTransactionId(current_fxid),
485+
ControlFile.checkPointCopy.oldestXid);
486+
487+
if (set_xid > full_datfrozenxid.value &&
488+
(set_xid - full_datfrozenxid.value) > INT32_MAX)
489+
{
490+
/*
491+
* Cannot advance transaction ID in this case, because all unfrozen
492+
* transactions in cluster will be considered as 'future' for given
493+
* and all subsequent transaction IDs.
494+
*/
495+
pg_fatal("transaction ID (-x) cannot be ahead of datfrozenxid by %u", INT32_MAX);
496+
}
497+
else if (set_xid >= MaxTransactionId)
498+
{
499+
/*
500+
* Given transaction ID might exeed current epoch, so advance epoch
501+
* if needed.
502+
*/
503+
current_epoch = set_xid / MaxTransactionId;
504+
set_xid = set_xid % MaxTransactionId;
505+
}
506+
else
507+
current_epoch = EpochFromFullTransactionId(current_fxid);
508+
444509
ControlFile.checkPointCopy.nextXid =
445-
FullTransactionIdFromEpochAndXid(EpochFromFullTransactionId(ControlFile.checkPointCopy.nextXid),
446-
set_xid);
510+
FullTransactionIdFromEpochAndXid(current_epoch, set_xid);
511+
512+
if (FullTransactionIdPrecedes(current_fxid, ControlFile.checkPointCopy.nextXid) &&
513+
!noupdate)
514+
{
515+
AdvanceNextXid(XidFromFullTransactionId(current_fxid),
516+
XidFromFullTransactionId(ControlFile.checkPointCopy.nextXid));
517+
}
518+
}
447519

448520
if (set_oldest_commit_ts_xid != 0)
449521
ControlFile.checkPointCopy.oldestCommitTsXid = set_oldest_commit_ts_xid;
@@ -455,12 +527,19 @@ main(int argc, char *argv[])
455527

456528
if (set_mxid != 0)
457529
{
530+
MultiXactId current_mxid = ControlFile.checkPointCopy.nextMulti;
458531
ControlFile.checkPointCopy.nextMulti = set_mxid;
459532

460533
ControlFile.checkPointCopy.oldestMulti = set_oldestmxid;
461534
if (ControlFile.checkPointCopy.oldestMulti < FirstMultiXactId)
462535
ControlFile.checkPointCopy.oldestMulti += FirstMultiXactId;
463536
ControlFile.checkPointCopy.oldestMultiDB = InvalidOid;
537+
538+
/*
539+
* If current_mxid precedes set_mxid.
540+
*/
541+
if (((int32) (current_mxid - set_mxid) < 0) && !noupdate)
542+
AdvanceNextMultiXid(current_mxid, set_mxid);
464543
}
465544

466545
if (set_mxoff != -1)
@@ -1218,3 +1297,256 @@ usage(void)
12181297
printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
12191298
printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
12201299
}
1300+
1301+
/*
1302+
* Calculate how many xacts can fit one page of given SLRU type.
1303+
*/
1304+
static int64
1305+
calculate_xacts_per_page(char *slru_type)
1306+
{
1307+
int64 result = -1;
1308+
1309+
if (strcmp(slru_type, "pg_xact") == 0)
1310+
result = CLOG_XACTS_PER_PAGE;
1311+
else if (strcmp(slru_type, "pg_commit_ts") == 0)
1312+
result = COMMIT_TS_XACTS_PER_PAGE;
1313+
else if (strcmp(slru_type, "pg_subtrans") == 0)
1314+
result = SUBTRANS_XACTS_PER_PAGE;
1315+
else if (strcmp(slru_type, "pg_multixact/offsets") == 0)
1316+
result = MULTIXACT_OFFSETS_PER_PAGE;
1317+
else
1318+
pg_fatal("unknown SLRU type : %s", slru_type);
1319+
1320+
return result;
1321+
}
1322+
1323+
/*
1324+
* Fill given SLRU segment with zeroes.
1325+
*/
1326+
static void
1327+
zero_segment(int fd, char *path)
1328+
{
1329+
char zeroes[BLCKSZ] = {0};
1330+
1331+
for (int i = 0; i < SLRU_PAGES_PER_SEGMENT; i++)
1332+
{
1333+
errno = 0;
1334+
if (write(fd, zeroes, BLCKSZ) != BLCKSZ)
1335+
{
1336+
if (errno == 0)
1337+
errno = ENOSPC;
1338+
pg_fatal("could not write file \"%s\": %m", path);
1339+
}
1340+
}
1341+
}
1342+
1343+
/*
1344+
* Fill entry for given transaction ID with zeroes in clog.
1345+
*/
1346+
static void
1347+
zero_clog_xact_info(int fd, char *path, char *slru_type, TransactionId xid)
1348+
{
1349+
int64 pageno;
1350+
int byteno = TransactionIdToByte(xid);
1351+
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
1352+
char *byteptr;
1353+
char byteval;
1354+
int status = 0x00;
1355+
char buff[BLCKSZ];
1356+
1357+
pageno = (xid / CLOG_XACTS_PER_PAGE) % SLRU_PAGES_PER_SEGMENT;
1358+
1359+
if (lseek(fd, pageno * BLCKSZ, SEEK_SET) != pageno * BLCKSZ)
1360+
pg_fatal("could not iterate through file \"%s\": %m", path);
1361+
1362+
if (read(fd, buff, BLCKSZ) != BLCKSZ)
1363+
pg_fatal("could not read file \"%s\": %m", path);
1364+
1365+
byteptr = buff + byteno;
1366+
1367+
byteval = *byteptr;
1368+
byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift);
1369+
byteval |= (status << bshift);
1370+
*byteptr = byteval;
1371+
1372+
if (write(fd, buff, BLCKSZ) != BLCKSZ)
1373+
{
1374+
if (errno == 0)
1375+
errno = ENOSPC;
1376+
pg_fatal("could not write file \"%s\": %m", path);
1377+
}
1378+
}
1379+
1380+
/*
1381+
* Fill entry for given transaction ID with zeroes in specified SLRU type.
1382+
*/
1383+
static void
1384+
zero_xact_info(int fd, char *path, char *slru_type, TransactionId xid)
1385+
{
1386+
int offset = 0,
1387+
entry_size = 0;
1388+
int64 pageno;
1389+
char buff[BLCKSZ];
1390+
1391+
if (strcmp(slru_type, "pg_xact") == 0)
1392+
{
1393+
zero_clog_xact_info(fd, path, slru_type, xid);
1394+
return;
1395+
}
1396+
else if (strcmp(slru_type, "pg_commit_ts") == 0)
1397+
{
1398+
entry_size = SizeOfCommitTimestampEntry;
1399+
offset = (xid % COMMIT_TS_XACTS_PER_PAGE) * entry_size;
1400+
pageno = xid / COMMIT_TS_XACTS_PER_PAGE;
1401+
}
1402+
else if (strcmp(slru_type, "pg_subtrans") == 0)
1403+
{
1404+
entry_size = sizeof(TransactionId);
1405+
offset = (xid % SUBTRANS_XACTS_PER_PAGE) * entry_size;
1406+
pageno = xid / SUBTRANS_XACTS_PER_PAGE;
1407+
}
1408+
else if (strcmp(slru_type, "pg_multixact/offsets") == 0)
1409+
{
1410+
entry_size = sizeof(MultiXactOffset);
1411+
offset = (xid % MULTIXACT_OFFSETS_PER_PAGE) * entry_size;
1412+
pageno = xid / MULTIXACT_OFFSETS_PER_PAGE;
1413+
}
1414+
else
1415+
pg_fatal("unknown SLRU type : %s", slru_type);
1416+
1417+
if (lseek(fd, pageno * BLCKSZ, SEEK_SET) != pageno * BLCKSZ)
1418+
pg_fatal("could not iterate through file \"%s\": %m", path);
1419+
1420+
if (read(fd, buff, BLCKSZ) != BLCKSZ)
1421+
pg_fatal("could not read file \"%s\": %m", path);
1422+
1423+
memset(buff + offset, 0, entry_size);
1424+
1425+
if (write(fd, buff, BLCKSZ) != BLCKSZ)
1426+
{
1427+
if (errno == 0)
1428+
errno = ENOSPC;
1429+
pg_fatal("could not write file \"%s\": %m", path);
1430+
}
1431+
}
1432+
1433+
/*
1434+
* Make sure that given xid has entry in specified SLRU type.
1435+
*/
1436+
static void
1437+
enlarge_slru(TransactionId xid, char *dir)
1438+
{
1439+
char path[MAXPGPATH];
1440+
int fd,
1441+
flags = O_RDWR | O_APPEND | O_EXCL | PG_BINARY;
1442+
int64 segno,
1443+
pageno,
1444+
xacts_per_page;
1445+
1446+
xacts_per_page = calculate_xacts_per_page(dir);
1447+
pageno = xid / xacts_per_page;
1448+
segno = pageno / SLRU_PAGES_PER_SEGMENT;
1449+
1450+
snprintf(path, MAXPGPATH, "%s/%04X", dir, (unsigned int) segno);
1451+
1452+
errno = 0;
1453+
if (access(path, F_OK) != 0)
1454+
{
1455+
if (errno != ENOENT)
1456+
pg_fatal("cannot access file \"%s\" : %m", path);
1457+
1458+
flags |= O_CREAT;
1459+
}
1460+
1461+
/*
1462+
* Create or open segment file
1463+
*/
1464+
fd = open(path, flags, pg_file_create_mode);
1465+
if (fd < 0)
1466+
pg_fatal("could not create/open file \"%s\": %m", path);
1467+
1468+
/*
1469+
* If segment doen't exist - create segment and fill all it's pages
1470+
* with zeroes.
1471+
*/
1472+
if (flags & O_CREAT)
1473+
zero_segment(fd, path);
1474+
/*
1475+
* If segment already exists - fill with zeroes given transaction's
1476+
* entry in it.
1477+
*/
1478+
else
1479+
zero_xact_info(fd, path, dir, xid);
1480+
1481+
if (fsync(fd) != 0)
1482+
pg_fatal("fsync error: %m");
1483+
1484+
close(fd);
1485+
}
1486+
1487+
/*
1488+
* Extend clog so that is can accomodate statuses of all transactions from
1489+
* oldval to newval.
1490+
*/
1491+
static void
1492+
AdvanceNextXid(TransactionId oldval, TransactionId newval)
1493+
{
1494+
int64 current_segno = -1, /* last existing slru segment */
1495+
pageno,
1496+
segno;
1497+
1498+
if (newval < oldval) /* handle wraparound */
1499+
oldval = FirstNormalTransactionId;
1500+
else /* oldval already has entry in clog */
1501+
oldval += 1;
1502+
1503+
for (TransactionId xid = oldval; xid <= newval; xid++)
1504+
{
1505+
pageno = xid / CLOG_XACTS_PER_PAGE;
1506+
segno = pageno / SLRU_PAGES_PER_SEGMENT;
1507+
1508+
/*
1509+
* We already zeroed all necessary pages in this segment during
1510+
* previous xid processing.
1511+
*/
1512+
if (segno == current_segno)
1513+
continue;
1514+
1515+
enlarge_slru(xid, "pg_xact");
1516+
1517+
current_segno = segno;
1518+
}
1519+
1520+
pageno = newval / COMMIT_TS_XACTS_PER_PAGE;
1521+
if (pageno > (oldval / COMMIT_TS_XACTS_PER_PAGE))
1522+
{
1523+
enlarge_slru(newval, "pg_commit_ts");
1524+
}
1525+
1526+
pageno = (newval / SUBTRANS_XACTS_PER_PAGE);
1527+
if (pageno > (oldval / SUBTRANS_XACTS_PER_PAGE))
1528+
{
1529+
enlarge_slru(newval, "pg_subtrans");
1530+
}
1531+
}
1532+
1533+
static void
1534+
AdvanceNextMultiXid(MultiXactId oldval, MultiXactId newval)
1535+
{
1536+
int64 current_segno = -1,
1537+
pageno,
1538+
segno;
1539+
1540+
for (MultiXactId mxid = oldval + 1; mxid <= newval; mxid++)
1541+
{
1542+
pageno = mxid / MULTIXACT_OFFSETS_PER_PAGE;
1543+
segno = pageno / SLRU_PAGES_PER_SEGMENT;
1544+
1545+
if (segno == current_segno)
1546+
continue;
1547+
1548+
enlarge_slru(mxid, "pg_multixact/offsets");
1549+
1550+
current_segno = segno;
1551+
}
1552+
}

0 commit comments

Comments
 (0)