CommandId combocid; /* just for debugging */
} ReorderBufferTupleCidEnt;
+/* Virtual file descriptor with file offset tracking */
+typedef struct TXNEntryFile
+{
+ File vfd; /* -1 when the file is closed */
+ off_t curOffset; /* offset for next write or read. Reset to 0
+ * when vfd is opened. */
+} TXNEntryFile;
+
/* k-way in-order change iteration support structures */
typedef struct ReorderBufferIterTXNEntry
{
XLogRecPtr lsn;
ReorderBufferChange *change;
ReorderBufferTXN *txn;
- int fd;
+ TXNEntryFile file;
XLogSegNo segno;
} ReorderBufferIterTXNEntry;
* subtransactions
* ---------------------------------------
*/
-static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state);
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
ReorderBufferIterTXNState *state);
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
int fd, ReorderBufferChange *change);
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno);
+ TXNEntryFile *file, XLogSegNo *segno);
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
char *change);
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
/*
* Allocate & initialize an iterator which iterates in lsn order over a
* transaction and all its subtransactions.
+ *
+ * Note: The iterator state is returned through iter_state parameter rather
+ * than the function's return value. This is because the state gets cleaned up
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
+ * back the state even if this function throws an exception.
*/
-static ReorderBufferIterTXNState *
-ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
+static void
+ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ ReorderBufferIterTXNState *volatile *iter_state)
{
Size nr_txns = 0;
ReorderBufferIterTXNState *state;
dlist_iter cur_txn_i;
int32 off;
+ *iter_state = NULL;
+
/*
* Calculate the size of our heap: one element for every transaction that
* contains changes. (Besides the transactions already in the reorder
for (off = 0; off < state->nr_txns; off++)
{
- state->entries[off].fd = -1;
+ state->entries[off].file.vfd = -1;
state->entries[off].segno = 0;
}
ReorderBufferIterCompare,
state);
+ /* Now that the state fields are initialized, it is safe to return it. */
+ *iter_state = state;
+
/*
* Now insert items into the binary heap, in an unordered fashion. (We
* will run a heap assembly step at the end; this is more efficient.)
{
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, txn);
- ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
+ ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
&state->entries[off].segno);
}
/* serialize remaining changes */
ReorderBufferSerializeTXN(rb, cur_txn);
ReorderBufferRestoreChanges(rb, cur_txn,
- &state->entries[off].fd,
+ &state->entries[off].file,
&state->entries[off].segno);
}
cur_change = dlist_head_element(ReorderBufferChange, node,
/* assemble a valid binary heap */
binaryheap_build(state->heap);
-
- return state;
}
/*
dlist_delete(&change->node);
dlist_push_tail(&state->old_change, &change->node);
- if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
+ if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
&state->entries[off].segno))
{
/* successfully restored changes from disk */
for (off = 0; off < state->nr_txns; off++)
{
- if (state->entries[off].fd != -1)
- CloseTransientFile(state->entries[off].fd);
+ if (state->entries[off].file.vfd != -1)
+ FileClose(state->entries[off].file.vfd);
}
/* free memory we might have "leaked" in the last *Next call */
rb->begin(rb, txn);
- iterstate = ReorderBufferIterTXNInit(rb, txn);
+ ReorderBufferIterTXNInit(rb, txn, &iterstate);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
{
Relation relation = NULL;
*/
static Size
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
- int *fd, XLogSegNo *segno)
+ TXNEntryFile *file, XLogSegNo *segno)
{
Size restored = 0;
XLogSegNo last_segno;
dlist_mutable_iter cleanup_iter;
+ File *fd = &file->vfd;
Assert(txn->first_lsn != InvalidXLogRecPtr);
Assert(txn->final_lsn != InvalidXLogRecPtr);
ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
*segno);
- *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
+ *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
+
+ /* No harm in resetting the offset even in case of failure */
+ file->curOffset = 0;
+
if (*fd < 0 && errno == ENOENT)
{
*fd = -1;
* end of this file.
*/
ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(file->vfd, rb->outbuf,
+ sizeof(ReorderBufferDiskChange),
+ file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
/* eof */
if (readBytes == 0)
{
- CloseTransientFile(*fd);
+ FileClose(*fd);
*fd = -1;
(*segno)++;
continue;
readBytes,
(uint32) sizeof(ReorderBufferDiskChange))));
+ file->curOffset += readBytes;
+
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
ReorderBufferSerializeReserve(rb,
sizeof(ReorderBufferDiskChange) + ondisk->size);
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
- pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
- readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
- ondisk->size - sizeof(ReorderBufferDiskChange));
- pgstat_report_wait_end();
+ readBytes = FileRead(file->vfd,
+ rb->outbuf + sizeof(ReorderBufferDiskChange),
+ ondisk->size - sizeof(ReorderBufferDiskChange),
+ file->curOffset,
+ WAIT_EVENT_REORDER_BUFFER_READ);
if (readBytes < 0)
ereport(ERROR,
readBytes,
(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
+ file->curOffset += readBytes;
+
/*
* ok, read a full change from disk, now restore it into proper
* in-memory format