Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit de3d207

Browse files
committed
Cancel bgwpool workers on receiver shutdown
1 parent dfb1703 commit de3d207

File tree

3 files changed

+89
-54
lines changed

3 files changed

+89
-54
lines changed

src/bgwpool.c

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "utils/portal.h"
1313
#include "tcop/pquery.h"
1414
#include "utils/guc.h"
15+
#include "tcop/tcopprot.h"
1516

1617
#include "bgwpool.h"
1718
#include "mm.h"
@@ -25,11 +26,17 @@ static BgwPool* MtmPool;
2526
void BgwPoolStaticWorkerMainLoop(Datum arg);
2627
void BgwPoolDynamicWorkerMainLoop(Datum arg);
2728

28-
static void BgwShutdownWorker(int sig)
29+
static void
30+
BgwShutdownHandler(int sig)
2931
{
30-
if (MtmPool) {
31-
BgwPoolStop(MtmPool);
32-
}
32+
Assert(MtmPool != NULL);
33+
BgwPoolStop(MtmPool);
34+
35+
/*
36+
* set ProcDiePending for cases when we are waiting on latch somewhere
37+
* deep inside our execute() function.
38+
*/
39+
die(sig);
3340
}
3441

3542
static void BgwPoolMainLoop(BgwPool* pool)
@@ -44,10 +51,9 @@ static void BgwPoolMainLoop(BgwPool* pool)
4451
MtmIsLogicalReceiver = true;
4552
MtmPool = pool;
4653

47-
// XXX: fix that
48-
pqsignal(SIGINT, BgwShutdownWorker);
49-
pqsignal(SIGQUIT, BgwShutdownWorker);
50-
pqsignal(SIGTERM, BgwShutdownWorker);
54+
pqsignal(SIGINT, StatementCancelHandler);
55+
pqsignal(SIGQUIT, BgwShutdownHandler);
56+
pqsignal(SIGTERM, BgwShutdownHandler);
5157
pqsignal(SIGHUP, PostgresSigHupHandler);
5258

5359
BackgroundWorkerUnblockSignals();
@@ -93,8 +99,13 @@ static void BgwPoolMainLoop(BgwPool* pool)
9399
pool->lastPeakTime = 0;
94100
}
95101
SpinLockRelease(&pool->lock);
102+
103+
/* Ignore cancel that arrived before we started current command */
104+
QueryCancelPending = false;
105+
96106
pool->executor(work, size, NULL);
97-
pfree(work);
107+
pfree(work);
108+
98109
SpinLockAcquire(&pool->lock);
99110
pool->active -= 1;
100111
pool->lastPeakTime = 0;
@@ -107,6 +118,8 @@ static void BgwPoolMainLoop(BgwPool* pool)
107118
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers)
108119
{
109120
MtmPool = pool;
121+
122+
pool->bgwhandles = (BackgroundWorkerHandle **) ShmemAlloc(MtmMaxWorkers * sizeof(BackgroundWorkerHandle *));
110123
pool->queue = (char*)ShmemAlloc(queueSize);
111124
if (pool->queue == NULL) {
112125
elog(PANIC, "Failed to allocate memory for background workers pool: %lld bytes requested", (long64)queueSize);
@@ -148,7 +161,6 @@ void BgwPoolDynamicWorkerMainLoop(Datum arg)
148161

149162
void BgwPoolStart(BgwPool* pool, char *poolName)
150163
{
151-
int i;
152164
BackgroundWorker worker;
153165

154166
MemSet(&worker, 0, sizeof(BackgroundWorker));
@@ -159,13 +171,6 @@ void BgwPoolStart(BgwPool* pool, char *poolName)
159171
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
160172

161173
strncpy(pool->poolName, poolName, MAX_NAME_LEN);
162-
163-
for (i = 0; i < pool->nWorkers; i++)
164-
{
165-
snprintf(worker.bgw_name, BGW_MAXLEN, "%s_worker_%d", pool->poolName, i+1);
166-
worker.bgw_main_arg = PointerGetDatum(pool);
167-
RegisterBackgroundWorker(&worker);
168-
}
169174
}
170175

171176
size_t BgwPoolGetQueueSize(BgwPool* pool)
@@ -180,25 +185,29 @@ size_t BgwPoolGetQueueSize(BgwPool* pool)
180185

181186
static void BgwStartExtraWorker(BgwPool* pool)
182187
{
183-
if (pool->nWorkers < MtmMaxWorkers) {
184-
timestamp_t now = MtmGetSystemTime();
185-
/*if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now)*/
186-
{
187-
BackgroundWorker worker;
188-
BackgroundWorkerHandle* handle;
189-
MemSet(&worker, 0, sizeof(BackgroundWorker));
190-
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
191-
worker.bgw_start_time = BgWorkerStart_ConsistentState;
192-
sprintf(worker.bgw_library_name, "multimaster");
193-
sprintf(worker.bgw_function_name, "BgwPoolDynamicWorkerMainLoop");
194-
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
195-
snprintf(worker.bgw_name, BGW_MAXLEN, "%s-dynworker-%d", pool->poolName, (int)++pool->nWorkers);
196-
worker.bgw_main_arg = PointerGetDatum(pool);
197-
pool->lastDynamicWorkerStartTime = now;
198-
if (!RegisterDynamicBackgroundWorker(&worker, &handle)) {
199-
elog(WARNING, "Failed to start dynamic background worker");
200-
}
201-
}
188+
BackgroundWorker worker;
189+
BackgroundWorkerHandle* handle;
190+
191+
if (pool->nWorkers >= MtmMaxWorkers)
192+
return;
193+
194+
MemSet(&worker, 0, sizeof(BackgroundWorker));
195+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
196+
worker.bgw_start_time = BgWorkerStart_ConsistentState;
197+
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
198+
worker.bgw_main_arg = PointerGetDatum(pool);
199+
sprintf(worker.bgw_library_name, "multimaster");
200+
sprintf(worker.bgw_function_name, "BgwPoolDynamicWorkerMainLoop");
201+
snprintf(worker.bgw_name, BGW_MAXLEN, "%s-dynworker-%d", pool->poolName, (int) pool->nWorkers + 1);
202+
203+
pool->lastDynamicWorkerStartTime = MtmGetSystemTime();
204+
if (RegisterDynamicBackgroundWorker(&worker, &handle))
205+
{
206+
pool->bgwhandles[pool->nWorkers++] = handle;
207+
}
208+
else
209+
{
210+
elog(WARNING, "Failed to start dynamic background worker");
202211
}
203212
}
204213

@@ -253,9 +262,29 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
253262

254263
void BgwPoolStop(BgwPool* pool)
255264
{
256-
// SpinLockAcquire(&pool->lock);
257265
pool->shutdown = true;
258-
// SpinLockRelease(&pool->lock);
259266
PGSemaphoreUnlock(pool->available);
260267
PGSemaphoreUnlock(pool->overflow);
261268
}
269+
270+
/*
271+
* Tell our lads to cancel currently active transactions.
272+
*/
273+
void
274+
BgwPoolCancel(BgwPool* pool)
275+
{
276+
int i;
277+
278+
for (i = 0; i < pool->nWorkers; i++)
279+
{
280+
BgwHandleStatus status;
281+
pid_t pid;
282+
283+
status = GetBackgroundWorkerPid(pool->bgwhandles[i], &pid);
284+
if (status == BGWH_STARTED)
285+
{
286+
Assert(pid > 0);
287+
kill(pid, SIGINT);
288+
}
289+
}
290+
}

src/include/bgwpool.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "storage/s_lock.h"
55
#include "storage/spin.h"
66
#include "storage/pg_sema.h"
7+
#include "postmaster/bgworker.h"
78
#include "bkb.h" // XXX
89

910
#include "mm.h"
@@ -43,6 +44,8 @@ typedef struct
4344
char dbname[MAX_DBNAME_LEN];
4445
char dbuser[MAX_DBUSER_LEN];
4546
char* queue;
47+
48+
BackgroundWorkerHandle **bgwhandles;
4649
} BgwPool;
4750

4851
typedef BgwPool*(*BgwPoolConstructor)(void);
@@ -58,4 +61,7 @@ extern size_t BgwPoolGetQueueSize(BgwPool* pool);
5861
extern timestamp_t BgwGetLastPeekTime(BgwPool* pool);
5962

6063
extern void BgwPoolStop(BgwPool* pool);
64+
65+
extern void BgwPoolCancel(BgwPool* pool);
66+
6167
#endif

src/pglogical_receiver.c

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ pglogical_receiver_main(Datum main_arg)
470470
/* Register functions for SIGTERM/SIGHUP management */
471471
pqsignal(SIGHUP, receiver_raw_sighup);
472472
// pqsignal(SIGTERM, receiver_raw_sigterm);
473+
// XXX: also need to unlock overflow semaphore (or rewrite it on latches)
473474
pqsignal(SIGTERM, die);
474475

475476
MtmCreateSpillDirectory(nodeId);
@@ -534,16 +535,7 @@ pglogical_receiver_main(Datum main_arg)
534535
mtm_log(MtmReceiverMode, "WalReceiver to %d starts in %s mode",
535536
nodeId, MtmReplicationModeName[mode]);
536537

537-
// if (mode == REPLMODE_RECOVERY)
538-
// synchronous_twophase = false;
539-
// else
540-
// synchronous_twophase = true;
541-
// synchronous_twophase = true;
542-
543-
if (mode == REPLMODE_EXIT)
544-
{
545-
break;
546-
}
538+
Assert(mode == REPLMODE_RECOVERY || mode == REPLMODE_RECOVERED);
547539
timeline = Mtm->nodes[nodeId-1].timeline;
548540
count = Mtm->recoveryCount;
549541

@@ -648,16 +640,17 @@ pglogical_receiver_main(Datum main_arg)
648640
if (Mtm->status == MTM_DISABLED || (Mtm->status == MTM_RECOVERY && Mtm->recoverySlot != nodeId))
649641
{
650642
ereport(LOG, (MTM_ERRMSG("%s: restart WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
651-
break;
643+
goto OnError;
652644
}
645+
653646
if (count != Mtm->recoveryCount) {
654647
ereport(LOG, (MTM_ERRMSG("%s: restart WAL receiver because node was recovered", worker_proc)));
655-
break;
648+
goto OnError;
656649
}
657650

658651
if (timeline != Mtm->nodes[nodeId-1].timeline) {
659652
ereport(LOG, (MTM_ERRMSG("%s: restart WAL receiver because node %d timeline is changed", worker_proc, nodeId)));
660-
break;
653+
goto OnError;
661654
}
662655

663656
/*
@@ -911,14 +904,21 @@ pglogical_receiver_main(Datum main_arg)
911904
goto OnError;
912905
}
913906
}
914-
PQfinish(conn);
915-
continue;
907+
Assert(false);
908+
916909

917-
OnError:
910+
OnError:
918911
ByteBufferReset(&buf);
919912
PQfinish(conn);
920913
if (Mtm->recoverySlot == nodeId)
921914
Mtm->recoverySlot = 0;
915+
916+
/*
917+
* Some of the workers may stuck on lock and survive until node will come
918+
* back and prepare stuck transaction when it was aborted long time ago.
919+
* Force all workers to cancel stmt to ensure this will not happen.
920+
*/
921+
BgwPoolCancel(&Mtm->nodes[nodeId - 1].pool);
922922
MtmSleep(RECEIVER_SUSPEND_TIMEOUT);
923923
}
924924
ByteBufferFree(&buf);

0 commit comments

Comments
 (0)