@@ -6,69 +6,366 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
6
6
init
7
7
(1 row)
8
8
9
- CREATE TABLE test_prepared1(id int);
10
- CREATE TABLE test_prepared2(id int);
9
+ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc', 'test_decoding');
10
+ ?column?
11
+ ----------
12
+ init
13
+ (1 row)
14
+
15
+ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_2pc_nofilter', 'test_decoding');
16
+ ?column?
17
+ ----------
18
+ init
19
+ (1 row)
20
+
21
+ CREATE TABLE test_prepared1(id integer primary key);
22
+ CREATE TABLE test_prepared2(id integer primary key);
23
+ -- Reused queries
24
+ \set get_no2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'');'
25
+ \set get_with2pc 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'');'
26
+ \set get_with2pc_nofilter 'SELECT data FROM pg_logical_slot_get_changes(''regression_slot_2pc_nofilter'', NULL, NULL, ''include-xids'', ''0'', ''skip-empty-xacts'', ''1'', ''twophase-decoding'', ''1'', ''twophase-decode-with-catalog-changes'', ''1'');'
11
27
-- test simple successful use of a prepared xact
12
28
BEGIN;
13
29
INSERT INTO test_prepared1 VALUES (1);
14
30
PREPARE TRANSACTION 'test_prepared#1';
31
+ :get_no2pc
32
+ data
33
+ ------
34
+ (0 rows)
35
+
36
+ :get_with2pc
37
+ data
38
+ ----------------------------------------------------
39
+ BEGIN
40
+ table public.test_prepared1: INSERT: id[integer]:1
41
+ PREPARE TRANSACTION 'test_prepared#1'
42
+ (3 rows)
43
+
44
+ :get_with2pc_nofilter
45
+ data
46
+ ----------------------------------------------------
47
+ BEGIN
48
+ table public.test_prepared1: INSERT: id[integer]:1
49
+ PREPARE TRANSACTION 'test_prepared#1'
50
+ (3 rows)
51
+
15
52
COMMIT PREPARED 'test_prepared#1';
53
+ :get_no2pc
54
+ data
55
+ ----------------------------------------------------
56
+ BEGIN
57
+ table public.test_prepared1: INSERT: id[integer]:1
58
+ COMMIT
59
+ (3 rows)
60
+
61
+ :get_with2pc
62
+ data
63
+ -----------------------------------
64
+ COMMIT PREPARED 'test_prepared#1'
65
+ (1 row)
66
+
67
+ :get_with2pc_nofilter
68
+ data
69
+ -----------------------------------
70
+ COMMIT PREPARED 'test_prepared#1'
71
+ (1 row)
72
+
16
73
INSERT INTO test_prepared1 VALUES (2);
17
74
-- test abort of a prepared xact
18
75
BEGIN;
19
76
INSERT INTO test_prepared1 VALUES (3);
20
77
PREPARE TRANSACTION 'test_prepared#2';
78
+ :get_no2pc
79
+ data
80
+ ----------------------------------------------------
81
+ BEGIN
82
+ table public.test_prepared1: INSERT: id[integer]:2
83
+ COMMIT
84
+ (3 rows)
85
+
86
+ :get_with2pc
87
+ data
88
+ ----------------------------------------------------
89
+ BEGIN
90
+ table public.test_prepared1: INSERT: id[integer]:2
91
+ COMMIT
92
+ BEGIN
93
+ table public.test_prepared1: INSERT: id[integer]:3
94
+ PREPARE TRANSACTION 'test_prepared#2'
95
+ (6 rows)
96
+
97
+ :get_with2pc_nofilter
98
+ data
99
+ ----------------------------------------------------
100
+ BEGIN
101
+ table public.test_prepared1: INSERT: id[integer]:2
102
+ COMMIT
103
+ BEGIN
104
+ table public.test_prepared1: INSERT: id[integer]:3
105
+ PREPARE TRANSACTION 'test_prepared#2'
106
+ (6 rows)
107
+
21
108
ROLLBACK PREPARED 'test_prepared#2';
109
+ :get_no2pc
110
+ data
111
+ ------
112
+ (0 rows)
113
+
114
+ :get_with2pc
115
+ data
116
+ -------------------------------------
117
+ ROLLBACK PREPARED 'test_prepared#2'
118
+ (1 row)
119
+
120
+ :get_with2pc_nofilter
121
+ data
122
+ -------------------------------------
123
+ ROLLBACK PREPARED 'test_prepared#2'
124
+ (1 row)
125
+
22
126
INSERT INTO test_prepared1 VALUES (4);
23
127
-- test prepared xact containing ddl
24
128
BEGIN;
25
129
INSERT INTO test_prepared1 VALUES (5);
26
130
ALTER TABLE test_prepared1 ADD COLUMN data text;
27
131
INSERT INTO test_prepared1 VALUES (6, 'frakbar');
28
132
PREPARE TRANSACTION 'test_prepared#3';
29
- -- test that we decode correctly while an uncommitted prepared xact
30
- -- with ddl exists.
31
- -- separate table because of the lock from the ALTER
32
- -- this will come before the '5' row above, as this commits before it.
33
- INSERT INTO test_prepared2 VALUES (7);
34
- COMMIT PREPARED 'test_prepared#3';
35
- -- make sure stuff still works
36
- INSERT INTO test_prepared1 VALUES (8);
37
- INSERT INTO test_prepared2 VALUES (9);
38
- -- cleanup
39
- DROP TABLE test_prepared1;
40
- DROP TABLE test_prepared2;
41
- -- show results
42
- SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
133
+ SELECT 'test_prepared_1' AS relation, locktype, mode
134
+ FROM pg_locks
135
+ WHERE locktype = 'relation'
136
+ AND relation = 'test_prepared1'::regclass;
137
+ relation | locktype | mode
138
+ -----------------+----------+---------------------
139
+ test_prepared_1 | relation | RowExclusiveLock
140
+ test_prepared_1 | relation | AccessExclusiveLock
141
+ (2 rows)
142
+
143
+ :get_no2pc
144
+ data
145
+ ----------------------------------------------------
146
+ BEGIN
147
+ table public.test_prepared1: INSERT: id[integer]:4
148
+ COMMIT
149
+ (3 rows)
150
+
151
+ :get_with2pc
152
+ data
153
+ ----------------------------------------------------
154
+ BEGIN
155
+ table public.test_prepared1: INSERT: id[integer]:4
156
+ COMMIT
157
+ (3 rows)
158
+
159
+ :get_with2pc_nofilter
43
160
data
44
161
-------------------------------------------------------------------------
45
162
BEGIN
46
- table public.test_prepared1: INSERT: id[integer]:1
163
+ table public.test_prepared1: INSERT: id[integer]:4
47
164
COMMIT
48
165
BEGIN
49
- table public.test_prepared1: INSERT: id[integer]:2
166
+ table public.test_prepared1: INSERT: id[integer]:5
167
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
168
+ PREPARE TRANSACTION 'test_prepared#3'
169
+ (7 rows)
170
+
171
+ -- Test that we decode correctly while an uncommitted prepared xact
172
+ -- with ddl exists. Our 2pc filter callback will skip decoding of xacts
173
+ -- with catalog changes at PREPARE time, so we don't decode it now.
174
+ --
175
+ -- Use a separate table for the concurrent transaction because the lock from
176
+ -- the ALTER will stop us inserting into the other one.
177
+ --
178
+ -- We should see '7' before '5' in our results since it commits first.
179
+ --
180
+ INSERT INTO test_prepared2 VALUES (7);
181
+ :get_no2pc
182
+ data
183
+ ----------------------------------------------------
184
+ BEGIN
185
+ table public.test_prepared2: INSERT: id[integer]:7
50
186
COMMIT
187
+ (3 rows)
188
+
189
+ :get_with2pc
190
+ data
191
+ ----------------------------------------------------
51
192
BEGIN
52
- table public.test_prepared1 : INSERT: id[integer]:4
193
+ table public.test_prepared2 : INSERT: id[integer]:7
53
194
COMMIT
195
+ (3 rows)
196
+
197
+ :get_with2pc_nofilter
198
+ data
199
+ ----------------------------------------------------
54
200
BEGIN
55
201
table public.test_prepared2: INSERT: id[integer]:7
56
202
COMMIT
203
+ (3 rows)
204
+
205
+ COMMIT PREPARED 'test_prepared#3';
206
+ :get_no2pc
207
+ data
208
+ -------------------------------------------------------------------------
209
+ BEGIN
210
+ table public.test_prepared1: INSERT: id[integer]:5
211
+ table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
212
+ COMMIT
213
+ (4 rows)
214
+
215
+ :get_with2pc
216
+ data
217
+ -------------------------------------------------------------------------
57
218
BEGIN
58
219
table public.test_prepared1: INSERT: id[integer]:5
59
220
table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
60
221
COMMIT
222
+ (4 rows)
223
+
224
+ :get_with2pc_nofilter
225
+ data
226
+ -----------------------------------
227
+ COMMIT PREPARED 'test_prepared#3'
228
+ (1 row)
229
+
230
+ -- make sure stuff still works
231
+ INSERT INTO test_prepared1 VALUES (8);
232
+ INSERT INTO test_prepared2 VALUES (9);
233
+ :get_no2pc
234
+ data
235
+ --------------------------------------------------------------------
236
+ BEGIN
237
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
238
+ COMMIT
239
+ BEGIN
240
+ table public.test_prepared2: INSERT: id[integer]:9
241
+ COMMIT
242
+ (6 rows)
243
+
244
+ :get_with2pc
245
+ data
246
+ --------------------------------------------------------------------
247
+ BEGIN
248
+ table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
249
+ COMMIT
250
+ BEGIN
251
+ table public.test_prepared2: INSERT: id[integer]:9
252
+ COMMIT
253
+ (6 rows)
254
+
255
+ :get_with2pc_nofilter
256
+ data
257
+ --------------------------------------------------------------------
61
258
BEGIN
62
259
table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
63
260
COMMIT
64
261
BEGIN
65
262
table public.test_prepared2: INSERT: id[integer]:9
66
263
COMMIT
67
- (22 rows)
264
+ (6 rows)
265
+
266
+ -- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
267
+ -- logical decoding.
268
+ BEGIN;
269
+ INSERT INTO test_prepared1 VALUES (10, 'othercol');
270
+ CLUSTER test_prepared1 USING test_prepared1_pkey;
271
+ INSERT INTO test_prepared1 VALUES (11, 'othercol2');
272
+ PREPARE TRANSACTION 'test_prepared_lock';
273
+ BEGIN;
274
+ insert into test_prepared2 values (12);
275
+ PREPARE TRANSACTION 'test_prepared_lock2';
276
+ COMMIT PREPARED 'test_prepared_lock2';
277
+ SELECT 'pg_class' AS relation, locktype, mode
278
+ FROM pg_locks
279
+ WHERE locktype = 'relation'
280
+ AND relation = 'pg_class'::regclass;
281
+ relation | locktype | mode
282
+ ----------+----------+------
283
+ (0 rows)
284
+
285
+ -- Shouldn't see anything with 2pc decoding off
286
+ :get_no2pc
287
+ data
288
+ -----------------------------------------------------
289
+ BEGIN
290
+ table public.test_prepared2: INSERT: id[integer]:12
291
+ COMMIT
292
+ (3 rows)
293
+
294
+ -- Shouldn't timeout on 2pc decoding.
295
+ SET statement_timeout = '1s';
296
+ :get_with2pc
297
+ data
298
+ -----------------------------------------------------
299
+ BEGIN
300
+ table public.test_prepared2: INSERT: id[integer]:12
301
+ PREPARE TRANSACTION 'test_prepared_lock2'
302
+ COMMIT PREPARED 'test_prepared_lock2'
303
+ (4 rows)
304
+
305
+ :get_with2pc_nofilter
306
+ data
307
+ ----------------------------------------------------------------------------
308
+ BEGIN
309
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
310
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
311
+ PREPARE TRANSACTION 'test_prepared_lock'
312
+ BEGIN
313
+ table public.test_prepared2: INSERT: id[integer]:12
314
+ PREPARE TRANSACTION 'test_prepared_lock2'
315
+ COMMIT PREPARED 'test_prepared_lock2'
316
+ (8 rows)
317
+
318
+ RESET statement_timeout;
319
+ COMMIT PREPARED 'test_prepared_lock';
320
+ -- Both will work normally after we commit
321
+ :get_no2pc
322
+ data
323
+ ----------------------------------------------------------------------------
324
+ BEGIN
325
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
326
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
327
+ COMMIT
328
+ (4 rows)
329
+
330
+ :get_with2pc
331
+ data
332
+ ----------------------------------------------------------------------------
333
+ BEGIN
334
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
335
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
336
+ COMMIT
337
+ (4 rows)
338
+
339
+ :get_with2pc_nofilter
340
+ data
341
+ --------------------------------------
342
+ COMMIT PREPARED 'test_prepared_lock'
343
+ (1 row)
344
+
345
+ -- cleanup
346
+ DROP TABLE test_prepared1;
347
+ DROP TABLE test_prepared2;
348
+ -- show results
349
+ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
350
+ data
351
+ ------
352
+ (0 rows)
68
353
69
354
SELECT pg_drop_replication_slot('regression_slot');
70
355
pg_drop_replication_slot
71
356
--------------------------
72
357
73
358
(1 row)
74
359
360
+ SELECT pg_drop_replication_slot('regression_slot_2pc');
361
+ pg_drop_replication_slot
362
+ --------------------------
363
+
364
+ (1 row)
365
+
366
+ SELECT pg_drop_replication_slot('regression_slot_2pc_nofilter');
367
+ pg_drop_replication_slot
368
+ --------------------------
369
+
370
+ (1 row)
371
+
0 commit comments