Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4421f3c

Browse files
committed
more protection from QueryCancel in apply worker
1 parent 8930610 commit 4421f3c

File tree

3 files changed

+36
-7
lines changed

3 files changed

+36
-7
lines changed

src/bgwpool.c

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
4949
receiver_mtm_cfg_valid = false;
5050
}
5151

52-
5352
static void
5453
BgwPoolMainLoop(BgwPool* pool)
5554
{
@@ -68,10 +67,10 @@ BgwPoolMainLoop(BgwPool* pool)
6867
MtmIsLogicalReceiver = true;
6968
MtmPool = pool;
7069

71-
pqsignal(SIGINT, StatementCancelHandler);
70+
pqsignal(SIGINT, ApplyCancelHandler);
7271
pqsignal(SIGQUIT, BgwShutdownHandler);
7372
pqsignal(SIGTERM, BgwShutdownHandler);
74-
pqsignal(SIGHUP, PostgresSigHupHandler);
73+
pqsignal(SIGHUP, ApplyCancelHandler);
7574

7675
// XXX: probably we should add static variable that signalizes that
7776
// we are between pool->active += 1 and pool->active -= 1, so if
@@ -105,6 +104,7 @@ BgwPoolMainLoop(BgwPool* pool)
105104
if (pool->shutdown)
106105
{
107106
PGSemaphoreUnlock(pool->available);
107+
SpinLockRelease(&pool->lock);
108108
break;
109109
}
110110
size = * (int *) &pool->queue[pool->head];
@@ -113,6 +113,7 @@ BgwPoolMainLoop(BgwPool* pool)
113113
work = palloc(size);
114114
pool->pending -= 1;
115115
pool->active += 1;
116+
116117
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0)
117118
pool->lastPeakTime = MtmGetSystemTime();
118119

@@ -152,9 +153,6 @@ BgwPoolMainLoop(BgwPool* pool)
152153

153154
SpinLockRelease(&pool->lock);
154155

155-
/* Ignore cancel that arrived before we started current command */
156-
QueryCancelPending = false;
157-
158156
MtmExecutor(work, size, &ctx);
159157
pfree(work);
160158

@@ -166,7 +164,6 @@ BgwPoolMainLoop(BgwPool* pool)
166164
ConditionVariableBroadcast(&pool->syncpoint_cv);
167165
}
168166

169-
SpinLockRelease(&pool->lock);
170167
mtm_log(BgwPoolEvent, "Shutdown background worker %d", MyProcPid);
171168
}
172169

src/include/receiver.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ extern char const *const MtmReplicationModeName[];
2121
extern BackgroundWorkerHandle *MtmStartReceiver(int nodeId, Oid db_id, Oid user_id, pid_t monitor_pid);
2222

2323
extern void MtmExecutor(void* work, size_t size, MtmReceiverContext *rctx);
24+
extern void ApplyCancelHandler(SIGNAL_ARGS);
2425
extern void MtmUpdateLsnMapping(int node_id, XLogRecPtr end_lsn);
2526

2627
extern void MtmBeginSession(int nodeId);

src/pglogical_apply.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ typedef struct TupleData
7676
bool changed[MaxTupleAttributeNumber];
7777
} TupleData;
7878

79+
static bool query_cancel_allowed;
7980

8081
static Relation read_rel(StringInfo s, LOCKMODE mode);
8182
static void read_tuple_parts(StringInfo s, Relation rel, TupleData *tup);
@@ -121,6 +122,25 @@ MtmHandleApplyError(void)
121122
FreeErrorData(edata);
122123
}
123124

125+
void
126+
ApplyCancelHandler(SIGNAL_ARGS)
127+
{
128+
int save_errno = errno;
129+
130+
/*
131+
* Don't joggle the elbow of proc_exit
132+
*/
133+
if (!proc_exit_inprogress && query_cancel_allowed)
134+
{
135+
InterruptPending = true;
136+
QueryCancelPending = true;
137+
}
138+
139+
/* If we're still here, waken anything waiting on the process latch */
140+
SetLatch(MyLatch);
141+
142+
errno = save_errno;
143+
}
124144

125145
bool find_heap_tuple(TupleData *tup, Relation rel, TupleTableSlot *slot, bool lock)
126146
{
@@ -483,6 +503,10 @@ process_remote_begin(StringInfo s, GlobalTransactionId *gtid)
483503
gtid->node = pq_getmsgint(s, 4);
484504
gtid->xid = pq_getmsgint64(s);
485505

506+
InterruptPending = false;
507+
QueryCancelPending = false;
508+
query_cancel_allowed = true;
509+
486510
// XXX: get rid of MtmReplicationNodeId
487511
MtmReplicationNodeId = gtid->node;
488512

@@ -929,6 +953,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid, MtmRecei
929953
{
930954
case PGLOGICAL_PRECOMMIT_PREPARED:
931955
{
956+
Assert(!query_cancel_allowed);
932957
strncpy(gid, pq_getmsgstring(in), sizeof gid);
933958
MtmBeginSession(origin_node);
934959

@@ -979,6 +1004,10 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid, MtmRecei
9791004

9801005
CommitTransactionCommand();
9811006

1007+
InterruptPending = false;
1008+
QueryCancelPending = false;
1009+
query_cancel_allowed = false;
1010+
9821011
if (receiver_ctx->parallel_allowed)
9831012
{
9841013
mtm_send_xid_reply(current_gtid->xid, origin_node,
@@ -1005,6 +1034,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid, MtmRecei
10051034
}
10061035
case PGLOGICAL_COMMIT_PREPARED:
10071036
{
1037+
Assert(!query_cancel_allowed);
10081038
pq_getmsgint64(in); /* csn */
10091039
strncpy(gid, pq_getmsgstring(in), sizeof gid);
10101040
StartTransactionCommand();
@@ -1621,6 +1651,7 @@ MtmExecutor(void* work, size_t size, MtmReceiverContext *receiver_ctx)
16211651
}
16221652
PG_CATCH();
16231653
{
1654+
query_cancel_allowed = false;
16241655

16251656
old_context = MemoryContextSwitchTo(MtmApplyContext);
16261657
MtmHandleApplyError();

0 commit comments

Comments
 (0)