Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f9c842e

Browse files
committed
make concurrent part worker more reliable, new tests
1 parent fda1b36 commit f9c842e

File tree

4 files changed

+197
-51
lines changed

4 files changed

+197
-51
lines changed

expected/pathman_bgw.out

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ SELECT set_init_callback('test_bgw.test_5', 'test_bgw.abort_xact(jsonb)');
132132
(1 row)
133133

134134
INSERT INTO test_bgw.test_5 VALUES (-100);
135-
ERROR: Attempt to spawn new partitions of relation "test_5" failed
135+
ERROR: attempt to spawn new partitions of relation "test_5" failed
136136
SELECT * FROM pathman_partition_list ORDER BY partition; /* should contain 3 partitions */
137137
parent | partition | parttype | expr | range_min | range_max
138138
-----------------+-------------------+----------+------+-----------+-----------
@@ -143,5 +143,80 @@ SELECT * FROM pathman_partition_list ORDER BY partition; /* should contain 3 par
143143
DROP FUNCTION test_bgw.abort_xact(args JSONB);
144144
DROP TABLE test_bgw.test_5 CASCADE;
145145
NOTICE: drop cascades to 3 other objects
146+
/*
147+
* Tests for ConcurrentPartWorker
148+
*/
149+
CREATE TABLE test_bgw.conc_part(id INT4 NOT NULL);
150+
INSERT INTO test_bgw.conc_part SELECT generate_series(1, 500);
151+
SELECT create_hash_partitions('test_bgw.conc_part', 'id', 5, false);
152+
create_hash_partitions
153+
------------------------
154+
5
155+
(1 row)
156+
157+
BEGIN;
158+
/* Also test FOR SHARE/UPDATE conflicts in BGW */
159+
SELECT * FROM test_bgw.conc_part ORDER BY id LIMIT 1 FOR SHARE;
160+
id
161+
----
162+
1
163+
(1 row)
164+
165+
/* Run partitioning bgworker */
166+
SELECT partition_table_concurrently('test_bgw.conc_part', 10, 1);
167+
NOTICE: worker started, you can stop it with the following command: select public.stop_concurrent_part_task('conc_part');
168+
partition_table_concurrently
169+
------------------------------
170+
171+
(1 row)
172+
173+
/* Wait until bgworker starts */
174+
SELECT pg_sleep(1);
175+
pg_sleep
176+
----------
177+
178+
(1 row)
179+
180+
ROLLBACK;
181+
/* Wait until it finises */
182+
DO $$
183+
DECLARE
184+
ops int8;
185+
BEGIN
186+
LOOP
187+
SELECT count(*)
188+
FROM pathman_concurrent_part_tasks
189+
WHERE processed < 500 -- protect from endless loops
190+
INTO ops;
191+
192+
IF ops > 0 THEN
193+
PERFORM pg_sleep(0.2);
194+
ELSE
195+
EXIT;
196+
END IF;
197+
END LOOP;
198+
END
199+
$$ LANGUAGE plpgsql;
200+
/* Check amount of tasks and rows in parent and partitions */
201+
SELECT count(*) FROM pathman_concurrent_part_tasks;
202+
count
203+
-------
204+
0
205+
(1 row)
206+
207+
SELECT count(*) FROM ONLY test_bgw.conc_part;
208+
count
209+
-------
210+
0
211+
(1 row)
212+
213+
SELECT count(*) FROM test_bgw.conc_part;
214+
count
215+
-------
216+
500
217+
(1 row)
218+
219+
DROP TABLE test_bgw.conc_part CASCADE;
220+
NOTICE: drop cascades to 5 other objects
146221
DROP SCHEMA test_bgw CASCADE;
147222
DROP EXTENSION pg_pathman;

sql/pathman_bgw.sql

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CREATE EXTENSION pg_pathman;
55
CREATE SCHEMA test_bgw;
66

77

8+
89
/*
910
* Tests for SpawnPartitionsWorker
1011
*/
@@ -74,5 +75,51 @@ DROP TABLE test_bgw.test_5 CASCADE;
7475

7576

7677

78+
/*
79+
* Tests for ConcurrentPartWorker
80+
*/
81+
82+
CREATE TABLE test_bgw.conc_part(id INT4 NOT NULL);
83+
INSERT INTO test_bgw.conc_part SELECT generate_series(1, 500);
84+
SELECT create_hash_partitions('test_bgw.conc_part', 'id', 5, false);
85+
86+
BEGIN;
87+
/* Also test FOR SHARE/UPDATE conflicts in BGW */
88+
SELECT * FROM test_bgw.conc_part ORDER BY id LIMIT 1 FOR SHARE;
89+
/* Run partitioning bgworker */
90+
SELECT partition_table_concurrently('test_bgw.conc_part', 10, 1);
91+
/* Wait until bgworker starts */
92+
SELECT pg_sleep(1);
93+
ROLLBACK;
94+
95+
/* Wait until it finises */
96+
DO $$
97+
DECLARE
98+
ops int8;
99+
BEGIN
100+
LOOP
101+
SELECT count(*)
102+
FROM pathman_concurrent_part_tasks
103+
WHERE processed < 500 -- protect from endless loops
104+
INTO ops;
105+
106+
IF ops > 0 THEN
107+
PERFORM pg_sleep(0.2);
108+
ELSE
109+
EXIT;
110+
END IF;
111+
END LOOP;
112+
END
113+
$$ LANGUAGE plpgsql;
114+
115+
/* Check amount of tasks and rows in parent and partitions */
116+
SELECT count(*) FROM pathman_concurrent_part_tasks;
117+
SELECT count(*) FROM ONLY test_bgw.conc_part;
118+
SELECT count(*) FROM test_bgw.conc_part;
119+
120+
DROP TABLE test_bgw.conc_part CASCADE;
121+
122+
123+
77124
DROP SCHEMA test_bgw CASCADE;
78125
DROP EXTENSION pg_pathman;

src/include/pathman_workers.h

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,29 @@ cps_set_status(ConcurrentPartSlot *slot, ConcurrentPartSlotStatus status)
112112
SpinLockRelease(&slot->mutex);
113113
}
114114

115+
static inline const char *
116+
cps_print_status(ConcurrentPartSlotStatus status)
117+
{
118+
switch(status)
119+
{
120+
case CPS_FREE:
121+
return "free";
122+
123+
case CPS_WORKING:
124+
return "working";
125+
126+
case CPS_STOPPING:
127+
return "stopping";
128+
129+
default:
130+
return "[unknown]";
131+
}
132+
}
133+
115134

116135

117136
/* Number of worker slots for concurrent partitioning */
118-
#define PART_WORKER_SLOTS 10
137+
#define PART_WORKER_SLOTS max_worker_processes
119138

120139
/* Max number of attempts per batch */
121140
#define PART_WORKER_MAX_ATTEMPTS 60

0 commit comments

Comments
 (0)