Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 194c178

Browse files
knizhnikkelvich
authored andcommitted
Filter applied transactions
1 parent f83bad4 commit 194c178

File tree

6 files changed

+146
-50
lines changed

6 files changed

+146
-50
lines changed

arbiter.c

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,14 @@ static void MtmSetSocketOptions(int sd)
313313

314314
static void MtmCheckResponse(MtmArbiterMessage* resp)
315315
{
316-
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1) && !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)) {
316+
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1)
317+
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
318+
&& Mtm->status != MTM_RECOVERY
319+
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
320+
{
317321
elog(WARNING, "Node %d thinks that I was dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], messageKindText[resp->code]);
318-
if (Mtm->status != MTM_RECOVERY) {
319-
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
320-
MtmSwitchClusterMode(MTM_RECOVERY);
321-
}
322+
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
323+
MtmSwitchClusterMode(MTM_RECOVERY);
322324
}
323325
}
324326

@@ -561,7 +563,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
561563
static int MtmReadFromNode(int node, void* buf, int buf_size)
562564
{
563565
int rc = MtmReadSocket(sockets[node], buf, buf_size);
564-
if (rc < 0) {
566+
if (rc <= 0) {
565567
elog(WARNING, "Arbiter failed to read from node=%d, rc=%d, errno=%d", node+1, rc, errno);
566568
MtmDisconnect(node);
567569
}
@@ -957,6 +959,7 @@ static void MtmReceiver(Datum arg)
957959
elog(WARNING, "Ignore response for unexisted transaction %d from node %d", msg->dxid, node);
958960
continue;
959961
}
962+
Assert(msg->code == MSG_ABORTED || strcmp(msg->gid, ts->gid) == 0);
960963
if (BIT_CHECK(ts->votedMask, node-1)) {
961964
elog(WARNING, "Receive deteriorated %s response for transaction %d (%s) from node %d",
962965
messageKindText[msg->code], ts->xid, ts->gid, node);
@@ -990,6 +993,8 @@ static void MtmReceiver(Datum arg)
990993
MtmWakeUpBackend(ts);
991994
} else {
992995
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
996+
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
997+
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
993998
ts->isPrepared = true;
994999
if (ts->isTwoPhase) {
9951000
MtmWakeUpBackend(ts);
@@ -1048,9 +1053,11 @@ static void MtmReceiver(Datum arg)
10481053
ts->csn = MtmAssignCSN();
10491054
MtmAdjustSubtransactions(ts);
10501055
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
1051-
} else {
1052-
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
1056+
} else if (ts->status == TRANSACTION_STATUS_ABORTED) {
10531057
MtmSend2PCMessage(ts, MSG_ABORTED);
1058+
} else {
1059+
elog(WARNING, "Transaction %s is already %s",
1060+
ts->gid, ts->status == TRANSACTION_STATUS_COMMITTED ? "committed" : "prepared");
10541061
}
10551062
break;
10561063
default:

multimaster.c

Lines changed: 97 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include "miscadmin.h"
1616

1717
#include "libpq-fe.h"
18+
#include "lib/stringinfo.h"
19+
#include "libpq/pqformat.h"
1820
#include "common/username.h"
1921

2022
#include "postmaster/postmaster.h"
@@ -925,7 +927,9 @@ MtmVotingCompleted(MtmTransState* ts)
925927
ts->votingCompleted = true;
926928
ts->status = TRANSACTION_STATUS_UNKNOWN;
927929
return true;
928-
} else {
930+
} else {
931+
MTM_LOG1("Transaction %s is considered as prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
932+
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
929933
ts->isPrepared = true;
930934
if (ts->isTwoPhase) {
931935
ts->votingCompleted = true;
@@ -979,9 +983,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
979983
MtmResetTransaction();
980984
} else {
981985
int result = 0;
982-
986+
int nConfigChanges = Mtm->nConfigChanges;
983987
/* Wait votes from all nodes until: */
984-
while (!MtmVotingCompleted(ts))
988+
while (!MtmVotingCompleted(ts)
989+
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
985990
{
986991
MtmUnlock();
987992
MTM_TXTRACE(x, "PostPrepareTransaction WaitLatch Start");
@@ -997,8 +1002,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
9971002
MtmLock(LW_EXCLUSIVE);
9981003
}
9991004
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1005+
if (ts->isPrepared) {
1006+
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1007+
}
1008+
if (Mtm->status != MTM_ONLINE) {
1009+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1010+
} else {
1011+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1012+
}
10001013
MtmAbortTransaction(ts);
1001-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
10021014
}
10031015
x->status = ts->status;
10041016
MTM_LOG3("%d: Result of vote: %d", MyProcPid, ts->status);
@@ -1031,6 +1043,7 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10311043
elog(WARNING, "Global transaciton ID '%s' is not found", x->gid);
10321044
} else {
10331045
int result = 0;
1046+
int nConfigChanges = Mtm->nConfigChanges;
10341047

10351048
Assert(tm->state != NULL);
10361049
MTM_LOG3("Commit prepared transaction %d with gid='%s'", x->xid, x->gid);
@@ -1045,7 +1058,8 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10451058
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10461059

10471060
/* Wait votes from all nodes until: */
1048-
while (!MtmVotingCompleted(ts))
1061+
while (!MtmVotingCompleted(ts)
1062+
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
10491063
{
10501064
MtmUnlock();
10511065
MTM_TXTRACE(x, "CommitPreparedTransaction WaitLatch Start");
@@ -1062,8 +1076,15 @@ MtmCommitPreparedTransaction(MtmCurrentTrans* x)
10621076
}
10631077
}
10641078
if (ts->status != TRANSACTION_STATUS_ABORTED && !ts->votingCompleted) {
1079+
if (ts->isPrepared) {
1080+
elog(ERROR, "Commit of distributed transaction %s is suspended because node is switched to %s mode", ts->gid, MtmNodeStatusMnem[Mtm->status]);
1081+
}
1082+
if (Mtm->status != MTM_ONLINE) {
1083+
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1084+
} else {
1085+
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1086+
}
10651087
MtmAbortTransaction(ts);
1066-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
10671088
}
10681089
x->status = ts->status;
10691090
x->xid = ts->xid;
@@ -1165,11 +1186,14 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
11651186
}
11661187
ts->status = TRANSACTION_STATUS_ABORTED;
11671188
ts->isLocal = true;
1189+
ts->isPrepared = false;
11681190
ts->snapshot = x->snapshot;
1191+
ts->isTwoPhase = x->isTwoPhase;
11691192
ts->csn = MtmAssignCSN();
11701193
ts->gtid = x->gtid;
11711194
ts->nSubxids = 0;
11721195
ts->votingCompleted = true;
1196+
strcpy(ts->gid, x->gid);
11731197
if (ts->isActive) {
11741198
ts->isActive = false;
11751199
Assert(Mtm->nActiveTransactions != 0);
@@ -1225,8 +1249,9 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12251249
int i;
12261250
for (i = 0; i < Mtm->nAllNodes; i++)
12271251
{
1228-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i) && TransactionIdIsValid(ts->xids[i]))
1252+
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i))
12291253
{
1254+
Assert(TransactionIdIsValid(ts->xids[i]));
12301255
msg.node = i+1;
12311256
msg.dxid = ts->xids[i];
12321257
MtmSendMessage(&msg);
@@ -1654,7 +1679,7 @@ MtmCheckClusterLock()
16541679
continue;
16551680
} else {
16561681
/* All lockers are synchronized their logs */
1657-
/* Remove lock and mark them as rceovered */
1682+
/* Remove lock and mark them as recovered */
16581683
MTM_LOG1("Complete recovery of %d nodes (node mask %lx)", Mtm->nLockers, (long) Mtm->nodeLockerMask);
16591684
Assert(Mtm->walSenderLockerMask == 0);
16601685
Assert((Mtm->nodeLockerMask & Mtm->disabledNodeMask) == Mtm->nodeLockerMask);
@@ -2081,6 +2106,8 @@ static void MtmInitialize()
20812106
Mtm->nodes[i].timeline = 0;
20822107
}
20832108
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
2109+
/* All transaction originated from the current node should be ignored during recovery */
2110+
Mtm->nodes[MtmNodeId-1].restartLsn = (XLogRecPtr)PG_UINT64_MAX;
20842111
PGSemaphoreCreate(&Mtm->sendSemaphore);
20852112
PGSemaphoreReset(&Mtm->sendSemaphore);
20862113
SpinLockInit(&Mtm->spinlock);
@@ -2807,12 +2834,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28072834
Assert(!IsTransactionState());
28082835
MtmResetTransaction();
28092836
StartTransactionCommand();
2810-
#if 0
2811-
if (Mtm->nodes[MtmNodeId-1].originId == InvalidRepOriginId) {
2812-
/* This dummy origin is used for local commits/aborts which should not be replicated */
2813-
Mtm->nodes[MtmNodeId-1].originId = replorigin_create(psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId));
2814-
}
2815-
#endif
2837+
28162838
MtmBeginSession(MtmNodeId);
28172839
MtmSetCurrentTransactionCSN(ts->csn);
28182840
MtmSetCurrentTransactionGID(ts->gid);
@@ -2829,7 +2851,6 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
28292851
*/
28302852
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
28312853
{
2832-
int i;
28332854
bool recovery = false;
28342855

28352856
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE)
@@ -2851,9 +2872,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28512872
Mtm->nReceivers = 0;
28522873
Mtm->recoveryCount += 1;
28532874
Mtm->pglogicalNodeMask = 0;
2854-
for (i = 0; i < Mtm->nAllNodes; i++) {
2855-
Mtm->nodes[i].restartLsn = InvalidXLogRecPtr;
2856-
}
28572875
MtmUnlock();
28582876
return REPLMODE_RECOVERY;
28592877
}
@@ -3070,6 +3088,67 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
30703088
return isDistributed;
30713089
}
30723090

3091+
bool MtmFilterTransaction(char* record, int size)
3092+
{
3093+
StringInfoData s;
3094+
uint8 flags;
3095+
XLogRecPtr origin_lsn;
3096+
XLogRecPtr end_lsn;
3097+
int replication_node;
3098+
int origin_node;
3099+
char const* gid = "";
3100+
bool duplicate;
3101+
3102+
s.data = record;
3103+
s.len = size;
3104+
s.maxlen = -1;
3105+
s.cursor = 0;
3106+
3107+
Assert(pq_getmsgbyte(&s) == 'C');
3108+
flags = pq_getmsgbyte(&s); /* flags */
3109+
replication_node = pq_getmsgbyte(&s);
3110+
3111+
/* read fields */
3112+
pq_getmsgint64(&s); /* commit_lsn */
3113+
end_lsn = pq_getmsgint64(&s); /* end_lsn */
3114+
pq_getmsgint64(&s); /* commit_time */
3115+
3116+
origin_node = pq_getmsgbyte(&s);
3117+
origin_lsn = pq_getmsgint64(&s);
3118+
3119+
Assert(replication_node == MtmReplicationNodeId &&
3120+
origin_node != 0 &&
3121+
(Mtm->status == MTM_RECOVERY || origin_node == replication_node));
3122+
3123+
switch(PGLOGICAL_XACT_EVENT(flags))
3124+
{
3125+
case PGLOGICAL_PREPARE:
3126+
case PGLOGICAL_ABORT_PREPARED:
3127+
gid = pq_getmsgstring(&s);
3128+
break;
3129+
case PGLOGICAL_COMMIT_PREPARED:
3130+
pq_getmsgint64(&s); /* CSN */
3131+
gid = pq_getmsgstring(&s);
3132+
break;
3133+
default:
3134+
break;
3135+
}
3136+
duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLsn;
3137+
3138+
MTM_LOG1("%s transaction %s from node %d lsn %lx, origin node %d, original lsn=%lx, current lsn=%lx",
3139+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLsn);
3140+
if (Mtm->status == MTM_RECOVERY) {
3141+
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
3142+
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
3143+
}
3144+
} else {
3145+
if (Mtm->nodes[replication_node-1].restartLsn < end_lsn) {
3146+
Mtm->nodes[replication_node-1].restartLsn = end_lsn;
3147+
}
3148+
}
3149+
return duplicate;
3150+
}
3151+
30733152
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
30743153
{
30753154
hooks->startup_hook = MtmReplicationStartupHook;

multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,4 +362,6 @@ extern void MtmBeginSession(int nodeId);
362362
extern void MtmEndSession(int nodeId, bool unlock);
363363
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
364364
extern void MtmRollbackPreparedTransaction(char const* gid);
365+
extern bool MtmFilterTransaction(char* record, int size);
366+
365367
#endif

pglogical_apply.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,10 @@ process_remote_begin(StringInfo s)
344344
Assert(gtid.node > 0);
345345

346346
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
347+
MtmResetTransaction();
347348
#if 1
348349
if (BIT_CHECK(Mtm->disabledNodeMask, gtid.node-1)) {
349350
elog(WARNING, "Ignore transaction %d from disabled node %d", gtid.xid, gtid.node);
350-
MtmResetTransaction();
351351
return false;
352352
}
353353
#endif
@@ -603,9 +603,6 @@ process_remote_commit(StringInfo in)
603603
origin_node = pq_getmsgbyte(in);
604604
origin_lsn = pq_getmsgint64(in);
605605

606-
if (Mtm->nodes[origin_node-1].restartLsn < origin_lsn) {
607-
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
608-
}
609606
if (origin_node != MtmReplicationNodeId) {
610607
replorigin_advance(Mtm->nodes[origin_node-1].originId, origin_lsn, GetXLogInsertRecPtr(),
611608
false /* backward */ , false /* WAL */ );

pglogical_receiver.c

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,9 @@ pglogical_receiver_main(Datum main_arg)
339339
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
340340
} else {
341341
originStartPos = replorigin_get_progress(originId, false);
342+
if (Mtm->nodes[nodeId-1].restartLsn < originStartPos) {
343+
Mtm->nodes[nodeId-1].restartLsn = originStartPos;
344+
}
342345
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
343346
}
344347
Mtm->nodes[nodeId-1].originId = originId;
@@ -535,27 +538,32 @@ pglogical_receiver_main(Datum main_arg)
535538
ByteBufferAppend(&buf, stmt, rc - hdr_len);
536539
if (stmt[0] == 'C') /* commit */
537540
{
538-
if (spill_file >= 0) {
539-
ByteBufferAppend(&buf, ")", 1);
540-
pq_sendbyte(&spill_info, '(');
541-
pq_sendint(&spill_info, buf.used, 4);
542-
MtmSpillToFile(spill_file, buf.data, buf.used);
543-
MtmCloseSpillFile(spill_file);
544-
MtmExecute(spill_info.data, spill_info.len);
545-
spill_file = -1;
546-
resetStringInfo(&spill_info);
547-
} else {
548-
if (MtmPreserveCommitOrder && buf.used == rc - hdr_len) {
549-
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
550-
timestamp_t stop, start = MtmGetSystemTime();
551-
MtmExecutor(buf.data, buf.used);
552-
stop = MtmGetSystemTime();
553-
if (stop - start > USECS_PER_SEC) {
554-
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
541+
if (!MtmFilterTransaction(stmt, rc - hdr_len)) {
542+
if (spill_file >= 0) {
543+
ByteBufferAppend(&buf, ")", 1);
544+
pq_sendbyte(&spill_info, '(');
545+
pq_sendint(&spill_info, buf.used, 4);
546+
MtmSpillToFile(spill_file, buf.data, buf.used);
547+
MtmCloseSpillFile(spill_file);
548+
MtmExecute(spill_info.data, spill_info.len);
549+
spill_file = -1;
550+
resetStringInfo(&spill_info);
551+
} else {
552+
if (MtmPreserveCommitOrder && buf.used == rc - hdr_len) {
553+
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
554+
timestamp_t stop, start = MtmGetSystemTime();
555+
MtmExecutor(buf.data, buf.used);
556+
stop = MtmGetSystemTime();
557+
if (stop - start > USECS_PER_SEC) {
558+
elog(WARNING, "Commit of prepared transaction takes %ld usec, flags=%x", stop - start, stmt[1]);
559+
}
560+
} else {
561+
MtmExecute(buf.data, buf.used);
555562
}
556-
} else {
557-
MtmExecute(buf.data, buf.used);
558563
}
564+
} else if (spill_file >= 0) {
565+
MtmCloseSpillFile(spill_file);
566+
spill_file = -1;
559567
}
560568
ByteBufferReset(&buf);
561569
}

tests2/docker-entrypoint.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ if [ "$1" = 'postgres' ]; then
4545
# dbname=$POSTGRES_DB user=$POSTGRES_USER host=node2, \
4646
# dbname=$POSTGRES_DB user=$POSTGRES_USER host=node3"
4747

48-
# log_line_prefix = '%t: '
4948

5049
cat <<-EOF >> $PGDATA/postgresql.conf
5150
listen_addresses='*'
@@ -58,6 +57,10 @@ if [ "$1" = 'postgres' ]; then
5857
max_wal_senders = 10
5958
shared_preload_libraries = 'raftable,multimaster'
6059
default_transaction_isolation = 'repeatable read'
60+
log_checkpoints = on
61+
checkpoint_timeout = 30
62+
log_autovacuum_min_duration = 0
63+
log_line_prefix = '%t: '
6164
6265
multimaster.workers = 4
6366
multimaster.max_nodes = 3

0 commit comments

Comments
 (0)