Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d065889

Browse files
knizhnikkelvich
authored andcommitted
Store origin in abort WAL record
1 parent 22139df commit d065889

9 files changed

+75
-56
lines changed

arbiter.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,8 +1043,8 @@ static void MtmReceiver(Datum arg)
10431043
if ((~msg->disabledNodeMask & Mtm->disabledNodeMask) != 0) {
10441044
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
10451045
commit on smaller subset of nodes */
1046-
elog(WARNING, "Coordinator of distributed transaction see less nodes than node %d: %llx instead of %llx",
1047-
node, (long long) Mtm->disabledNodeMask, (long long) msg->disabledNodeMask);
1046+
elog(WARNING, "Coordinator of distributed transaction %s (%d) see less nodes than node %d: %llx instead of %llx",
1047+
ts->gid, ts->xid, node, (long long) Mtm->disabledNodeMask, (long long) msg->disabledNodeMask);
10481048
MtmAbortTransaction(ts);
10491049
}
10501050
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
@@ -1084,6 +1084,7 @@ static void MtmReceiver(Datum arg)
10841084
continue;
10851085
}
10861086
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1087+
MTM_LOG1("Arbiter receive abort message for transaction %s (%d)", ts->gid, ts->xid);
10871088
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
10881089
MtmAbortTransaction(ts);
10891090
}

multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ LANGUAGE C;
4545
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer,
4646
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" integer, "configChanges" integer);
4747

48-
CREATE TYPE mtm.trans_state AS ("status" text, "gid" text, "xid" integer, "coordinator" integer, "gxid" integer, "csn" timestamp, "snapshot" timestamp, "local" boolean, "prepared" boolean, "active" boolean, "twophase" boolean, "votingCompleted" boolean, "participants" bigint, "voted" bigint);
48+
CREATE TYPE mtm.trans_state AS ("status" text, "gid" text, "xid" integer, "coordinator" integer, "gxid" integer, "csn" timestamp, "snapshot" timestamp, "local" boolean, "prepared" boolean, "active" boolean, "twophase" boolean, "votingCompleted" boolean, "participants" bigint, "voted" bigint, "configChanges" integer);
4949

5050
CREATE FUNCTION mtm.get_trans_by_gid(git text) RETURNS mtm.trans_state
5151
AS 'MODULE_PATHNAME','mtm_get_trans_by_gid'

multimaster.c

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static void MtmShmemStartup(void);
156156
static BgwPool* MtmPoolConstructor(void);
157157
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
158158
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
159-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bool contextFree);
159+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional);
160160

161161
MtmState* Mtm;
162162

@@ -885,6 +885,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
885885
ts->procno = MyProc->pgprocno;
886886
ts->votingCompleted = false;
887887
ts->participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
888+
ts->nConfigChanges = Mtm->nConfigChanges;
888889
ts->votedMask = 0;
889890
ts->nSubxids = xactGetCommittedChildren(&subxids);
890891
if (!ts->isActive) {
@@ -979,14 +980,20 @@ void MtmPrecommitTransaction(char const* gid)
979980
static bool
980981
MtmVotingCompleted(MtmTransState* ts)
981982
{
982-
nodemask_t liveNodesMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
983-
if (!ts->isPrepared && ts->participantsMask != liveNodesMask)
984-
{
985-
elog(WARNING, "Abort transaction %d (%s) because cluster configuration is changed from %lx to %lx since transaction start",
986-
ts->xid, ts->gid, ts->participantsMask, liveNodesMask);
987-
MtmAbortTransaction(ts);
988-
return true;
983+
nodemask_t liveNodesMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
984+
985+
if (!ts->isPrepared) { /* We can not just abort precommitted transactions */
986+
if (ts->nConfigChanges != Mtm->nConfigChanges)
987+
{
988+
elog(WARNING, "Abort transaction %s (%d) because cluster configuration is changed from %lx to %lx since transaction start",
989+
ts->gid, ts->xid, ts->participantsMask, liveNodesMask);
990+
MtmAbortTransaction(ts);
991+
return true;
992+
}
993+
/* If cluster configuration was not changed, then node mask should not changed as well */
994+
Assert(ts->participantsMask == liveNodesMask);
989995
}
996+
990997
if (ts->votingCompleted) {
991998
return true;
992999
}
@@ -1071,9 +1078,9 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10711078
x->isSuspended = true;
10721079
} else {
10731080
if (Mtm->status != MTM_ONLINE) {
1074-
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1081+
elog(WARNING, "Commit of distributed transaction %s (%d) is canceled because node is switched to %s mode", ts->gid, ts->xid, MtmNodeStatusMnem[Mtm->status]);
10751082
} else {
1076-
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
1083+
elog(WARNING, "Commit of distributed transaction %s (%d) is canceled because cluster configuration was changed", ts->gid, ts->xid);
10771084
}
10781085
MtmAbortTransaction(ts);
10791086
}
@@ -1201,7 +1208,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
12011208
MtmUnlock();
12021209
x->status = TRANSACTION_STATUS_ABORTED;
12031210
} else {
1204-
MTM_LOG1("Transaction %d with gid='%s' is already aborted", x->xid, x->gid);
1211+
MTM_LOG1("Transaction %s (%d) is already aborted", x->gid, x->xid);
12051212
}
12061213
}
12071214

@@ -1262,7 +1269,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12621269
Mtm->nActiveTransactions -= 1;
12631270
MtmAdjustSubtransactions(ts);
12641271
} else {
1265-
MTM_LOG1("%d: abort transaction %d gid='%s' is called from MtmEndTransaction", MyProcPid, x->xid, x->gid);
1272+
MTM_LOG1("%d: abort transaction %s (%d) is called from MtmEndTransaction", MyProcPid, x->gid, x->xid);
12661273
MtmAbortTransaction(ts);
12671274
}
12681275
}
@@ -1272,7 +1279,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12721279
* Send notification only if ABORT happens during transaction processing at replicas,
12731280
* do not send notification if ABORT is received from master
12741281
*/
1275-
MTM_LOG1("%d: send ABORT notification for transaction %d (%s) to coordinator %d", MyProcPid, x->gtid.xid, x->gid, x->gtid.node);
1282+
MTM_LOG1("%d: send ABORT notification for transaction %d (%s) local xid=%d to coordinator %d", MyProcPid, x->gtid.xid, x->gid, x->xid, x->gtid.node);
12761283
if (ts == NULL) {
12771284
bool found;
12781285
Assert(TransactionIdIsValid(x->xid));
@@ -1441,6 +1448,7 @@ static void MtmLoadPreparedTransactions(void)
14411448
ts->nSubxids = 0;
14421449
ts->votingCompleted = true;
14431450
ts->participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask & ~((nodemask_t)1 << (MtmNodeId-1));
1451+
ts->nConfigChanges = Mtm->nConfigChanges;
14441452
ts->votedMask = 0;
14451453
strcpy(ts->gid, gid);
14461454
MtmTransactionListAppend(ts);
@@ -1601,9 +1609,9 @@ void MtmAbortTransaction(MtmTransState* ts)
16011609
Assert(MtmLockCount != 0); /* should be invoked with exclsuive lock */
16021610
if (ts->status != TRANSACTION_STATUS_ABORTED) {
16031611
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1604-
elog(LOG, "Attempt to rollback already committed transaction %d (%s)", ts->xid, ts->gid);
1612+
elog(LOG, "Attempt to rollback already committed transaction %s (%d)", ts->gid, ts->xid);
16051613
} else {
1606-
MTM_LOG1("Rollback active transaction %d:%d (local xid %d) status %s", ts->gtid.node, ts->gtid.xid, ts->xid, MtmTxnStatusMnem[ts->status]);
1614+
MTM_LOG1("Rollback active transaction %s (%d) %d:%d status %s", ts->gid, ts->xid, ts->gtid.node, ts->gtid.xid, MtmTxnStatusMnem[ts->status]);
16071615
ts->status = TRANSACTION_STATUS_ABORTED;
16081616
MtmAdjustSubtransactions(ts);
16091617
if (ts->isActive) {
@@ -3094,9 +3102,9 @@ void MtmReleaseRecoverySlot(int nodeId)
30943102
void MtmRollbackPreparedTransaction(int nodeId, char const* gid)
30953103
{
30963104
XidStatus status = MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED);
3097-
MTM_LOG1("Abort prepared transaction %s status %s", gid, MtmTxnStatusMnem[status]);
3105+
MTM_LOG1("Abort prepared transaction %s status %s from node %d originId=%d", gid, MtmTxnStatusMnem[status], nodeId, Mtm->nodes[nodeId-1].originId);
30983106
if (status == TRANSACTION_STATUS_UNKNOWN) {
3099-
MTM_LOG2("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
3107+
MTM_LOG1("PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", gid);
31003108
MtmResetTransaction();
31013109
StartTransactionCommand();
31023110
MtmBeginSession(nodeId);
@@ -3806,6 +3814,7 @@ mtm_get_trans_by_gid(PG_FUNCTION_ARGS)
38063814
values[11] = BoolGetDatum(ts->votingCompleted);
38073815
values[12] = Int64GetDatum(ts->participantsMask);
38083816
values[13] = Int64GetDatum(ts->votedMask);
3817+
values[14] = Int32GetDatum(ts->nConfigChanges);
38093818
}
38103819
MtmUnlock();
38113820

@@ -4246,12 +4255,12 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
42464255
} else {
42474256
CommitTransactionCommand();
42484257
if (x->isSuspended) {
4249-
elog(WARNING, "Transaction %s is left in prepared state because coordinator node is not online", x->gid);
4258+
elog(WARNING, "Transaction %s (%d) is left in prepared state because coordinator node is not online", x->gid, x->xid);
42504259
} else {
42514260
StartTransactionCommand();
42524261
if (x->status == TRANSACTION_STATUS_ABORTED) {
42534262
FinishPreparedTransaction(x->gid, false);
4254-
elog(ERROR, "Transaction aborted by DTM");
4263+
elog(ERROR, "Transaction %s (%d) is aborted by DTM", x->gid, x->xid);
42554264
} else {
42564265
FinishPreparedTransaction(x->gid, true);
42574266
}
@@ -4382,14 +4391,13 @@ static void MtmGucSet(VariableSetStmt *stmt, const char *queryStr)
43824391
MemoryContextSwitchTo(oldcontext);
43834392
}
43844393

4385-
static char * MtmGucSerialize(void)
4394+
char* MtmGucSerialize(void)
43864395
{
43874396
StringInfo serialized_gucs;
43884397
dlist_iter iter;
43894398
int nvars = 0;
43904399

43914400
serialized_gucs = makeStringInfo();
4392-
appendStringInfoString(serialized_gucs, "RESET SESSION AUTHORIZATION; reset all; ");
43934401

43944402
dlist_foreach(iter, &MtmGucList)
43954403
{
@@ -4423,30 +4431,16 @@ static char * MtmGucSerialize(void)
44234431
* -------------------------------------------
44244432
*/
44254433

4426-
static bool MtmProcessDDLCommand(char const* queryString, bool transactional, bool contextFree)
4434+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional)
44274435
{
4428-
char *queryWithContext = (char *) queryString;
4429-
char *gucContext;
4430-
4431-
if (!contextFree) {
4432-
/* Append global GUC to utility stmt. */
4433-
gucContext = MtmGucSerialize();
4434-
if (gucContext)
4435-
{
4436-
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
4437-
strcpy(queryWithContext, gucContext);
4438-
strcat(queryWithContext, queryString);
4439-
}
4440-
}
4441-
4442-
MTM_LOG3("Sending utility: %s", queryWithContext);
4436+
MTM_LOG3("Sending utility: %s", queryString);
44434437
if (transactional) {
44444438
/* DDL */
4445-
LogLogicalMessage("D", queryWithContext, strlen(queryWithContext) + 1, true);
4439+
LogLogicalMessage("D", queryString, strlen(queryString) + 1, true);
44464440
MtmTx.containsDML = true;
44474441
} else {
44484442
/* CONCURRENT DDL */
4449-
XLogFlush(LogLogicalMessage("C", queryWithContext, strlen(queryWithContext) + 1, false));
4443+
XLogFlush(LogLogicalMessage("C", queryString, strlen(queryString) + 1, false));
44504444
}
44514445
return false;
44524446
}
@@ -4718,9 +4712,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
47184712
MtmUtilityProcessedInXid = GetCurrentTransactionId();
47194713

47204714
if (context == PROCESS_UTILITY_TOPLEVEL)
4721-
MtmProcessDDLCommand(queryString, true, false);
4715+
MtmProcessDDLCommand(queryString, true);
47224716
else
4723-
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
4717+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
47244718

47254719
executed = true;
47264720
}
@@ -4777,7 +4771,7 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
47774771
}
47784772

47794773
if (ddl_generating_call && !MtmTx.isReplicated)
4780-
MtmProcessDDLCommand(ActivePortal->sourceText, true, false);
4774+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
47814775

47824776
if (PreviousExecutorStartHook != NULL)
47834777
PreviousExecutorStartHook(queryDesc, eflags);

multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@
8181
#define Anum_mtm_local_tables_rel_name 2
8282

8383
#define Natts_mtm_cluster_state 16
84+
#define Natts_mtm_trans_state 15
8485
#define Natts_mtm_nodes_state 14
85-
#define Natts_mtm_trans_state 14
8686

8787
typedef uint64 csn_t; /* commit serial number */
8888
#define INVALID_CSN ((csn_t)-1)
@@ -243,6 +243,7 @@ typedef struct MtmTransState
243243
bool isActive; /* Transaction is active */
244244
bool isTwoPhase; /* User level 2PC */
245245
bool isPinned; /* Transaction oid potected from GC */
246+
int nConfigChanges; /* Number of cluster configuration changes at moment of transaction start */
246247
nodemask_t participantsMask; /* Mask of nodes involved in transaction */
247248
nodemask_t votedMask; /* Mask of voted nodes */
248249
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
@@ -398,5 +399,5 @@ extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
398399
extern void MtmRollbackPreparedTransaction(int nodeId, char const* gid);
399400
extern bool MtmFilterTransaction(char* record, int size);
400401
extern void MtmPrecommitTransaction(char const* gid);
401-
402+
extern char* MtmGucSerialize(void);
402403
#endif

pglogical_apply.c

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ static void process_remote_update(StringInfo s, Relation rel);
8080
static void process_remote_delete(StringInfo s, Relation rel);
8181

8282
static MemoryContext TopContext;
83+
static bool GucAltered; /* transaction is setting some GUC variables */
8384

8485
/*
8586
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
@@ -336,10 +337,13 @@ process_remote_begin(StringInfo s)
336337
{
337338
GlobalTransactionId gtid;
338339
csn_t snapshot;
340+
char const* gucCtx;
341+
int rc;
339342

340343
gtid.node = pq_getmsgint(s, 4);
341344
gtid.xid = pq_getmsgint(s, 4);
342345
snapshot = pq_getmsgint64(s);
346+
gucCtx = pq_getmsgstring(s);
343347

344348
Assert(gtid.node > 0);
345349

@@ -355,6 +359,22 @@ process_remote_begin(StringInfo s)
355359
StartTransactionCommand();
356360
MtmJoinTransaction(&gtid, snapshot);
357361

362+
if (*gucCtx || GucAltered) {
363+
SPI_connect();
364+
if (GucAltered) {
365+
GucAltered = *gucCtx != '\0';
366+
gucCtx = psprintf("RESET SESSION AUTHORIZATION; reset all; %s", gucCtx);
367+
} else {
368+
GucAltered = true;
369+
}
370+
ActivePortal->sourceText = gucCtx;
371+
rc = SPI_execute(gucCtx, false, 0);
372+
SPI_finish();
373+
if (rc < 0) {
374+
elog(ERROR, "Failed to set GUC context %s: %d", gucCtx, rc);
375+
}
376+
}
377+
358378
return true;
359379
}
360380

@@ -438,9 +458,9 @@ process_remote_message(StringInfo s)
438458
msg->origin_lsn = MtmSenderWalEnd;
439459
}
440460
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
441-
MTM_LOG2("[restartlsn] node %d: %lx -> %lx (MtmFilterTransaction)", origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
461+
MTM_LOG1("Receive logical abort message for transaction %s from node %d: %lx < %lx", msg->gid, origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
442462
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
443-
replorigin_session_origin_lsn = msg->origin_lsn;
463+
replorigin_session_origin_lsn = msg->origin_lsn;
444464
MtmRollbackPreparedTransaction(origin_node, msg->gid);
445465
} else {
446466
if (msg->origin_lsn != InvalidXLogRecPtr) {
@@ -702,6 +722,7 @@ process_remote_commit(StringInfo in)
702722
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
703723
gid = pq_getmsgstring(in);
704724
/* MtmRollbackPreparedTransaction will set origin session itself */
725+
MTM_LOG1("Receive ABORT_PREPARED logical message for transaction %s from node %d", gid, origin_node);
705726
MtmRollbackPreparedTransaction(origin_node, gid);
706727
break;
707728
}

pglogical_proto.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
131131
pq_sendint(out, MtmNodeId, 4);
132132
pq_sendint(out, isRecovery ? InvalidTransactionId : txn->xid, 4);
133133
pq_sendint64(out, csn);
134+
pq_sendstring(out, MtmGucSerialize());
135+
134136
MtmTransactionRecords = 0;
135137
}
136138
}

tests2/docker-entrypoint.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ if [ "$1" = 'postgres' ]; then
5757
max_wal_senders = 10
5858
shared_preload_libraries = 'multimaster'
5959
default_transaction_isolation = 'repeatable read'
60-
log_line_prefix = '%t: '
60+
log_line_prefix = '%m: '
6161
6262
multimaster.workers = 4
6363
multimaster.max_workers = 16
@@ -69,7 +69,7 @@ if [ "$1" = 'postgres' ]; then
6969
multimaster.conn_strings = '$CONNSTRS'
7070
multimaster.heartbeat_recv_timeout = 1100
7171
multimaster.heartbeat_send_timeout = 250
72-
multimaster.min_2pc_timeout = 40000
72+
multimaster.min_2pc_timeout = 100000
7373
EOF
7474

7575
cat $PGDATA/postgresql.conf

tests2/support/docker-regress.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ SQL
1717

1818
STATUS=$?
1919

20-
#if [ -f "regression.diffs" ]
21-
#then
22-
# cat regression.diffs
23-
#fi
20+
if [ -f "regression.diffs" ]
21+
then
22+
cat regression.diffs
23+
fi
2424

2525
exit $STATUS

tests2/test_regression.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def setUpClass(self):
1515
@classmethod
1616
def tearDownClass(self):
1717
print('tearDown')
18-
subprocess.check_call(['docker-compose','down'])
18+
#subprocess.check_call(['docker-compose','down'])
1919

2020
def test_regression(self):
2121
# XXX: make smth clever here

0 commit comments

Comments
 (0)