Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0fe2cf8

Browse files
committed
more node checks during gather
1 parent f44f61b commit 0fe2cf8

File tree

5 files changed

+41
-14
lines changed

5 files changed

+41
-14
lines changed

src/commit.c

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include "tcop/tcopprot.h"
2323
#include "postmaster/autovacuum.h"
2424
#include "libpq/pqformat.h"
25+
#include "pgstat.h"
26+
#include "storage/ipc.h"
2527

2628
#include "multimaster.h"
2729
#include "logger.h"
@@ -32,7 +34,7 @@
3234

3335
typedef struct
3436
{
35-
StringInfo message;
37+
StringInfoData message;
3638
int node_id;
3739
} mtm_msg;
3840

@@ -267,7 +269,7 @@ MtmTwoPhaseCommit()
267269

268270
for (i = 0; i < n_messages; i++)
269271
{
270-
MtmMessageCode status = pq_getmsgbyte(messages[i].message);
272+
MtmMessageCode status = pq_getmsgbyte(&messages[i].message);
271273

272274
Assert(status == MSG_PREPARED || status == MSG_ABORTED);
273275
if (status == MSG_ABORTED)
@@ -307,11 +309,13 @@ gather(uint64 participants, mtm_msg *messages, int *msg_count)
307309
*msg_count = 0;
308310
while (participants != 0)
309311
{
310-
bool ret;
311-
DmqSenderId sender_id;
312-
StringInfo msg = makeStringInfo();
312+
bool ret;
313+
DmqSenderId sender_id;
314+
StringInfoData msg;
315+
int rc;
316+
bool wait;
313317

314-
ret = dmq_pop(&sender_id, msg, participants);
318+
ret = dmq_pop_nb(&sender_id, &msg, participants, &wait);
315319
if (ret)
316320
{
317321
messages[*msg_count].message = msg;
@@ -323,7 +327,7 @@ gather(uint64 participants, mtm_msg *messages, int *msg_count)
323327
"gather: got message from node%d",
324328
sender_to_node[sender_id]);
325329
}
326-
else
330+
else if (sender_id >= 0)
327331
{
328332
/*
329333
* If queue is detached then the neignbour node is probably
@@ -338,6 +342,19 @@ gather(uint64 participants, mtm_msg *messages, int *msg_count)
338342
sender_to_node[sender_id]);
339343
}
340344
}
345+
346+
if (wait)
347+
{
348+
// XXX cache that
349+
rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, 100.0,
350+
PG_WAIT_EXTENSION);
351+
352+
if (rc & WL_POSTMASTER_DEATH)
353+
proc_exit(1);
354+
355+
if (rc & WL_LATCH_SET)
356+
ResetLatch(MyLatch);
357+
}
341358
}
342359
}
343360

@@ -373,7 +390,7 @@ MtmExplicitPrepare(char *gid)
373390

374391
for (i = 0; i < n_messages; i++)
375392
{
376-
MtmMessageCode status = pq_getmsgbyte(messages[i].message);
393+
MtmMessageCode status = pq_getmsgbyte(&messages[i].message);
377394

378395
Assert(status == MSG_PREPARED || status == MSG_ABORTED);
379396
if (status == MSG_ABORTED)

src/dmq.c

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,11 +1529,14 @@ dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15291529
}
15301530

15311531
bool
1532-
dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
1532+
dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask, bool *wait)
15331533
{
15341534
shm_mq_result res;
15351535
int i;
15361536

1537+
*wait = true;
1538+
*sender_id = -1;
1539+
15371540
for (i = 0; i < dmq_local.n_inhandles; i++)
15381541
{
15391542
Size len;
@@ -1553,7 +1556,9 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15531556
msg->len = len;
15541557
msg->maxlen = -1;
15551558
msg->cursor = 0;
1559+
15561560
*sender_id = i;
1561+
*wait = false;
15571562

15581563
mtm_log(DmqTraceIncoming,
15591564
"[DMQ] dmq_pop_nb: got message %s (len=%zu) from %s",
@@ -1562,8 +1567,11 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15621567
}
15631568
else if (res == SHM_MQ_DETACHED)
15641569
{
1565-
if (!dmq_reattach_shm_mq(i))
1566-
mtm_log(WARNING, "[DMQ] dmq_pop_nb: failed to reattach");
1570+
*sender_id = i;
1571+
1572+
if (dmq_reattach_shm_mq(i))
1573+
*wait = false;
1574+
15671575
return false;
15681576
}
15691577
}

src/include/dmq.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ extern void dmq_stream_subscribe(char *stream_name);
2424
extern void dmq_stream_unsubscribe(char *stream_name);
2525

2626
extern bool dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
27-
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
27+
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask, bool *wait);
2828

2929
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);
3030
extern void dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *buffer, size_t len);

src/resolver.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,8 +467,9 @@ handle_responses(void)
467467
{
468468
DmqSenderId sender_id;
469469
StringInfoData msg;
470+
bool wait;
470471

471-
while(dmq_pop_nb(&sender_id, &msg, MtmGetConnectedNodeMask()))
472+
while(dmq_pop_nb(&sender_id, &msg, MtmGetConnectedNodeMask(), &wait))
472473
{
473474
int node_id;
474475
const char *gid;

src/state.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1196,8 +1196,9 @@ check_status_requests(MtmConfig *mtm_cfg)
11961196
{
11971197
DmqSenderId sender_id;
11981198
StringInfoData msg;
1199+
bool wait;
11991200

1200-
while(dmq_pop_nb(&sender_id, &msg, MtmGetConnectedNodeMask()))
1201+
while(dmq_pop_nb(&sender_id, &msg, MtmGetConnectedNodeMask(), &wait))
12011202
{
12021203
DmqDestinationId dest_id;
12031204
StringInfoData response_msg;

0 commit comments

Comments
 (0)