Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 38d5a0c

Browse files
committed
Merge branch 'master' into more_tests
2 parents b06753a + 1ae54f5 commit 38d5a0c

File tree

6 files changed

+157
-90
lines changed

6 files changed

+157
-90
lines changed

contrib/mmts/multimaster.c

+88-52
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ typedef struct {
8383
bool isReplicated; /* transaction on replica */
8484
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
8585
bool isPrepared; /* transaction is perpared at first stage of 2PC */
86+
bool isSuspended; /* prepared transaction is suspended because coordinator node is switch to offline */
8687
bool isTransactionBlock; /* is transaction block */
8788
bool containsDML; /* transaction contains DML statements */
8889
XidStatus status; /* transaction status */
@@ -712,7 +713,7 @@ MtmXactCallback(XactEvent event, void *arg)
712713
}
713714

714715
/*
715-
* Check if this is "normal" user trnsaction which should be distributed to other nodes
716+
* Check if this is "normal" user transaction which should be distributed to other nodes
716717
*/
717718
static bool
718719
MtmIsUserTransaction()
@@ -734,6 +735,7 @@ MtmResetTransaction()
734735
x->gtid.xid = InvalidTransactionId;
735736
x->isDistributed = false;
736737
x->isPrepared = false;
738+
x->isSuspended = false;
737739
x->isTwoPhase = false;
738740
x->csn =
739741
x->status = TRANSACTION_STATUS_UNKNOWN;
@@ -763,6 +765,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
763765
x->isReplicated = MtmIsLogicalReceiver;
764766
x->isDistributed = MtmIsUserTransaction();
765767
x->isPrepared = false;
768+
x->isSuspended = false;
766769
x->isTwoPhase = false;
767770
x->isTransactionBlock = IsTransactionBlock();
768771
/* Application name can be changed usnig PGAPPNAME environment variable */
@@ -1004,14 +1007,18 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10041007
}
10051008
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
10061009
if (ts->isPrepared) {
1007-
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1008-
}
1009-
if (Mtm->status != MTM_ONLINE) {
1010-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1011-
} else {
1012-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1010+
// GetNewTransactionId(false); /* force increment of transaction counter */
1011+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1012+
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1013+
x->isSuspended = true;
1014+
} else {
1015+
if (Mtm->status != MTM_ONLINE) {
1016+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1017+
} else {
1018+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1019+
}
1020+
MtmAbortTransaction(ts);
10131021
}
1014-
MtmAbortTransaction(ts);
10151022
}
10161023
x->status = ts->status;
10171024
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
@@ -1078,14 +1085,18 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10781085
}
10791086
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
10801087
if (ts->isPrepared) {
1081-
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1082-
}
1083-
if (Mtm->status != MTM_ONLINE) {
1084-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1085-
} else {
1086-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1088+
// GetNewTransactionId(false); /* force increment of transaction counter */
1089+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1090+
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1091+
x->isSuspended = true;
1092+
} else {
1093+
if (Mtm->status != MTM_ONLINE) {
1094+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1095+
} else {
1096+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1097+
}
1098+
MtmAbortTransaction(ts);
10871099
}
1088-
MtmAbortTransaction(ts);
10891100
}
10901101
x->status = ts->status;
10911102
x->xid = ts->xid;
@@ -1122,6 +1133,16 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
11221133
}
11231134
}
11241135

1136+
static void
1137+
MtmLogAbortLogicalMessage(int nodeId, char const* gid)
1138+
{
1139+
MtmAbortLogicalMessage msg;
1140+
strcpy(msg.gid, gid);
1141+
msg.origin_node = nodeId;
1142+
msg.origin_lsn = replorigin_session_origin_lsn;
1143+
XLogFlush(LogLogicalMessage("A", (char*)&msg, sizeof msg, false));
1144+
}
1145+
11251146
static void
11261147
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11271148
{
@@ -1143,7 +1164,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11431164
}
11441165
if (ts != NULL) {
11451166
if (*ts->gid)
1146-
MTM_LOG2("TRANSLOG: %s transaction %s status %d", (commit ? "commit" : "rollback"), ts->gid, ts->status);
1167+
MTM_LOG1("TRANSLOG: %s transaction git=%s xid=%d node=%d dxid=%d status %d",
1168+
(commit ? "commit" : "rollback"), ts->gid, ts->xid, ts->gtid.node, ts->gtid.xid, ts->status);
11471169
if (commit) {
11481170
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
11491171
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
@@ -1202,7 +1224,8 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021224
}
12031225
MtmTransactionListAppend(ts);
12041226
if (*x->gid) {
1205-
LogLogicalMessage("A", x->gid, strlen(x->gid) + 1, false);
1227+
replorigin_session_origin_lsn = InvalidXLogRecPtr;
1228+
MtmLogAbortLogicalMessage(MtmNodeId, x->gid);
12061229
}
12071230
}
12081231
MtmSend2PCMessage(ts, MSG_ABORTED); /* send notification to coordinator */
@@ -1293,6 +1316,7 @@ static void MtmStartRecovery()
12931316
MtmLock(LW_EXCLUSIVE);
12941317
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
12951318
MtmSwitchClusterMode(MTM_RECOVERY);
1319+
Mtm->recoveredLSN = InvalidXLogRecPtr;
12961320
MtmUnlock();
12971321
}
12981322

@@ -1604,6 +1628,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
16041628
MTM_LOG1("%d: node %d is caugth-up without locking cluster", MyProcPid, nodeId);
16051629
/* We are lucky: caugth-up without locking cluster! */
16061630
}
1631+
Mtm->recoveredLSN = walLSN;
16071632
MtmEnableNode(nodeId);
16081633
Mtm->nConfigChanges += 1;
16091634
caughtUp = true;
@@ -2075,6 +2100,7 @@ static void MtmInitialize()
20752100
Mtm->walSenderLockerMask = 0;
20762101
Mtm->nodeLockerMask = 0;
20772102
Mtm->reconnectMask = 0;
2103+
Mtm->recoveredLSN = InvalidXLogRecPtr;
20782104
Mtm->nLockers = 0;
20792105
Mtm->nActiveTransactions = 0;
20802106
Mtm->votingTransactions = NULL;
@@ -2102,13 +2128,14 @@ static void MtmInitialize()
21022128
Mtm->nodes[i].con = MtmConnections[i];
21032129
Mtm->nodes[i].flushPos = 0;
21042130
Mtm->nodes[i].lastHeartbeat = 0;
2105-
Mtm->nodes[i].restartLsn = 0;
2131+
Mtm->nodes[i].restartLSN = InvalidXLogRecPtr;
21062132
Mtm->nodes[i].originId = InvalidRepOriginId;
21072133
Mtm->nodes[i].timeline = 0;
2134+
Mtm->nodes[i].recoveredLSN = InvalidXLogRecPtr;
21082135
}
21092136
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
21102137
/* All transaction originated from the current node should be ignored during recovery */
2111-
Mtm->nodes[MtmNodeId-1].restartLsn = (XLogRecPtr)PG_UINT64_MAX;
2138+
Mtm->nodes[MtmNodeId-1].restartLSN = (XLogRecPtr)PG_UINT64_MAX;
21122139
PGSemaphoreCreate(&Mtm->sendSemaphore);
21132140
PGSemaphoreReset(&Mtm->sendSemaphore);
21142141
SpinLockInit(&Mtm->spinlock);
@@ -2811,20 +2838,23 @@ void MtmReleaseRecoverySlot(int nodeId)
28112838
if (Mtm->recoverySlot == nodeId) {
28122839
Mtm->recoverySlot = 0;
28132840
}
2814-
}
2841+
}
28152842

2816-
void MtmRollbackPreparedTransaction(char const* gid)
2843+
void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28172844
{
2818-
MTM_LOG1("Abort prepared transaction %s", gid);
2819-
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) == TRANSACTION_STATUS_UNKNOWN) {
2845+
XidStatus status = MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED);
2846+
MTM_LOG1("Abort prepared transaction %s status %d", gid, status);
2847+
if (status == TRANSACTION_STATUS_UNKNOWN) {
28202848
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
28212849
MtmResetTransaction();
28222850
StartTransactionCommand();
2823-
MtmBeginSession(MtmReplicationNodeId);
2851+
MtmBeginSession(nodeId);
28242852
MtmSetCurrentTransactionGID(gid);
28252853
FinishPreparedTransaction(gid, false);
28262854
CommitTransactionCommand();
2827-
MtmEndSession(MtmReplicationNodeId, true);
2855+
MtmEndSession(nodeId, true);
2856+
} else if (status == TRANSACTION_STATUS_IN_PROGRESS) {
2857+
MtmLogAbortLogicalMessage(nodeId, gid);
28282858
}
28292859
}
28302860

@@ -2852,18 +2882,21 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28522882
*/
28532883
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
28542884
{
2855-
bool recovery = false;
2885+
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED;
28562886

2857-
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE)
2887+
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
28582888
{
28592889
if (*shutdown)
28602890
{
28612891
return REPLMODE_EXIT;
28622892
}
2863-
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
28642893
MtmLock(LW_EXCLUSIVE);
2894+
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2895+
mode = REPLMODE_CREATE_NEW;
2896+
}
2897+
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
28652898
if (Mtm->status == MTM_RECOVERY) {
2866-
recovery = true;
2899+
mode = REPLMODE_RECOVERED;
28672900
if ((Mtm->recoverySlot == 0 && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId))
28682901
|| Mtm->recoverySlot == nodeId)
28692902
{
@@ -2881,13 +2914,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28812914
/* delay opening of other slots until recovery is completed */
28822915
MtmSleep(STATUS_POLL_DELAY);
28832916
}
2884-
if (recovery) {
2917+
if (mode == REPLMODE_RECOVERED) {
28852918
MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
2919+
} else if (mode == REPLMODE_CREATE_NEW) {
2920+
MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
28862921
} else {
28872922
MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
28882923
}
2889-
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2890-
return recovery ? REPLMODE_RECOVERED : REPLMODE_NORMAL;
2924+
return mode;
28912925
}
28922926

28932927
static bool MtmIsBroadcast()
@@ -2966,7 +3000,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29663000
MtmIsRecoverySession = true;
29673001
} else if (strcmp(strVal(elem->arg), "recovered") == 0) {
29683002
recoveryCompleted = true;
2969-
} else if (strcmp(strVal(elem->arg), "normal") != 0) {
3003+
} else if (strcmp(strVal(elem->arg), "open_existed") != 0 && strcmp(strVal(elem->arg), "create_new") != 0) {
29703004
elog(ERROR, "Illegal recovery mode %s", strVal(elem->arg));
29713005
}
29723006
} else {
@@ -2978,14 +3012,20 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29783012
} else {
29793013
elog(ERROR, "Restart position is not specified");
29803014
}
3015+
} else if (strcmp("mtm_recovered_pos", elem->defname) == 0) {
3016+
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
3017+
sscanf(strVal(elem->arg), "%lx", &Mtm->nodes[MtmReplicationNodeId-1].recoveredLSN);
3018+
} else {
3019+
elog(ERROR, "Recovered position is not specified");
3020+
}
29813021
}
29823022
}
29833023
MtmLock(LW_EXCLUSIVE);
29843024
if (MtmIsRecoverySession) {
29853025
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx", MyProcPid, MtmNodeId, MtmReplicationNodeId, recoveryStartPos);
29863026
Assert(MyReplicationSlot != NULL);
29873027
if (recoveryStartPos < MyReplicationSlot->data.restart_lsn) {
2988-
elog(ERROR, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
3028+
elog(WARNING, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
29893029
}
29903030
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
29913031
MtmDisableNode(MtmReplicationNodeId);
@@ -3134,19 +3174,11 @@ bool MtmFilterTransaction(char* record, int size)
31343174
default:
31353175
break;
31363176
}
3137-
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLsn;
3177+
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3178+
duplicate = origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
31383179

3139-
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3140-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLsn);
3141-
if (Mtm->status == MTM_RECOVERY) {
3142-
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
3143-
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
3144-
}
3145-
} else {
3146-
if (Mtm->nodes[replication_node-1].restartLsn < end_lsn) {
3147-
Mtm->nodes[replication_node-1].restartLsn = end_lsn;
3148-
}
3149-
}
3180+
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3181+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
31503182
return duplicate;
31513183
}
31523184

@@ -3759,12 +3791,16 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
37593791
/* ??? Should we do explicit rollback */
37603792
} else {
37613793
CommitTransactionCommand();
3762-
StartTransactionCommand();
3763-
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
3764-
FinishPreparedTransaction(x->gid, false);
3765-
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
3766-
} else {
3767-
FinishPreparedTransaction(x->gid, true);
3794+
if (x->isSuspended) {
3795+
elog(WARNING, "Transaction %s is left in prepared state because coordinator onde is not online", x->gid);
3796+
} else {
3797+
StartTransactionCommand();
3798+
if (x->status == TRANSACTION_STATUS_ABORTED) {
3799+
FinishPreparedTransaction(x->gid, false);
3800+
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
3801+
} else {
3802+
FinishPreparedTransaction(x->gid, true);
3803+
}
37683804
}
37693805
}
37703806
return true;

contrib/mmts/multimaster.h

+16-6
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ typedef enum
128128

129129
typedef enum
130130
{
131-
REPLMODE_EXIT, /* receiver should exit */
132-
REPLMODE_RECOVERED, /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
133-
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
134-
REPLMODE_NORMAL /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
131+
REPLMODE_EXIT, /* receiver should exit */
132+
REPLMODE_RECOVERED, /* recovery of receiver node is completed so drop old slot and restart replication from the current position in WAL */
133+
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
134+
REPLMODE_CREATE_NEW, /* destination node is recovered: drop old slot and restart from roveredLsn position */
135+
REPLMODE_OPEN_EXISTED /* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
135136
} MtmReplicationMode;
136137

137138
typedef struct
@@ -148,6 +149,13 @@ typedef struct
148149
pgid_t gid; /* Global transaction identifier */
149150
} MtmArbiterMessage;
150151

152+
typedef struct MtmAbortLogicalMessage
153+
{
154+
pgid_t gid;
155+
int origin_node;
156+
XLogRecPtr origin_lsn;
157+
} MtmAbortLogicalMessage;
158+
151159
typedef struct MtmMessageQueue
152160
{
153161
MtmArbiterMessage msg;
@@ -190,12 +198,13 @@ typedef struct
190198
int receiverPid;
191199
XLogRecPtr flushPos;
192200
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
193-
XLogRecPtr restartLsn;
201+
XLogRecPtr restartLSN;
194202
RepOriginId originId;
195203
int timeline;
196204
void* lockGraphData;
197205
int lockGraphAllocated;
198206
int lockGraphUsed;
207+
XLogRecPtr recoveredLSN;
199208
} MtmNodeInfo;
200209

201210
typedef struct MtmTransState
@@ -266,6 +275,7 @@ typedef struct
266275
uint64 gcCount; /* Number of global transactions performed since last GC */
267276
MtmMessageQueue* sendQueue; /* Messages to be sent by arbiter sender */
268277
MtmMessageQueue* freeQueue; /* Free messages */
278+
XLogRecPtr recoveredLSN; /* LSN at the moment of recovery completion */
269279
BgwPool pool; /* Pool of background workers for applying logical replication patches */
270280
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
271281
} MtmState;
@@ -361,7 +371,7 @@ extern PGconn *PQconnectdb_safe(const char *conninfo);
361371
extern void MtmBeginSession(int nodeId);
362372
extern void MtmEndSession(int nodeId, bool unlock);
363373
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
364-
extern void MtmRollbackPreparedTransaction(char const* gid);
374+
extern void MtmRollbackPreparedTransaction(int nodeId, char const* gid);
365375
extern bool MtmFilterTransaction(char* record, int size);
366376

367377
#endif

0 commit comments

Comments
 (0)