Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 04648d8

Browse files
committed
Fix logical abort stuff.
1 parent 86afc8d commit 04648d8

File tree

3 files changed

+23
-14
lines changed

3 files changed

+23
-14
lines changed

multimaster.c

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,7 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
11461146
strcpy(msg.gid, gid);
11471147
msg.origin_node = nodeId;
11481148
msg.origin_lsn = replorigin_session_origin_lsn;
1149+
MTM_LOG2("[TRACE] MtmLogAbortLogicalMessage(%d, %s)", nodeId, gid);
11491150
XLogFlush(LogLogicalMessage("A", (char*)&msg, sizeof msg, false));
11501151
}
11511152

@@ -1234,6 +1235,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12341235
MtmTransactionListAppend(ts);
12351236
if (*x->gid) {
12361237
replorigin_session_origin_lsn = InvalidXLogRecPtr;
1238+
MTM_TXTRACE(x, "MtmEndTransaction/MtmLogAbortLogicalMessage");
12371239
MtmLogAbortLogicalMessage(MtmNodeId, x->gid);
12381240
}
12391241
}
@@ -2888,7 +2890,9 @@ void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28882890
CommitTransactionCommand();
28892891
MtmEndSession(nodeId, true);
28902892
} else if (status == TRANSACTION_STATUS_IN_PROGRESS) {
2893+
MtmBeginSession(nodeId);
28912894
MtmLogAbortLogicalMessage(nodeId, gid);
2895+
MtmEndSession(nodeId, true);
28922896
}
28932897
}
28942898

@@ -3055,6 +3059,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30553059
sscanf(strVal(elem->arg), "%lx", &recoveredLSN);
30563060
MTM_LOG1("Recovered position of node %d is %lx", MtmReplicationNodeId, recoveredLSN);
30573061
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3062+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmReplicationStartupHook)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
30583063
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
30593064
}
30603065
} else {
@@ -3220,18 +3225,20 @@ bool MtmFilterTransaction(char* record, int size)
32203225
}
32213226
restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
32223227
if (Mtm->nodes[origin_node-1].restartLSN < restart_lsn) {
3228+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, restart_lsn);
32233229
Mtm->nodes[origin_node-1].restartLSN = restart_lsn;
32243230
} else {
32253231
duplicate = true;
32263232
}
32273233

32283234
if (duplicate) {
3229-
MTM_LOG1("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3230-
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3235+
MTM_LOG1("Ignore transaction %s from node %d flags=%x, our restartLSN for node: %lx,restart_lsn = (origin node %d == MtmReplicationNodeId %d) ? end_lsn=%lx, origin_lsn=%lx",
3236+
gid, replication_node, flags, Mtm->nodes[origin_node-1].restartLSN, origin_node, MtmReplicationNodeId, end_lsn, origin_lsn);
32313237
} else {
32323238
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
32333239
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
32343240
}
3241+
32353242
return duplicate;
32363243
}
32373244

@@ -4137,16 +4144,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
41374144

41384145
case T_VacuumStmt:
41394146
skipCommand = true;
4140-
if (context == PROCESS_UTILITY_TOPLEVEL) {
4141-
MtmProcessDDLCommand(queryString, false, true);
4142-
MtmTx.isDistributed = false;
4143-
} else if (MtmApplyContext != NULL) {
4144-
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4145-
Assert(oldContext != MtmApplyContext);
4146-
MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4147-
MemoryContextSwitchTo(oldContext);
4148-
return;
4149-
}
4147+
// if (context == PROCESS_UTILITY_TOPLEVEL) {
4148+
// MtmProcessDDLCommand(queryString, false, true);
4149+
// MtmTx.isDistributed = false;
4150+
// } else if (MtmApplyContext != NULL) {
4151+
// MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
4152+
// Assert(oldContext != MtmApplyContext);
4153+
// MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
4154+
// MemoryContextSwitchTo(oldContext);
4155+
// return;
4156+
// }
41504157
break;
41514158

41524159
case T_CreateDomainStmt:
@@ -4241,7 +4248,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42414248
if (indexStmt->concurrent)
42424249
{
42434250
if (context == PROCESS_UTILITY_TOPLEVEL) {
4244-
MtmProcessDDLCommand(queryString, false, true);
4251+
// MtmProcessDDLCommand(queryString, false, true);
42454252
MtmTx.isDistributed = false;
42464253
skipCommand = true;
42474254
/*
@@ -4268,7 +4275,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
42684275
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)
42694276
{
42704277
if (context == PROCESS_UTILITY_TOPLEVEL) {
4271-
MtmProcessDDLCommand(queryString, false, true);
4278+
// MtmProcessDDLCommand(queryString, false, true);
42724279
MtmTx.isDistributed = false;
42734280
skipCommand = true;
42744281
} else if (MtmApplyContext != NULL) {

pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ process_remote_message(StringInfo s)
434434
* restartLSN without locks
435435
*/
436436
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
437+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
437438
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
438439
replorigin_session_origin_lsn = msg->origin_lsn;
439440
MtmRollbackPreparedTransaction(origin_node, msg->gid);

pglogical_receiver.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,7 @@ pglogical_receiver_main(Datum main_arg)
338338
} else {
339339
originStartPos = replorigin_get_progress(originId, false);
340340
if (Mtm->nodes[nodeId-1].restartLSN < originStartPos) {
341+
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (pglogical_receiver_mains)", nodeId, Mtm->nodes[nodeId-1].restartLSN, originStartPos);
341342
Mtm->nodes[nodeId-1].restartLSN = originStartPos;
342343
}
343344
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);

0 commit comments

Comments
 (0)