Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 076234f

Browse files
committed
3pc support fixes
1 parent 4dda4ec commit 076234f

File tree

11 files changed

+279
-182
lines changed

11 files changed

+279
-182
lines changed

contrib/mmts/arbiter.c

+20-14
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ static void MtmMonitor(Datum arg);
9191
static void MtmSendHeartbeat(void);
9292
static bool MtmSendToNode(int node, void const* buf, int size);
9393

94-
static char const* const messageKindText[] =
94+
char const* const MtmMessageKindMnem[] =
9595
{
9696
"INVALID",
9797
"HANDSHAKE",
@@ -318,7 +318,7 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
318318
&& Mtm->status != MTM_RECOVERY
319319
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
320320
{
321-
elog(WARNING, "Node %d thinks that I am dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], messageKindText[resp->code]);
321+
elog(WARNING, "Node %d thinks that I am dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], MtmMessageKindMnem[resp->code]);
322322
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
323323
MtmSwitchClusterMode(MTM_RECOVERY);
324324
} else if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) && sockets[resp->node-1] < 0) {
@@ -372,7 +372,7 @@ static void MtmSendHeartbeat()
372372
MTM_LOG4("Send heartbeat to node %d with timestamp %ld", i+1, now);
373373
}
374374
} else {
375-
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %d", i+1, (long long) busy_mask, Mtm->status);
375+
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %s", i+1, (long long) busy_mask, MtmNodeStatusMnem[Mtm->status]);
376376
}
377377
}
378378
}
@@ -882,6 +882,7 @@ static void MtmReceiver(Datum arg)
882882
rxBuffer[i].used += rc;
883883
nResponses = rxBuffer[i].used/sizeof(MtmArbiterMessage);
884884

885+
885886
MtmLock(LW_EXCLUSIVE);
886887

887888
for (j = 0; j < nResponses; j++) {
@@ -897,6 +898,7 @@ static void MtmReceiver(Datum arg)
897898
Mtm->nodes[node-1].lastHeartbeat = MtmGetSystemTime();
898899

899900
MtmCheckResponse(msg);
901+
MTM_LOG2("Receive response %s for transaction %s from node %d", MtmMessageKindMnem[msg->code], msg->gid, msg->node);
900902

901903
switch (msg->code) {
902904
case MSG_HEARTBEAT:
@@ -912,7 +914,7 @@ static void MtmReceiver(Datum arg)
912914
} else {
913915
msg->status = tm->state->status;
914916
msg->csn = tm->state->csn;
915-
MTM_LOG1("Send response %d for transaction %s to node %d", msg->status, msg->gid, msg->node);
917+
MTM_LOG1("Send response %s for transaction %s to node %d", MtmTxnStatusMnem[msg->status], msg->gid, msg->node);
916918
}
917919
msg->disabledNodeMask = Mtm->disabledNodeMask;
918920
msg->connectivityMask = Mtm->connectivityMask;
@@ -931,7 +933,7 @@ static void MtmReceiver(Datum arg)
931933
if (ts->status == TRANSACTION_STATUS_UNKNOWN) {
932934
if (msg->status == TRANSACTION_STATUS_IN_PROGRESS || msg->status == TRANSACTION_STATUS_ABORTED) {
933935
elog(LOG, "Abort prepared transaction %s because it is in state %s at node %d",
934-
msg->gid, MtmNodeStatusMnem[msg->status], node);
936+
msg->gid, MtmTxnStatusMnem[msg->status], node);
935937
MtmFinishPreparedTransaction(ts, false);
936938
}
937939
else if (msg->status == TRANSACTION_STATUS_COMMITTED || msg->status == TRANSACTION_STATUS_UNKNOWN)
@@ -944,11 +946,12 @@ static void MtmReceiver(Datum arg)
944946
elog(LOG, "Commit transaction %s because it is prepared at all live nodes", msg->gid);
945947
MtmFinishPreparedTransaction(ts, true);
946948
} else {
947-
MTM_LOG1("Receive response for transaction %s -> %d, participants=%llx, voted=%llx", msg->gid, msg->status, (long long)ts->participantsMask, (long long)ts->votedMask);
949+
MTM_LOG1("Receive response for transaction %s -> %s, participants=%llx, voted=%llx",
950+
msg->gid, MtmTxnStatusMnem[msg->status], (long long)ts->participantsMask, (long long)ts->votedMask);
948951
}
949952
} else {
950953
elog(LOG, "Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
951-
MtmNodeStatusMnem[msg->status], msg->gid, node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask));
954+
MtmTxnStatusMnem[msg->status], msg->gid, node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask));
952955
continue;
953956
}
954957
} else if (ts->status == TRANSACTION_STATUS_ABORTED && msg->status == TRANSACTION_STATUS_COMMITTED) {
@@ -957,7 +960,7 @@ static void MtmReceiver(Datum arg)
957960
elog(WARNING, "Transaction %s is committed at node %d but aborted at node %d", msg->gid, MtmNodeId, node);
958961
} else {
959962
elog(LOG, "Receive response %s for transaction %s status %s for node %d, votedMask %llx, participantsMask %llx",
960-
MtmNodeStatusMnem[msg->status], msg->gid, MtmNodeStatusMnem[ts->status], node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask) );
963+
MtmTxnStatusMnem[msg->status], msg->gid, MtmTxnStatusMnem[ts->status], node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask) );
961964
}
962965
}
963966
continue;
@@ -976,7 +979,7 @@ static void MtmReceiver(Datum arg)
976979
Assert(msg->code == MSG_ABORTED || strcmp(msg->gid, ts->gid) == 0);
977980
if (BIT_CHECK(ts->votedMask, node-1)) {
978981
elog(WARNING, "Receive deteriorated %s response for transaction %d (%s) from node %d",
979-
messageKindText[msg->code], ts->xid, ts->gid, node);
982+
MtmMessageKindMnem[msg->code], ts->xid, ts->gid, node);
980983
continue;
981984
}
982985
BIT_SET(ts->votedMask, node-1);
@@ -1006,16 +1009,20 @@ static void MtmReceiver(Datum arg)
10061009
MtmWakeUpBackend(ts);
10071010
} else {
10081011
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1009-
MTM_LOG2("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
1010-
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
1012+
MTM_LOG2("Transaction %s is prepared (status=%s participants=%lx disabled=%lx, voted=%lx)",
1013+
ts->gid, MtmTxnStatusMnem[ts->status], ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
10111014
ts->isPrepared = true;
10121015
if (ts->isTwoPhase) {
10131016
MtmWakeUpBackend(ts);
10141017
} else if (MtmUseDtm) {
10151018
ts->votedMask = 0;
10161019
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PRECOMMIT");
10171020
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1018-
SetPrepareTransactionState(ts->gid, "precommitted");
1021+
Assert(replorigin_session_origin == InvalidRepOriginId);
1022+
MTM_LOG2("SetPreparedTransactionState for %s", ts->gid);
1023+
MtmUnlock();
1024+
SetPreparedTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
1025+
MtmLock(LW_EXCLUSIVE);
10191026
} else {
10201027
ts->status = TRANSACTION_STATUS_UNKNOWN;
10211028
MtmWakeUpBackend(ts);
@@ -1072,8 +1079,7 @@ static void MtmReceiver(Datum arg)
10721079
} else if (ts->status == TRANSACTION_STATUS_ABORTED) {
10731080
MtmSend2PCMessage(ts, MSG_ABORTED);
10741081
} else {
1075-
elog(WARNING, "Transaction %s is already %s",
1076-
ts->gid, ts->status == TRANSACTION_STATUS_COMMITTED ? "committed" : "prepared");
1082+
elog(WARNING, "Transaction %s is already %s", ts->gid, MtmTxnStatusMnem[ts->status]);
10771083
}
10781084
break;
10791085
default:

contrib/mmts/multimaster.c

+46-28
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,14 @@ char const* const MtmNodeStatusMnem[] =
204204
"OutOfService"
205205
};
206206

207+
char const* const MtmTxnStatusMnem[] =
208+
{
209+
"InProgress",
210+
"Committed",
211+
"Aborted",
212+
"Unknown"
213+
};
214+
207215
bool MtmDoReplication;
208216
char* MtmDatabaseName;
209217
char* MtmDatabaseUser;
@@ -595,17 +603,17 @@ MtmAdjustOldestXid(TransactionId xid)
595603
if (MtmUseDtm && !MtmVolksWagenMode)
596604
{
597605
if (prev != NULL) {
598-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%d, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
599-
MyProcPid, xid, prev->xid, prev->status, prev->snapshot, (ts ? ts->xid : 0), (ts ? ts->status : -1), (ts ? ts->snapshot : -1), oldestSnapshot);
606+
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%s, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
607+
MyProcPid, xid, prev->xid, MtmTxnStatusMnem[prev->status], prev->snapshot, (ts ? ts->xid : 0), (ts ? ts->status : -1), (ts ? ts->snapshot : -1), oldestSnapshot);
600608
Mtm->transListHead = prev;
601609
Mtm->oldestXid = xid = prev->xid;
602610
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
603611
xid = Mtm->oldestXid;
604612
}
605613
} else {
606614
if (prev != NULL) {
607-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%d, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
608-
MyProcPid, xid, prev->xid, prev->status, prev->snapshot, (ts ? ts->xid : 0), (ts ? ts->status : -1), (ts ? ts->snapshot : -1), oldestSnapshot);
615+
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%s, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
616+
MyProcPid, xid, prev->xid, MtmTxnStatusMnem[prev->status], prev->snapshot, (ts ? ts->xid : 0), (ts ? ts->status : -1), (ts ? ts->snapshot : -1), oldestSnapshot);
609617
Mtm->transListHead = prev;
610618
}
611619
}
@@ -933,7 +941,8 @@ void MtmPrecommitTransaction(char const* gid)
933941
{
934942
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_FIND, NULL);
935943
if (tm == NULL) {
936-
elog(WARNING, "MtmPrecommitTransaction: transaciton '%s' is not found", gid);
944+
MtmUnlock();
945+
elog(WARNING, "MtmPrecommitTransaction: transaction '%s' is not found", gid);
937946
} else {
938947
MtmTransState* ts = tm->state;
939948
Assert(ts != NULL);
@@ -942,10 +951,11 @@ void MtmPrecommitTransaction(char const* gid)
942951
ts->csn = MtmAssignCSN();
943952
MtmAdjustSubtransactions(ts);
944953
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
945-
SetPrepareTransactionState(ts->gid, "precommitted");
954+
Assert(replorigin_session_origin != InvalidRepOriginId);
955+
MtmUnlock();
956+
SetPreparedTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
946957
}
947958
}
948-
MtmUnlock();
949959
}
950960

951961

@@ -967,15 +977,18 @@ MtmVotingCompleted(MtmTransState* ts)
967977
ts->status = TRANSACTION_STATUS_UNKNOWN;
968978
return true;
969979
} else {
970-
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
971-
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
980+
MTM_LOG1("Transaction %s is considered as prepared (status=%s participants=%lx disabled=%lx, voted=%lx)",
981+
ts->gid, MtmTxnStatusMnem[ts->status], ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
972982
ts->isPrepared = true;
973983
if (ts->isTwoPhase) {
974984
ts->votingCompleted = true;
975985
return true;
976986
} else if (MtmUseDtm) {
977987
ts->votedMask = 0;
978-
SetPrepareTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
988+
Assert(replorigin_session_origin == InvalidRepOriginId);
989+
MtmUnlock();
990+
SetPreparedTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
991+
MtmLock(LW_EXCLUSIVE);
979992
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
980993
return false;
981994
} else {
@@ -1023,7 +1036,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10231036
if (ts->isPrepared) {
10241037
/* resend precommit message */
10251038
// MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1026-
elog(LOG, "Distributes transaction is not committed in %ld msec", USEC_TO_MSEC(now - start));
1039+
elog(LOG, "Distributed transaction is not committed in %ld msec", USEC_TO_MSEC(now - start));
10271040
} else {
10281041
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(timeout));
10291042
MtmAbortTransaction(ts);
@@ -1047,7 +1060,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10471060
}
10481061
}
10491062
x->status = ts->status;
1050-
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
1063+
MTM_LOG3("%d: Result of vote: %d", MyProcPid, MtmTxnStatusMnem[ts->status]);
10511064
}
10521065

10531066
static void
@@ -1071,6 +1084,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
10711084
//if (x->gid[0]) MTM_LOG1("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10721085
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
10731086
MTM_TXTRACE(x, "recovery?");
1087+
MTM_LOG3("Preparing transaction %d (%s) at %ld", x->xid, x->gid, MtmGetCurrentTime());
10741088
Assert(x->gid[0]);
10751089
ts->votingCompleted = true;
10761090
MTM_TXTRACE(x, "recovery? 1");
@@ -1130,9 +1144,12 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
11301144
ts->votingCompleted = false;
11311145
ts->votedMask = 0;
11321146
ts->procno = MyProc->pgprocno;
1133-
MTM_TXTRACE(ts, "Coordinator sends MSG_PRECOMMIT");
1134-
SetPrepareTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
1147+
MTM_LOG2("Coordinator of transaction %s sends MSG_PRECOMMIT", ts->gid);
1148+
Assert(replorigin_session_origin == InvalidRepOriginId);
1149+
MtmUnlock();
1150+
SetPreparedTransactionState(ts->gid, MULTIMASTER_PRECOMMITTED);
11351151
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
1152+
MtmLock(LW_EXCLUSIVE);
11361153

11371154
Mtm2PCVoting(x, ts);
11381155

@@ -1202,17 +1219,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021219
}
12031220
if (ts != NULL) {
12041221
if (*ts->gid)
1205-
MTM_LOG2("TRANSLOG: %s transaction gid=%s xid=%d node=%d dxid=%d status %d",
1206-
(commit ? "commit" : "rollback"), ts->gid, ts->xid, ts->gtid.node, ts->gtid.xid, ts->status);
1222+
MTM_LOG2("TRANSLOG: %s transaction gid=%s xid=%d node=%d dxid=%d status %s",
1223+
(commit ? "commit" : "rollback"), ts->gid, ts->xid, ts->gtid.node, ts->gtid.xid, MtmTxnStatusMnem[ts->status]);
12071224
if (commit) {
12081225
if (!(ts->status == TRANSACTION_STATUS_UNKNOWN
12091226
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)))
1210-
{
1211-
Assert(false);
1227+
{ Assert(false);
12121228
elog(ERROR, "Attempt to commit %s transaction %d (%s)",
1213-
ts->status == TRANSACTION_STATUS_ABORTED ? "aborted"
1214-
: ts->status == TRANSACTION_STATUS_COMMITTED ? "committed" : "not prepared",
1215-
ts->xid, ts->gid);
1229+
MtmTxnStatusMnem[ts->status], ts->xid, ts->gid);
12161230
}
12171231
if (x->csn > ts->csn || Mtm->status == MTM_RECOVERY) {
12181232
Assert(x->csn != INVALID_CSN);
@@ -1324,6 +1338,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
13241338
}
13251339
}
13261340
} else if (!BIT_CHECK(Mtm->disabledNodeMask, ts->gtid.node-1)) {
1341+
MTM_LOG2("Send %s message to node %d xid=%d gid=%s", MtmMessageKindMnem[cmd], ts->gtid.node, ts->gtid.xid, ts->gid);
13271342
msg.node = ts->gtid.node;
13281343
msg.dxid = ts->gtid.xid;
13291344
MtmSendMessage(&msg);
@@ -1522,7 +1537,7 @@ void MtmAbortTransaction(MtmTransState* ts)
15221537
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
15231538
elog(LOG, "Attempt to rollback already committed transaction %d (%s)", ts->xid, ts->gid);
15241539
} else {
1525-
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %d", ts->gtid.node, ts->gtid.xid, ts->xid, ts->status);
1540+
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %s", ts->gtid.node, ts->gtid.xid, ts->xid, MtmTxnStatusMnem[ts->status]);
15261541
ts->status = TRANSACTION_STATUS_ABORTED;
15271542
MtmAdjustSubtransactions(ts);
15281543
if (ts->isActive) {
@@ -1590,8 +1605,8 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
15901605
MtmBroadcastPollMessage(ts);
15911606
}
15921607
} else {
1593-
MTM_LOG1("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
1594-
ts->xid, ts->gid, ts->status, ts->gtid.node, ts->gtid.xid, ts->votedMask);
1608+
MTM_LOG1("Skip transaction %d (%s) with status %s gtid.node=%d gtid.xid=%d votedMask=%lx",
1609+
ts->xid, ts->gid, MtmTxnStatusMnem[ts->status], ts->gtid.node, ts->gtid.xid, ts->votedMask);
15951610
}
15961611
}
15971612
}
@@ -1926,8 +1941,8 @@ bool MtmRefreshClusterStatus(bool nowait)
19261941
MtmTransState *ts;
19271942
/* Interrupt voting for active transaction and abort them */
19281943
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
1929-
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1930-
ts->gid, ts->gtid.node, ts->xid, ts->status, ts->gtid.xid);
1944+
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%s, gtid.xid=%d",
1945+
ts->gid, ts->gtid.node, ts->xid, MtmTxnStatusMnen[ts->status], ts->gtid.xid);
19311946
if (MtmIsCoordinator(ts) && !ts->votingCompleted && ts->status != TRANSACTION_STATUS_ABORTED) {
19321947
MtmAbortTransaction(ts);
19331948
MtmWakeUpBackend(ts);
@@ -2898,7 +2913,7 @@ void MtmReleaseRecoverySlot(int nodeId)
28982913
void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
28992914
{
29002915
XidStatus status = MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED);
2901-
MTM_LOG1("Abort prepared transaction %s status %d", gid, status);
2916+
MTM_LOG1("Abort prepared transaction %s status %s", gid, MtmTxnStatusMnem[status]);
29022917
if (status == TRANSACTION_STATUS_UNKNOWN) {
29032918
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
29042919
MtmResetTransaction();
@@ -3176,6 +3191,9 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
31763191
bool res = Mtm->status != MTM_RECOVERY
31773192
&& (args->origin_id == InvalidRepOriginId
31783193
|| MtmIsRecoveredNode(MtmReplicationNodeId));
3194+
if (!res) {
3195+
MTM_LOG2("Filter transaction with origin_id=%d", args->origin_id);
3196+
}
31793197
return res;
31803198
}
31813199

@@ -3211,7 +3229,7 @@ bool MtmFilterTransaction(char* record, int size)
32113229
int replication_node;
32123230
int origin_node;
32133231
char const* gid = "";
3214-
char msgtype;
3232+
char msgtype PG_USED_FOR_ASSERTS_ONLY;
32153233
bool duplicate = false;
32163234

32173235
s.data = record;

contrib/mmts/multimaster.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
#define MTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4141
#endif
4242

43-
#if MTM_TRACE
43+
#if MTM_TRACE == 0
4444
#define MTM_TXTRACE(tx, event)
4545
#else
4646
#define MTM_TXTRACE(tx, event) \
@@ -309,6 +309,8 @@ typedef struct MtmFlushPosition
309309
#define MtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
310310

311311
extern char const* const MtmNodeStatusMnem[];
312+
extern char const* const MtmTxnStatusMnem[];
313+
extern char const* const MtmMessageKindMnem[];
312314

313315
extern MtmState* Mtm;
314316

contrib/mmts/pglogical_apply.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1028,7 +1028,7 @@ void MtmExecutor(void* work, size_t size)
10281028
{
10291029
while (true) {
10301030
char action = pq_getmsgbyte(&s);
1031-
MTM_LOG3("%d: REMOTE process action %c", MyProcPid, action);
1031+
MTM_LOG2("%d: REMOTE process action %c", MyProcPid, action);
10321032
#if 0
10331033
if (Mtm->status == MTM_RECOVERY) {
10341034
MTM_LOG1("Replay action %c[%x]", action, s.data[s.cursor]);

contrib/mmts/pglogical_proto.c

+5-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
184184
if (txn->xact_action == XLOG_XACT_COMMIT)
185185
flags = PGLOGICAL_COMMIT;
186186
else if (txn->xact_action == XLOG_XACT_PREPARE)
187-
flags = strcmp(txn->state_3pc, "precommitted") == 0 ? PGLOGICAL_PRECOMMIT_PREPARED : PGLOGICAL_PREPARE;
187+
flags = *txn->state_3pc ? PGLOGICAL_PRECOMMIT_PREPARED : PGLOGICAL_PREPARE;
188188
else if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED)
189189
flags = PGLOGICAL_COMMIT_PREPARED;
190190
else if (txn->xact_action == XLOG_XACT_ABORT_PREPARED)
@@ -214,6 +214,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
214214
MTM_LOG1("Send ABORT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
215215
txn->xid, txn->gid, txn->end_lsn, MtmReplicationNodeId, isRecovery, txn->origin_id, csn);
216216
}
217+
if (flags == PGLOGICAL_PRECOMMIT_PREPARED) {
218+
MTM_LOG2("Send PGLOGICAL_PRECOMMIT_PREPARED for transaction %d (%s) end_lsn=%lx to node %d, isRecovery=%d, txn->origin_id=%d, csn=%ld",
219+
txn->xid, txn->gid, txn->end_lsn, MtmReplicationNodeId, isRecovery, txn->origin_id, csn);
220+
}
217221
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {
218222
MTM_LOG1("wal-sender complete recovery of node %d at LSN(commit %lx, end %lx, log %lx) in transaction %s event %d", MtmReplicationNodeId, commit_lsn, txn->end_lsn, GetXLogInsertRecPtr(), txn->gid, flags);
219223
flags |= PGLOGICAL_CAUGHT_UP;

0 commit comments

Comments
 (0)