Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2522bcc

Browse files
committed
Move xact static metadata (node id, generation, etc) from gid to state_3pc.
Prepares the ground for explicit 2PC: changing on the fly user provided gid would be ugly and inefficient, so let's move everything out.
1 parent 1d38260 commit 2522bcc

10 files changed

+242
-195
lines changed

src/commit.c

+25-41
Original file line numberDiff line numberDiff line change
@@ -277,25 +277,17 @@ MtmBeginTransaction()
277277
}
278278

279279
/*
280-
* Genenerate global transaction identifier for two-phase commit.
281-
* It should be unique for all nodes.
282-
* MTM-node_id-xid is part ensuring uniqueness; the rest is necessary payload.
283-
* gen_num identifies the number of generation xact belongs to.
284-
* configured is mask of configured nodes of this generation; it is required
285-
* by resolver.
286-
* (in theory we could get rid of it if we remembered generation history for
287-
* some time, but we don't currently)
288-
*
289-
* Beware that GlobalTxGCInTableProposals parses gid from SQL.
290-
*
291-
* TODO: add version and kinda backwards compatibility.
280+
* Generate global transaction identifier for two-phase commit.
281+
* It should be unique for all nodes. This is basically the only requirement;
282+
* we have some important metadata associated with xact, but we put
283+
* everything into state_3pc for seamless integration with explicit 2PC.
284+
* (we still keep gen_num here for logging)
292285
*/
293286
void
294-
MtmGenerateGid(char *gid, int node_id, TransactionId xid, uint64 gen_num,
295-
nodemask_t configured)
287+
MtmGenerateGid(char *gid, int node_id, TransactionId xid, uint64 gen_num)
296288
{
297-
sprintf(gid, "MTM-%d-" XID_FMT "-%" INT64_MODIFIER "X-%" INT64_MODIFIER "X",
298-
node_id, xid, gen_num, configured);
289+
sprintf(gid, "MTM-%d-" XID_FMT "-" UINT64_FORMAT,
290+
node_id, xid, gen_num);
299291
return;
300292
}
301293

@@ -329,20 +321,6 @@ MtmGidParseXid(const char *gid)
329321
return xid;
330322
}
331323

332-
/* parse generation configured mask from gid */
333-
nodemask_t
334-
MtmGidParseConfigured(const char *gid)
335-
{
336-
TransactionId xid;
337-
uint64 gen_num;
338-
nodemask_t configured = 0;
339-
340-
sscanf(gid, "MTM-%*d-" XID_FMT "-%" INT64_MODIFIER "X-%" INT64_MODIFIER "X",
341-
&xid, &gen_num, &configured);
342-
Assert(configured != 0);
343-
return configured;
344-
}
345-
346324
/* ensure we get the right PREPARE ack */
347325
static bool
348326
PrepareGatherHook(MtmMessage *anymsg, Datum arg)
@@ -385,6 +363,7 @@ MtmTwoPhaseCommit(void)
385363
nodemask_t pc_success_cohort;
386364
MtmGeneration xact_gen;
387365
char dmq_stream_name[DMQ_STREAM_NAME_MAXLEN];
366+
GTxState gtx_state;
388367

389368
if (MtmNo3PC)
390369
{
@@ -449,28 +428,33 @@ MtmTwoPhaseCommit(void)
449428
xact_gen = MtmGetCurrentGen(true);
450429
xid = GetTopTransactionId();
451430
MtmGenerateGid(mtm_commit_state.gid, mtm_cfg->my_node_id, xid,
452-
xact_gen.num, xact_gen.configured);
431+
xact_gen.num);
453432
sprintf(dmq_stream_name, "xid" XID_FMT, xid);
454433
dmq_stream_subscribe(dmq_stream_name);
455434
mtm_log(MtmTxTrace, "%s subscribed for %s", mtm_commit_state.gid,
456435
dmq_stream_name);
457436

458437
/* prepare transaction on our node */
459-
460438
mtm_commit_state.gtx = GlobalTxAcquire(mtm_commit_state.gid, true,
461-
false, NULL);
439+
false, NULL, 0);
462440
/*
463441
* it is simpler to mark gtx originated here as orphaned from the
464442
* beginning rather than in error handler; resolver won't touch gtx
465443
* while it is locked on us anyway
466444
*/
467445
mtm_commit_state.gtx->orphaned = true;
446+
mtm_commit_state.gtx->xinfo.coordinator = mtm_cfg->my_node_id;
447+
mtm_commit_state.gtx->xinfo.xid = xid;
448+
mtm_commit_state.gtx->xinfo.gen_num = xact_gen.num;
449+
mtm_commit_state.gtx->xinfo.configured = xact_gen.configured;
468450
Assert(mtm_commit_state.gtx->state.status == GTXInvalid);
469451
/*
470452
* PREPARE doesn't happen here; ret 0 just means we were already in
471453
* aborted transaction block and we expect the callee to handle this.
472454
*/
473-
ret = PrepareTransactionBlock(mtm_commit_state.gid);
455+
ret = PrepareTransactionBlockWithState3PC(
456+
mtm_commit_state.gid,
457+
serialize_xstate(&mtm_commit_state.gtx->xinfo, &mtm_commit_state.gtx->state));
474458
if (!ret)
475459
{
476460
Assert(false);
@@ -559,18 +543,18 @@ MtmTwoPhaseCommit(void)
559543
}
560544

561545
/* ok, we have all prepare responses, precommit */
562-
SetPreparedTransactionState(mtm_commit_state.gid,
563-
serialize_gtx_state(GTXPreCommitted,
564-
InitialGTxTerm,
565-
InitialGTxTerm),
546+
gtx_state.status = GTXPreCommitted;
547+
gtx_state.proposal = InitialGTxTerm;
548+
gtx_state.accepted = InitialGTxTerm;
549+
SetPreparedTransactionState(
550+
mtm_commit_state.gid,
551+
serialize_xstate(&mtm_commit_state.gtx->xinfo, &gtx_state),
566552
false);
567553
/*
568554
* since this moment direct aborting is not allowed; others can
569555
* receive our precommit and resolve xact to commit without us
570556
*/
571-
mtm_commit_state.gtx->state.status = GTXPreCommitted;
572-
mtm_commit_state.gtx->state.proposal = InitialGTxTerm;
573-
mtm_commit_state.gtx->state.accepted = InitialGTxTerm;
557+
mtm_commit_state.gtx->state = gtx_state;
574558
mtm_log(MtmTxFinish, "TXFINISH: %s precommitted", mtm_commit_state.gid);
575559

576560
/*

src/global_tx.c

+60-52
Original file line numberDiff line numberDiff line change
@@ -67,78 +67,81 @@ term_cmp(GlobalTxTerm t1, GlobalTxTerm t2)
6767
return 1;
6868
}
6969

70+
#define XStateVersion 1
7071

7172
char *
72-
serialize_gtx_state(GlobalTxStatus status, GlobalTxTerm term_prop, GlobalTxTerm term_acc)
73+
serialize_xstate(XactInfo *xinfo, GTxState *gtx_state)
7374
{
7475
char *state;
7576
char *status_abbr;
7677

77-
if (status == GTXInvalid)
78+
if (gtx_state->status == GTXInvalid)
7879
status_abbr = "in";
79-
else if (status == GTXPreCommitted)
80+
else if (gtx_state->status == GTXPreCommitted)
8081
status_abbr = "pc";
81-
else if (status == GTXPreAborted)
82+
else if (gtx_state->status == GTXPreAborted)
8283
status_abbr = "pa";
83-
else if (status == GTXCommitted)
84+
else if (gtx_state->status == GTXCommitted)
8485
status_abbr = "cm";
8586
else
8687
{
87-
Assert(status == GTXAborted);
88+
Assert(gtx_state->status == GTXAborted);
8889
status_abbr = "ab";
8990
}
9091

91-
state = psprintf("%s-%d:%d-%d:%d",
92+
state = psprintf("%d-%d-" XID_FMT "-%" INT64_MODIFIER "X-%" INT64_MODIFIER "X-%s-%d:%d-%d:%d",
93+
XStateVersion,
94+
xinfo->coordinator,
95+
xinfo->xid,
96+
xinfo->gen_num,
97+
xinfo->configured,
9298
status_abbr,
93-
term_prop.ballot, term_prop.node_id,
94-
term_acc.ballot, term_acc.node_id);
99+
gtx_state->proposal.ballot, gtx_state->proposal.node_id,
100+
gtx_state->accepted.ballot, gtx_state->accepted.node_id);
95101
return state;
96102
}
97103

98-
void
99-
parse_gtx_state(const char *state, GlobalTxStatus *status,
100-
GlobalTxTerm *term_prop, GlobalTxTerm *term_acc)
104+
/* returns 0 on success */
105+
int
106+
deserialize_xstate(const char *state, XactInfo *xinfo, GTxState *gtx_state,
107+
int elevel)
101108
{
102109
int n_parsed = 0;
110+
char status_abbr[3]; /* must be big enough for '\0' */
103111

104112
Assert(state);
105113

106-
/*
107-
* Might be immediately after PrepareTransaction. It would be better to
108-
* also pass state_3pc to it, but...
109-
*/
110-
if (state[0] == '\0')
114+
n_parsed = sscanf(state, "%*d-%d-" XID_FMT "-%" INT64_MODIFIER "X-%" INT64_MODIFIER "X-%2s-%d:%d-%d:%d",
115+
&xinfo->coordinator,
116+
&xinfo->xid,
117+
&xinfo->gen_num,
118+
&xinfo->configured,
119+
status_abbr,
120+
&gtx_state->proposal.ballot,
121+
&gtx_state->proposal.node_id,
122+
&gtx_state->accepted.ballot,
123+
&gtx_state->accepted.node_id);
124+
if (n_parsed != 9)
111125
{
112-
*status = GTXInvalid;
113-
*term_prop = InitialGTxTerm;
114-
*term_acc = InvalidGTxTerm;
126+
mtm_log(elevel, "GlobalTxLoadAll: failed to deparse state_3pc %s, ignoring it (res=%d)",
127+
state, n_parsed);
128+
return n_parsed;
115129
}
130+
131+
if (strncmp(status_abbr, "in", 2) == 0)
132+
gtx_state->status = GTXInvalid;
133+
else if (strncmp(status_abbr, "pc", 2) == 0)
134+
gtx_state->status = GTXPreCommitted;
135+
else if (strncmp(status_abbr, "pa", 2) == 0)
136+
gtx_state->status = GTXPreAborted;
137+
else if (strncmp(status_abbr, "cm", 3) == 0)
138+
gtx_state->status = GTXCommitted;
116139
else
117140
{
118-
if (strncmp(state, "in-", 3) == 0)
119-
*status = GTXInvalid;
120-
else if (strncmp(state, "pc-", 3) == 0)
121-
*status = GTXPreCommitted;
122-
else if (strncmp(state, "pa-", 3) == 0)
123-
*status = GTXPreAborted;
124-
else if (strncmp(state, "cm-", 3) == 0)
125-
*status = GTXCommitted;
126-
else
127-
{
128-
Assert((strncmp(state, "ab-", 3) == 0));
129-
*status = GTXAborted;
130-
}
131-
132-
n_parsed = sscanf(state + 3, "%d:%d-%d:%d",
133-
&term_prop->ballot, &term_prop->node_id,
134-
&term_acc->ballot, &term_acc->node_id);
135-
136-
if (n_parsed != 4)
137-
{
138-
Assert(false);
139-
mtm_log(PANIC, "wrong state_3pc format: %s", state);
140-
}
141+
Assert((strncmp(status_abbr, "ab", 2) == 0));
142+
gtx_state->status = GTXAborted;
141143
}
144+
return 0;
142145
}
143146

144147
void
@@ -219,10 +222,11 @@ GlobalTxEnsureBeforeShmemExitHook(void)
219222
* If nowait_own_live is true, gtx is already locked, I am the coordinator and
220223
* gtx is not orphaned, don't wait for release -- backend is still working on
221224
* xact, which may be very long. *busy (if provided) is set to true in this
222-
* case.
225+
* case. coordinator must be passed for this to work.
223226
*/
224227
GlobalTx *
225-
GlobalTxAcquire(const char *gid, bool create, bool nowait_own_live, bool *busy)
228+
GlobalTxAcquire(const char *gid, bool create, bool nowait_own_live, bool *busy,
229+
int coordinator)
226230
{
227231
GlobalTx *gtx = NULL;
228232
bool found;
@@ -274,9 +278,7 @@ GlobalTxAcquire(const char *gid, bool create, bool nowait_own_live, bool *busy)
274278

275279
if (nowait_own_live)
276280
{
277-
int tx_node_id = MtmGidParseNodeId(gid);
278-
279-
if (tx_node_id == Mtm->my_node_id && !gtx->orphaned)
281+
if (coordinator == Mtm->my_node_id && !gtx->orphaned)
280282
{
281283
if (busy)
282284
*busy = true;
@@ -374,8 +376,15 @@ GlobalTxLoadAll()
374376
Assert(!found);
375377

376378
gtx->acquired_by = InvalidBackendId;
377-
parse_gtx_state(pxacts[i].state_3pc, &gtx->state.status,
378-
&gtx->state.proposal, &gtx->state.accepted);
379+
/*
380+
* Allow instance to start even if we have problems parsing xstate...
381+
*/
382+
if (deserialize_xstate(pxacts[i].state_3pc, &gtx->xinfo, &gtx->state,
383+
WARNING) != 0)
384+
{
385+
hash_search(gtx_shared->gid2gtx, pxacts[i].gid, HASH_REMOVE, &found);
386+
continue;
387+
}
379388
gtx->prepared = true;
380389
gtx->orphaned = true;
381390
gtx->resolver_stage = GTRS_AwaitStatus;
@@ -420,8 +429,7 @@ GlobalTxMarkOrphaned(int node_id)
420429
hash_seq_init(&hash_seq, gtx_shared->gid2gtx);
421430
while ((gtx = hash_seq_search(&hash_seq)) != NULL)
422431
{
423-
int tx_node_id = MtmGidParseNodeId(gtx->gid);
424-
if (tx_node_id == node_id)
432+
if (gtx->xinfo.coordinator == node_id)
425433
{
426434
gtx->orphaned = true;
427435
mtm_log(MtmTxTrace, "%s is orphaned", gtx->gid);

src/include/commit.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@
1717

1818
#include "messaging.h"
1919

20-
extern void MtmGenerateGid(char *gid, int node_id, TransactionId xid, uint64 gen_num,
21-
nodemask_t configured);
20+
extern void MtmGenerateGid(char *gid, int node_id, TransactionId xid,
21+
uint64 gen_num);
2222
extern uint64 MtmGidParseGenNum(const char *gid);
2323
extern int MtmGidParseNodeId(const char *gid);
2424
extern TransactionId MtmGidParseXid(const char *gid);
25-
extern nodemask_t MtmGidParseConfigured(const char *gid);
2625

2726
extern bool MtmTwoPhaseCommit(void);
2827
extern void MtmBeginTransaction(void);

src/include/global_tx.h

+31-10
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,38 @@ typedef enum
4545

4646
typedef struct
4747
{
48-
GlobalTxTerm proposal; /* nextBal in terms of The Part-Time Paliament */
49-
GlobalTxTerm accepted; /* prevBal in terms of The Part-Time Paliament */
50-
GlobalTxStatus status; /* prevDec in terms of The Part-Time Paliament */
48+
GlobalTxTerm proposal; /* nextBal in terms of The Part-Time Parliament */
49+
GlobalTxTerm accepted; /* prevBal in terms of The Part-Time Parliament */
50+
GlobalTxStatus status; /*
51+
* prevDec in terms of The Part-Time Parliament
52+
* (or special never voted | commit | abort)
53+
*/
5154
} GTxState;
5255

56+
/*
57+
* Constant xact metadata which we encode into state_3pc. We could (and
58+
* previously did) carry that directly in gid, but this intervenes with
59+
* explicit 2PC usage: applier must know generation of the xact, and
60+
* scribbling over user-provided gid is ugly and/or inefficient.
61+
*/
62+
typedef struct
63+
{
64+
int coordinator; /* node id who initiated the transaction */
65+
TransactionId xid; /* xid at coordinator */
66+
uint64 gen_num; /* the number of generation xact belongs to */
67+
nodemask_t configured; /* mask of configured nodes of this generation;
68+
* the idea was to use this by resolver, but it
69+
* wasn't finished. We shouldn't have any problems
70+
* with this anyway if all xacts created before
71+
* first node add-rm are resolved before the
72+
* second one is started
73+
*/
74+
} XactInfo;
75+
5376
typedef struct GlobalTx
5477
{
5578
char gid[GIDSIZE];
79+
XactInfo xinfo;
5680
XLogRecPtr coordinator_end_lsn;
5781
BackendId acquired_by;
5882
/* paxos voting state for this xact */
@@ -86,21 +110,18 @@ void MtmGlobalTxInit(void);
86110
void MtmGlobalTxShmemStartup(void);
87111
void GlobalTxEnsureBeforeShmemExitHook(void);
88112
GlobalTx *GlobalTxAcquire(const char *gid, bool create, bool nowait_own_live,
89-
bool *busy);
113+
bool *busy, int coordinator);
90114
void GlobalTxRelease(GlobalTx *gtx);
91115
void GlobalTxAtExit(int code, Datum arg);
92116
void GlobalTxLoadAll(void);
93-
char *serialize_gtx_state(GlobalTxStatus status, GlobalTxTerm term_prop,
94-
GlobalTxTerm term_acc);
117+
char *serialize_xstate(XactInfo *xinfo, GTxState *gtx_state);
95118
int term_cmp(GlobalTxTerm t1, GlobalTxTerm t2);
96-
void parse_gtx_state(const char *state, GlobalTxStatus *status,
97-
GlobalTxTerm *term_prop, GlobalTxTerm *term_acc);
119+
int deserialize_xstate(const char *state, XactInfo *xinfo, GTxState *gtx_state,
120+
int elevel);
98121
GlobalTxTerm GlobalTxGetMaxProposal(void);
99122
void GlobalTxSaveInTable(const char *gid, XLogRecPtr coordinator_end_lsn,
100123
GlobalTxStatus status,
101124
GlobalTxTerm term_prop, GlobalTxTerm term_acc);
102-
void GlobalTxDeleteFromTable(const char *gid);
103-
void GlobalTxGCInTableProposals(void);
104125
void GlobalTxMarkOrphaned(int node_id);
105126

106127
char *GlobalTxToString(GlobalTx *gtx);

src/include/messaging.h

+2
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ typedef struct
9797
MtmTxRequestValue type;
9898
GlobalTxTerm term;
9999
const char *gid;
100+
int coordinator;
101+
uint64 gen_num;
100102
XLogRecPtr coordinator_end_lsn; /* matters for 1a */
101103
} MtmTxRequest;
102104

0 commit comments

Comments
 (0)