Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 18e8789

Browse files
Andrey Kazarinovdanolivo
Andrey Kazarinov
authored andcommitted
[PGPRO-7038] bgworker for aqo_cleanup
1 parent 3cf1112 commit 18e8789

11 files changed

+334
-3
lines changed

aqo.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include "preprocessing.h"
2727
#include "learn_cache.h"
2828
#include "storage.h"
29+
#include "postmaster/bgworker.h"
30+
#include "catalog/objectaccess.h"
2931

3032

3133
PG_MODULE_MAGIC;
@@ -77,6 +79,8 @@ int auto_tuning_infinite_loop = 8;
7779
int aqo_k = 3;
7880
double log_selectivity_lower_bound = -30;
7981

82+
bool cleanup_bgworker = false;
83+
8084
/*
8185
* Currently we use it only to store query_text string which is initialized
8286
* after a query parsing and is used during the query planning.
@@ -112,6 +116,7 @@ get_parameterized_joinrel_size_hook_type prev_get_parameterized_joinrel_size_hoo
112116
ExplainOnePlan_hook_type prev_ExplainOnePlan_hook;
113117
ExplainOneNode_hook_type prev_ExplainOneNode_hook;
114118
static shmem_request_hook_type prev_shmem_request_hook = NULL;
119+
object_access_hook_type prev_object_access_hook;
115120

116121
/*****************************************************************************
117122
*
@@ -147,6 +152,93 @@ aqo_shmem_request(void)
147152
RequestAddinShmemSpace(aqo_memsize());
148153
}
149154

155+
/*
156+
* Entry point for CleanupWorker's process.
157+
*/
158+
void
159+
aqo_bgworker_cleanup(void)
160+
{
161+
int fs_num;
162+
int fss_num;
163+
164+
cleanup_aqo_database(true, &fs_num, &fss_num);
165+
}
166+
167+
/*
168+
* Object access hook
169+
*/
170+
void
171+
aqo_drop_access_hook(ObjectAccessType access,
172+
Oid classId,
173+
Oid objectId,
174+
int subId,
175+
void *arg)
176+
{
177+
if (prev_object_access_hook)
178+
(*prev_object_access_hook) (access, classId, objectId, subId, arg);
179+
180+
if (access == OAT_DROP && cleanup_bgworker)
181+
{
182+
MemoryContext old_ctx;
183+
int status = BGWH_STOPPED;
184+
pid_t pid;
185+
186+
old_ctx = MemoryContextSwitchTo(AQOTopMemCtx);
187+
LWLockAcquire(&aqo_state->lock, LW_EXCLUSIVE);
188+
if (aqo_state->bgw_handle != NULL)
189+
{
190+
status = GetBackgroundWorkerPid(aqo_state->bgw_handle, &pid);
191+
}
192+
LWLockRelease(&aqo_state->lock);
193+
if (status != BGWH_STARTED)
194+
{
195+
aqo_bgworker_startup();
196+
}
197+
MemoryContextSwitchTo(old_ctx);
198+
}
199+
}
200+
201+
void
202+
aqo_bgworker_startup(void)
203+
{
204+
BackgroundWorker worker;
205+
BackgroundWorkerHandle *handle;
206+
BgwHandleStatus status;
207+
pid_t pid;
208+
209+
MemSet(&worker, 0, sizeof(worker));
210+
211+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
212+
BGWORKER_BACKEND_DATABASE_CONNECTION;
213+
worker.bgw_start_time = BgWorkerStart_ConsistentState;
214+
worker.bgw_restart_time = BGW_NEVER_RESTART;
215+
worker.bgw_main_arg = Int32GetDatum(0);
216+
worker.bgw_extra[0] = 0;
217+
memcpy(worker.bgw_function_name, "aqo_bgworker_cleanup", 21);
218+
memcpy(worker.bgw_library_name, "aqo", 4);
219+
memcpy(worker.bgw_name, "aqo cleanup", 12);
220+
221+
/* must set notify PID to wait for startup */
222+
worker.bgw_notify_pid = MyProcPid;
223+
224+
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
225+
ereport(NOTICE,
226+
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
227+
errmsg("could not register background process"),
228+
errhint("You might need to increase max_worker_processes.")));
229+
230+
status = WaitForBackgroundWorkerStartup(handle, &pid);
231+
if (status != BGWH_STARTED)
232+
ereport(NOTICE,
233+
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
234+
errmsg("could not start background process"),
235+
errhint("More details may be available in the server log.")));
236+
237+
LWLockAcquire(&aqo_state->lock, LW_EXCLUSIVE);
238+
aqo_state->bgw_handle = handle;
239+
LWLockRelease(&aqo_state->lock);
240+
}
241+
150242
void
151243
_PG_init(void)
152244
{
@@ -309,6 +401,18 @@ _PG_init(void)
309401
NULL
310402
);
311403

404+
DefineCustomBoolVariable("aqo.cleanup_bgworker",
405+
"Enable bgworker which responsible for doing cleanup after drop",
406+
NULL,
407+
&cleanup_bgworker,
408+
false,
409+
PGC_USERSET,
410+
0,
411+
NULL,
412+
NULL,
413+
NULL
414+
);
415+
312416
prev_shmem_startup_hook = shmem_startup_hook;
313417
shmem_startup_hook = aqo_init_shmem;
314418
prev_planner_hook = planner_hook;
@@ -349,6 +453,9 @@ _PG_init(void)
349453
prev_shmem_request_hook = shmem_request_hook;
350454
shmem_request_hook = aqo_shmem_request;
351455

456+
prev_object_access_hook = object_access_hook;
457+
object_access_hook = aqo_drop_access_hook;
458+
352459
init_deactivated_queries_storage();
353460

354461
/*

aqo.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
#include "utils/rel.h"
143143
#include "utils/fmgroids.h"
144144
#include "utils/snapmgr.h"
145+
#include "catalog/objectaccess.h"
145146

146147
#include "machine_learning.h"
147148
//#include "storage.h"
@@ -299,5 +300,10 @@ extern void selectivity_cache_clear(void);
299300

300301
extern bool IsQueryDisabled(void);
301302

303+
extern void aqo_drop_access_hook(ObjectAccessType access, Oid classId,
304+
Oid objectId, int subId, void *arg);
305+
extern void aqo_bgworker_startup(void);
306+
extern PGDLLEXPORT void aqo_bgworker_cleanup(void);
307+
302308
extern List *cur_classes;
303309
#endif

aqo_shared.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ aqo_init_shmem(void)
199199
aqo_state->stat_changed = false;
200200
aqo_state->data_changed = false;
201201
aqo_state->queries_changed = false;
202+
aqo_state->bgw_handle = NULL;
202203

203204
LWLockInitialize(&aqo_state->lock, LWLockNewTrancheId());
204205
LWLockInitialize(&aqo_state->stat_lock, LWLockNewTrancheId());

aqo_shared.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include "storage/lwlock.h"
88
#include "utils/dsa.h"
99
#include "lib/dshash.h"
10+
#include "postmaster/bgworker.h"
1011

1112
#define AQO_SHARED_MAGIC 0x053163
1213

@@ -43,6 +44,8 @@ typedef struct AQOSharedState
4344

4445
LWLock queries_lock; /* lock for access to queries storage */
4546
bool queries_changed;
47+
48+
BackgroundWorkerHandle *bgw_handle;
4649
} AQOSharedState;
4750

4851

expected/cleanup_bgworker.out

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
CREATE TABLE aqo_test0(a int, b int, c int, d int);
2+
WITH RECURSIVE t(a, b, c, d)
3+
AS (
4+
VALUES (0, 0, 0, 0)
5+
UNION ALL
6+
SELECT t.a + 1, t.b + 1, t.c + 1, t.d + 1 FROM t WHERE t.a < 2000
7+
) INSERT INTO aqo_test0 (SELECT * FROM t);
8+
CREATE INDEX aqo_test0_idx_a ON aqo_test0 (a);
9+
ANALYZE aqo_test0;
10+
CREATE EXTENSION aqo;
11+
SET aqo.join_threshold = 0;
12+
SET aqo.mode = 'learn';
13+
SET aqo.cleanup_bgworker = 'true';
14+
EXPLAIN SELECT t1.a, t2.b FROM aqo_test0 AS t1, aqo_test0 AS t2
15+
WHERE t1.a < 1 AND t1.b < 1 AND t2.c < 1 AND t2.d < 1;
16+
QUERY PLAN
17+
------------------------------------------------------------------------------------------
18+
Nested Loop (cost=0.28..49.32 rows=1 width=8)
19+
-> Index Scan using aqo_test0_idx_a on aqo_test0 t1 (cost=0.28..8.30 rows=1 width=4)
20+
Index Cond: (a < 1)
21+
Filter: (b < 1)
22+
-> Seq Scan on aqo_test0 t2 (cost=0.00..41.02 rows=1 width=4)
23+
Filter: ((c < 1) AND (d < 1))
24+
(6 rows)
25+
26+
-- Test 1 : delete 1 out of 1 entry
27+
SELECT count(*) FROM aqo_queries;
28+
count
29+
-------
30+
2
31+
(1 row)
32+
33+
DROP TABLE aqo_test0;
34+
CREATE TABLE IF NOT EXISTS aqo_test0(a int, b int, c int, d int);
35+
WITH RECURSIVE t(a, b, c, d)
36+
AS (
37+
VALUES (0, 0, 0, 0)
38+
UNION ALL
39+
SELECT t.a + 1, t.b + 1, t.c + 1, t.d + 1 FROM t WHERE t.a < 2000
40+
) INSERT INTO aqo_test0 (SELECT * FROM t);
41+
CREATE INDEX aqo_test0_idx_a ON aqo_test0 (a);
42+
ANALYZE aqo_test0;
43+
CREATE TABLE aqo_test1(a int, b int);
44+
WITH RECURSIVE t(a, b)
45+
AS (
46+
VALUES (1, 2)
47+
UNION ALL
48+
SELECT t.a + 1, t.b + 1 FROM t WHERE t.a < 20
49+
) INSERT INTO aqo_test1 (SELECT * FROM t);
50+
CREATE INDEX aqo_test1_idx_a ON aqo_test1 (a);
51+
ANALYZE aqo_test1;
52+
SELECT count(*) FROM aqo_queries;
53+
count
54+
-------
55+
1
56+
(1 row)
57+
58+
EXPLAIN SELECT * FROM aqo_test0
59+
WHERE a < 3 AND b < 3 AND c < 3 AND d < 3;
60+
QUERY PLAN
61+
----------------------------------------------------------------------------------
62+
Index Scan using aqo_test0_idx_a on aqo_test0 (cost=0.28..8.35 rows=1 width=16)
63+
Index Cond: (a < 3)
64+
Filter: ((b < 3) AND (c < 3) AND (d < 3))
65+
(3 rows)
66+
67+
EXPLAIN SELECT * FROM aqo_test1
68+
WHERE a < 4 AND b < 4;
69+
QUERY PLAN
70+
---------------------------------------------------------
71+
Seq Scan on aqo_test1 (cost=0.00..1.30 rows=1 width=8)
72+
Filter: ((a < 4) AND (b < 4))
73+
(2 rows)
74+
75+
-- Test2: Must delete 1 out of 2 entries
76+
SELECT count(*) FROM aqo_queries;
77+
count
78+
-------
79+
3
80+
(1 row)
81+
82+
DROP TABLE aqo_test1;
83+
CREATE TABLE IF NOT EXISTS aqo_test1(a int, b int);
84+
WITH RECURSIVE t(a, b)
85+
AS (
86+
VALUES (1, 2)
87+
UNION ALL
88+
SELECT t.a + 1, t.b + 1 FROM t WHERE t.a < 20
89+
) INSERT INTO aqo_test1 (SELECT * FROM t);
90+
CREATE INDEX aqo_test1_idx_a ON aqo_test1 (a);
91+
ANALYZE aqo_test1;
92+
SELECT count(*) FROM aqo_queries;
93+
count
94+
-------
95+
1
96+
(1 row)
97+
98+
EXPLAIN SELECT * FROM aqo_test0
99+
WHERE a < 3 AND b < 3 AND c < 3 AND d < 3;
100+
QUERY PLAN
101+
----------------------------------------------------------------------------------
102+
Index Scan using aqo_test0_idx_a on aqo_test0 (cost=0.28..8.35 rows=1 width=16)
103+
Index Cond: (a < 3)
104+
Filter: ((b < 3) AND (c < 3) AND (d < 3))
105+
(3 rows)
106+
107+
EXPLAIN SELECT * FROM aqo_test1
108+
WHERE a < 4 AND b < 4;
109+
QUERY PLAN
110+
---------------------------------------------------------
111+
Seq Scan on aqo_test1 (cost=0.00..1.30 rows=1 width=8)
112+
Filter: ((a < 4) AND (b < 4))
113+
(2 rows)
114+
115+
-- Test3: delete 2 out of 2 entries
116+
SELECT count(*) FROM aqo_queries;
117+
count
118+
-------
119+
3
120+
(1 row)
121+
122+
DROP TABLE aqo_test0;
123+
DROP TABLE aqo_test1;
124+
SELECT count(*) FROM aqo_queries;
125+
count
126+
-------
127+
1
128+
(1 row)
129+
130+
SET aqo.cleanup_bgworker = 'false';
131+
DROP EXTENSION aqo;

expected/relocatable.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,9 @@ ORDER BY (learn_aqo, use_aqo, auto_tuning);
114114
t | t | f
115115
(3 rows)
116116

117-
RESET search_path;
118117
DROP TABLE test CASCADE;
119118
DROP SCHEMA IF EXISTS test CASCADE;
120119
NOTICE: drop cascades to extension aqo
121120
DROP EXTENSION IF EXISTS aqo CASCADE;
122121
NOTICE: extension "aqo" does not exist, skipping
122+
RESET search_path;

regress_schedule

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ test: top_queries
2020
test: relocatable
2121
test: look_a_like
2222
test: feature_subspace
23+
test: cleanup_bgworker

0 commit comments

Comments
 (0)