Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 28afff3

Browse files
committed
Preserve required !catalog tuples while computing initial decoding snapshot.
The logical decoding machinery already preserved all the required catalog tuples, which is sufficient in the course of normal logical decoding, but did not guarantee that non-catalog tuples were preserved during computation of the initial snapshot when creating a slot over the replication protocol. This could cause a corrupted initial snapshot being exported. The time window for issues is usually not terribly large, but on a busy server it's perfectly possible to it hit it. Ongoing decoding is not affected by this bug. To avoid increased overhead for the SQL API, only retain additional tuples when a logical slot is being created over the replication protocol. To do so this commit changes the signature of CreateInitDecodingContext(), but it seems unlikely that it's being used in an extension, so that's probably ok. In a drive-by fix, fix handling of ReplicationSlotsComputeRequiredXmin's already_locked argument, which should only apply to ProcArrayLock, not ReplicationSlotControlLock. Reported-By: Erik Rijkers Analyzed-By: Petr Jelinek Author: Petr Jelinek, heavily editorialized by Andres Freund Reviewed-By: Andres Freund Discussion: https://postgr.es/m/9a897b86-46e1-9915-ee4c-da02e4ff6a95@2ndquadrant.com Backport: 9.4, where logical decoding was introduced.
1 parent 866452c commit 28afff3

File tree

8 files changed

+66
-18
lines changed

8 files changed

+66
-18
lines changed

src/backend/replication/logical/logical.c

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ StartupDecodingContext(List *output_plugin_options,
210210
LogicalDecodingContext *
211211
CreateInitDecodingContext(char *plugin,
212212
List *output_plugin_options,
213+
bool need_full_snapshot,
213214
XLogPageReadCB read_page,
214215
LogicalOutputPluginWriterPrepareWrite prepare_write,
215216
LogicalOutputPluginWriterWrite do_write)
@@ -267,23 +268,31 @@ CreateInitDecodingContext(char *plugin,
267268
* the slot machinery about the new limit. Once that's done the
268269
* ProcArrayLock can be released as the slot machinery now is
269270
* protecting against vacuum.
271+
*
272+
* Note that, temporarily, the data, not just the catalog, xmin has to be
273+
* reserved if a data snapshot is to be exported. Otherwise the initial
274+
* data snapshot created here is not guaranteed to be valid. After that
275+
* the data xmin doesn't need to be managed anymore and the global xmin
276+
* should be recomputed. As we are fine with losing the pegged data xmin
277+
* after crash - no chance a snapshot would get exported anymore - we can
278+
* get away with just setting the slot's
279+
* effective_xmin. ReplicationSlotRelease will reset it again.
280+
*
270281
* ----
271282
*/
272283
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
273284

274-
slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
275-
slot->data.catalog_xmin = slot->effective_catalog_xmin;
285+
xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot);
286+
287+
slot->effective_catalog_xmin = xmin_horizon;
288+
slot->data.catalog_xmin = xmin_horizon;
289+
if (need_full_snapshot)
290+
slot->effective_xmin = xmin_horizon;
276291

277292
ReplicationSlotsComputeRequiredXmin(true);
278293

279294
LWLockRelease(ProcArrayLock);
280295

281-
/*
282-
* tell the snapshot builder to only assemble snapshot once reaching the
283-
* running_xact's record with the respective xmin.
284-
*/
285-
xmin_horizon = slot->data.catalog_xmin;
286-
287296
ReplicationSlotMarkDirty();
288297
ReplicationSlotSave();
289298

src/backend/replication/logical/snapbuild.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,18 @@ SnapBuildExportSnapshot(SnapBuild *builder)
551551
* mechanism. Due to that we can do this without locks, we're only
552552
* changing our own value.
553553
*/
554+
#ifdef USE_ASSERT_CHECKING
555+
{
556+
TransactionId safeXid;
557+
558+
LWLockAcquire(ProcArrayLock, LW_SHARED);
559+
safeXid = GetOldestSafeDecodingTransactionId(true);
560+
LWLockRelease(ProcArrayLock);
561+
562+
Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
563+
}
564+
#endif
565+
554566
MyPgXact->xmin = snap->xmin;
555567

556568
/* allocate in transaction context */

src/backend/replication/slot.c

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,22 @@ ReplicationSlotRelease(void)
397397
SpinLockRelease(&slot->mutex);
398398
}
399399

400+
401+
/*
402+
* If slot needed to temporarily restrain both data and catalog xmin to
403+
* create the catalog snapshot, remove that temporary constraint.
404+
* Snapshots can only be exported while the initial snapshot is still
405+
* acquired.
406+
*/
407+
if (!TransactionIdIsValid(slot->data.xmin) &&
408+
TransactionIdIsValid(slot->effective_xmin))
409+
{
410+
SpinLockAcquire(&slot->mutex);
411+
slot->effective_xmin = InvalidTransactionId;
412+
SpinLockRelease(&slot->mutex);
413+
ReplicationSlotsComputeRequiredXmin(false);
414+
}
415+
400416
MyReplicationSlot = NULL;
401417

402418
/* might not have been set when we've been a plain slot */
@@ -574,6 +590,9 @@ ReplicationSlotPersist(void)
574590

575591
/*
576592
* Compute the oldest xmin across all slots and store it in the ProcArray.
593+
*
594+
* If already_locked is true, ProcArrayLock has already been acquired
595+
* exclusively.
577596
*/
578597
void
579598
ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -584,8 +603,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
584603

585604
Assert(ReplicationSlotCtl != NULL);
586605

587-
if (!already_locked)
588-
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
606+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
589607

590608
for (i = 0; i < max_replication_slots; i++)
591609
{
@@ -614,8 +632,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
614632
agg_catalog_xmin = effective_catalog_xmin;
615633
}
616634

617-
if (!already_locked)
618-
LWLockRelease(ReplicationSlotControlLock);
635+
LWLockRelease(ReplicationSlotControlLock);
619636

620637
ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
621638
}

src/backend/replication/slotfuncs.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
125125
/*
126126
* Create logical decoding context, to build the initial snapshot.
127127
*/
128-
ctx = CreateInitDecodingContext(
129-
NameStr(*plugin), NIL,
128+
ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
129+
false, /* do not build snapshot */
130130
logical_read_local_xlog_page, NULL, NULL);
131131

132132
/* build initial snapshot, might take a while */

src/backend/replication/walsender.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
813813
LogicalDecodingContext *ctx;
814814

815815
ctx = CreateInitDecodingContext(cmd->plugin, NIL,
816+
true, /* build snapshot */
816817
logical_read_xlog_page,
817818
WalSndPrepareWrite, WalSndWriteData);
818819

src/backend/storage/ipc/procarray.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2153,7 +2153,7 @@ GetOldestActiveTransactionId(void)
21532153
* that the caller will immediately use the xid to peg the xmin horizon.
21542154
*/
21552155
TransactionId
2156-
GetOldestSafeDecodingTransactionId(void)
2156+
GetOldestSafeDecodingTransactionId(bool catalogOnly)
21572157
{
21582158
ProcArrayStruct *arrayP = procArray;
21592159
TransactionId oldestSafeXid;
@@ -2176,9 +2176,17 @@ GetOldestSafeDecodingTransactionId(void)
21762176
/*
21772177
* If there's already a slot pegging the xmin horizon, we can start with
21782178
* that value, it's guaranteed to be safe since it's computed by this
2179-
* routine initially and has been enforced since.
2179+
* routine initially and has been enforced since. We can always use the
2180+
* slot's general xmin horizon, but the catalog horizon is only usable
2181+
* when we only catalog data is going to be looked at.
21802182
*/
2181-
if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
2183+
if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
2184+
TransactionIdPrecedes(procArray->replication_slot_xmin,
2185+
oldestSafeXid))
2186+
oldestSafeXid = procArray->replication_slot_xmin;
2187+
2188+
if (catalogOnly &&
2189+
TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
21822190
TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
21832191
oldestSafeXid))
21842192
oldestSafeXid = procArray->replication_slot_catalog_xmin;

src/include/replication/logical.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ extern void CheckLogicalDecodingRequirements(void);
7979

8080
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
8181
List *output_plugin_options,
82+
bool need_full_snapshot,
8283
XLogPageReadCB read_page,
8384
LogicalOutputPluginWriterPrepareWrite prepare_write,
8485
LogicalOutputPluginWriterWrite do_write);

src/include/storage/procarray.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid);
5555
extern bool TransactionIdIsActive(TransactionId xid);
5656
extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
5757
extern TransactionId GetOldestActiveTransactionId(void);
58-
extern TransactionId GetOldestSafeDecodingTransactionId(void);
58+
extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
5959

6060
extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
6161
extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);

0 commit comments

Comments
 (0)