Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8fefcf6

Browse files
knizhnikkelvich
authored andcommitted
Support 2PC in multimaster
1 parent d3a8cdc commit 8fefcf6

File tree

2 files changed

+85
-6
lines changed

2 files changed

+85
-6
lines changed

multimaster.c

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ typedef struct {
6161
bool isReplicated; /* transaction on replica */
6262
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6363
bool containsDML; /* transaction contains DML statements */
64-
csn_t snapshot; /* transaction snaphsot */
64+
bool isPrepared; /* transaction is prepared as part of 2PC */
65+
csn_t snapshot; /* transaction snaphsot */
6566
} MtmCurrentTrans;
6667

6768
/* #define USE_SPINLOCK 1 */
@@ -94,6 +95,8 @@ static void MtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
9495
static void MtmInitialize(void);
9596
static void MtmXactCallback(XactEvent event, void *arg);
9697
static void MtmBeginTransaction(MtmCurrentTrans* x);
98+
static void MtmPrecommitTransaction(MtmCurrentTrans* x);
99+
static bool MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids);
97100
static void MtmPrepareTransaction(MtmCurrentTrans* x);
98101
static void MtmEndTransaction(MtmCurrentTrans* x, bool commit);
99102
static TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum);
@@ -143,6 +146,7 @@ static int MtmWorkers;
143146
static int MtmVacuumDelay;
144147
static int MtmMinRecoveryLag;
145148
static int MtmMaxRecoveryLag;
149+
static bool MtmUse2PC;
146150

147151
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
148152
static ProcessUtility_hook_type PreviousProcessUtilityHook;
@@ -467,6 +471,9 @@ MtmXactCallback(XactEvent event, void *arg)
467471
MtmBeginTransaction(&dtmTx);
468472
break;
469473
case XACT_EVENT_PRE_COMMIT:
474+
MtmPrecommitTransaction(&dtmTx);
475+
break;
476+
case XACT_EVENT_PREPARE:
470477
MtmPrepareTransaction(&dtmTx);
471478
break;
472479
case XACT_EVENT_COMMIT:
@@ -498,6 +505,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
498505
x->isReplicated = false;
499506
x->isDistributed = MtmIsUserTransaction();
500507
x->containsDML = false;
508+
x->isPrepared = false;
501509
x->snapshot = MtmAssignCSN();
502510
x->gtid.xid = InvalidTransactionId;
503511
MtmUnlock();
@@ -561,10 +569,11 @@ MtmCheckClusterLock()
561569
}
562570

563571
/*
572+
* This functions is called as pre-commit callback.
564573
* We need to pass snapshot to WAL-sender, so create record in transaction status hash table
565574
* before commit
566575
*/
567-
static void MtmPrepareTransaction(MtmCurrentTrans* x)
576+
static void MtmPrecommitTransaction(MtmCurrentTrans* x)
568577
{
569578
MtmTransState* ts;
570579
int i;
@@ -608,6 +617,20 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
608617
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n", getpid(), x->xid, ts->csn);
609618
}
610619

620+
static void
621+
MtmPrepareTransaction(MtmCurrentTrans* x)
622+
{
623+
TransactionId *subxids;
624+
int nSubxids;
625+
MtmPrecommitTransaction(x);
626+
x->isPrepared = true;
627+
nSubxids = xactGetCommittedChildren(&subxids);
628+
if (!MtmCommitTransaction(x->xid, nSubxids, subxids))
629+
{
630+
elog(ERROR, "Commit of transaction %d is rejected by DTM", x->xid);
631+
}
632+
}
633+
611634
/**
612635
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
613636
* WAL overflow
@@ -755,7 +778,7 @@ static void
755778
MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
756779
{
757780
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n", getpid(), xid, dtmTx.xid, status, dtmTx.isDistributed);
758-
if (xid == dtmTx.xid && dtmTx.isDistributed)
781+
if (xid == dtmTx.xid && dtmTx.isDistributed && !dtmTx.isPrepared)
759782
{
760783
if (status == TRANSACTION_STATUS_ABORTED || !dtmTx.containsDML || dtm->status == MTM_RECOVERY)
761784
{
@@ -812,6 +835,18 @@ _PG_init(void)
812835
if (!process_shared_preload_libraries_in_progress)
813836
return;
814837

838+
DefineCustomBoolVariable(
839+
"multimaster.use_2pc",
840+
"Use two phase commit",
841+
"Replace normal commit with two phase commit",
842+
&MtmUse2PC,
843+
false,
844+
PGC_BACKEND,
845+
0,
846+
NULL,
847+
NULL,
848+
NULL
849+
);
815850
DefineCustomIntVariable(
816851
"multimaster.min_recovery_lag",
817852
"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed",
@@ -1313,13 +1348,54 @@ static bool MtmProcessDDLCommand(char const* queryString)
13131348
return false;
13141349
}
13151350

1351+
/*
1352+
* Genenerate global transaction identifier for two-pahse commit.
1353+
* It should be unique for all nodes
1354+
*/
1355+
static char*
1356+
MtmGenerateGid()
1357+
{
1358+
static int localCount;
1359+
return psprintf("GID-%d-%d-%d", MtmNodeId, MyProcPid, ++localCount);
1360+
}
1361+
13161362
static void MtmProcessUtility(Node *parsetree, const char *queryString,
13171363
ProcessUtilityContext context, ParamListInfo params,
13181364
DestReceiver *dest, char *completionTag)
13191365
{
13201366
bool skipCommand;
13211367
switch (nodeTag(parsetree))
13221368
{
1369+
case T_TransactionStmt:
1370+
{
1371+
TransactionStmt *stmt = (TransactionStmt *) parsetree;
1372+
switch (stmt->kind)
1373+
{
1374+
case TRANS_STMT_COMMIT:
1375+
if (MtmUse2PC) {
1376+
char* gid = MtmGenerateGid();
1377+
if (!PrepareTransactionBlock(gid))
1378+
{
1379+
/* report unsuccessful commit in completionTag */
1380+
if (completionTag) {
1381+
strcpy(completionTag, "ROLLBACK");
1382+
}
1383+
/* ??? Should we do explicit rollback */
1384+
} else {
1385+
FinishPreparedTransaction(gid, true);
1386+
}
1387+
return;
1388+
}
1389+
break;
1390+
case TRANS_STMT_PREPARE:
1391+
case TRANS_STMT_COMMIT_PREPARED:
1392+
case TRANS_STMT_ROLLBACK_PREPARED:
1393+
elog(ERROR, "Two phase commit is not supported by multimaster");
1394+
default:
1395+
break;
1396+
}
1397+
}
1398+
/* no break */
13231399
case T_PlannedStmt:
13241400
case T_ClosePortalStmt:
13251401
case T_FetchStmt:
@@ -1333,7 +1409,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
13331409
case T_LoadStmt:
13341410
case T_VariableSetStmt:
13351411
case T_VariableShowStmt:
1336-
case T_TransactionStmt:
13371412
skipCommand = true;
13381413
break;
13391414
default:

pglogical_receiver.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,8 @@ pglogical_receiver_main(Datum main_arg)
209209
bool insideTrans = false;
210210
#endif
211211
ByteBuffer buf;
212-
XLogRecPtr originStartPos;
212+
XLogRecPtr originStartPos = 0;
213+
RepOriginId originId;
213214

214215
/* Register functions for SIGTERM/SIGHUP management */
215216
pqsignal(SIGHUP, receiver_raw_sighup);
@@ -266,7 +267,10 @@ pglogical_receiver_main(Datum main_arg)
266267
resetPQExpBuffer(query);
267268
}
268269
/* Start logical replication at specified position */
269-
originStartPos = replorigin_session_get_progress(false);
270+
originId = replorigin_by_name(args->receiver_slot, true);
271+
if (originId != InvalidRepOriginId) {
272+
originStartPos = replorigin_get_progress(originId, false);
273+
}
270274
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
271275
args->receiver_slot,
272276
(uint32) (originStartPos >> 32),

0 commit comments

Comments
 (0)