Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fd54999

Browse files
committed
Explicit twophase transactions
1 parent 094c29d commit fd54999

File tree

9 files changed

+238
-49
lines changed

9 files changed

+238
-49
lines changed

expected/multimaster.out

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,3 +271,35 @@ SELECT name FROM pg_cursors ORDER BY 1;
271271
(0 rows)
272272

273273
COMMIT;
274+
-- explicit 2pc
275+
begin;
276+
create table twopc_test(i int primary key);
277+
insert into twopc_test values (1);
278+
prepare transaction 'x';
279+
begin;
280+
create table twopc_test2(i int primary key);
281+
insert into twopc_test2 values (2);
282+
prepare transaction 'y';
283+
rollback prepared 'y';
284+
commit prepared 'x';
285+
begin;
286+
create table twopc_test2(i int primary key);
287+
insert into twopc_test2 values (2);
288+
prepare transaction 'y';
289+
begin;
290+
commit prepared 'y';
291+
ERROR: COMMIT PREPARED cannot run inside a transaction block
292+
rollback;
293+
commit prepared 'y';
294+
table twopc_test;
295+
i
296+
---
297+
1
298+
(1 row)
299+
300+
table twopc_test2;
301+
i
302+
---
303+
2
304+
(1 row)
305+

sql/multimaster.sql

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,3 +200,33 @@ SELECT name FROM pg_cursors ORDER BY 1;
200200
COMMIT;
201201

202202

203+
-- explicit 2pc
204+
205+
begin;
206+
create table twopc_test(i int primary key);
207+
insert into twopc_test values (1);
208+
prepare transaction 'x';
209+
210+
begin;
211+
create table twopc_test2(i int primary key);
212+
insert into twopc_test2 values (2);
213+
prepare transaction 'y';
214+
215+
rollback prepared 'y';
216+
commit prepared 'x';
217+
218+
begin;
219+
create table twopc_test2(i int primary key);
220+
insert into twopc_test2 values (2);
221+
prepare transaction 'y';
222+
223+
begin;
224+
commit prepared 'y';
225+
rollback;
226+
227+
commit prepared 'y';
228+
229+
table twopc_test;
230+
table twopc_test2;
231+
232+

src/commit.c

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "ddl.h"
2929
#include "state.h"
3030
#include "syncpoint.h"
31+
#include "commit.h"
3132

3233
typedef struct
3334
{
@@ -69,7 +70,7 @@ detach_node(int node_id, MtmConfig *new_cfg, Datum arg)
6970
}
7071

7172
void
72-
MtmXactCallback2(XactEvent event, void *arg)
73+
MtmXactCallback(XactEvent event, void *arg)
7374
{
7475
/*
7576
* Perform distributed commit only for transactions in ordinary
@@ -137,11 +138,9 @@ MtmBeginTransaction()
137138
}
138139

139140
/* Reset MtmTx */
140-
MtmTx.explicit_twophase = false;
141141
MtmTx.contains_temp_ddl = false;
142142
MtmTx.contains_persistent_ddl = false;
143143
MtmTx.contains_dml = false;
144-
MtmTx.gid[0] = '\0';
145144
MtmTx.distributed = true;
146145

147146
MtmDDLResetStatement();
@@ -171,6 +170,33 @@ MtmBeginTransaction()
171170
}
172171
}
173172

173+
/*
174+
* Genenerate global transaction identifier for two-pahse commit.
175+
* It should be unique for all nodes
176+
*/
177+
void
178+
MtmGenerateGid(char *gid, TransactionId xid, int node_id)
179+
{
180+
sprintf(gid, "MTM-%d-" XID_FMT, node_id, xid);
181+
return;
182+
}
183+
184+
int
185+
MtmGidParseNodeId(const char* gid)
186+
{
187+
int node_id = -1;
188+
sscanf(gid, "MTM-%d-%*d", &node_id);
189+
return node_id;
190+
}
191+
192+
TransactionId
193+
MtmGidParseXid(const char* gid)
194+
{
195+
TransactionId xid = InvalidTransactionId;
196+
sscanf(gid, "MTM-%*d-" XID_FMT, &xid);
197+
return xid;
198+
}
199+
174200
bool
175201
MtmTwoPhaseCommit()
176202
{
@@ -291,6 +317,10 @@ gather(uint64 participants, mtm_msg *messages, int *msg_count)
291317
messages[*msg_count].node_id = sender_to_node[sender_id];
292318
(*msg_count)++;
293319
BIT_CLEAR(participants, sender_to_node[sender_id] - 1);
320+
321+
mtm_log(MtmTxTrace,
322+
"gather: got message from node%d",
323+
sender_to_node[sender_id]);
294324
}
295325
else
296326
{
@@ -310,6 +340,94 @@ gather(uint64 participants, mtm_msg *messages, int *msg_count)
310340
}
311341
}
312342

343+
bool
344+
MtmExplicitPrepare(char *gid)
345+
{
346+
nodemask_t participants;
347+
bool ret;
348+
TransactionId xid;
349+
char stream[DMQ_NAME_MAXLEN];
350+
mtm_msg messages[MTM_MAX_NODES];
351+
int n_messages;
352+
353+
xid = GetTopTransactionId();
354+
sprintf(stream, "xid" XID_FMT, xid);
355+
dmq_stream_subscribe(stream);
356+
mtm_log(MtmTxTrace, "%s subscribed for %s", gid, stream);
357+
358+
participants = MtmGetEnabledNodeMask() &
359+
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
360+
361+
ret = PrepareTransactionBlock(gid);
362+
if (!ret)
363+
return false;
364+
365+
CommitTransactionCommand();
366+
367+
mtm_log(MtmTxFinish, "TXFINISH: %s prepared", gid);
368+
369+
gather(participants, messages, &n_messages);
370+
dmq_stream_unsubscribe(stream);
371+
372+
for (int i = 0; i < n_messages; i++)
373+
{
374+
MtmMessageCode status = pq_getmsgbyte(messages[i].message);
375+
376+
Assert(status == MSG_PREPARED || status == MSG_ABORTED);
377+
if (status == MSG_ABORTED)
378+
{
379+
380+
StartTransactionCommand();
381+
FinishPreparedTransaction(gid, false, false);
382+
mtm_log(MtmTxFinish, "TXFINISH: %s aborted", gid);
383+
mtm_log(ERROR, "Failed to prepare transaction %s at node %d",
384+
gid, messages[i].node_id);
385+
}
386+
}
387+
388+
elog(LOG, "lololo");
389+
390+
StartTransactionCommand();
391+
392+
return true;
393+
}
394+
395+
void
396+
MtmExplicitFinishPrepared(bool isTopLevel, char *gid, bool isCommit)
397+
{
398+
nodemask_t participants;
399+
mtm_msg messages[MTM_MAX_NODES];
400+
int n_messages;
401+
402+
PreventInTransactionBlock(isTopLevel,
403+
isCommit ? "COMMIT PREPARED" : "ROLLBACK PREPARED");
404+
405+
if (isCommit)
406+
{
407+
dmq_stream_subscribe(gid);
408+
409+
participants = MtmGetEnabledNodeMask() &
410+
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
411+
412+
SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
413+
mtm_log(MtmTxFinish, "TXFINISH: %s precommitted", gid);
414+
gather(participants, messages, &n_messages);
415+
416+
FinishPreparedTransaction(gid, true, false);
417+
418+
// XXX: make this conditional
419+
mtm_log(MtmTxFinish, "TXFINISH: %s committed", gid);
420+
gather(participants, messages, &n_messages);
421+
422+
dmq_stream_unsubscribe(gid);
423+
}
424+
else
425+
{
426+
FinishPreparedTransaction(gid, false, false);
427+
mtm_log(MtmTxFinish, "TXFINISH: %s abort", gid);
428+
}
429+
}
430+
313431
/*
314432
* Allow replication in bgworker.
315433
* Needed for scheduler.

src/ddl.c

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "mm.h"
4343
#include "ddl.h"
4444
#include "logger.h"
45+
#include "commit.h"
4546

4647
#include "multimaster.h"
4748

@@ -661,6 +662,7 @@ MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString,
661662
int stmt_start = pstmt->stmt_location > 0 ? pstmt->stmt_location : 0;
662663
int stmt_len = pstmt->stmt_len > 0 ? pstmt->stmt_len : strlen(queryString + stmt_start);
663664
char *stmt_string = palloc(stmt_len + 1);
665+
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
664666

665667
strncpy(stmt_string, queryString + stmt_start, stmt_len);
666668
stmt_string[stmt_len] = 0;
@@ -680,12 +682,24 @@ MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString,
680682
if (MtmTwoPhaseCommit())
681683
return;
682684
break;
685+
683686
case TRANS_STMT_PREPARE:
687+
if (!MtmExplicitPrepare(stmt->gid))
688+
{
689+
/* report unsuccessful commit in completionTag */
690+
if (completionTag)
691+
strcpy(completionTag, "ROLLBACK");
692+
}
693+
return;
694+
684695
case TRANS_STMT_COMMIT_PREPARED:
696+
MtmExplicitFinishPrepared(isTopLevel, stmt->gid, true);
697+
return;
698+
685699
case TRANS_STMT_ROLLBACK_PREPARED:
686-
MtmTx.explicit_twophase = true;
687-
strncpy(MtmTx.gid, stmt->gid, GIDSIZE);
688-
break;
700+
MtmExplicitFinishPrepared(isTopLevel, stmt->gid, false);
701+
return;
702+
689703
default:
690704
break;
691705
}

src/include/commit.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*----------------------------------------------------------------------------
2+
*
3+
* ddl.h
4+
* Statement based replication of DDL commands.
5+
*
6+
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
*----------------------------------------------------------------------------
10+
*/
11+
12+
#ifndef COMMIT_H
13+
#define COMMIT_H
14+
15+
#include "postgres.h"
16+
#include "access/xact.h"
17+
18+
extern void MtmGenerateGid(char *gid, TransactionId xid, int node_id);
19+
extern int MtmGidParseNodeId(const char *gid);
20+
extern TransactionId MtmGidParseXid(const char *gid);
21+
22+
extern bool MtmTwoPhaseCommit(void);
23+
extern void MtmBeginTransaction(void);
24+
extern void MtmXactCallback(XactEvent event, void *arg);
25+
26+
extern bool MtmExplicitPrepare(char *gid);
27+
extern void MtmExplicitFinishPrepared(bool isTopLevel, char *gid, bool isCommit);
28+
29+
#endif

src/include/multimaster.h

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,8 @@ typedef struct
9797
{
9898
bool contains_temp_ddl;
9999
bool contains_persistent_ddl;
100-
bool contains_dml; /* transaction contains DML statements */
101-
// bool accessed_temp;
102-
bool explicit_twophase; /* user level 2PC */
100+
bool contains_dml;
103101
bool distributed;
104-
pgid_t gid; /* global transaction identifier (only in case
105-
* of explicit_twophase) */
106102
} MtmCurrentTrans;
107103

108104
typedef struct MtmSeqPosition
@@ -201,13 +197,6 @@ extern int MtmMaxWorkers;
201197
extern int MtmMaxNodes;
202198
extern bool MtmBreakConnection;
203199

204-
extern bool MtmTwoPhaseCommit(void);
205-
extern void MtmBeginTransaction(void);
206-
207-
extern void MtmXactCallback2(XactEvent event, void *arg);
208-
extern void MtmGenerateGid(char *gid, TransactionId xid, int node_id);
209-
extern int MtmGidParseNodeId(const char *gid);
210-
extern TransactionId MtmGidParseXid(const char *gid);
211200
extern void MtmSleep(timestamp_t interval);
212201
extern TimestampTz MtmGetIncreasingTimestamp(void);
213202
extern bool MtmAllApplyWorkersFinished(void);

src/multimaster.c

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
#include "resolver.h"
6161
#include "logger.h"
6262
#include "syncpoint.h"
63+
#include "commit.h"
6364

6465
#include "compat.h"
6566

@@ -288,7 +289,7 @@ MtmSharedShmemStartup()
288289
}
289290
}
290291

291-
RegisterXactCallback(MtmXactCallback2, NULL);
292+
RegisterXactCallback(MtmXactCallback, NULL);
292293

293294
MtmLock = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[0].lock);
294295
MtmCommitBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[1].lock);
@@ -536,34 +537,6 @@ _PG_fini(void)
536537
shmem_startup_hook = PreviousShmemStartupHook;
537538
}
538539

539-
/*
540-
* Genenerate global transaction identifier for two-pahse commit.
541-
* It should be unique for all nodes
542-
*/
543-
void
544-
MtmGenerateGid(char *gid, TransactionId xid, int node_id)
545-
{
546-
sprintf(gid, "MTM-%d-" XID_FMT, node_id, xid);
547-
return;
548-
}
549-
550-
int
551-
MtmGidParseNodeId(const char* gid)
552-
{
553-
int node_id = -1;
554-
sscanf(gid, "MTM-%d-%*d", &node_id);
555-
return node_id;
556-
}
557-
558-
TransactionId
559-
MtmGidParseXid(const char* gid)
560-
{
561-
TransactionId xid = InvalidTransactionId;
562-
sscanf(gid, "MTM-%*d-" XID_FMT, &xid);
563-
Assert(TransactionIdIsValid(xid));
564-
return xid;
565-
}
566-
567540
/*
568541
* Publication named 'multimaster' acts as a flag that that multimaster
569542
* extension was created and configured, so we can hijack transactions.

0 commit comments

Comments
 (0)