Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 3d144c6

Browse files
author
Amit Kapila
committed
Fix invalid memory access during the shutdown of the parallel apply worker.
The callback function pa_shutdown() accesses MyLogicalRepWorker which may not be initialized if there is an error during the initialization of the parallel apply worker. The other problem is that by the time it is invoked even after the initialization of the worker, the MyLogicalRepWorker will be reset by another callback logicalrep_worker_onexit. So, it won't have the required information. To fix this, register the shutdown callback after we are attached to the worker slot. After this fix, we observed another issue which is that sometimes the leader apply worker tries to receive the message from the error queue that might already be detached by the parallel apply worker leading to an error. To prevent such an error, we ensure that the leader apply worker detaches from the parallel apply worker's error queue before stopping it. Reported-by: Sawada Masahiko Author: Hou Zhijie Reviewed-by: Sawada Masahiko, Amit Kapila Discussion: https://postgr.es/m/CAD21AoDo+yUwNq6nTrvE2h9bB2vZfcag=jxWc7QxuWCmkDAqcA@mail.gmail.com
1 parent 455f948 commit 3d144c6

File tree

3 files changed

+38
-18
lines changed

3 files changed

+38
-18
lines changed

src/backend/replication/logical/applyparallelworker.c

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
577577
list_length(ParallelApplyWorkerPool) >
578578
(max_parallel_apply_workers_per_subscription / 2))
579579
{
580-
int slot_no;
581-
uint16 generation;
582-
583-
SpinLockAcquire(&winfo->shared->mutex);
584-
generation = winfo->shared->logicalrep_worker_generation;
585-
slot_no = winfo->shared->logicalrep_worker_slot_no;
586-
SpinLockRelease(&winfo->shared->mutex);
587-
588-
logicalrep_pa_worker_stop(slot_no, generation);
589-
580+
logicalrep_pa_worker_stop(winfo);
590581
pa_free_worker_info(winfo);
591582

592583
return;
@@ -636,8 +627,11 @@ pa_detach_all_error_mq(void)
636627
{
637628
ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
638629

639-
shm_mq_detach(winfo->error_mq_handle);
640-
winfo->error_mq_handle = NULL;
630+
if (winfo->error_mq_handle)
631+
{
632+
shm_mq_detach(winfo->error_mq_handle);
633+
winfo->error_mq_handle = NULL;
634+
}
641635
}
642636
}
643637

@@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
845839
* Make sure the leader apply worker tries to read from our error queue one more
846840
* time. This guards against the case where we exit uncleanly without sending
847841
* an ErrorResponse, for example because some code calls proc_exit directly.
842+
*
843+
* Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
844+
* if any. See ParallelWorkerShutdown for details.
848845
*/
849846
static void
850847
pa_shutdown(int code, Datum arg)
@@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg)
901898
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
902899
errmsg("bad magic number in dynamic shared memory segment")));
903900

904-
before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
905-
906901
/* Look up the shared information. */
907902
shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
908903
MyParallelShared = shared;
@@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg)
921916
*/
922917
logicalrep_worker_attach(worker_slot);
923918

919+
/*
920+
* Register the shutdown callback after we are attached to the worker
921+
* slot. This is to ensure that MyLogicalRepWorker remains valid when this
922+
* callback is invoked.
923+
*/
924+
before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
925+
924926
SpinLockAcquire(&MyParallelShared->mutex);
925927
MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
926928
MyParallelShared->logicalrep_worker_slot_no = worker_slot;

src/backend/replication/logical/launcher.c

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid)
609609
}
610610

611611
/*
612-
* Stop the logical replication parallel apply worker corresponding to the
613-
* input slot number.
612+
* Stop the given logical replication parallel apply worker.
614613
*
615614
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
616615
* worker so that the worker exits cleanly.
617616
*/
618617
void
619-
logicalrep_pa_worker_stop(int slot_no, uint16 generation)
618+
logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
620619
{
620+
int slot_no;
621+
uint16 generation;
621622
LogicalRepWorker *worker;
622623

624+
SpinLockAcquire(&winfo->shared->mutex);
625+
generation = winfo->shared->logicalrep_worker_generation;
626+
slot_no = winfo->shared->logicalrep_worker_slot_no;
627+
SpinLockRelease(&winfo->shared->mutex);
628+
623629
Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
624630

631+
/*
632+
* Detach from the error_mq_handle for the parallel apply worker before
633+
* stopping it. This prevents the leader apply worker from trying to
634+
* receive the message from the error queue that might already be detached
635+
* by the parallel apply worker.
636+
*/
637+
if (winfo->error_mq_handle)
638+
{
639+
shm_mq_detach(winfo->error_mq_handle);
640+
winfo->error_mq_handle = NULL;
641+
}
642+
625643
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
626644

627645
worker = &LogicalRepCtx->workers[slot_no];

src/include/replication/worker_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
235235
Oid userid, Oid relid,
236236
dsm_handle subworker_dsm);
237237
extern void logicalrep_worker_stop(Oid subid, Oid relid);
238-
extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
238+
extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
239239
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
240240
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
241241

0 commit comments

Comments
 (0)