Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b69eae0

Browse files
knizhnikkelvich
authored andcommitted
Add loging of filtered transactions in logical decoder
1 parent bc15caf commit b69eae0

File tree

4 files changed

+94
-55
lines changed

4 files changed

+94
-55
lines changed

multimaster.c

Lines changed: 68 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ typedef struct {
8383
bool isReplicated; /* transaction on replica */
8484
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
8585
bool isPrepared; /* transaction is perpared at first stage of 2PC */
86+
bool isSuspended; /* prepared transaction is suspended because coordinator node is switch to offline */
87+
bool isTransactionBlock; /* is transaction block */
8688
bool containsDML; /* transaction contains DML statements */
8789
XidStatus status; /* transaction status */
8890
csn_t snapshot; /* transaction snaphsot */
@@ -711,7 +713,7 @@ MtmXactCallback(XactEvent event, void *arg)
711713
}
712714

713715
/*
714-
* Check if this is "normal" user trnsaction which should be distributed to other nodes
716+
* Check if this is "normal" user transaction which should be distributed to other nodes
715717
*/
716718
static bool
717719
MtmIsUserTransaction()
@@ -733,6 +735,7 @@ MtmResetTransaction()
733735
x->gtid.xid = InvalidTransactionId;
734736
x->isDistributed = false;
735737
x->isPrepared = false;
738+
x->isSuspended = false;
736739
x->isTwoPhase = false;
737740
x->csn =
738741
x->status = TRANSACTION_STATUS_UNKNOWN;
@@ -762,6 +765,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
762765
x->isReplicated = MtmIsLogicalReceiver;
763766
x->isDistributed = MtmIsUserTransaction();
764767
x->isPrepared = false;
768+
x->isSuspended = false;
765769
x->isTwoPhase = false;
766770
x->isTransactionBlock = IsTransactionBlock();
767771
/* Application name can be changed usnig PGAPPNAME environment variable */
@@ -1003,14 +1007,18 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10031007
}
10041008
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
10051009
if (ts->isPrepared) {
1006-
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1007-
}
1008-
if (Mtm->status != MTM_ONLINE) {
1009-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1010-
} else {
1011-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1010+
// GetNewTransactionId(false); /* force increment of transaction counter */
1011+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1012+
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1013+
x->isSuspended = true;
1014+
} else {
1015+
if (Mtm->status != MTM_ONLINE) {
1016+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1017+
} else {
1018+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1019+
}
1020+
MtmAbortTransaction(ts);
10121021
}
1013-
MtmAbortTransaction(ts);
10141022
}
10151023
x->status = ts->status;
10161024
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
@@ -1077,14 +1085,18 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10771085
}
10781086
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
10791087
if (ts->isPrepared) {
1080-
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1081-
}
1082-
if (Mtm->status != MTM_ONLINE) {
1083-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1084-
} else {
1085-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1088+
// GetNewTransactionId(false); /* force increment of transaction counter */
1089+
// elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1090+
elog(WARNING, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1091+
x->isSuspended = true;
1092+
} else {
1093+
if (Mtm->status != MTM_ONLINE) {
1094+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1095+
} else {
1096+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1097+
}
1098+
MtmAbortTransaction(ts);
10861099
}
1087-
MtmAbortTransaction(ts);
10881100
}
10891101
x->status = ts->status;
10901102
x->xid = ts->xid;
@@ -1292,6 +1304,7 @@ static void MtmStartRecovery()
12921304
MtmLock(LW_EXCLUSIVE);
12931305
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
12941306
MtmSwitchClusterMode(MTM_RECOVERY);
1307+
Mtm->recoveredLSN = InvalidXLogRecPtr;
12951308
MtmUnlock();
12961309
}
12971310

@@ -1603,6 +1616,7 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
16031616
MTM_LOG1("%d: node %d is caugth-up without locking cluster", MyProcPid, nodeId);
16041617
/* We are lucky: caugth-up without locking cluster! */
16051618
}
1619+
Mtm->recoveredLSN = walLSN;
16061620
MtmEnableNode(nodeId);
16071621
Mtm->nConfigChanges += 1;
16081622
caughtUp = true;
@@ -2074,6 +2088,7 @@ static void MtmInitialize()
20742088
Mtm->walSenderLockerMask = 0;
20752089
Mtm->nodeLockerMask = 0;
20762090
Mtm->reconnectMask = 0;
2091+
Mtm->recoveredLSN = InvalidXLogRecPtr;
20772092
Mtm->nLockers = 0;
20782093
Mtm->nActiveTransactions = 0;
20792094
Mtm->votingTransactions = NULL;
@@ -2101,13 +2116,14 @@ static void MtmInitialize()
21012116
Mtm->nodes[i].con = MtmConnections[i];
21022117
Mtm->nodes[i].flushPos = 0;
21032118
Mtm->nodes[i].lastHeartbeat = 0;
2104-
Mtm->nodes[i].restartLsn = 0;
2119+
Mtm->nodes[i].restartLSN = InvalidXLogRecPtr;
21052120
Mtm->nodes[i].originId = InvalidRepOriginId;
21062121
Mtm->nodes[i].timeline = 0;
2122+
Mtm->nodes[i].recoveredLSN = InvalidXLogRecPtr;
21072123
}
21082124
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
21092125
/* All transaction originated from the current node should be ignored during recovery */
2110-
Mtm->nodes[MtmNodeId-1].restartLsn = (XLogRecPtr)PG_UINT64_MAX;
2126+
Mtm->nodes[MtmNodeId-1].restartLSN = (XLogRecPtr)PG_UINT64_MAX;
21112127
PGSemaphoreCreate(&Mtm->sendSemaphore);
21122128
PGSemaphoreReset(&Mtm->sendSemaphore);
21132129
SpinLockInit(&Mtm->spinlock);
@@ -2851,18 +2867,21 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28512867
*/
28522868
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
28532869
{
2854-
bool recovery = false;
2870+
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED;
28552871

2856-
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE)
2872+
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
28572873
{
28582874
if (*shutdown)
28592875
{
28602876
return REPLMODE_EXIT;
28612877
}
2862-
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
28632878
MtmLock(LW_EXCLUSIVE);
2879+
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2880+
mode = REPLMODE_CREATE_NEW;
2881+
}
2882+
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
28642883
if (Mtm->status == MTM_RECOVERY) {
2865-
recovery = true;
2884+
mode = REPLMODE_RECOVERED;
28662885
if ((Mtm->recoverySlot == 0 && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId))
28672886
|| Mtm->recoverySlot == nodeId)
28682887
{
@@ -2880,13 +2899,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28802899
/* delay opening of other slots until recovery is completed */
28812900
MtmSleep(STATUS_POLL_DELAY);
28822901
}
2883-
if (recovery) {
2902+
if (mode == REPLMODE_RECOVERED) {
28842903
MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
2904+
} else if (mode == REPLMODE_CREATE_NEW) {
2905+
MTM_LOG1("%d: Start replication from recovered node %d", MyProcPid, nodeId);
28852906
} else {
28862907
MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
28872908
}
2888-
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2889-
return recovery ? REPLMODE_RECOVERED : REPLMODE_NORMAL;
2909+
return mode;
28902910
}
28912911

28922912
static bool MtmIsBroadcast()
@@ -2965,7 +2985,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29652985
MtmIsRecoverySession = true;
29662986
} else if (strcmp(strVal(elem->arg), "recovered") == 0) {
29672987
recoveryCompleted = true;
2968-
} else if (strcmp(strVal(elem->arg), "normal") != 0) {
2988+
} else if (strcmp(strVal(elem->arg), "open_existed") != 0 && strcmp(strVal(elem->arg), "create_new") != 0) {
29692989
elog(ERROR, "Illegal recovery mode %s", strVal(elem->arg));
29702990
}
29712991
} else {
@@ -2977,14 +2997,20 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
29772997
} else {
29782998
elog(ERROR, "Restart position is not specified");
29792999
}
3000+
} else if (strcmp("mtm_recovered_pos", elem->defname) == 0) {
3001+
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
3002+
sscanf(strVal(elem->arg), "%lx", &Mtm->nodes[MtmReplicationNodeId-1].recoveredLSN);
3003+
} else {
3004+
elog(ERROR, "Recovered position is not specified");
3005+
}
29803006
}
29813007
}
29823008
MtmLock(LW_EXCLUSIVE);
29833009
if (MtmIsRecoverySession) {
29843010
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx", MyProcPid, MtmNodeId, MtmReplicationNodeId, recoveryStartPos);
29853011
Assert(MyReplicationSlot != NULL);
29863012
if (recoveryStartPos < MyReplicationSlot->data.restart_lsn) {
2987-
elog(ERROR, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
3013+
elog(WARNING, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
29883014
}
29893015
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
29903016
MtmDisableNode(MtmReplicationNodeId);
@@ -3133,17 +3159,17 @@ bool MtmFilterTransaction(char* record, int size)
31333159
default:
31343160
break;
31353161
}
3136-
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLsn;
3162+
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
31373163

31383164
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3139-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLsn);
3165+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
31403166
if (Mtm->status == MTM_RECOVERY) {
3141-
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
3142-
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
3167+
if (Mtm->nodes[origin_node-1].restartLSN < origin_lsn) {
3168+
Mtm->nodes[origin_node-1].restartLSN = origin_lsn;
31433169
}
31443170
} else {
3145-
if (Mtm->nodes[replication_node-1].restartLsn < end_lsn) {
3146-
Mtm->nodes[replication_node-1].restartLsn = end_lsn;
3171+
if (Mtm->nodes[replication_node-1].restartLSN < end_lsn) {
3172+
Mtm->nodes[replication_node-1].restartLSN = end_lsn;
31473173
}
31483174
}
31493175
return duplicate;
@@ -3758,12 +3784,16 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
37583784
/* ??? Should we do explicit rollback */
37593785
} else {
37603786
CommitTransactionCommand();
3761-
StartTransactionCommand();
3762-
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
3763-
FinishPreparedTransaction(x->gid, false);
3764-
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
3765-
} else {
3766-
FinishPreparedTransaction(x->gid, true);
3787+
if (x->isSuspended) {
3788+
elog(WARNING, "Transaction %s is left in prepared state because coordinator onde is not online", x->gid);
3789+
} else {
3790+
StartTransactionCommand();
3791+
if (x->status == TRANSACTION_STATUS_ABORTED) {
3792+
FinishPreparedTransaction(x->gid, false);
3793+
elog(ERROR, "Transaction %s is aborted by DTM", x->gid);
3794+
} else {
3795+
FinishPreparedTransaction(x->gid, true);
3796+
}
37673797
}
37683798
}
37693799
return true;

multimaster.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,11 @@ typedef enum
128128

129129
typedef enum
130130
{
131-
REPLMODE_EXIT, /* receiver should exit */
132-
REPLMODE_RECOVERED, /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
133-
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
134-
REPLMODE_NORMAL /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
131+
REPLMODE_EXIT, /* receiver should exit */
132+
REPLMODE_RECOVERED, /* recovery of receiver node is completed so drop old slot and restart replication from the current position in WAL */
133+
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
134+
REPLMODE_CREATE_NEW, /* destination node is recovered: drop old slot and restart from roveredLsn position */
135+
REPLMODE_OPEN_EXISTED /* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
135136
} MtmReplicationMode;
136137

137138
typedef struct
@@ -190,12 +191,13 @@ typedef struct
190191
int receiverPid;
191192
XLogRecPtr flushPos;
192193
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
193-
XLogRecPtr restartLsn;
194+
XLogRecPtr restartLSN;
194195
RepOriginId originId;
195196
int timeline;
196197
void* lockGraphData;
197198
int lockGraphAllocated;
198199
int lockGraphUsed;
200+
XLogRecPtr recoveredLSN;
199201
} MtmNodeInfo;
200202

201203
typedef struct MtmTransState
@@ -266,6 +268,7 @@ typedef struct
266268
uint64 gcCount; /* Number of global transactions performed since last GC */
267269
MtmMessageQueue* sendQueue; /* Messages to be sent by arbiter sender */
268270
MtmMessageQueue* freeQueue; /* Free messages */
271+
XLogRecPtr recoveredLSN; /* LSN at the moment of recovery completion */
269272
BgwPool pool; /* Pool of background workers for applying logical replication patches */
270273
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
271274
} MtmState;

pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187187
Assert(false);
188188

189189
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
190-
Assert(txn->xid < 1000 || MtmTransactionRecords >= 2);
190+
//Assert(txn->xid < 1000 || MtmTransactionRecords != 1);
191191
// if (MtmIsFilteredTxn) {
192192
// Assert(MtmTransactionRecords == 0);
193193
// return;

pglogical_receiver.c

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,10 @@ feTimestampDifference(int64 start_time, int64 stop_time,
196196
static char const* const MtmReplicationModeName[] =
197197
{
198198
"exit",
199-
"recovered", /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
200-
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
201-
"normal" /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
199+
"recovered", /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
200+
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
201+
"create_new", /* destination node is recovered: drop old slot and restart from roveredLsn position */
202+
"open_existed" /* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
202203
};
203204

204205
static void
@@ -275,7 +276,7 @@ pglogical_receiver_main(Datum main_arg)
275276
}
276277
timeline = Mtm->nodes[nodeId-1].timeline;
277278
count = Mtm->recoveryCount;
278-
279+
279280
/* Establish connection to remote server */
280281
conn = PQconnectdb_safe(connString);
281282
status = PQstatus(conn);
@@ -287,7 +288,9 @@ pglogical_receiver_main(Datum main_arg)
287288
}
288289

289290
query = createPQExpBuffer();
290-
if (mode == REPLMODE_NORMAL && timeline != Mtm->nodes[nodeId-1].timeline) { /* recreate slot */
291+
if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm->nodes[nodeId-1].timeline)
292+
|| mode == REPLMODE_CREATE_NEW)
293+
{ /* recreate slot */
291294
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
292295
res = PQexec(conn, query->data);
293296
PQclear(res);
@@ -320,7 +323,7 @@ pglogical_receiver_main(Datum main_arg)
320323

321324
/* Start logical replication at specified position */
322325
if (mode == REPLMODE_RECOVERED) {
323-
originStartPos = Mtm->nodes[nodeId-1].restartLsn;
326+
originStartPos = Mtm->nodes[nodeId-1].restartLSN;
324327
MTM_LOG1("Restart replication from node %d from position %lx", nodeId, originStartPos);
325328
}
326329
if (originStartPos == InvalidXLogRecPtr && !newTimeline) {
@@ -339,23 +342,26 @@ pglogical_receiver_main(Datum main_arg)
339342
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
340343
} else {
341344
originStartPos = replorigin_get_progress(originId, false);
342-
if (Mtm->nodes[nodeId-1].restartLsn < originStartPos) {
343-
Mtm->nodes[nodeId-1].restartLsn = originStartPos;
345+
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
346+
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
344347
}
345348
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
346349
}
347350
Mtm->nodes[nodeId-1].originId = originId;
348351
CommitTransactionCommand();
352+
} else if (mode == REPLMODE_CREATE_NEW) {
353+
originStartPos = Mtm->nodes[nodeId-1].recoveredLSN;
349354
}
350355

351-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx')",
356+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx', \"mtm_recovered_pos\" '%lx')",
352357
slotName,
353358
(uint32) (originStartPos >> 32),
354359
(uint32) originStartPos,
355360
MULTIMASTER_MAX_PROTO_VERSION,
356361
MULTIMASTER_MIN_PROTO_VERSION,
357362
MtmReplicationModeName[mode],
358-
originStartPos
363+
originStartPos,
364+
Mtm->recoveredLSN
359365
);
360366
res = PQexec(conn, query->data);
361367
if (PQresultStatus(res) != PGRES_COPY_BOTH)
@@ -511,7 +517,7 @@ pglogical_receiver_main(Datum main_arg)
511517
output_written_lsn = Max(walEnd, output_written_lsn);
512518
continue;
513519
}
514-
mode = REPLMODE_NORMAL;
520+
mode = REPLMODE_OPEN_EXISTED;
515521
}
516522
MTM_LOG3("%ld: Receive message %c from node %d", MtmGetSystemTime(), stmt[0], nodeId);
517523
if (buf.used >= MtmTransSpillThreshold*MB) {

0 commit comments

Comments
 (0)