Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5b4619f

Browse files
committed
dmq integration: be more mindfull about gathering votes in presence of failures
1 parent 5485d9f commit 5b4619f

File tree

4 files changed

+99
-29
lines changed

4 files changed

+99
-29
lines changed

commit.c

Lines changed: 88 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -180,32 +180,65 @@ GatherPrepares(MtmCurrentTrans* x, nodemask_t participantsMask, int *failed_at)
180180

181181
while (participantsMask != 0)
182182
{
183+
bool ret;
183184
DmqSenderId sender_id;
184185
StringInfoData buffer;
185186
MtmArbiterMessage *msg;
186187

187-
dmq_pop(&sender_id, &buffer, participantsMask);
188-
msg = (MtmArbiterMessage *) buffer.data;
188+
ret = dmq_pop(&sender_id, &buffer, participantsMask);
189189

190-
Assert(msg->node == sender_to_node[sender_id]);
191-
Assert(msg->code == MSG_PREPARED || msg->code == MSG_ABORTED);
192-
Assert(msg->dxid == x->xid);
193-
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
190+
if (ret)
191+
{
192+
msg = (MtmArbiterMessage *) buffer.data;
193+
194+
Assert(msg->node == sender_to_node[sender_id]);
195+
Assert(msg->code == MSG_PREPARED || msg->code == MSG_ABORTED);
196+
Assert(msg->dxid == x->xid);
197+
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
194198

195-
mtm_log(MtmTxTrace,
196-
"GatherPrepares: got '%s' for %s from node%d",
197-
msg->code == MSG_PREPARED ? "ok" : "failed",
198-
msg->gid, sender_to_node[sender_id]);
199+
mtm_log(MtmTxTrace,
200+
"GatherPrepares: got '%s' for tx" XID_FMT " from node%d",
201+
msg->code == MSG_PREPARED ? "ok" : "failed",
202+
x->xid, sender_to_node[sender_id]);
199203

200-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
204+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
201205

202-
if (msg->code == MSG_ABORTED)
206+
if (msg->code == MSG_ABORTED)
207+
{
208+
prepared = false;
209+
*failed_at = msg->node;
210+
}
211+
}
212+
else
203213
{
204-
prepared = false;
205-
*failed_at = msg->node;
214+
/*
215+
* If queue is detached then the neignbour node is probably
216+
* disconnected. Let's wait when it became disabled as we can
217+
* became offline by this time.
218+
*/
219+
MtmLock(LW_SHARED);
220+
if (BIT_CHECK(Mtm->disabledNodeMask, sender_to_node[sender_id] - 1))
221+
{
222+
if (Mtm->status != MTM_ONLINE)
223+
{
224+
elog(ERROR, "our node was disabled during transaction commit");
225+
}
226+
else
227+
{
228+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
229+
mtm_log(MtmTxTrace,
230+
"GatherPrepares: dropping node%d from participants of tx" XID_FMT,
231+
sender_to_node[sender_id], x->xid);
232+
prepared = false;
233+
*failed_at = sender_to_node[sender_id];
234+
}
235+
}
236+
MtmUnlock();
206237
}
207238
}
208239

240+
// XXX: assert that majority has responded
241+
209242
return prepared;
210243
}
211244

@@ -216,22 +249,53 @@ GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask)
216249

217250
while (participantsMask != 0)
218251
{
252+
bool ret;
219253
DmqSenderId sender_id;
220254
StringInfoData buffer;
221255
MtmArbiterMessage *msg;
222256

223-
dmq_pop(&sender_id, &buffer, participantsMask);
224-
msg = (MtmArbiterMessage *) buffer.data;
257+
ret = dmq_pop(&sender_id, &buffer, participantsMask);
225258

226-
Assert(msg->node == sender_to_node[sender_id]);
227-
Assert(msg->code == MSG_PRECOMMITTED);
228-
Assert(msg->dxid == x->xid);
229-
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
259+
if (ret)
260+
{
261+
msg = (MtmArbiterMessage *) buffer.data;
262+
263+
Assert(msg->node == sender_to_node[sender_id]);
264+
Assert(msg->code == MSG_PRECOMMITTED);
265+
Assert(msg->dxid == x->xid);
266+
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
230267

231-
mtm_log(MtmTxTrace,
232-
"GatherPrecommits: got 'ok' for %s from node%d",
233-
msg->gid, sender_to_node[sender_id]);
268+
mtm_log(MtmTxTrace,
269+
"GatherPrecommits: got 'ok' for tx" XID_FMT " from node%d",
270+
x->xid, sender_to_node[sender_id]);
234271

235-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
272+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
273+
}
274+
else
275+
{
276+
/*
277+
* If queue is detached then the neignbour node is probably
278+
* disconnected. Let's wait when it became disabled as we can
279+
* became offline by this time.
280+
*/
281+
MtmLock(LW_SHARED);
282+
if (BIT_CHECK(Mtm->disabledNodeMask, sender_to_node[sender_id] - 1))
283+
{
284+
if (Mtm->status != MTM_ONLINE)
285+
{
286+
elog(ERROR, "our node was disabled during transaction commit");
287+
}
288+
else
289+
{
290+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
291+
mtm_log(MtmTxTrace,
292+
"GatherPrecommit: dropping node%d from participants of tx" XID_FMT,
293+
sender_to_node[sender_id], x->xid);
294+
}
295+
}
296+
MtmUnlock();
297+
}
236298
}
299+
300+
// XXX: assert that majority has responded
237301
}

dmq.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,7 +1256,7 @@ dmq_stream_unsubscribe(char *stream_name)
12561256
Assert(found);
12571257
}
12581258

1259-
void
1259+
bool
12601260
dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
12611261
{
12621262
shm_mq_result res;
@@ -1292,7 +1292,7 @@ dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
12921292
mtm_log(DmqTraceIncoming,
12931293
"[DMQ] dmq_pop: got message %s from %s",
12941294
(char *) data, dmq_local.inhandles[i].name);
1295-
return;
1295+
return true;
12961296
}
12971297
else if (res == SHM_MQ_DETACHED)
12981298
{
@@ -1301,9 +1301,15 @@ dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
13011301
dmq_local.inhandles[i].name);
13021302

13031303
if (dmq_reattach_shm_mq(i))
1304+
{
13041305
nowait = true;
1306+
}
13051307
else
1306-
mtm_log(ERROR, "[DMQ] dmq_pop: failed to reattach");
1308+
{
1309+
*sender_id = i;
1310+
return false;
1311+
}
1312+
// mtm_log(ERROR, "[DMQ] dmq_pop: failed to reattach");
13071313
}
13081314
}
13091315

dmq.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ extern void dmq_attach_receiver(char *sender_name, int mask_pos);
1818
extern void dmq_stream_subscribe(char *stream_name);
1919
extern void dmq_stream_unsubscribe(char *stream_name);
2020

21-
extern void dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
21+
extern bool dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
2222
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask);
2323

2424
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);

pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -786,7 +786,7 @@ mtm_send_reply(TransactionId xid, int node_id, MtmMessageCode msg_code)
786786
dmq_push_buffer(dest_id, psprintf("xid" XID_FMT, msg.dxid),
787787
&msg, sizeof(MtmArbiterMessage));
788788

789-
elog(LOG, "MtmFollowerSendReply: %s to node%d (dest %d)", msg.gid, node_id, dest_id);
789+
// elog(LOG, "MtmFollowerSendReply: %s to node%d (dest %d)", msg.gid, node_id, dest_id);
790790
}
791791

792792
static void

0 commit comments

Comments
 (0)