Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit dfb1703

Browse files
committed
don't wait dmq_receivers to exit during smart shotdown
1 parent f88abfb commit dfb1703

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed

src/dmq.c

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ struct DmqSharedState
110110
dsm_handle dsm_h;
111111
int procno;
112112
bool active;
113+
pid_t pid;
113114
} receivers[DMQ_MAX_RECEIVERS];
114115

115116
} *dmq_state;
@@ -317,6 +318,23 @@ fe_send(PGconn *conn, char *msg, size_t len)
317318
return 0;
318319
}
319320

321+
static void
322+
dmq_sender_at_exit(int status, Datum arg)
323+
{
324+
int i;
325+
326+
LWLockAcquire(dmq_state->lock, LW_SHARED);
327+
for (i = 0; i < dmq_state->n_receivers; i++)
328+
{
329+
if (dmq_state->receivers[i].active &&
330+
dmq_state->receivers[i].pid > 0)
331+
{
332+
kill(dmq_state->receivers[i].pid, SIGTERM);
333+
}
334+
}
335+
LWLockRelease(dmq_state->lock);
336+
}
337+
320338
void
321339
dmq_sender_main(Datum main_arg)
322340
{
@@ -329,6 +347,8 @@ dmq_sender_main(Datum main_arg)
329347

330348
double prev_timer_at = dmq_now();
331349

350+
on_shmem_exit(dmq_sender_at_exit, (Datum) 0);
351+
332352
/* init this worker */
333353
pqsignal(SIGHUP, dmq_sighup_handler);
334354
pqsignal(SIGTERM, die);
@@ -662,12 +682,10 @@ dmq_sender_main(Datum main_arg)
662682
}
663683
else if (nevents > 0 && event.events & WL_POSTMASTER_DEATH)
664684
{
665-
exit(1);
685+
proc_exit(1);
666686
}
667687

668688
CHECK_FOR_INTERRUPTS();
669-
670-
// XXX: handle WL_POSTMASTER_DEATH ?
671689
}
672690
FreeWaitEventSet(set);
673691

@@ -997,6 +1015,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
9971015
dmq_state->receivers[receiver_id].dsm_h = dsm_segment_handle(seg);
9981016
dmq_state->receivers[receiver_id].procno = MyProc->pgprocno;
9991017
dmq_state->receivers[receiver_id].active = true;
1018+
dmq_state->receivers[receiver_id].pid = MyProcPid;
10001019
LWLockRelease(dmq_state->lock);
10011020

10021021
on_shmem_exit(dmq_receiver_at_exit, Int32GetDatum(receiver_id));
@@ -1075,6 +1094,13 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
10751094
ResetLatch(MyLatch);
10761095
}
10771096

1097+
if (nevents > 0 && event.events & WL_POSTMASTER_DEATH)
1098+
{
1099+
ereport(FATAL,
1100+
(errcode(ERRCODE_ADMIN_SHUTDOWN),
1101+
errmsg("[DMQ] exit receiver due to unexpected postmaster exit")));
1102+
}
1103+
10781104
// XXX: is it enough?
10791105
CHECK_FOR_INTERRUPTS();
10801106

src/resolver.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,18 @@ majority_in(resolver_tx *tx, MtmTxStateMask mask)
269269
return (hits > Mtm->nAllNodes/2);
270270
}
271271

272+
273+
/*
274+
* resolve_tx
275+
*
276+
* This handles respenses with tx status and makes decision based on
277+
* 3PC resolving protocol.
278+
*
279+
* Here we can get an error when trying to commit transaction that
280+
* is already during commit by receiver (see gxact->locking_backend).
281+
* We can catch those, but better just let it happend, restart bgworker
282+
* and continue resolving.
283+
*/
272284
static void
273285
resolve_tx(const char *gid, int node_id, MtmTxState state)
274286
{

0 commit comments

Comments
 (0)