Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b4c56f5

Browse files
committed
Add functions for explicit controling merge operations
1 parent 76734a7 commit b4c56f5

File tree

5 files changed

+189
-7
lines changed

5 files changed

+189
-7
lines changed

expected/test.out

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,110 @@
11
create extension lsm3;
22
create table t(k bigint, val bigint);
33
create index lsm3_index on t using lsm3(k);
4+
set enable_seqscan=off;
5+
insert into t values (1,10);
6+
select lsm3_start_merge('lsm3_index');
7+
lsm3_start_merge
8+
------------------
9+
10+
(1 row)
11+
12+
select lsm3_wait_merge_completion('lsm3_index');
13+
lsm3_wait_merge_completion
14+
----------------------------
15+
16+
(1 row)
17+
18+
insert into t values (2,20);
19+
select lsm3_start_merge('lsm3_index');
20+
lsm3_start_merge
21+
------------------
22+
23+
(1 row)
24+
25+
select lsm3_wait_merge_completion('lsm3_index');
26+
lsm3_wait_merge_completion
27+
----------------------------
28+
29+
(1 row)
30+
31+
insert into t values (3,30);
32+
select lsm3_start_merge('lsm3_index');
33+
lsm3_start_merge
34+
------------------
35+
36+
(1 row)
37+
38+
select lsm3_wait_merge_completion('lsm3_index');
39+
lsm3_wait_merge_completion
40+
----------------------------
41+
42+
(1 row)
43+
44+
insert into t values (4,40);
45+
select lsm3_start_merge('lsm3_index');
46+
lsm3_start_merge
47+
------------------
48+
49+
(1 row)
50+
51+
select lsm3_wait_merge_completion('lsm3_index');
52+
lsm3_wait_merge_completion
53+
----------------------------
54+
55+
(1 row)
56+
57+
insert into t values (5,50);
58+
select lsm3_start_merge('lsm3_index');
59+
lsm3_start_merge
60+
------------------
61+
62+
(1 row)
63+
64+
select lsm3_wait_merge_completion('lsm3_index');
65+
lsm3_wait_merge_completion
66+
----------------------------
67+
68+
(1 row)
69+
70+
select lsm3_get_merge_count('lsm3_index');
71+
lsm3_get_merge_count
72+
----------------------
73+
5
74+
(1 row)
75+
76+
select * from t where k = 1;
77+
k | val
78+
---+-----
79+
1 | 10
80+
(1 row)
81+
82+
select * from t order by k;
83+
k | val
84+
---+-----
85+
1 | 10
86+
2 | 20
87+
3 | 30
88+
4 | 40
89+
5 | 50
90+
(5 rows)
91+
92+
select * from t order by k desc;
93+
k | val
94+
---+-----
95+
5 | 50
96+
4 | 40
97+
3 | 30
98+
2 | 20
99+
1 | 10
100+
(5 rows)
101+
102+
explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t order by k;
103+
QUERY PLAN
104+
----------------------------------
105+
Index Scan using lsm3_index on t
106+
(1 row)
107+
4108
insert into t values (generate_series(1,100000), 1);
5109
insert into t values (generate_series(1000001,200000), 2);
6110
insert into t values (generate_series(2000001,300000), 3);
@@ -10,9 +114,10 @@ insert into t values (generate_series(2000001,300000), 3);
10114
select * from t where k = 1;
11115
k | val
12116
---+-----
117+
1 | 10
13118
1 | 1
14119
1 | 1
15-
(2 rows)
120+
(3 rows)
16121

17122
select * from t where k = 1000000;
18123
k | val
@@ -36,7 +141,7 @@ explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t where k = 1;
36141
Index Cond: (k = 1)
37142
(2 rows)
38143

39-
select lsm3_get_merge_count('lsm3_index') > 0;
144+
select lsm3_get_merge_count('lsm3_index') > 5;
40145
?column?
41146
----------
42147
t

lsm3--1.0.sql

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,3 +789,11 @@ CREATE OPERATOR CLASS uuid_ops DEFAULT
789789
-- Number of index merges since server start
790790
CREATE FUNCTION lsm3_get_merge_count(index regclass) returns bigint
791791
AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED;
792+
793+
-- Force merge of top index.
794+
CREATE FUNCTION lsm3_start_merge(index regclass) returns void
795+
AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED;
796+
797+
-- Wait merge completion
798+
CREATE FUNCTION lsm3_wait_merge_completion(index regclass) returns void
799+
AS 'MODULE_PATHNAME' LANGUAGE C STRICT PARALLEL RESTRICTED;

lsm3.c

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ PG_MODULE_MAGIC;
3737
PG_FUNCTION_INFO_V1(lsm3_handler);
3838
PG_FUNCTION_INFO_V1(lsm3_btree_wrapper);
3939
PG_FUNCTION_INFO_V1(lsm3_get_merge_count);
40+
PG_FUNCTION_INFO_V1(lsm3_start_merge);
41+
PG_FUNCTION_INFO_V1(lsm3_wait_merge_completion);
4042

4143
extern void _PG_init(void);
4244
extern void _PG_fini(void);
@@ -174,9 +176,9 @@ lsm3_launch_bgworker(Lsm3DictEntry* entry)
174176
elog(ERROR, "Lsm3: startup of background worker is failed");
175177
}
176178
entry->merger = BackendPidGetProc(bgw_pid);
177-
for (int n_attempts = 0; entry->merger == NULL || n_attempts < 10; n_attempts++)
179+
for (int n_attempts = 0; entry->merger == NULL || n_attempts < 100; n_attempts++)
178180
{
179-
pg_usleep(1000); /* wait background worker to be registered in procarray */
181+
pg_usleep(10000); /* wait background worker to be registered in procarray */
180182
entry->merger = BackendPidGetProc(bgw_pid);
181183
}
182184
if (entry->merger == NULL)
@@ -962,3 +964,46 @@ lsm3_get_merge_count(PG_FUNCTION_ARGS)
962964
PG_RETURN_INT64(entry->n_merges);
963965
}
964966

967+
968+
Datum
969+
lsm3_start_merge(PG_FUNCTION_ARGS)
970+
{
971+
Oid relid = PG_GETARG_OID(0);
972+
Relation index = index_open(relid, AccessShareLock);
973+
Lsm3DictEntry* entry = lsm3_get_entry(index);
974+
index_close(index, AccessShareLock);
975+
976+
SpinLockAcquire(&entry->spinlock);
977+
if (!entry->merge_in_progress)
978+
{
979+
entry->merge_in_progress = true;
980+
entry->active_index ^= 1;
981+
entry->n_merges += 1;
982+
if (entry->access_count[1-entry->active_index] == 0)
983+
{
984+
entry->start_merge = true;
985+
if (entry->merger == NULL) /* lazy start of bgworker */
986+
{
987+
lsm3_launch_bgworker(entry);
988+
}
989+
SetLatch(&entry->merger->procLatch);
990+
}
991+
}
992+
SpinLockRelease(&entry->spinlock);
993+
PG_RETURN_NULL();
994+
}
995+
996+
Datum
997+
lsm3_wait_merge_completion(PG_FUNCTION_ARGS)
998+
{
999+
Oid relid = PG_GETARG_OID(0);
1000+
Relation index = index_open(relid, AccessShareLock);
1001+
Lsm3DictEntry* entry = lsm3_get_entry(index);
1002+
index_close(index, AccessShareLock);
1003+
1004+
while (entry->merge_in_progress)
1005+
{
1006+
pg_usleep(1000000); /* one second */
1007+
}
1008+
PG_RETURN_NULL();
1009+
}

lsm3.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ typedef struct
1717
int active_index; /* Index used for insert */
1818
uint64 n_merges; /* Number of performed merges since database open */
1919
uint64 n_inserts; /* Number of performed inserts since database open */
20-
bool start_merge; /* Start merging of top index with base index */
21-
bool merge_in_progress; /* Overflow of top index intiate merge process */
20+
volatile bool start_merge; /* Start merging of top index with base index */
21+
volatile bool merge_in_progress; /* Overflow of top index intiate merge process */
2222
PGPROC* merger; /* Merger background worker */
2323
Oid db_id; /* user ID (for background worker) */
2424
Oid user_id; /* database Id (for background worker) */

sql/test.sql

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,30 @@ create extension lsm3;
22

33
create table t(k bigint, val bigint);
44
create index lsm3_index on t using lsm3(k);
5+
6+
set enable_seqscan=off;
7+
8+
insert into t values (1,10);
9+
select lsm3_start_merge('lsm3_index');
10+
select lsm3_wait_merge_completion('lsm3_index');
11+
insert into t values (2,20);
12+
select lsm3_start_merge('lsm3_index');
13+
select lsm3_wait_merge_completion('lsm3_index');
14+
insert into t values (3,30);
15+
select lsm3_start_merge('lsm3_index');
16+
select lsm3_wait_merge_completion('lsm3_index');
17+
insert into t values (4,40);
18+
select lsm3_start_merge('lsm3_index');
19+
select lsm3_wait_merge_completion('lsm3_index');
20+
insert into t values (5,50);
21+
select lsm3_start_merge('lsm3_index');
22+
select lsm3_wait_merge_completion('lsm3_index');
23+
select lsm3_get_merge_count('lsm3_index');
24+
select * from t where k = 1;
25+
select * from t order by k;
26+
select * from t order by k desc;
27+
explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t order by k;
28+
529
insert into t values (generate_series(1,100000), 1);
630
insert into t values (generate_series(1000001,200000), 2);
731
insert into t values (generate_series(2000001,300000), 3);
@@ -13,5 +37,5 @@ select * from t where k = 1000000;
1337
select * from t where k = 2000000;
1438
select * from t where k = 3000000;
1539
explain (COSTS OFF, TIMING OFF, SUMMARY OFF) select * from t where k = 1;
16-
select lsm3_get_merge_count('lsm3_index') > 0;
40+
select lsm3_get_merge_count('lsm3_index') > 5;
1741
drop table t;

0 commit comments

Comments
 (0)