Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit cee1dd1

Browse files
committed
Refrain from duplicating data in reorderbuffers
If a walsender exits leaving data in reorderbuffers, the next walsender that tries to decode the same transaction would append its decoded data in the same spill files without truncating it first, which effectively duplicate the data. Avoid that by removing any leftover reorderbuffer spill files when a walsender starts. Backpatch to 9.4; this bug has been there from the very beginning of logical decoding. Author: Craig Ringer, revised by me Reviewed by: Álvaro Herrera, Petr Jelínek, Masahiko Sawada
1 parent e20dd6a commit cee1dd1

File tree

1 file changed

+82
-55
lines changed

1 file changed

+82
-55
lines changed

src/backend/replication/logical/reorderbuffer.c

Lines changed: 82 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,9 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
199199
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
200200
char *change);
201201
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
202+
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
203+
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
204+
TransactionId xid, XLogSegNo segno);
202205

203206
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
204207
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
@@ -217,7 +220,8 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
217220

218221

219222
/*
220-
* Allocate a new ReorderBuffer
223+
* Allocate a new ReorderBuffer and clean out any old serialized state from
224+
* prior ReorderBuffer instances for the same slot.
221225
*/
222226
ReorderBuffer *
223227
ReorderBufferAllocate(void)
@@ -226,6 +230,8 @@ ReorderBufferAllocate(void)
226230
HASHCTL hash_ctl;
227231
MemoryContext new_ctx;
228232

233+
Assert(MyReplicationSlot != NULL);
234+
229235
/* allocate memory in own context, to have better accountability */
230236
new_ctx = AllocSetContextCreate(CurrentMemoryContext,
231237
"ReorderBuffer",
@@ -268,6 +274,13 @@ ReorderBufferAllocate(void)
268274
dlist_init(&buffer->toplevel_by_lsn);
269275
slist_init(&buffer->cached_tuplebufs);
270276

277+
/*
278+
* Ensure there's no stale data from prior uses of this slot, in case some
279+
* prior exit avoided calling ReorderBufferFree. Failure to do this can
280+
* produce duplicated txns, and it's very cheap if there's nothing there.
281+
*/
282+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
283+
271284
return buffer;
272285
}
273286

@@ -284,6 +297,9 @@ ReorderBufferFree(ReorderBuffer *rb)
284297
* memory context.
285298
*/
286299
MemoryContextDelete(context);
300+
301+
/* Free disk space used by unconsumed reorder buffers */
302+
ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
287303
}
288304

289305
/*
@@ -2073,7 +2089,6 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
20732089
int fd = -1;
20742090
XLogSegNo curOpenSegNo = 0;
20752091
Size spilled = 0;
2076-
char path[MAXPGPATH];
20772092

20782093
elog(DEBUG2, "spill %u changes in XID %u to disk",
20792094
(uint32) txn->nentries_mem, txn->xid);
@@ -2100,21 +2115,19 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21002115
*/
21012116
if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
21022117
{
2103-
XLogRecPtr recptr;
2118+
char path[MAXPGPATH];
21042119

21052120
if (fd != -1)
21062121
CloseTransientFile(fd);
21072122

21082123
XLByteToSeg(change->lsn, curOpenSegNo);
2109-
XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
21102124

21112125
/*
21122126
* No need to care about TLIs here, only used during a single run,
21132127
* so each LSN only maps to a specific WAL record.
21142128
*/
2115-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2116-
NameStr(MyReplicationSlot->data.name), txn->xid,
2117-
(uint32) (recptr >> 32), (uint32) recptr);
2129+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2130+
curOpenSegNo);
21182131

21192132
/* open segment, create it if necessary */
21202133
fd = OpenTransientFile(path,
@@ -2124,8 +2137,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
21242137
if (fd < 0)
21252138
ereport(ERROR,
21262139
(errcode_for_file_access(),
2127-
errmsg("could not open file \"%s\": %m",
2128-
path)));
2140+
errmsg("could not open file \"%s\": %m", path)));
21292141
}
21302142

21312143
ReorderBufferSerializeChange(rb, txn, fd, change);
@@ -2343,25 +2355,20 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
23432355

23442356
if (*fd == -1)
23452357
{
2346-
XLogRecPtr recptr;
23472358
char path[MAXPGPATH];
23482359

23492360
/* first time in */
23502361
if (*segno == 0)
2351-
{
23522362
XLByteToSeg(txn->first_lsn, *segno);
2353-
}
23542363

23552364
Assert(*segno != 0 || dlist_is_empty(&txn->changes));
2356-
XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
23572365

23582366
/*
23592367
* No need to care about TLIs here, only used during a single run,
23602368
* so each LSN only maps to a specific WAL record.
23612369
*/
2362-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2363-
NameStr(MyReplicationSlot->data.name), txn->xid,
2364-
(uint32) (recptr >> 32), (uint32) recptr);
2370+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
2371+
*segno);
23652372

23662373
*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
23672374
if (*fd < 0 && errno == ENOENT)
@@ -2597,20 +2604,72 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
25972604
for (cur = first; cur <= last; cur++)
25982605
{
25992606
char path[MAXPGPATH];
2600-
XLogRecPtr recptr;
2601-
2602-
XLogSegNoOffsetToRecPtr(cur, 0, recptr);
26032607

2604-
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2605-
NameStr(MyReplicationSlot->data.name), txn->xid,
2606-
(uint32) (recptr >> 32), (uint32) recptr);
2608+
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
26072609
if (unlink(path) != 0 && errno != ENOENT)
26082610
ereport(ERROR,
26092611
(errcode_for_file_access(),
26102612
errmsg("could not remove file \"%s\": %m", path)));
26112613
}
26122614
}
26132615

2616+
/*
2617+
* Remove any leftover serialized reorder buffers from a slot directory after a
2618+
* prior crash or decoding session exit.
2619+
*/
2620+
static void
2621+
ReorderBufferCleanupSerializedTXNs(const char *slotname)
2622+
{
2623+
DIR *spill_dir;
2624+
struct dirent *spill_de;
2625+
struct stat statbuf;
2626+
char path[MAXPGPATH * 2 + 12];
2627+
2628+
sprintf(path, "pg_replslot/%s", slotname);
2629+
2630+
/* we're only handling directories here, skip if it's not ours */
2631+
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2632+
return;
2633+
2634+
spill_dir = AllocateDir(path);
2635+
while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
2636+
{
2637+
/* only look at names that can be ours */
2638+
if (strncmp(spill_de->d_name, "xid", 3) == 0)
2639+
{
2640+
snprintf(path, sizeof(path),
2641+
"pg_replslot/%s/%s", slotname,
2642+
spill_de->d_name);
2643+
2644+
if (unlink(path) != 0)
2645+
ereport(ERROR,
2646+
(errcode_for_file_access(),
2647+
errmsg("could not remove file \"%s\" during removal of pg_replslot/%s/*.xid: %m",
2648+
path, slotname)));
2649+
}
2650+
}
2651+
FreeDir(spill_dir);
2652+
}
2653+
2654+
/*
2655+
* Given a replication slot, transaction ID and segment number, fill in the
2656+
* corresponding spill file into 'path', which is a caller-owned buffer of size
2657+
* at least MAXPGPATH.
2658+
*/
2659+
static void
2660+
ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
2661+
XLogSegNo segno)
2662+
{
2663+
XLogRecPtr recptr;
2664+
2665+
XLogSegNoOffsetToRecPtr(segno, 0, recptr);
2666+
2667+
snprintf(path, MAXPGPATH, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
2668+
NameStr(MyReplicationSlot->data.name),
2669+
xid,
2670+
(uint32) (recptr >> 32), (uint32) recptr);
2671+
}
2672+
26142673
/*
26152674
* Delete all data spilled to disk after we've restarted/crashed. It will be
26162675
* recreated when the respective slots are reused.
@@ -2621,15 +2680,9 @@ StartupReorderBuffer(void)
26212680
DIR *logical_dir;
26222681
struct dirent *logical_de;
26232682

2624-
DIR *spill_dir;
2625-
struct dirent *spill_de;
2626-
26272683
logical_dir = AllocateDir("pg_replslot");
26282684
while ((logical_de = ReadDir(logical_dir, "pg_replslot")) != NULL)
26292685
{
2630-
struct stat statbuf;
2631-
char path[MAXPGPATH * 2 + 12];
2632-
26332686
if (strcmp(logical_de->d_name, ".") == 0 ||
26342687
strcmp(logical_de->d_name, "..") == 0)
26352688
continue;
@@ -2642,33 +2695,7 @@ StartupReorderBuffer(void)
26422695
* ok, has to be a surviving logical slot, iterate and delete
26432696
* everything starting with xid-*
26442697
*/
2645-
sprintf(path, "pg_replslot/%s", logical_de->d_name);
2646-
2647-
/* we're only creating directories here, skip if it's not our's */
2648-
if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
2649-
continue;
2650-
2651-
spill_dir = AllocateDir(path);
2652-
while ((spill_de = ReadDir(spill_dir, path)) != NULL)
2653-
{
2654-
if (strcmp(spill_de->d_name, ".") == 0 ||
2655-
strcmp(spill_de->d_name, "..") == 0)
2656-
continue;
2657-
2658-
/* only look at names that can be ours */
2659-
if (strncmp(spill_de->d_name, "xid", 3) == 0)
2660-
{
2661-
sprintf(path, "pg_replslot/%s/%s", logical_de->d_name,
2662-
spill_de->d_name);
2663-
2664-
if (unlink(path) != 0)
2665-
ereport(PANIC,
2666-
(errcode_for_file_access(),
2667-
errmsg("could not remove file \"%s\": %m",
2668-
path)));
2669-
}
2670-
}
2671-
FreeDir(spill_dir);
2698+
ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
26722699
}
26732700
FreeDir(logical_dir);
26742701
}

0 commit comments

Comments
 (0)