Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 094c29d

Browse files
committed
rework commit and resolve messaging wire format
1 parent 2328b01 commit 094c29d

File tree

7 files changed

+151
-219
lines changed

7 files changed

+151
-219
lines changed

src/commit.c

Lines changed: 50 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,20 @@
2121
#include "catalog/pg_subscription.h"
2222
#include "tcop/tcopprot.h"
2323
#include "postmaster/autovacuum.h"
24+
#include "libpq/pqformat.h"
2425

2526
#include "multimaster.h"
2627
#include "logger.h"
2728
#include "ddl.h"
2829
#include "state.h"
2930
#include "syncpoint.h"
3031

32+
typedef struct
33+
{
34+
StringInfo message;
35+
int node_id;
36+
} mtm_msg;
37+
3138
static bool force_in_bgworker;
3239

3340
static bool subchange_cb_registered;
@@ -38,10 +45,7 @@ static MtmConfig *mtm_cfg;
3845

3946
MtmCurrentTrans MtmTx;
4047

41-
static bool GatherPrepares(TransactionId xid, nodemask_t participantsMask,
42-
int *failed_at);
43-
static void GatherPrecommits(TransactionId xid, nodemask_t participantsMask,
44-
MtmMessageCode code);
48+
static void gather(uint64 participants, mtm_msg *messages, int *msg_count);
4549

4650
static void
4751
pubsub_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -170,12 +174,13 @@ MtmBeginTransaction()
170174
bool
171175
MtmTwoPhaseCommit()
172176
{
173-
nodemask_t participantsMask;
174-
bool ret;
175-
int failed_at = 0;
177+
uint64 participants;
178+
bool ret;
176179
TransactionId xid;
177-
char stream[DMQ_NAME_MAXLEN];
178-
pgid_t gid;
180+
char stream[DMQ_NAME_MAXLEN];
181+
char gid[GIDSIZE];
182+
mtm_msg messages[MTM_MAX_NODES];
183+
int n_messages;
179184

180185
if (!MtmTx.contains_persistent_ddl && !MtmTx.contains_dml)
181186
return false;
@@ -206,7 +211,7 @@ MtmTwoPhaseCommit()
206211
*
207212
* It is only used during startup of WalSender(node_id) in recovered mode
208213
* to create a barrier after which all transactions doing our 3PC are
209-
* guaranted to have seen participantsMask with node_id enabled, so the
214+
* guaranted to have seen participants with node_id enabled, so the
210215
* receiver can apply them in parallel and be sure that precommit will
211216
* not happens before node_id applies prepare.
212217
*
@@ -217,8 +222,8 @@ MtmTwoPhaseCommit()
217222

218223
LWLockAcquire(MtmCommitBarrier, LW_SHARED);
219224

220-
participantsMask = MtmGetEnabledNodeMask() &
221-
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
225+
participants = MtmGetEnabledNodeMask() &
226+
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
222227

223228
ret = PrepareTransactionBlock(gid);
224229
if (!ret)
@@ -230,119 +235,62 @@ MtmTwoPhaseCommit()
230235
mtm_log(MtmTxFinish, "TXFINISH: %s prepared", gid);
231236
CommitTransactionCommand();
232237

233-
ret = GatherPrepares(xid, participantsMask, &failed_at);
234-
if (!ret)
238+
gather(participants, messages, &n_messages);
239+
dmq_stream_unsubscribe(stream);
240+
241+
for (int i = 0; i < n_messages; i++)
235242
{
236-
dmq_stream_unsubscribe(stream);
237-
FinishPreparedTransaction(gid, false, false);
238-
mtm_log(MtmTxFinish, "TXFINISH: %s aborted", gid);
239-
mtm_log(ERROR, "Failed to prepare transaction %s at node %d",
240-
gid, failed_at);
243+
MtmMessageCode status = pq_getmsgbyte(messages[i].message);
244+
245+
Assert(status == MSG_PREPARED || status == MSG_ABORTED);
246+
if (status == MSG_ABORTED)
247+
{
248+
FinishPreparedTransaction(gid, false, false);
249+
mtm_log(MtmTxFinish, "TXFINISH: %s aborted", gid);
250+
mtm_log(ERROR, "Failed to prepare transaction %s at node %d",
251+
gid, messages[i].node_id);
252+
}
241253
}
242254

255+
dmq_stream_subscribe(gid);
256+
243257
SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
244258
mtm_log(MtmTxFinish, "TXFINISH: %s precommitted", gid);
245-
GatherPrecommits(xid, participantsMask, MSG_PRECOMMITTED);
259+
gather(participants, messages, &n_messages);
246260

247261
StartTransactionCommand();
248262
FinishPreparedTransaction(gid, true, false);
249263
mtm_log(MtmTxFinish, "TXFINISH: %s committed", gid);
250-
GatherPrecommits(xid, participantsMask, MSG_COMMITTED);
264+
// XXX: make this conditional
265+
gather(participants, messages, &n_messages);
251266

252267
LWLockRelease(MtmCommitBarrier);
253268

254-
dmq_stream_unsubscribe(stream);
255-
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, stream);
269+
dmq_stream_unsubscribe(gid);
270+
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, gid);
256271

257272
MaybeLogSyncpoint(false);
258273

259274
return true;
260275
}
261276

262-
static bool
263-
GatherPrepares(TransactionId xid, nodemask_t participantsMask, int *failed_at)
264-
{
265-
bool prepared = true;
266-
267-
while (participantsMask != 0)
268-
{
269-
bool ret;
270-
DmqSenderId sender_id;
271-
StringInfoData buffer;
272-
MtmArbiterMessage *msg;
273-
274-
ret = dmq_pop(&sender_id, &buffer, participantsMask);
275-
276-
if (ret)
277-
{
278-
msg = (MtmArbiterMessage *) buffer.data;
279-
280-
Assert(msg->node == sender_to_node[sender_id]);
281-
Assert(msg->code == MSG_PREPARED || msg->code == MSG_ABORTED);
282-
Assert(msg->dxid == xid);
283-
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
284-
285-
mtm_log(MtmTxTrace,
286-
"GatherPrepares: got '%s' for tx" XID_FMT " from node%d",
287-
msg->code == MSG_PREPARED ? "ok" : "failed",
288-
xid, sender_to_node[sender_id]);
289-
290-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
291-
292-
if (msg->code == MSG_ABORTED)
293-
{
294-
prepared = false;
295-
*failed_at = msg->node;
296-
}
297-
}
298-
else
299-
{
300-
/*
301-
* If queue is detached then the neignbour node is probably
302-
* disconnected. Let's wait when it became disabled as we can
303-
* became offline by this time.
304-
*/
305-
if (!BIT_CHECK(MtmGetEnabledNodeMask(), sender_to_node[sender_id] - 1))
306-
{
307-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
308-
mtm_log(MtmTxTrace,
309-
"GatherPrepares: dropping node%d from participants of tx" XID_FMT,
310-
sender_to_node[sender_id], xid);
311-
}
312-
}
313-
}
314-
315-
// XXX: assert that majority has responded
316-
317-
return prepared;
318-
}
319-
320277
static void
321-
GatherPrecommits(TransactionId xid, nodemask_t participantsMask, MtmMessageCode code)
278+
gather(uint64 participants, mtm_msg *messages, int *msg_count)
322279
{
323-
while (participantsMask != 0)
280+
*msg_count = 0;
281+
while (participants != 0)
324282
{
325283
bool ret;
326284
DmqSenderId sender_id;
327-
StringInfoData buffer;
328-
MtmArbiterMessage *msg;
329-
330-
ret = dmq_pop(&sender_id, &buffer, participantsMask);
285+
StringInfo msg = makeStringInfo();
331286

287+
ret = dmq_pop(&sender_id, msg, participants);
332288
if (ret)
333289
{
334-
msg = (MtmArbiterMessage *) buffer.data;
335-
336-
Assert(msg->node == sender_to_node[sender_id]);
337-
Assert(msg->code == code);
338-
Assert(msg->dxid == xid);
339-
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
340-
341-
mtm_log(MtmTxTrace,
342-
"GatherPrecommits: got 'ok' for tx" XID_FMT " from node%d",
343-
xid, sender_to_node[sender_id]);
344-
345-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
290+
messages[*msg_count].message = msg;
291+
messages[*msg_count].node_id = sender_to_node[sender_id];
292+
(*msg_count)++;
293+
BIT_CLEAR(participants, sender_to_node[sender_id] - 1);
346294
}
347295
else
348296
{
@@ -353,15 +301,13 @@ GatherPrecommits(TransactionId xid, nodemask_t participantsMask, MtmMessageCode
353301
*/
354302
if (!BIT_CHECK(MtmGetEnabledNodeMask(), sender_to_node[sender_id] - 1))
355303
{
356-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
304+
BIT_CLEAR(participants, sender_to_node[sender_id] - 1);
357305
mtm_log(MtmTxTrace,
358-
"GatherPrecommit: dropping node%d from participants of tx" XID_FMT,
359-
sender_to_node[sender_id], xid);
306+
"GatherPrecommit: dropping node%d from tx participants",
307+
sender_to_node[sender_id]);
360308
}
361309
}
362310
}
363-
364-
// XXX: assert that majority has responded
365311
}
366312

367313
/*

src/dmq.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1475,7 +1475,7 @@ dmq_pop(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
14751475
{
14761476
msg->data = data;
14771477
msg->len = len;
1478-
msg->maxlen = len;
1478+
msg->maxlen = -1;
14791479
msg->cursor = 0;
14801480
*sender_id = i;
14811481

@@ -1541,7 +1541,7 @@ dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg, uint64 mask)
15411541
{
15421542
msg->data = data;
15431543
msg->len = len;
1544-
msg->maxlen = len;
1544+
msg->maxlen = -1;
15451545
msg->cursor = 0;
15461546
*sender_id = i;
15471547

src/include/multimaster.h

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -123,31 +123,12 @@ typedef enum
123123
typedef enum
124124
{
125125
MSG_INVALID,
126-
MSG_HANDSHAKE,
127126
MSG_PREPARED,
128-
MSG_PRECOMMIT,
129127
MSG_PRECOMMITTED,
130128
MSG_COMMITTED,
131-
MSG_ABORTED,
132-
MSG_STATUS,
133-
MSG_HEARTBEAT,
134-
MSG_POLL_REQUEST,
135-
MSG_POLL_STATUS
129+
MSG_ABORTED
136130
} MtmMessageCode;
137131

138-
typedef struct
139-
{
140-
MtmMessageCode code; /* Message code: MSG_PREPARE, MSG_PRECOMMIT,
141-
* MSG_COMMIT, MSG_ABORT,... */
142-
MtmTxState state;
143-
int node; /* Sender node ID */
144-
TransactionId dxid; /* Transaction ID at destination node */
145-
TransactionId sxid; /* Transaction ID at sender node */
146-
nodemask_t connectivityMask; /* Connectivity bitmask at the sender of
147-
* message */
148-
pgid_t gid; /* Global transaction identifier */
149-
} MtmArbiterMessage;
150-
151132
typedef struct
152133
{
153134
Oid sourceTable;
@@ -242,8 +223,6 @@ extern MtmNode *MtmNodeById(MtmConfig *cfg, int node_id);
242223

243224
extern void MtmStateFill(MtmConfig *cfg);
244225

245-
extern void MtmInitMessage(MtmArbiterMessage *msg, MtmMessageCode code);
246-
247226
extern bool MtmIsEnabled(void);
248227

249228
extern void MtmToggleReplication(void);

src/multimaster.c

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -250,19 +250,6 @@ static void MtmResumeTransaction(void* ctx)
250250
free(ctx);
251251
}
252252

253-
254-
/*
255-
* Initialize message
256-
*/
257-
void MtmInitMessage(MtmArbiterMessage* msg, MtmMessageCode code)
258-
{
259-
memset(msg, '\0', sizeof(MtmArbiterMessage));
260-
261-
msg->code = code;
262-
msg->connectivityMask = MtmGetConnectedNodeMask();
263-
msg->node = Mtm->my_node_id;
264-
}
265-
266253
/*
267254
* Perform initialization of multimaster state.
268255
* This function is called from shared memory startup hook (after completion of initialization of shared memory)

0 commit comments

Comments
 (0)