Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7e174fa

Browse files
committed
Only kill sync workers at commit time in subscription DDL
This allows a transaction abort to avoid killing those workers. Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
1 parent ff98a5e commit 7e174fa

File tree

5 files changed

+117
-6
lines changed

5 files changed

+117
-6
lines changed

src/backend/access/transam/xact.c

+9
Original file line numberDiff line numberDiff line change
@@ -2277,6 +2277,15 @@ PrepareTransaction(void)
22772277
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
22782278
errmsg("cannot PREPARE a transaction that has exported snapshots")));
22792279

2280+
/*
2281+
* Don't allow PREPARE but for transaction that has/might kill logical
2282+
* replication workers.
2283+
*/
2284+
if (XactManipulatesLogicalReplicationWorkers())
2285+
ereport(ERROR,
2286+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2287+
errmsg("cannot PREPARE a transaction that has manipulated logical replication workers")));
2288+
22802289
/* Prevent cancel/die interrupt while cleaning up */
22812290
HOLD_INTERRUPTS();
22822291

src/backend/commands/subscriptioncmds.c

+24-4
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
597597

598598
RemoveSubscriptionRel(sub->oid, relid);
599599

600-
logicalrep_worker_stop(sub->oid, relid);
600+
logicalrep_worker_stop_at_commit(sub->oid, relid);
601601

602602
namespace = get_namespace_name(get_rel_namespace(relid));
603603
ereport(NOTICE,
@@ -819,6 +819,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
819819
char *subname;
820820
char *conninfo;
821821
char *slotname;
822+
List *subworkers;
823+
ListCell *lc;
822824
char originname[NAMEDATALEN];
823825
char *err = NULL;
824826
RepOriginId originid;
@@ -909,15 +911,33 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
909911

910912
ReleaseSysCache(tup);
911913

914+
/*
915+
* If we are dropping the replication slot, stop all the subscription
916+
* workers immediately, so that the slot becomes accessible. Otherwise
917+
* just schedule the stopping for the end of the transaction.
918+
*
919+
* New workers won't be started because we hold an exclusive lock on the
920+
* subscription till the end of the transaction.
921+
*/
922+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
923+
subworkers = logicalrep_workers_find(subid, false);
924+
LWLockRelease(LogicalRepWorkerLock);
925+
foreach (lc, subworkers)
926+
{
927+
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
928+
if (slotname)
929+
logicalrep_worker_stop(w->subid, w->relid);
930+
else
931+
logicalrep_worker_stop_at_commit(w->subid, w->relid);
932+
}
933+
list_free(subworkers);
934+
912935
/* Clean up dependencies */
913936
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
914937

915938
/* Remove any associated relation synchronization states. */
916939
RemoveSubscriptionRel(subid, InvalidOid);
917940

918-
/* Kill the apply worker so that the slot becomes accessible. */
919-
logicalrep_worker_stop(subid, InvalidOid);
920-
921941
/* Remove the origin tracking if exists. */
922942
snprintf(originname, sizeof(originname), "pg_%u", subid);
923943
originid = replorigin_by_name(originname, true);

src/backend/replication/logical/launcher.c

+81-2
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct
7373

7474
LogicalRepCtxStruct *LogicalRepCtx;
7575

76+
typedef struct LogicalRepWorkerId
77+
{
78+
Oid subid;
79+
Oid relid;
80+
} LogicalRepWorkerId;
81+
82+
static List *on_commit_stop_workers = NIL;
83+
7684
static void ApplyLauncherWakeup(void);
7785
static void logicalrep_launcher_onexit(int code, Datum arg);
7886
static void logicalrep_worker_onexit(int code, Datum arg);
@@ -249,6 +257,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
249257
return res;
250258
}
251259

260+
/*
261+
* Similar to logicalrep_worker_find(), but returns list of all workers for
262+
* the subscription, instead just one.
263+
*/
264+
List *
265+
logicalrep_workers_find(Oid subid, bool only_running)
266+
{
267+
int i;
268+
List *res = NIL;
269+
270+
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
271+
272+
/* Search for attached worker for a given subscription id. */
273+
for (i = 0; i < max_logical_replication_workers; i++)
274+
{
275+
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
276+
277+
if (w->in_use && w->subid == subid && (!only_running || w->proc))
278+
res = lappend(res, w);
279+
}
280+
281+
return res;
282+
}
283+
252284
/*
253285
* Start new apply background worker.
254286
*/
@@ -513,6 +545,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
513545
LWLockRelease(LogicalRepWorkerLock);
514546
}
515547

548+
/*
549+
* Request worker for specified sub/rel to be stopped on commit.
550+
*/
551+
void
552+
logicalrep_worker_stop_at_commit(Oid subid, Oid relid)
553+
{
554+
LogicalRepWorkerId *wid;
555+
MemoryContext oldctx;
556+
557+
/* Make sure we store the info in context that survives until commit. */
558+
oldctx = MemoryContextSwitchTo(TopTransactionContext);
559+
560+
wid = palloc(sizeof(LogicalRepWorkerId));
561+
wid->subid = subid;
562+
wid->relid = relid;
563+
564+
on_commit_stop_workers = lappend(on_commit_stop_workers, wid);
565+
566+
MemoryContextSwitchTo(oldctx);
567+
}
568+
516569
/*
517570
* Wake up (using latch) any logical replication worker for specified sub/rel.
518571
*/
@@ -753,15 +806,41 @@ ApplyLauncherShmemInit(void)
753806
}
754807
}
755808

809+
/*
810+
* Check whether current transaction has manipulated logical replication
811+
* workers.
812+
*/
813+
bool
814+
XactManipulatesLogicalReplicationWorkers(void)
815+
{
816+
return (on_commit_stop_workers != NIL);
817+
}
818+
756819
/*
757820
* Wakeup the launcher on commit if requested.
758821
*/
759822
void
760823
AtEOXact_ApplyLauncher(bool isCommit)
761824
{
762-
if (isCommit && on_commit_launcher_wakeup)
763-
ApplyLauncherWakeup();
825+
if (isCommit)
826+
{
827+
ListCell *lc;
764828

829+
foreach (lc, on_commit_stop_workers)
830+
{
831+
LogicalRepWorkerId *wid = lfirst(lc);
832+
logicalrep_worker_stop(wid->subid, wid->relid);
833+
}
834+
835+
if (on_commit_launcher_wakeup)
836+
ApplyLauncherWakeup();
837+
}
838+
839+
/*
840+
* No need to pfree on_commit_stop_workers. It was allocated in
841+
* transaction memory context, which is going to be cleaned soon.
842+
*/
843+
on_commit_stop_workers = NIL;
765844
on_commit_launcher_wakeup = false;
766845
}
767846

src/include/replication/logicallauncher.h

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ extern Size ApplyLauncherShmemSize(void);
2222
extern void ApplyLauncherShmemInit(void);
2323

2424
extern void ApplyLauncherWakeupAtCommit(void);
25+
extern bool XactManipulatesLogicalReplicationWorkers(void);
2526
extern void AtEOXact_ApplyLauncher(bool isCommit);
2627

2728
extern bool IsLogicalLauncher(void);

src/include/replication/worker_internal.h

+2
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,11 @@ extern bool in_remote_transaction;
7171
extern void logicalrep_worker_attach(int slot);
7272
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
7373
bool only_running);
74+
extern List *logicalrep_workers_find(Oid subid, bool only_running);
7475
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
7576
Oid userid, Oid relid);
7677
extern void logicalrep_worker_stop(Oid subid, Oid relid);
78+
extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
7779
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
7880
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
7981

0 commit comments

Comments
 (0)