Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6074c30

Browse files
anarazelCommitfest Bot
authored and
Commitfest Bot
committed
aio: Add IO queue helper
This is likely never going to anywhere - Thomas Munro is working on something more complete. But I needed a way to exercise aio for checkpointer / bgwriter.
1 parent c779e9b commit 6074c30

File tree

5 files changed

+241
-0
lines changed

5 files changed

+241
-0
lines changed

src/backend/storage/aio/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ OBJS = \
1515
aio_init.o \
1616
aio_io.o \
1717
aio_target.o \
18+
io_queue.o \
1819
method_io_uring.o \
1920
method_sync.o \
2021
method_worker.o \

src/backend/storage/aio/io_queue.c

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* io_queue.c
4+
* AIO - Mechanism for tracking many IOs
5+
*
6+
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
* IDENTIFICATION
10+
* src/backend/storage/aio/io_queue.c
11+
*
12+
*-------------------------------------------------------------------------
13+
*/
14+
15+
#include "postgres.h"
16+
17+
#include "lib/ilist.h"
18+
#include "storage/aio.h"
19+
#include "storage/io_queue.h"
20+
#include "utils/resowner.h"
21+
22+
23+
24+
typedef struct TrackedIO
25+
{
26+
PgAioWaitRef iow;
27+
dlist_node node;
28+
} TrackedIO;
29+
30+
struct IOQueue
31+
{
32+
int depth;
33+
int unsubmitted;
34+
35+
bool has_reserved;
36+
37+
dclist_head idle;
38+
dclist_head in_progress;
39+
40+
TrackedIO tracked_ios[FLEXIBLE_ARRAY_MEMBER];
41+
};
42+
43+
44+
IOQueue *
45+
io_queue_create(int depth, int flags)
46+
{
47+
size_t sz;
48+
IOQueue *ioq;
49+
50+
sz = offsetof(IOQueue, tracked_ios)
51+
+ sizeof(TrackedIO) * depth;
52+
53+
ioq = palloc0(sz);
54+
55+
ioq->depth = 0;
56+
57+
for (int i = 0; i < depth; i++)
58+
{
59+
TrackedIO *tio = &ioq->tracked_ios[i];
60+
61+
pgaio_wref_clear(&tio->iow);
62+
dclist_push_tail(&ioq->idle, &tio->node);
63+
}
64+
65+
return ioq;
66+
}
67+
68+
void
69+
io_queue_wait_one(IOQueue *ioq)
70+
{
71+
/* submit all pending IO before waiting */
72+
pgaio_submit_staged();
73+
74+
while (!dclist_is_empty(&ioq->in_progress))
75+
{
76+
/* FIXME: Should we really pop here already? */
77+
dlist_node *node = dclist_pop_head_node(&ioq->in_progress);
78+
TrackedIO *tio = dclist_container(TrackedIO, node, node);
79+
80+
pgaio_wref_wait(&tio->iow);
81+
dclist_push_head(&ioq->idle, &tio->node);
82+
}
83+
}
84+
85+
void
86+
io_queue_reserve(IOQueue *ioq)
87+
{
88+
if (ioq->has_reserved)
89+
return;
90+
91+
if (dclist_is_empty(&ioq->idle))
92+
io_queue_wait_one(ioq);
93+
94+
Assert(!dclist_is_empty(&ioq->idle));
95+
96+
ioq->has_reserved = true;
97+
}
98+
99+
PgAioHandle *
100+
io_queue_acquire_io(IOQueue *ioq)
101+
{
102+
PgAioHandle *ioh;
103+
104+
io_queue_reserve(ioq);
105+
106+
Assert(!dclist_is_empty(&ioq->idle));
107+
108+
if (!io_queue_is_empty(ioq))
109+
{
110+
ioh = pgaio_io_acquire_nb(CurrentResourceOwner, NULL);
111+
if (ioh == NULL)
112+
{
113+
/*
114+
* Need to wait for all IOs, blocking might not be legal in the
115+
* context.
116+
*
117+
* XXX: This doesn't make a whole lot of sense, we're also
118+
* blocking here. What was I smoking when I wrote the above?
119+
*/
120+
io_queue_wait_all(ioq);
121+
ioh = pgaio_io_acquire(CurrentResourceOwner, NULL);
122+
}
123+
}
124+
else
125+
{
126+
ioh = pgaio_io_acquire(CurrentResourceOwner, NULL);
127+
}
128+
129+
return ioh;
130+
}
131+
132+
void
133+
io_queue_track(IOQueue *ioq, const struct PgAioWaitRef *iow)
134+
{
135+
dlist_node *node;
136+
TrackedIO *tio;
137+
138+
Assert(ioq->has_reserved);
139+
ioq->has_reserved = false;
140+
141+
Assert(!dclist_is_empty(&ioq->idle));
142+
143+
node = dclist_pop_head_node(&ioq->idle);
144+
tio = dclist_container(TrackedIO, node, node);
145+
146+
tio->iow = *iow;
147+
148+
dclist_push_tail(&ioq->in_progress, &tio->node);
149+
150+
ioq->unsubmitted++;
151+
152+
/*
153+
* XXX: Should have some smarter logic here. We don't want to wait too
154+
* long to submit, that'll mean we're more likely to block. But we also
155+
* don't want to have the overhead of submitting every IO individually.
156+
*/
157+
if (ioq->unsubmitted >= 4)
158+
{
159+
pgaio_submit_staged();
160+
ioq->unsubmitted = 0;
161+
}
162+
}
163+
164+
void
165+
io_queue_wait_all(IOQueue *ioq)
166+
{
167+
/* submit all pending IO before waiting */
168+
pgaio_submit_staged();
169+
170+
while (!dclist_is_empty(&ioq->in_progress))
171+
{
172+
/* wait for the last IO to minimize unnecessary wakeups */
173+
dlist_node *node = dclist_tail_node(&ioq->in_progress);
174+
TrackedIO *tio = dclist_container(TrackedIO, node, node);
175+
176+
if (!pgaio_wref_check_done(&tio->iow))
177+
{
178+
ereport(DEBUG3,
179+
errmsg("io_queue_wait_all for io:%d",
180+
pgaio_wref_get_id(&tio->iow)),
181+
errhidestmt(true),
182+
errhidecontext(true));
183+
184+
pgaio_wref_wait(&tio->iow);
185+
}
186+
187+
dclist_delete_from(&ioq->in_progress, &tio->node);
188+
dclist_push_head(&ioq->idle, &tio->node);
189+
}
190+
}
191+
192+
bool
193+
io_queue_is_empty(IOQueue *ioq)
194+
{
195+
return dclist_is_empty(&ioq->in_progress);
196+
}
197+
198+
void
199+
io_queue_free(IOQueue *ioq)
200+
{
201+
io_queue_wait_all(ioq);
202+
203+
pfree(ioq);
204+
}

src/backend/storage/aio/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ backend_sources += files(
77
'aio_init.c',
88
'aio_io.c',
99
'aio_target.c',
10+
'io_queue.c',
1011
'method_io_uring.c',
1112
'method_sync.c',
1213
'method_worker.c',

src/include/storage/io_queue.h

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* io_queue.h
4+
* Mechanism for tracking many IOs
5+
*
6+
*
7+
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8+
* Portions Copyright (c) 1994, Regents of the University of California
9+
*
10+
* src/include/storage/io_queue.h
11+
*
12+
*-------------------------------------------------------------------------
13+
*/
14+
#ifndef IO_QUEUE_H
15+
#define IO_QUEUE_H
16+
17+
#include "storage/aio_types.h"
18+
19+
struct IOQueue;
20+
typedef struct IOQueue IOQueue;
21+
22+
struct PgAioWaitRef;
23+
24+
extern IOQueue *io_queue_create(int depth, int flags);
25+
extern void io_queue_track(IOQueue *ioq, const PgAioWaitRef *iow);
26+
extern void io_queue_wait_one(IOQueue *ioq);
27+
extern void io_queue_wait_all(IOQueue *ioq);
28+
extern bool io_queue_is_empty(IOQueue *ioq);
29+
extern void io_queue_reserve(IOQueue *ioq);
30+
extern PgAioHandle *io_queue_acquire_io(IOQueue *ioq);
31+
extern void io_queue_free(IOQueue *ioq);
32+
33+
#endif /* IO_QUEUE_H */

src/tools/pgindent/typedefs.list

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,6 +1196,7 @@ IOContext
11961196
IOFuncSelector
11971197
IOObject
11981198
IOOp
1199+
IOQueue
11991200
IO_STATUS_BLOCK
12001201
IPCompareMethod
12011202
ITEM
@@ -3022,6 +3023,7 @@ TocEntry
30223023
TokenAuxData
30233024
TokenizedAuthLine
30243025
TrackItem
3026+
TrackedIO
30253027
TransApplyAction
30263028
TransInvalidationInfo
30273029
TransState

0 commit comments

Comments
 (0)