Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 99a9609

Browse files
committed
Mark node as online only when it totally completed recovery. Also
pass ReceiverContext to bgworkers to avoid unnecessary replies during non parallel-safe replication.
1 parent de3d207 commit 99a9609

File tree

9 files changed

+122
-93
lines changed

9 files changed

+122
-93
lines changed

src/bgwpool.c

Lines changed: 92 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@ BgwShutdownHandler(int sig)
3939
die(sig);
4040
}
4141

42-
static void BgwPoolMainLoop(BgwPool* pool)
42+
static void
43+
BgwPoolMainLoop(BgwPool* pool)
4344
{
44-
int size;
45-
void* work;
45+
int size;
46+
void* work;
47+
size_t payload = sizeof(MtmReceiverContext) + sizeof(size_t);
48+
MtmReceiverContext ctx;
4649
static PortalData fakePortal;
4750

4851
mtm_log(BgwPoolEvent, "Start background worker %d, shutdown=%d", MyProcPid, pool->shutdown);
@@ -56,61 +59,72 @@ static void BgwPoolMainLoop(BgwPool* pool)
5659
pqsignal(SIGTERM, BgwShutdownHandler);
5760
pqsignal(SIGHUP, PostgresSigHupHandler);
5861

59-
BackgroundWorkerUnblockSignals();
62+
BackgroundWorkerUnblockSignals();
6063
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser, 0);
6164
ActivePortal = &fakePortal;
6265
ActivePortal->status = PORTAL_ACTIVE;
6366
ActivePortal->sourceText = "";
6467

65-
while (true) {
68+
while (true)
69+
{
6670
if (ConfigReloadPending)
6771
{
6872
ConfigReloadPending = false;
6973
ProcessConfigFile(PGC_SIGHUP);
7074
}
7175

7276
PGSemaphoreLock(pool->available);
73-
SpinLockAcquire(&pool->lock);
74-
if (pool->shutdown) {
77+
SpinLockAcquire(&pool->lock);
78+
if (pool->shutdown)
79+
{
7580
PGSemaphoreUnlock(pool->available);
7681
break;
7782
}
78-
size = *(int*)&pool->queue[pool->head];
79-
Assert(size < pool->size);
80-
work = palloc(size);
81-
pool->pending -= 1;
82-
pool->active += 1;
83-
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
83+
size = * (int *) &pool->queue[pool->head];
84+
ctx = * (MtmReceiverContext *) &pool->queue[pool->head + sizeof(size_t)];
85+
86+
Assert(size < pool->size);
87+
work = palloc(size);
88+
pool->pending -= 1;
89+
pool->active += 1;
90+
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0)
8491
pool->lastPeakTime = MtmGetSystemTime();
92+
93+
if (pool->head + size + payload > pool->size)
94+
{
95+
memcpy(work, pool->queue, size);
96+
pool->head = INTALIGN(size);
8597
}
86-
if (pool->head + size + 4 > pool->size) {
87-
memcpy(work, pool->queue, size);
88-
pool->head = INTALIGN(size);
89-
} else {
90-
memcpy(work, &pool->queue[pool->head+4], size);
91-
pool->head += 4 + INTALIGN(size);
92-
}
93-
if (pool->size == pool->head) {
94-
pool->head = 0;
95-
}
96-
if (pool->producerBlocked) {
97-
pool->producerBlocked = false;
98+
else
99+
{
100+
memcpy(work, &pool->queue[pool->head + payload], size);
101+
pool->head += payload + INTALIGN(size);
102+
}
103+
104+
if (pool->size == pool->head)
105+
pool->head = 0;
106+
107+
if (pool->producerBlocked)
108+
{
109+
pool->producerBlocked = false;
98110
PGSemaphoreUnlock(pool->overflow);
99111
pool->lastPeakTime = 0;
100-
}
101-
SpinLockRelease(&pool->lock);
112+
}
113+
114+
SpinLockRelease(&pool->lock);
102115

103116
/* Ignore cancel that arrived before we started current command */
104117
QueryCancelPending = false;
105118

106-
pool->executor(work, size, NULL);
119+
pool->executor(work, size, &ctx);
107120
pfree(work);
108121

109-
SpinLockAcquire(&pool->lock);
110-
pool->active -= 1;
122+
SpinLockAcquire(&pool->lock);
123+
pool->active -= 1;
111124
pool->lastPeakTime = 0;
112-
SpinLockRelease(&pool->lock);
113-
}
125+
SpinLockRelease(&pool->lock);
126+
}
127+
114128
SpinLockRelease(&pool->lock);
115129
mtm_log(BgwPoolEvent, "Shutdown background worker %d", MyProcPid);
116130
}
@@ -211,9 +225,13 @@ static void BgwStartExtraWorker(BgwPool* pool)
211225
}
212226
}
213227

214-
void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
228+
void
229+
BgwPoolExecute(BgwPool* pool, void* work, size_t size, MtmReceiverContext *ctx)
215230
{
216-
if (size+4 > pool->size) {
231+
size_t payload = sizeof(MtmReceiverContext) + sizeof(size_t);
232+
233+
if (size + payload > pool->size)
234+
{
217235
/*
218236
* Size of work is larger than size of shared buffer:
219237
* run it immediately
@@ -222,42 +240,52 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
222240
return;
223241
}
224242

225-
SpinLockAcquire(&pool->lock);
226-
while (!pool->shutdown) {
227-
if ((pool->head <= pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
228-
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
229-
{
230-
if (pool->lastPeakTime == 0) {
243+
SpinLockAcquire(&pool->lock);
244+
while (!pool->shutdown)
245+
{
246+
if ((pool->head <= pool->tail && pool->size - pool->tail < size + payload && pool->head < size)
247+
|| (pool->head > pool->tail && pool->head - pool->tail < size + payload))
248+
{
249+
if (pool->lastPeakTime == 0)
231250
pool->lastPeakTime = MtmGetSystemTime();
232-
}
251+
233252
pool->producerBlocked = true;
234-
SpinLockRelease(&pool->lock);
253+
SpinLockRelease(&pool->lock);
235254
PGSemaphoreLock(pool->overflow);
236-
SpinLockAcquire(&pool->lock);
237-
} else {
238-
pool->pending += 1;
239-
if (pool->active + pool->pending > pool->nWorkers) {
240-
BgwStartExtraWorker(pool);
241-
}
242-
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
255+
SpinLockAcquire(&pool->lock);
256+
}
257+
else
258+
{
259+
pool->pending += 1;
260+
261+
if (pool->active + pool->pending > pool->nWorkers)
262+
BgwStartExtraWorker(pool);
263+
264+
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0)
243265
pool->lastPeakTime = MtmGetSystemTime();
266+
267+
*(int *)&pool->queue[pool->tail] = size;
268+
*(MtmReceiverContext *)&pool->queue[pool->tail + sizeof(size_t)] = *ctx;
269+
270+
if (pool->size - pool->tail >= size + payload)
271+
{
272+
memcpy(&pool->queue[pool->tail + payload], work, size);
273+
pool->tail += payload + INTALIGN(size);
274+
}
275+
else
276+
{
277+
memcpy(pool->queue, work, size);
278+
pool->tail = INTALIGN(size);
244279
}
245-
*(int*)&pool->queue[pool->tail] = size;
246-
if (pool->size - pool->tail >= size + 4) {
247-
memcpy(&pool->queue[pool->tail+4], work, size);
248-
pool->tail += 4 + INTALIGN(size);
249-
} else {
250-
memcpy(pool->queue, work, size);
251-
pool->tail = INTALIGN(size);
252-
}
253-
if (pool->tail == pool->size) {
254-
pool->tail = 0;
255-
}
280+
281+
if (pool->tail == pool->size)
282+
pool->tail = 0;
283+
256284
PGSemaphoreUnlock(pool->available);
257-
break;
258-
}
259-
}
260-
SpinLockRelease(&pool->lock);
285+
break;
286+
}
287+
}
288+
SpinLockRelease(&pool->lock);
261289
}
262290

263291
void BgwPoolStop(BgwPool* pool)

src/dmq.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1072,8 +1072,8 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
10721072

10731073
if (rc > 0)
10741074
{
1075-
last_message_at = dmq_now();
10761075
dmq_handle_message(&s, mq_handles, seg);
1076+
last_message_at = dmq_now();
10771077
reader_state = NeedByte;
10781078
}
10791079
}

src/include/bgwpool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ extern void BgwPoolStart(BgwPool* pool, char *poolName);
5454

5555
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers);
5656

57-
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size);
57+
extern void BgwPoolExecute(BgwPool* pool, void* work, size_t size, MtmReceiverContext *ctx);
5858

5959
extern size_t BgwPoolGetQueueSize(BgwPool* pool);
6060

src/include/logger.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
typedef enum MtmLogTag
1919
{
2020
/* general */
21-
MtmTxTrace = DEBUG1,
22-
MtmTxFinish = DEBUG1,
21+
MtmTxTrace = LOG,
22+
MtmTxFinish = LOG,
2323

2424
/* dmq */
2525
DmqStateIntermediate = DEBUG1,

src/include/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ typedef struct
126126
typedef struct
127127
{
128128
bool extension_created;
129+
bool recovered;
129130
MtmNodeStatus status; /* Status of this node */
130131
char *statusReason; /* A human-readable description of why the current status was set */
131132
int recoverySlot; /* NodeId of recovery slot or 0 if none */

src/multimaster.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ static void MtmInitialize()
529529
{
530530
MemSet(Mtm, 0, sizeof(MtmState) + sizeof(MtmNodeInfo)*(MtmMaxNodes-1));
531531
Mtm->extension_created = false;
532+
Mtm->recovered = false;
532533
Mtm->status = MTM_DISABLED; //MTM_INITIALIZATION;
533534
Mtm->recoverySlot = 0;
534535
Mtm->locks = GetNamedLWLockTranche(MULTIMASTER_NAME);

src/pglogical_apply.c

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
8686

8787
static bool process_remote_begin(StringInfo s, GlobalTransactionId *gtid);
8888
static bool process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx);
89-
static void process_remote_commit(StringInfo s, GlobalTransactionId *current_gtid);
89+
static void process_remote_commit(StringInfo s, GlobalTransactionId *current_gtid, MtmReceiverContext *receiver_ctx);
9090
static void process_remote_insert(StringInfo s, Relation rel);
9191
static void process_remote_update(StringInfo s, Relation rel);
9292
static void process_remote_delete(StringInfo s, Relation rel);
@@ -535,13 +535,17 @@ process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx)
535535
sscanf(messageBody, "%d", &node_id);
536536

537537
Assert(node_id > 0);
538-
Assert(receiver_ctx != NULL);
539538
// XXX assert that it is receiver itself
540539

541540
if (!receiver_ctx->is_recovery && node_id == MtmNodeId)
542541
{
543542
Assert(!receiver_ctx->parallel_allowed);
543+
Assert(receiver_ctx->node_id > 0);
544+
Assert(receiver_ctx->node_id == MtmReplicationNodeId);
545+
Assert(receiver_ctx->node_id != node_id);
546+
544547
receiver_ctx->parallel_allowed = true;
548+
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_RECEIVER_START, false);
545549
}
546550
standalone = true;
547551
break;
@@ -724,7 +728,7 @@ mtm_send_reply(TransactionId xid, int node_id, MtmMessageCode msg_code)
724728
}
725729

726730
static void
727-
process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
731+
process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid, MtmReceiverContext *receiver_ctx)
728732
{
729733
uint8 event;
730734
lsn_t end_lsn;
@@ -767,7 +771,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
767771
}
768772
mtm_log(MtmTxFinish, "TXFINISH: %s precommitted", gid);
769773

770-
if (Mtm->status != MTM_RECOVERY)
774+
if (receiver_ctx->parallel_allowed)
771775
{
772776
TransactionId origin_xid = MtmGidParseXid(gid);
773777
mtm_send_reply(origin_xid, origin_node, MSG_PRECOMMITTED);
@@ -813,8 +817,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
813817

814818
MtmDeadlockDetectorRemoveXact(xid);
815819

816-
/* XXX: we can move that to callbacks to keep apply clean */
817-
if (Mtm->status != MTM_RECOVERY)
820+
if (receiver_ctx->parallel_allowed)
818821
{
819822
TransactionId origin_xid = MtmGidParseXid(gid);
820823
mtm_send_reply(origin_xid, origin_node, res ? MSG_PREPARED : MSG_ABORTED);
@@ -842,7 +845,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
842845
mtm_log(MtmTxFinish, "TXFINISH: %s committed", gid);
843846
CommitTransactionCommand();
844847

845-
if (Mtm->status != MTM_RECOVERY)
848+
if (receiver_ctx->parallel_allowed)
846849
{
847850
TransactionId origin_xid = MtmGidParseXid(gid);
848851
mtm_send_reply(origin_xid, origin_node, MSG_COMMITTED);
@@ -868,7 +871,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
868871
Assert(false);
869872
}
870873

871-
if (Mtm->status == MTM_RECOVERY)
874+
if (!receiver_ctx->parallel_allowed)
872875
{
873876
// XXX
874877
elog(LOG, "Recover transaction %s event=%d", gid, event);
@@ -1295,7 +1298,7 @@ MtmExecutor(void* work, size_t size, MtmReceiverContext *receiver_ctx)
12951298
/* COMMIT */
12961299
case 'C':
12971300
close_rel(rel);
1298-
process_remote_commit(&s, &current_gtid);
1301+
process_remote_commit(&s, &current_gtid, receiver_ctx);
12991302
inside_transaction = false;
13001303
break;
13011304
/* INSERT */
@@ -1379,7 +1382,6 @@ MtmExecutor(void* work, size_t size, MtmReceiverContext *receiver_ctx)
13791382
}
13801383
PG_CATCH();
13811384
{
1382-
// XXX: change all this to re-throw
13831385

13841386
old_context = MemoryContextSwitchTo(MtmApplyContext);
13851387
MtmHandleApplyError();
@@ -1397,7 +1399,7 @@ MtmExecutor(void* work, size_t size, MtmReceiverContext *receiver_ctx)
13971399
if (TransactionIdIsValid(current_gtid.xid))
13981400
{
13991401
MtmDeadlockDetectorRemoveXact(GetCurrentTransactionIdIfAny());
1400-
if (Mtm->status != MTM_RECOVERY)
1402+
if (receiver_ctx->parallel_allowed)
14011403
mtm_send_reply(current_gtid.xid, current_gtid.node, MSG_ABORTED);
14021404
}
14031405

0 commit comments

Comments
 (0)