@@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
221
221
static void RemoveGXact (GlobalTransaction gxact );
222
222
223
223
static void XlogReadTwoPhaseData (XLogRecPtr lsn , char * * buf , int * len );
224
- static char * ProcessTwoPhaseBuffer (TransactionId xid ,
224
+ static char * ProcessTwoPhaseBuffer (FullTransactionId xid ,
225
225
XLogRecPtr prepare_start_lsn ,
226
226
bool fromdisk , bool setParent , bool setNextXid );
227
227
static void MarkAsPreparingGuts (GlobalTransaction gxact , TransactionId xid ,
228
228
const char * gid , TimestampTz prepared_at , Oid owner ,
229
229
Oid databaseid );
230
- static void RemoveTwoPhaseFile (TransactionId xid , bool giveWarning );
230
+ static void RemoveTwoPhaseFile (FullTransactionId fxid , bool giveWarning );
231
231
static void RecreateTwoPhaseFile (TransactionId xid , void * content , int len );
232
232
233
233
/*
@@ -927,41 +927,26 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
927
927
/************************************************************************/
928
928
929
929
/*
930
- * Compute the FullTransactionId for the given TransactionId.
931
- *
932
- * The wrap logic is safe here because the span of active xids cannot exceed one
933
- * epoch at any given time.
930
+ * Compute FullTransactionId for the given TransactionId, using the current
931
+ * epoch.
934
932
*/
935
933
static inline FullTransactionId
936
- AdjustToFullTransactionId (TransactionId xid )
934
+ FullTransactionIdFromCurrentEpoch (TransactionId xid )
937
935
{
936
+ FullTransactionId fxid ;
938
937
FullTransactionId nextFullXid ;
939
- TransactionId nextXid ;
940
938
uint32 epoch ;
941
939
942
- Assert (TransactionIdIsValid (xid ));
943
-
944
- LWLockAcquire (XidGenLock , LW_SHARED );
945
- nextFullXid = TransamVariables -> nextXid ;
946
- LWLockRelease (XidGenLock );
947
-
948
- nextXid = XidFromFullTransactionId (nextFullXid );
940
+ nextFullXid = ReadNextFullTransactionId ();
949
941
epoch = EpochFromFullTransactionId (nextFullXid );
950
- if (unlikely (xid > nextXid ))
951
- {
952
- /* Wraparound occurred, must be from a prev epoch. */
953
- Assert (epoch > 0 );
954
- epoch -- ;
955
- }
956
942
957
- return FullTransactionIdFromEpochAndXid (epoch , xid );
943
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
944
+ return fxid ;
958
945
}
959
946
960
947
static inline int
961
- TwoPhaseFilePath (char * path , TransactionId xid )
948
+ TwoPhaseFilePath (char * path , FullTransactionId fxid )
962
949
{
963
- FullTransactionId fxid = AdjustToFullTransactionId (xid );
964
-
965
950
return snprintf (path , MAXPGPATH , TWOPHASE_DIR "/%08X%08X" ,
966
951
EpochFromFullTransactionId (fxid ),
967
952
XidFromFullTransactionId (fxid ));
@@ -1297,7 +1282,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1297
1282
* If it looks OK (has a valid magic number and CRC), return the palloc'd
1298
1283
* contents of the file, issuing an error when finding corrupted data. If
1299
1284
* missing_ok is true, which indicates that missing files can be safely
1300
- * ignored, then return NULL. This state can be reached when doing recovery.
1285
+ * ignored, then return NULL. This state can be reached when doing recovery
1286
+ * after discarding two-phase files from other epochs.
1301
1287
*/
1302
1288
static char *
1303
1289
ReadTwoPhaseFile (TransactionId xid , bool missing_ok )
@@ -1311,8 +1297,10 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1311
1297
pg_crc32c calc_crc ,
1312
1298
file_crc ;
1313
1299
int r ;
1300
+ FullTransactionId fxid ;
1314
1301
1315
- TwoPhaseFilePath (path , xid );
1302
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1303
+ TwoPhaseFilePath (path , fxid );
1316
1304
1317
1305
fd = OpenTransientFile (path , O_RDONLY | PG_BINARY );
1318
1306
if (fd < 0 )
@@ -1677,10 +1665,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
1677
1665
AtEOXact_PgStat (isCommit , false);
1678
1666
1679
1667
/*
1680
- * And now we can clean up any files we may have left.
1668
+ * And now we can clean up any files we may have left. These should be
1669
+ * from the current epoch.
1681
1670
*/
1682
1671
if (ondisk )
1683
- RemoveTwoPhaseFile (xid , true);
1672
+ {
1673
+ FullTransactionId fxid ;
1674
+
1675
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1676
+ RemoveTwoPhaseFile (fxid , true);
1677
+ }
1684
1678
1685
1679
MyLockedGxact = NULL ;
1686
1680
@@ -1718,13 +1712,17 @@ ProcessRecords(char *bufptr, TransactionId xid,
1718
1712
*
1719
1713
* If giveWarning is false, do not complain about file-not-present;
1720
1714
* this is an expected case during WAL replay.
1715
+ *
1716
+ * This routine is used at early stages at recovery where future and
1717
+ * past orphaned files are checked, hence the FullTransactionId to build
1718
+ * a complete file name fit for the removal.
1721
1719
*/
1722
1720
static void
1723
- RemoveTwoPhaseFile (TransactionId xid , bool giveWarning )
1721
+ RemoveTwoPhaseFile (FullTransactionId fxid , bool giveWarning )
1724
1722
{
1725
1723
char path [MAXPGPATH ];
1726
1724
1727
- TwoPhaseFilePath (path , xid );
1725
+ TwoPhaseFilePath (path , fxid );
1728
1726
if (unlink (path ))
1729
1727
if (errno != ENOENT || giveWarning )
1730
1728
ereport (WARNING ,
@@ -1744,13 +1742,16 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1744
1742
char path [MAXPGPATH ];
1745
1743
pg_crc32c statefile_crc ;
1746
1744
int fd ;
1745
+ FullTransactionId fxid ;
1747
1746
1748
1747
/* Recompute CRC */
1749
1748
INIT_CRC32C (statefile_crc );
1750
1749
COMP_CRC32C (statefile_crc , content , len );
1751
1750
FIN_CRC32C (statefile_crc );
1752
1751
1753
- TwoPhaseFilePath (path , xid );
1752
+ /* Use current epoch */
1753
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
1754
+ TwoPhaseFilePath (path , fxid );
1754
1755
1755
1756
fd = OpenTransientFile (path ,
1756
1757
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY );
@@ -1898,7 +1899,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
1898
1899
* Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1899
1900
* This is called once at the beginning of recovery, saving any extra
1900
1901
* lookups in the future. Two-phase files that are newer than the
1901
- * minimum XID horizon are discarded on the way.
1902
+ * minimum XID horizon are discarded on the way. Two-phase files with
1903
+ * an epoch older or newer than the current checkpoint's record epoch
1904
+ * are also discarded.
1902
1905
*/
1903
1906
void
1904
1907
restoreTwoPhaseData (void )
@@ -1913,14 +1916,11 @@ restoreTwoPhaseData(void)
1913
1916
if (strlen (clde -> d_name ) == 16 &&
1914
1917
strspn (clde -> d_name , "0123456789ABCDEF" ) == 16 )
1915
1918
{
1916
- TransactionId xid ;
1917
1919
FullTransactionId fxid ;
1918
1920
char * buf ;
1919
1921
1920
1922
fxid = FullTransactionIdFromU64 (strtou64 (clde -> d_name , NULL , 16 ));
1921
- xid = XidFromFullTransactionId (fxid );
1922
-
1923
- buf = ProcessTwoPhaseBuffer (xid , InvalidXLogRecPtr ,
1923
+ buf = ProcessTwoPhaseBuffer (fxid , InvalidXLogRecPtr ,
1924
1924
true, false, false);
1925
1925
if (buf == NULL )
1926
1926
continue ;
@@ -1971,6 +1971,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1971
1971
TransactionId origNextXid = XidFromFullTransactionId (nextXid );
1972
1972
TransactionId result = origNextXid ;
1973
1973
TransactionId * xids = NULL ;
1974
+ uint32 epoch = EpochFromFullTransactionId (nextXid );
1974
1975
int nxids = 0 ;
1975
1976
int allocsize = 0 ;
1976
1977
int i ;
@@ -1979,14 +1980,20 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1979
1980
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
1980
1981
{
1981
1982
TransactionId xid ;
1983
+ FullTransactionId fxid ;
1982
1984
char * buf ;
1983
1985
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
1984
1986
1985
1987
Assert (gxact -> inredo );
1986
1988
1987
1989
xid = gxact -> xid ;
1988
1990
1989
- buf = ProcessTwoPhaseBuffer (xid ,
1991
+ /*
1992
+ * All two-phase files with past and future epoch in pg_twophase are
1993
+ * gone at this point, so we're OK to rely on only the current epoch.
1994
+ */
1995
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
1996
+ buf = ProcessTwoPhaseBuffer (fxid ,
1990
1997
gxact -> prepare_start_lsn ,
1991
1998
gxact -> ondisk , false, true);
1992
1999
@@ -2048,19 +2055,31 @@ void
2048
2055
StandbyRecoverPreparedTransactions (void )
2049
2056
{
2050
2057
int i ;
2058
+ uint32 epoch ;
2059
+ FullTransactionId nextFullXid ;
2060
+
2061
+ /* get current epoch */
2062
+ nextFullXid = ReadNextFullTransactionId ();
2063
+ epoch = EpochFromFullTransactionId (nextFullXid );
2051
2064
2052
2065
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
2053
2066
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
2054
2067
{
2055
2068
TransactionId xid ;
2069
+ FullTransactionId fxid ;
2056
2070
char * buf ;
2057
2071
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
2058
2072
2059
2073
Assert (gxact -> inredo );
2060
2074
2061
2075
xid = gxact -> xid ;
2062
2076
2063
- buf = ProcessTwoPhaseBuffer (xid ,
2077
+ /*
2078
+ * At this stage, we're OK to work with the current epoch as all past
2079
+ * and future files have been already discarded.
2080
+ */
2081
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
2082
+ buf = ProcessTwoPhaseBuffer (fxid ,
2064
2083
gxact -> prepare_start_lsn ,
2065
2084
gxact -> ondisk , true, false);
2066
2085
if (buf != NULL )
@@ -2089,18 +2108,29 @@ void
2089
2108
RecoverPreparedTransactions (void )
2090
2109
{
2091
2110
int i ;
2111
+ uint32 epoch ;
2112
+ FullTransactionId nextFullXid ;
2113
+
2114
+ /* get current epoch */
2115
+ nextFullXid = ReadNextFullTransactionId ();
2116
+ epoch = EpochFromFullTransactionId (nextFullXid );
2092
2117
2093
2118
LWLockAcquire (TwoPhaseStateLock , LW_EXCLUSIVE );
2094
2119
for (i = 0 ; i < TwoPhaseState -> numPrepXacts ; i ++ )
2095
2120
{
2096
2121
TransactionId xid ;
2122
+ FullTransactionId fxid ;
2097
2123
char * buf ;
2098
2124
GlobalTransaction gxact = TwoPhaseState -> prepXacts [i ];
2099
2125
char * bufptr ;
2100
2126
TwoPhaseFileHeader * hdr ;
2101
2127
TransactionId * subxids ;
2102
2128
const char * gid ;
2103
2129
2130
+ /*
2131
+ * At this stage, we're OK to work with the current epoch as all past
2132
+ * and future files have been already discarded.
2133
+ */
2104
2134
xid = gxact -> xid ;
2105
2135
2106
2136
/*
@@ -2112,7 +2142,8 @@ RecoverPreparedTransactions(void)
2112
2142
* SubTransSetParent has been set before, if the prepared transaction
2113
2143
* generated xid assignment records.
2114
2144
*/
2115
- buf = ProcessTwoPhaseBuffer (xid ,
2145
+ fxid = FullTransactionIdFromEpochAndXid (epoch , xid );
2146
+ buf = ProcessTwoPhaseBuffer (fxid ,
2116
2147
gxact -> prepare_start_lsn ,
2117
2148
gxact -> ondisk , true, false);
2118
2149
if (buf == NULL )
@@ -2180,7 +2211,7 @@ RecoverPreparedTransactions(void)
2180
2211
/*
2181
2212
* ProcessTwoPhaseBuffer
2182
2213
*
2183
- * Given a transaction id , read it either from disk or read it directly
2214
+ * Given a FullTransactionId , read it either from disk or read it directly
2184
2215
* via shmem xlog record pointer using the provided "prepare_start_lsn".
2185
2216
*
2186
2217
* If setParent is true, set up subtransaction parent linkages.
@@ -2189,32 +2220,35 @@ RecoverPreparedTransactions(void)
2189
2220
* value scanned.
2190
2221
*/
2191
2222
static char *
2192
- ProcessTwoPhaseBuffer (TransactionId xid ,
2223
+ ProcessTwoPhaseBuffer (FullTransactionId fxid ,
2193
2224
XLogRecPtr prepare_start_lsn ,
2194
2225
bool fromdisk ,
2195
2226
bool setParent , bool setNextXid )
2196
2227
{
2197
2228
FullTransactionId nextXid = TransamVariables -> nextXid ;
2198
- TransactionId origNextXid = XidFromFullTransactionId (nextXid );
2199
2229
TransactionId * subxids ;
2200
2230
char * buf ;
2201
2231
TwoPhaseFileHeader * hdr ;
2202
2232
int i ;
2233
+ TransactionId xid = XidFromFullTransactionId (fxid );
2203
2234
2204
2235
Assert (LWLockHeldByMeInMode (TwoPhaseStateLock , LW_EXCLUSIVE ));
2205
2236
2206
2237
if (!fromdisk )
2207
2238
Assert (prepare_start_lsn != InvalidXLogRecPtr );
2208
2239
2209
- /* Reject XID if too new */
2210
- if (TransactionIdFollowsOrEquals (xid , origNextXid ))
2240
+ /*
2241
+ * Reject full XID if too new. Note that this discards files from future
2242
+ * epochs.
2243
+ */
2244
+ if (FullTransactionIdFollowsOrEquals (fxid , nextXid ))
2211
2245
{
2212
2246
if (fromdisk )
2213
2247
{
2214
2248
ereport (WARNING ,
2215
- (errmsg ("removing future two-phase state file for transaction %u" ,
2216
- xid )));
2217
- RemoveTwoPhaseFile (xid , true);
2249
+ (errmsg ("removing future two-phase state file of epoch %u for transaction %u" ,
2250
+ EpochFromFullTransactionId ( fxid ), xid )));
2251
+ RemoveTwoPhaseFile (fxid , true);
2218
2252
}
2219
2253
else
2220
2254
{
@@ -2226,6 +2260,26 @@ ProcessTwoPhaseBuffer(TransactionId xid,
2226
2260
return NULL ;
2227
2261
}
2228
2262
2263
+ /* Discard files from past epochs */
2264
+ if (EpochFromFullTransactionId (fxid ) < EpochFromFullTransactionId (nextXid ))
2265
+ {
2266
+ if (fromdisk )
2267
+ {
2268
+ ereport (WARNING ,
2269
+ (errmsg ("removing past two-phase state file of epoch %u for transaction %u" ,
2270
+ EpochFromFullTransactionId (fxid ), xid )));
2271
+ RemoveTwoPhaseFile (fxid , true);
2272
+ }
2273
+ else
2274
+ {
2275
+ ereport (WARNING ,
2276
+ (errmsg ("removing past two-phase state from memory for transaction %u" ,
2277
+ xid )));
2278
+ PrepareRedoRemove (xid , true);
2279
+ }
2280
+ return NULL ;
2281
+ }
2282
+
2229
2283
/* Already processed? */
2230
2284
if (TransactionIdDidCommit (xid ) || TransactionIdDidAbort (xid ))
2231
2285
{
@@ -2234,7 +2288,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
2234
2288
ereport (WARNING ,
2235
2289
(errmsg ("removing stale two-phase state file for transaction %u" ,
2236
2290
xid )));
2237
- RemoveTwoPhaseFile (xid , true);
2291
+ RemoveTwoPhaseFile (fxid , true);
2238
2292
}
2239
2293
else
2240
2294
{
@@ -2520,8 +2574,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2520
2574
if (!XLogRecPtrIsInvalid (start_lsn ))
2521
2575
{
2522
2576
char path [MAXPGPATH ];
2577
+ FullTransactionId fxid ;
2523
2578
2524
- TwoPhaseFilePath (path , hdr -> xid );
2579
+ /* Use current epoch */
2580
+ fxid = FullTransactionIdFromCurrentEpoch (hdr -> xid );
2581
+ TwoPhaseFilePath (path , fxid );
2525
2582
2526
2583
if (access (path , F_OK ) == 0 )
2527
2584
{
@@ -2616,7 +2673,15 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
2616
2673
*/
2617
2674
elog (DEBUG2 , "removing 2PC data for transaction %u" , xid );
2618
2675
if (gxact -> ondisk )
2619
- RemoveTwoPhaseFile (xid , giveWarning );
2676
+ {
2677
+ FullTransactionId fxid ;
2678
+
2679
+ /*
2680
+ * We should deal with a file at the current epoch here.
2681
+ */
2682
+ fxid = FullTransactionIdFromCurrentEpoch (xid );
2683
+ RemoveTwoPhaseFile (fxid , giveWarning );
2684
+ }
2620
2685
RemoveGXact (gxact );
2621
2686
}
2622
2687
0 commit comments