Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 29e8c88

Browse files
committed
Don't use on-disk snapshots for exported logical decoding snapshot.
Logical decoding stores historical snapshots on disk, so that logical decoding can restart without having to reconstruct a snapshot from scratch (for which the resources are not guaranteed to be present anymore). These serialized snapshots were also used when creating a new slot via the walsender interface, which can export a "full" snapshot (i.e. one that can read all tables, not just catalog ones). The problem is that the serialized snapshots are only useful for catalogs and not for normal user tables. Thus the use of such a serialized snapshot could result in an inconsistent snapshot being exported, which could lead to queries returning wrong data. This would only happen if logical slots are created while another logical slot already exists. Author: Petr Jelinek Reviewed-By: Andres Freund Discussion: https://postgr.es/m/f37e975c-908f-858e-707f-058d3b1eb214@2ndquadrant.com Backport: 9.4, where logical decoding was introduced.
1 parent 53b1578 commit 29e8c88

File tree

3 files changed

+22
-9
lines changed

3 files changed

+22
-9
lines changed

src/backend/replication/logical/logical.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ static LogicalDecodingContext *
114114
StartupDecodingContext(List *output_plugin_options,
115115
XLogRecPtr start_lsn,
116116
TransactionId xmin_horizon,
117+
bool need_full_snapshot,
117118
XLogPageReadCB read_page,
118119
LogicalOutputPluginWriterPrepareWrite prepare_write,
119120
LogicalOutputPluginWriterWrite do_write)
@@ -171,7 +172,8 @@ StartupDecodingContext(List *output_plugin_options,
171172

172173
ctx->reorder = ReorderBufferAllocate();
173174
ctx->snapshot_builder =
174-
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn);
175+
AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
176+
need_full_snapshot);
175177

176178
ctx->reorder->private_data = ctx;
177179

@@ -297,7 +299,8 @@ CreateInitDecodingContext(char *plugin,
297299
ReplicationSlotSave();
298300

299301
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
300-
read_page, prepare_write, do_write);
302+
need_full_snapshot, read_page, prepare_write,
303+
do_write);
301304

302305
/* call output plugin initialization callback */
303306
old_context = MemoryContextSwitchTo(ctx->context);
@@ -386,7 +389,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
386389
}
387390

388391
ctx = StartupDecodingContext(output_plugin_options,
389-
start_lsn, InvalidTransactionId,
392+
start_lsn, InvalidTransactionId, false,
390393
read_page, prepare_write, do_write);
391394

392395
/* call output plugin initialization callback */

src/backend/replication/logical/snapbuild.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ struct SnapBuild
164164
*/
165165
TransactionId initial_xmin_horizon;
166166

167+
/* Indicates if we are building full snapshot or just catalog one .*/
168+
bool building_full_snapshot;
169+
167170
/*
168171
* Snapshot that's valid to see the catalog state seen at this moment.
169172
*/
@@ -280,7 +283,8 @@ static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
280283
SnapBuild *
281284
AllocateSnapshotBuilder(ReorderBuffer *reorder,
282285
TransactionId xmin_horizon,
283-
XLogRecPtr start_lsn)
286+
XLogRecPtr start_lsn,
287+
bool need_full_snapshot)
284288
{
285289
MemoryContext context;
286290
MemoryContext oldcontext;
@@ -307,6 +311,7 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
307311

308312
builder->initial_xmin_horizon = xmin_horizon;
309313
builder->start_decoding_at = start_lsn;
314+
builder->building_full_snapshot = need_full_snapshot;
310315

311316
MemoryContextSwitchTo(oldcontext);
312317

@@ -1227,7 +1232,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
12271232
*
12281233
* a) There were no running transactions when the xl_running_xacts record
12291234
* was inserted, jump to CONSISTENT immediately. We might find such a
1230-
* state we were waiting for b) and c).
1235+
* state we were waiting for b) or c).
12311236
*
12321237
* b) Wait for all toplevel transactions that were running to end. We
12331238
* simply track the number of in-progress toplevel transactions and
@@ -1242,7 +1247,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
12421247
* at all.
12431248
*
12441249
* c) This (in a previous run) or another decoding slot serialized a
1245-
* snapshot to disk that we can use.
1250+
* snapshot to disk that we can use. Can't use this method for the
1251+
* initial snapshot when slot is being created and needs full snapshot
1252+
* for export or direct use, as that snapshot will only contain catalog
1253+
* modifying transactions.
12461254
* ---
12471255
*/
12481256

@@ -1297,8 +1305,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
12971305

12981306
return false;
12991307
}
1300-
/* c) valid on disk state */
1301-
else if (SnapBuildRestore(builder, lsn))
1308+
/* c) valid on disk state and not building full snapshot */
1309+
else if (!builder->building_full_snapshot &&
1310+
SnapBuildRestore(builder, lsn))
13021311
{
13031312
/* there won't be any state to cleanup */
13041313
return false;

src/include/replication/snapbuild.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ struct xl_running_xacts;
5454
extern void CheckPointSnapBuild(void);
5555

5656
extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *cache,
57-
TransactionId xmin_horizon, XLogRecPtr start_lsn);
57+
TransactionId xmin_horizon, XLogRecPtr start_lsn,
58+
bool need_full_snapshot);
5859
extern void FreeSnapshotBuilder(SnapBuild *cache);
5960

6061
extern void SnapBuildSnapDecRefcount(Snapshot snap);

0 commit comments

Comments
 (0)