@@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
221
221
static void RemoveGXact (GlobalTransaction gxact );
222
222
223
223
static void XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len );
224
- static char * ProcessTwoPhaseBuffer (TransactionId xid ,
224
+ static char * ProcessTwoPhaseBuffer (FullTransactionId xid ,
225
225
XLogRecPtr prepare_start_lsn ,
226
226
bool fromdisk , bool setParent , bool setNextXid );
227
227
static void MarkAsPreparingGuts (GlobalTransaction gxact , TransactionId xid ,
228
228
const char * gid , TimestampTz prepared_at , Oid owner ,
229
229
Oid databaseid );
230
- static void RemoveTwoPhaseFile (TransactionId xid , bool giveWarning );
230
+ static void RemoveTwoPhaseFile (FullTransactionId fxid , bool giveWarning );
231
231
static void RecreateTwoPhaseFile (TransactionId xid , void * content , int len );
232
232
233
233
/*
@@ -927,41 +927,26 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
927
927
/************************************************************************/
928
928
929
929
/*
930
- * Compute the FullTransactionId for the given TransactionId.
931
- *
932
- * The wrap logic is safe here because the span of active xids cannot exceed one
933
- * epoch at any given time.
930
+ * Compute FullTransactionId for the given TransactionId, using the current
931
+ * epoch.
934
932
*/
935
933
static inline FullTransactionId
936
- AdjustToFullTransactionId (TransactionId xid )
934
+ FullTransactionIdFromCurrentEpoch (TransactionId xid )
937
935
{
936
+ FullTransactionId fxid ;
938
937
FullTransactionId nextFullXid ;
939
- TransactionId nextXid ;
940
938
uint32 epoch ;
941
939
942
- Assert (TransactionIdIsValid (xid ));
943
-
944
- LWLockAcquire (XidGenLock , LW_SHARED );
945
- nextFullXid = TransamVariables -> nextXid ;
946
- LWLockRelease (XidGenLock );
947
-
948
- nextXid = XidFromFullTransactionId (nextFullXid );
940
+ nextFullXid = ReadNextFullTransactionId ();
949
941
epoch = EpochFromFullTransactionId (nextFullXid );
950
- if (unlikely (xid > nextXid ))
951
- {
952
- /* Wraparound occurred, must be from a prev epoch. */
953
- Assert (epoch > 0 );
954
- epoch -- ;
955
- }
956
942
957
- return FullTransactionIdFromEpochAndXid (epoch , xid );
943
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
944
+ return fxid ;
958
945
}
959
946
960
947
static inline int
961
- TwoPhaseFilePath (char * path , TransactionId xid )
948
+ TwoPhaseFilePath (char * path , FullTransactionId fxid )
962
949
{
963
- FullTransactionId fxid = AdjustToFullTransactionId (xid );
964
-
965
950
return snprintf (path , MAXPGPATH , TWOPHASE_DIR "/%08X%08X" ,
966
951
EpochFromFullTransactionId (fxid ),
967
952
XidFromFullTransactionId (fxid ));
@@ -1297,7 +1282,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1297
1282
* If it looks OK (has a valid magic number and CRC), return the palloc'd
1298
1283
* contents of the file, issuing an error when finding corrupted data. If
1299
1284
* missing_ok is true, which indicates that missing files can be safely
1300
- * ignored, then return NULL. This state can be reached when doing recovery.
1285
+ * ignored, then return NULL. This state can be reached when doing recovery
1286
+ * after discarding two-phase files from other epochs.
1301
1287
*/
1302
1288
static char *
1303
1289
ReadTwoPhaseFile (TransactionId xid , bool missing_ok )
@@ -1311,8 +1297,10 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1311
1297
pg_crc32c calc_crc ,
1312
1298
file_crc ;
1313
1299
int r ;
1300
+ FullTransactionId fxid ;
1314
1301
1315
- TwoPhaseFilePath (path , xid );
1302
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1303
+ TwoPhaseFilePath (path , fxid );
1316
1304
1317
1305
fd = OpenTransientFile (path , O_RDONLY | PG_BINARY );
1318
1306
if (fd < 0 )
@@ -1677,10 +1665,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1677
1665
AtEOXact_PgStat (isCommit , false);
1678
1666
1679
1667
/*
1680
- * And now we can clean up any files we may have left.
1668
+ * And now we can clean up any files we may have left. These should be
1669
+ * from the current epoch.
1681
1670
*/
1682
1671
if (ondisk )
1683
- RemoveTwoPhaseFile (xid , true);
1672
+ {
1673
+ FullTransactionId fxid ;
1674
+
1675
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1676
+ RemoveTwoPhaseFile (fxid , true);
1677
+ }
1684
1678
1685
1679
MyLockedGxact = NULL ;
1686
1680
@@ -1719,13 +1713,17 @@ ProcessRecords(char *bufptr, TransactionId xid,
1719
1713
*
1720
1714
* If giveWarning is false, do not complain about file-not-present;
1721
1715
* this is an expected case during WAL replay.
1716
+ *
1717
+ * This routine is used at early stages at recovery where future and
1718
+ * past orphaned files are checked, hence the FullTransactionId to build
1719
+ * a complete file name fit for the removal.
1722
1720
*/
1723
1721
static void
1724
- RemoveTwoPhaseFile (TransactionId xid , bool giveWarning )
1722
+ RemoveTwoPhaseFile (FullTransactionId fxid , bool giveWarning )
1725
1723
{
1726
1724
char path [MAXPGPATH ];
1727
1725
1728
- TwoPhaseFilePath (path , xid );
1726
+ TwoPhaseFilePath (path , fxid );
1729
1727
if (unlink (path ))
1730
1728
if (errno != ENOENT || giveWarning )
1731
1729
ereport (WARNING ,
@@ -1745,13 +1743,16 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1745
1743
char path [MAXPGPATH ];
1746
1744
pg_crc32c statefile_crc ;
1747
1745
int fd ;
1746
+ FullTransactionId fxid ;
1748
1747
1749
1748
/* Recompute CRC */
1750
1749
INIT_CRC32C (statefile_crc );
1751
1750
COMP_CRC32C (statefile_crc , content , len );
1752
1751
FIN_CRC32C (statefile_crc );
1753
1752
1754
- TwoPhaseFilePath (path , xid );
1753
+ /* Use current epoch */
1754
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1755
+ TwoPhaseFilePath (path , fxid );
1755
1756
1756
1757
fd = OpenTransientFile (path ,
1757
1758
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY );
@@ -1899,7 +1900,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
1899
1900
* Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1900
1901
* This is called once at the beginning of recovery, saving any extra
1901
1902
* lookups in the future. Two-phase files that are newer than the
1902
- * minimum XID horizon are discarded on the way.
1903
+ * minimum XID horizon are discarded on the way. Two-phase files with
1904
+ * an epoch older or newer than the current checkpoint's record epoch
1905
+ * are also discarded.
1903
1906
*/
1904
1907
void
1905
1908
restoreTwoPhaseData (void )
@@ -1914,14 +1917,11 @@ restoreTwoPhaseData(void)
1914
1917
if (strlen (clde -> d_name ) == 16 &&
1915
1918
strspn (clde -> d_name , "0123456789ABCDEF" ) == 16 )
1916
1919
{
1917
- TransactionId xid ;
1918
1920
FullTransactionId fxid ;
1919
1921
char * buf ;
1920
1922
1921
1923
fxid = FullTransactionIdFromU64 (strtou64 (clde -> d_name , NULL , 16 ));
1922
- xid = XidFromFullTransactionId (fxid );
1923
-
1924
- buf = ProcessTwoPhaseBuffer (xid , InvalidXLogRecPtr ,
1924
+ buf = ProcessTwoPhaseBuffer (fxid , InvalidXLogRecPtr ,
1925
1925
true, false, false);
1926
1926
if (buf == NULL )
1927
1927
continue ;
@@ -1972,6 +1972,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1972
1972
TransactionId origNextXid = XidFromFullTransactionId (nextXid );
1973
1973
TransactionId result = origNextXid ;
1974
1974
TransactionId * xids = NULL ;
1975
+ uint32 epoch = EpochFromFullTransactionId (nextXid );
1975
1976
int nxids = 0 ;
1976
1977
int allocsize = 0 ;
1977
1978
int i ;
@@ -1980,14 +1981,20 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1980
1981
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
1981
1982
{
1982
1983
TransactionId xid ;
1984
+ FullTransactionId fxid ;
1983
1985
char * buf ;
1984
1986
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
1985
1987
1986
1988
Assert (gxact -> inredo );
1987
1989
1988
1990
xid = gxact -> xid ;
1989
1991
1990
- buf = ProcessTwoPhaseBuffer (xid ,
1992
+ /*
1993
+ * All two-phase files with past and future epoch in pg_twophase are
1994
+ * gone at this point, so we're OK to rely on only the current epoch.
1995
+ */
1996
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
1997
+ buf = ProcessTwoPhaseBuffer (fxid ,
1991
1998
gxact -> prepare_start_lsn ,
1992
1999
gxact -> ondisk , false, true);
1993
2000
@@ -2049,19 +2056,31 @@ void
2049
2056
StandbyRecoverPreparedTransactions (void )
2050
2057
{
2051
2058
int i ;
2059
+ uint32 epoch ;
2060
+ FullTransactionId nextFullXid ;
2061
+
2062
+ /* get current epoch */
2063
+ nextFullXid = ReadNextFullTransactionId ();
2064
+ epoch = EpochFromFullTransactionId (nextFullXid );
2052
2065
2053
2066
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
2054
2067
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
2055
2068
{
2056
2069
TransactionId xid ;
2070
+ FullTransactionId fxid ;
2057
2071
char * buf ;
2058
2072
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
2059
2073
2060
2074
Assert (gxact -> inredo );
2061
2075
2062
2076
xid = gxact -> xid ;
2063
2077
2064
- buf = ProcessTwoPhaseBuffer (xid ,
2078
+ /*
2079
+ * At this stage, we're OK to work with the current epoch as all past
2080
+ * and future files have been already discarded.
2081
+ */
2082
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
2083
+ buf = ProcessTwoPhaseBuffer (fxid ,
2065
2084
gxact -> prepare_start_lsn ,
2066
2085
gxact -> ondisk , true, false);
2067
2086
if (buf != NULL )
@@ -2090,18 +2109,29 @@ void
2090
2109
RecoverPreparedTransactions (void )
2091
2110
{
2092
2111
int i ;
2112
+ uint32 epoch ;
2113
+ FullTransactionId nextFullXid ;
2114
+
2115
+ /* get current epoch */
2116
+ nextFullXid = ReadNextFullTransactionId ();
2117
+ epoch = EpochFromFullTransactionId (nextFullXid );
2093
2118
2094
2119
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
2095
2120
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
2096
2121
{
2097
2122
TransactionId xid ;
2123
+ FullTransactionId fxid ;
2098
2124
char * buf ;
2099
2125
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
2100
2126
char * bufptr ;
2101
2127
TwoPhaseFileHeader * hdr ;
2102
2128
TransactionId * subxids ;
2103
2129
const char * gid ;
2104
2130
2131
+ /*
2132
+ * At this stage, we're OK to work with the current epoch as all past
2133
+ * and future files have been already discarded.
2134
+ */
2105
2135
xid = gxact -> xid ;
2106
2136
2107
2137
/*
@@ -2113,7 +2143,8 @@ RecoverPreparedTransactions(void)
2113
2143
* SubTransSetParent has been set before, if the prepared transaction
2114
2144
* generated xid assignment records.
2115
2145
*/
2116
- buf = ProcessTwoPhaseBuffer (xid ,
2146
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
2147
+ buf = ProcessTwoPhaseBuffer (fxid ,
2117
2148
gxact -> prepare_start_lsn ,
2118
2149
gxact -> ondisk , true, false);
2119
2150
if (buf == NULL )
@@ -2181,7 +2212,7 @@ RecoverPreparedTransactions(void)
2181
2212
/*
2182
2213
* ProcessTwoPhaseBuffer
2183
2214
*
2184
- * Given a transaction id , read it either from disk or read it directly
2215
+ * Given a FullTransactionId , read it either from disk or read it directly
2185
2216
* via shmem xlog record pointer using the provided "prepare_start_lsn".
2186
2217
*
2187
2218
* If setParent is true, set up subtransaction parent linkages.
@@ -2190,32 +2221,35 @@ RecoverPreparedTransactions(void)
2190
2221
* value scanned.
2191
2222
*/
2192
2223
static char *
2193
- ProcessTwoPhaseBuffer (TransactionId xid ,
2224
+ ProcessTwoPhaseBuffer (FullTransactionId fxid ,
2194
2225
XLogRecPtr prepare_start_lsn ,
2195
2226
bool fromdisk ,
2196
2227
bool setParent , bool setNextXid )
2197
2228
{
2198
2229
FullTransactionId nextXid = TransamVariables -> nextXid ;
2199
- TransactionId origNextXid = XidFromFullTransactionId (nextXid );
2200
2230
TransactionId * subxids ;
2201
2231
char * buf ;
2202
2232
TwoPhaseFileHeader * hdr ;
2203
2233
int i ;
2234
+ TransactionId xid = XidFromFullTransactionId (fxid );
2204
2235
2205
2236
Assert (LWLockHeldByMeInMode (TwoPhaseStateLock , LW_EXCLUSIVE ));
2206
2237
2207
2238
if (!fromdisk )
2208
2239
Assert (prepare_start_lsn != InvalidXLogRecPtr );
2209
2240
2210
- /* Reject XID if too new */
2211
- if (TransactionIdFollowsOrEquals (xid , origNextXid ))
2241
+ /*
2242
+ * Reject full XID if too new. Note that this discards files from future
2243
+ * epochs.
2244
+ */
2245
+ if (FullTransactionIdFollowsOrEquals (fxid , nextXid ))
2212
2246
{
2213
2247
if (fromdisk )
2214
2248
{
2215
2249
ereport (WARNING ,
2216
- (errmsg ("removing future two-phase state file for transaction %u" ,
2217
- xid )));
2218
- RemoveTwoPhaseFile (xid , true);
2250
+ (errmsg ("removing future two-phase state file of epoch %u for transaction %u" ,
2251
+ EpochFromFullTransactionId ( fxid ), xid )));
2252
+ RemoveTwoPhaseFile (fxid , true);
2219
2253
}
2220
2254
else
2221
2255
{
@@ -2227,6 +2261,26 @@ ProcessTwoPhaseBuffer(TransactionId xid,
2227
2261
return NULL ;
2228
2262
}
2229
2263
2264
+ /* Discard files from past epochs */
2265
+ if (EpochFromFullTransactionId (fxid ) < EpochFromFullTransactionId (nextXid ))
2266
+ {
2267
+ if (fromdisk )
2268
+ {
2269
+ ereport (WARNING ,
2270
+ (errmsg ("removing past two-phase state file of epoch %u for transaction %u" ,
2271
+ EpochFromFullTransactionId (fxid ), xid )));
2272
+ RemoveTwoPhaseFile (fxid , true);
2273
+ }
2274
+ else
2275
+ {
2276
+ ereport (WARNING ,
2277
+ (errmsg ("removing past two-phase state from memory for transaction %u" ,
2278
+ xid )));
2279
+ PrepareRedoRemove (xid , true);
2280
+ }
2281
+ return NULL ;
2282
+ }
2283
+
2230
2284
/* Already processed? */
2231
2285
if (TransactionIdDidCommit (xid ) || TransactionIdDidAbort (xid ))
2232
2286
{
@@ -2235,7 +2289,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
2235
2289
ereport (WARNING ,
2236
2290
(errmsg ("removing stale two-phase state file for transaction %u" ,
2237
2291
xid )));
2238
- RemoveTwoPhaseFile (xid , true);
2292
+ RemoveTwoPhaseFile (fxid , true);
2239
2293
}
2240
2294
else
2241
2295
{
@@ -2521,8 +2575,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2521
2575
if (!XLogRecPtrIsInvalid (start_lsn ))
2522
2576
{
2523
2577
char path [MAXPGPATH ];
2578
+ FullTransactionId fxid ;
2524
2579
2525
- TwoPhaseFilePath (path , hdr -> xid );
2580
+ /* Use current epoch */
2581
+ fxid = FullTransactionIdFromCurrentEpoch (hdr -> xid );
2582
+ TwoPhaseFilePath (path , fxid );
2526
2583
2527
2584
if (access (path , F_OK ) == 0 )
2528
2585
{
@@ -2617,7 +2674,15 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
2617
2674
*/
2618
2675
elog (DEBUG2 , "removing 2PC data for transaction %u" , xid );
2619
2676
if (gxact -> ondisk )
2620
- RemoveTwoPhaseFile (xid , giveWarning );
2677
+ {
2678
+ FullTransactionId fxid ;
2679
+
2680
+ /*
2681
+ * We should deal with a file at the current epoch here.
2682
+ */
2683
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
2684
+ RemoveTwoPhaseFile (fxid , giveWarning );
2685
+ }
2621
2686
RemoveGXact (gxact );
2622
2687
}
2623
2688
0 commit comments