Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b06753a

Browse files
committed
merge
2 parents 94b6265 + b958316 commit b06753a

File tree

6 files changed

+143
-49
lines changed

6 files changed

+143
-49
lines changed

contrib/mmts/arbiter.c

+15-8
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,14 @@ static void MtmSetSocketOptions(int sd)
313313

314314
static void MtmCheckResponse(MtmArbiterMessage* resp)
315315
{
316-
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) && !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)) {
316+
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1)
317+
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
318+
&& Mtm->status != MTM_RECOVERY
319+
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
320+
{
317321
elog(WARNING, "Node %d thinks that I was dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], messageKindText[resp->code]);
318-
if (Mtm->status != MTM_RECOVERY) {
319-
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
320-
MtmSwitchClusterMode(MTM_RECOVERY);
321-
}
322+
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
323+
MtmSwitchClusterMode(MTM_RECOVERY);
322324
}
323325
}
324326

@@ -561,7 +563,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
561563
static int MtmReadFromNode(int node, void* buf, int buf_size)
562564
{
563565
int rc = MtmReadSocket(sockets[node], buf, buf_size);
564-
if (rc < 0) {
566+
if (rc <= 0) {
565567
elog(WARNING, "Arbiter failed to read from node=%d, rc=%d, errno=%d", node+1, rc, errno);
566568
MtmDisconnect(node);
567569
}
@@ -957,6 +959,7 @@ static void MtmReceiver(Datum arg)
957959
elog(WARNING, "Ignore response for unexisted transaction %d from node %d", msg->dxid, node);
958960
continue;
959961
}
962+
Assert(msg->code == MSG_ABORTED || strcmp(msg->gid, ts->gid) == 0);
960963
if (BIT_CHECK(ts->votedMask, node-1)) {
961964
elog(WARNING, "Receive deteriorated %s response for transaction %d (%s) from node %d",
962965
messageKindText[msg->code], ts->xid, ts->gid, node);
@@ -990,6 +993,8 @@ static void MtmReceiver(Datum arg)
990993
MtmWakeUpBackend(ts);
991994
} else {
992995
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
996+
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
997+
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
993998
ts->isPrepared = true;
994999
if (ts->isTwoPhase) {
9951000
MtmWakeUpBackend(ts);
@@ -1048,9 +1053,11 @@ static void MtmReceiver(Datum arg)
10481053
ts->csn = MtmAssignCSN();
10491054
MtmAdjustSubtransactions(ts);
10501055
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
1051-
} else {
1052-
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
1056+
} else if (ts->status == TRANSACTION_STATUS_ABORTED) {
10531057
MtmSend2PCMessage(ts, MSG_ABORTED);
1058+
} else {
1059+
elog(WARNING, "Transaction %s is already %s",
1060+
ts->gid, ts->status == TRANSACTION_STATUS_COMMITTED ? "committed" : "prepared");
10541061
}
10551062
break;
10561063
default:

contrib/mmts/multimaster.c

+97-18
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include "miscadmin.h"
1616

1717
#include "libpq-fe.h"
18+
#include "lib/stringinfo.h"
19+
#include "libpq/pqformat.h"
1820
#include "common/username.h"
1921

2022
#include "postmaster/postmaster.h"
@@ -926,7 +928,9 @@ MtmVotingCompleted(MtmTransState* ts)
926928
ts->votingCompleted = true;
927929
ts->status = TRANSACTION_STATUS_UNKNOWN;
928930
return true;
929-
} else {
931+
} else {
932+
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
933+
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
930934
ts->isPrepared = true;
931935
if (ts->isTwoPhase) {
932936
ts->votingCompleted = true;
@@ -980,9 +984,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
980984
MtmResetTransaction();
981985
} else {
982986
int result = 0;
983-
987+
int nConfigChanges = Mtm->nConfigChanges;
984988
/* Wait votes from all nodes until: */
985-
while (!MtmVotingCompleted(ts))
989+
while (!MtmVotingCompleted(ts)
990+
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
986991
{
987992
MtmUnlock();
988993
MTM_TXTRACE(x, "PostPrepareTransaction WaitLatch Start");
@@ -998,8 +1003,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9981003
MtmLock(LW_EXCLUSIVE);
9991004
}
10001005
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1006+
if (ts->isPrepared) {
1007+
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1008+
}
1009+
if (Mtm->status != MTM_ONLINE) {
1010+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1011+
} else {
1012+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1013+
}
10011014
MtmAbortTransaction(ts);
1002-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
10031015
}
10041016
x->status = ts->status;
10051017
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
@@ -1032,6 +1044,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10321044
elog(WARNING, "Global transaciton ID '%s' is not found", x->gid);
10331045
} else {
10341046
int result = 0;
1047+
int nConfigChanges = Mtm->nConfigChanges;
10351048

10361049
Assert(tm->state != NULL);
10371050
MTM_LOG3("Commit prepared transaction %d with gid='%s'", x->xid, x->gid);
@@ -1046,7 +1059,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10461059
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10471060

10481061
/* Wait votes from all nodes until: */
1049-
while (!MtmVotingCompleted(ts))
1062+
while (!MtmVotingCompleted(ts)
1063+
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
10501064
{
10511065
MtmUnlock();
10521066
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Start");
@@ -1063,8 +1077,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10631077
}
10641078
}
10651079
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1080+
if (ts->isPrepared) {
1081+
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1082+
}
1083+
if (Mtm->status != MTM_ONLINE) {
1084+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1085+
} else {
1086+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1087+
}
10661088
MtmAbortTransaction(ts);
1067-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
10681089
}
10691090
x->status = ts->status;
10701091
x->xid = ts->xid;
@@ -1166,11 +1187,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11661187
}
11671188
ts->status = TRANSACTION_STATUS_ABORTED;
11681189
ts->isLocal = true;
1190+
ts->isPrepared = false;
11691191
ts->snapshot = x->snapshot;
1192+
ts->isTwoPhase = x->isTwoPhase;
11701193
ts->csn = MtmAssignCSN();
11711194
ts->gtid = x->gtid;
11721195
ts->nSubxids = 0;
11731196
ts->votingCompleted = true;
1197+
strcpy(ts->gid, x->gid);
11741198
if (ts->isActive) {
11751199
ts->isActive = false;
11761200
Assert(Mtm->nActiveTransactions != 0);
@@ -1226,8 +1250,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12261250
int i;
12271251
for (i = 0; i < Mtm->nAllNodes; i++)
12281252
{
1229-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i) && TransactionIdIsValid(ts->xids[i]))
1253+
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i))
12301254
{
1255+
Assert(TransactionIdIsValid(ts->xids[i]));
12311256
msg.node = i+1;
12321257
msg.dxid = ts->xids[i];
12331258
MtmSendMessage(&msg);
@@ -1655,7 +1680,7 @@ MtmCheckClusterLock()
16551680
continue;
16561681
} else {
16571682
/* All lockers are synchronized their logs */
1658-
/* Remove lock and mark them as rceovered */
1683+
/* Remove lock and mark them as recovered */
16591684
MTM_LOG1("Complete recovery of %d nodes (node mask %lx)", Mtm->nLockers, (long) Mtm->nodeLockerMask);
16601685
Assert(Mtm->walSenderLockerMask == 0);
16611686
Assert((Mtm->nodeLockerMask & Mtm->disabledNodeMask) == Mtm->nodeLockerMask);
@@ -2082,6 +2107,8 @@ static void MtmInitialize()
20822107
Mtm->nodes[i].timeline = 0;
20832108
}
20842109
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
2110+
/* All transaction originated from the current node should be ignored during recovery */
2111+
Mtm->nodes[MtmNodeId-1].restartLsn = (XLogRecPtr)PG_UINT64_MAX;
20852112
PGSemaphoreCreate(&Mtm->sendSemaphore);
20862113
PGSemaphoreReset(&Mtm->sendSemaphore);
20872114
SpinLockInit(&Mtm->spinlock);
@@ -2808,12 +2835,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28082835
Assert(!IsTransactionState());
28092836
MtmResetTransaction();
28102837
StartTransactionCommand();
2811-
#if 0
2812-
if (Mtm->nodes[MtmNodeId-1].originId == InvalidRepOriginId) {
2813-
/* This dummy origin is used for local commits/aborts which should not be replicated */
2814-
Mtm->nodes[MtmNodeId-1].originId = replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId));
2815-
}
2816-
#endif
2838+
28172839
MtmBeginSession(MtmNodeId);
28182840
MtmSetCurrentTransactionCSN(ts->csn);
28192841
MtmSetCurrentTransactionGID(ts->gid);
@@ -2830,7 +2852,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28302852
*/
28312853
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
28322854
{
2833-
int i;
28342855
bool recovery = false;
28352856

28362857
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE)
@@ -2852,9 +2873,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28522873
Mtm->nReceivers = 0;
28532874
Mtm->recoveryCount += 1;
28542875
Mtm->pglogicalNodeMask = 0;
2855-
for (i = 0; i < Mtm->nAllNodes; i++) {
2856-
Mtm->nodes[i].restartLsn = InvalidXLogRecPtr;
2857-
}
28582876
MtmUnlock();
28592877
return REPLMODE_RECOVERY;
28602878
}
@@ -3071,6 +3089,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
30713089
return isDistributed;
30723090
}
30733091

3092+
bool MtmFilterTransaction(char* record, int size)
3093+
{
3094+
StringInfoData s;
3095+
uint8 flags;
3096+
XLogRecPtr origin_lsn;
3097+
XLogRecPtr end_lsn;
3098+
int replication_node;
3099+
int origin_node;
3100+
char const* gid = "";
3101+
bool duplicate;
3102+
3103+
s.data = record;
3104+
s.len = size;
3105+
s.maxlen = -1;
3106+
s.cursor = 0;
3107+
3108+
Assert(pq_getmsgbyte(&s) == 'C');
3109+
flags = pq_getmsgbyte(&s); /* flags */
3110+
replication_node = pq_getmsgbyte(&s);
3111+
3112+
/* read fields */
3113+
pq_getmsgint64(&s); /* commit_lsn */
3114+
end_lsn = pq_getmsgint64(&s); /* end_lsn */
3115+
pq_getmsgint64(&s); /* commit_time */
3116+
3117+
origin_node = pq_getmsgbyte(&s);
3118+
origin_lsn = pq_getmsgint64(&s);
3119+
3120+
Assert(replication_node == MtmReplicationNodeId &&
3121+
origin_node != 0 &&
3122+
(Mtm->status == MTM_RECOVERY || origin_node == replication_node));
3123+
3124+
switch(PGLOGICAL_XACT_EVENT(flags))
3125+
{
3126+
case PGLOGICAL_PREPARE:
3127+
case PGLOGICAL_ABORT_PREPARED:
3128+
gid = pq_getmsgstring(&s);
3129+
break;
3130+
case PGLOGICAL_COMMIT_PREPARED:
3131+
pq_getmsgint64(&s); /* CSN */
3132+
gid = pq_getmsgstring(&s);
3133+
break;
3134+
default:
3135+
break;
3136+
}
3137+
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLsn;
3138+
3139+
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3140+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLsn);
3141+
if (Mtm->status == MTM_RECOVERY) {
3142+
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
3143+
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
3144+
}
3145+
} else {
3146+
if (Mtm->nodes[replication_node-1].restartLsn < end_lsn) {
3147+
Mtm->nodes[replication_node-1].restartLsn = end_lsn;
3148+
}
3149+
}
3150+
return duplicate;
3151+
}
3152+
30743153
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
30753154
{
30763155
hooks->startup_hook = MtmReplicationStartupHook;

contrib/mmts/multimaster.h

+2
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,6 @@ extern void MtmBeginSession(int nodeId);
362362
extern void MtmEndSession(int nodeId, bool unlock);
363363
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
364364
extern void MtmRollbackPreparedTransaction(char const* gid);
365+
extern bool MtmFilterTransaction(char* record, int size);
366+
365367
#endif

contrib/mmts/pglogical_apply.c

+1-4
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,10 @@ process_remote_begin(StringInfo s)
344344
Assert(gtid.node > 0);
345345

346346
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
347+
MtmResetTransaction();
347348
#if 1
348349
if (BIT_CHECK(Mtm->disabledNodeMask, gtid.node-1)) {
349350
elog(WARNING, "Ignore transaction %d from disabled node %d", gtid.xid, gtid.node);
350-
MtmResetTransaction();
351351
return false;
352352
}
353353
#endif
@@ -603,9 +603,6 @@ process_remote_commit(StringInfo in)
603603
origin_node = pq_getmsgbyte(in);
604604
origin_lsn = pq_getmsgint64(in);
605605

606-
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
607-
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
608-
}
609606
if (origin_node != MtmReplicationNodeId) {
610607
replorigin_advance(Mtm->nodes[origin_node-1].originId, origin_lsn, GetXLogInsertRecPtr(),
611608
false /* backward */ , false /* WAL */ );

contrib/mmts/pglogical_proto.c

+1
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187187
Assert(false);
188188

189189
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
190+
Assert(txn->xid < 1000 || MtmTransactionRecords >= 2);
190191
// if (MtmIsFilteredTxn) {
191192
// Assert(MtmTransactionRecords == 0);
192193
// return;

contrib/mmts/pglogical_receiver.c

+27-19
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ pglogical_receiver_main(Datum main_arg)
339339
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
340340
} else {
341341
originStartPos = replorigin_get_progress(originId, false);
342+
if (Mtm->nodes[nodeId-1].restartLsn < originStartPos) {
343+
Mtm->nodes[nodeId-1].restartLsn = originStartPos;
344+
}
342345
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
343346
}
344347
Mtm->nodes[nodeId-1].originId = originId;
@@ -535,27 +538,32 @@ pglogical_receiver_main(Datum main_arg)
535538
ByteBufferAppend(&buf, stmt, rc - hdr_len);
536539
if (stmt[0] == 'C') /* commit */
537540
{
538-
if (spill_file >= 0) {
539-
ByteBufferAppend(&buf, ")", 1);
540-
pq_sendbyte(&spill_info, '(');
541-
pq_sendint(&spill_info, buf.used, 4);
542-
MtmSpillToFile(spill_file, buf.data, buf.used);
543-
MtmCloseSpillFile(spill_file);
544-
MtmExecute(spill_info.data, spill_info.len);
545-
spill_file = -1;
546-
resetStringInfo(&spill_info);
547-
} else {
548-
if (MtmPreserveCommitOrder && buf.used == rc - hdr_len) {
549-
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
550-
timestamp_t stop, start = MtmGetSystemTime();
551-
MtmExecutor(buf.data, buf.used);
552-
stop = MtmGetSystemTime();
553-
if (stop - start > USECS_PER_SEC) {
554-
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
541+
if (!MtmFilterTransaction(stmt, rc - hdr_len)) {
542+
if (spill_file >= 0) {
543+
ByteBufferAppend(&buf, ")", 1);
544+
pq_sendbyte(&spill_info, '(');
545+
pq_sendint(&spill_info, buf.used, 4);
546+
MtmSpillToFile(spill_file, buf.data, buf.used);
547+
MtmCloseSpillFile(spill_file);
548+
MtmExecute(spill_info.data, spill_info.len);
549+
spill_file = -1;
550+
resetStringInfo(&spill_info);
551+
} else {
552+
if (MtmPreserveCommitOrder && buf.used == rc - hdr_len) {
553+
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
554+
timestamp_t stop, start = MtmGetSystemTime();
555+
MtmExecutor(buf.data, buf.used);
556+
stop = MtmGetSystemTime();
557+
if (stop - start > USECS_PER_SEC) {
558+
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
559+
}
560+
} else {
561+
MtmExecute(buf.data, buf.used);
555562
}
556-
} else {
557-
MtmExecute(buf.data, buf.used);
558563
}
564+
} else if (spill_file >= 0) {
565+
MtmCloseSpillFile(spill_file);
566+
spill_file = -1;
559567
}
560568
ByteBufferReset(&buf);
561569
}

0 commit comments

Comments
 (0)