Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9545eed

Browse files
knizhnikkelvich
authored andcommitted
Add multimaster.break_connection option
1 parent f0ed19f commit 9545eed

7 files changed

+82
-24
lines changed

arbiter.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,10 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
327327
{
328328
elog(WARNING, "Node %d thinks that I am dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], MtmMessageKindMnem[resp->code]);
329329
BIT_SET(Mtm->disabledNodeMask, MtmNodeId-1);
330+
Mtm->nConfigChanges += 1;
330331
MtmSwitchClusterMode(MTM_RECOVERY);
331332
} else if (BIT_CHECK(Mtm->disabledNodeMask, resp->node-1) && sockets[resp->node-1] < 0) {
332-
/* We receive heartbeat from dsiable node with
333+
/* We receive heartbeat from disabled node.
333334
* Looks like it is restarted.
334335
* Try to reconnect to it.
335336
*/
@@ -1040,13 +1041,16 @@ static void MtmReceiver(Datum arg)
10401041
Mtm->nodes[node-1].transDelay += MtmGetCurrentTime() - ts->csn;
10411042
ts->xids[node-1] = msg->sxid;
10421043

1044+
#if 0
1045+
/* This code seems to be deteriorated because now checking that distributed transaction involves all live nodes is done at replica while applying PREPARE */
10431046
if ((~msg->disabledNodeMask & Mtm->disabledNodeMask) != 0) {
10441047
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
10451048
commit on smaller subset of nodes */
10461049
elog(WARNING, "Coordinator of distributed transaction %s (%llu) see less nodes than node %d: %llx instead of %llx",
10471050
ts->gid, (long64)ts->xid, node, Mtm->disabledNodeMask, msg->disabledNodeMask);
10481051
MtmAbortTransaction(ts);
10491052
}
1053+
#endif
10501054
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
10511055
/* All nodes are finished their transactions */
10521056
if (ts->status == TRANSACTION_STATUS_ABORTED) {

multimaster.c

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ static int MtmGcPeriod;
246246
static bool MtmIgnoreTablesWithoutPk;
247247
static int MtmLockCount;
248248
static bool MtmMajorNode;
249+
static bool MtmBreakConnection;
249250

250251
static ExecutorStart_hook_type PreviousExecutorStartHook;
251252
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -418,16 +419,27 @@ MtmInitializeSequence(int64* start, int64* step)
418419
* -------------------------------------------
419420
*/
420421

421-
csn_t MtmTransactionSnapshot(TransactionId xid)
422+
/*
423+
* Get snapshot of transaction proceed by WAL sender pglogical plugin.
424+
* If it is local transaction or replication node is not in participant mask, then return INVALID_CSN.
425+
* Transaction should be skept by WAL sender in the following cases:
426+
* 1. Transaction was replicated from some other node and it is not a recovery process.
427+
* 2. State of transaction is unknown
428+
* 3. Replication node is not participated in transaction
429+
*/
430+
csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, nodemask_t* participantsMask)
422431
{
423432
csn_t snapshot = INVALID_CSN;
424-
433+
*participantsMask = 0;
425434
MtmLock(LW_SHARED);
426435
if (Mtm->status == MTM_ONLINE) {
427436
MtmTransState* ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
428-
if (ts != NULL && !ts->isLocal) {
429-
snapshot = ts->snapshot;
430-
Assert(ts->gtid.node == MtmNodeId || MtmIsRecoverySession);
437+
if (ts != NULL) {
438+
*participantsMask = ts->participantsMask;
439+
if (!ts->isLocal && BIT_CHECK(ts->participantsMask, nodeId-1)) {
440+
snapshot = ts->snapshot;
441+
Assert(ts->gtid.node == MtmNodeId || MtmIsRecoverySession);
442+
}
431443
}
432444
}
433445
MtmUnlock();
@@ -621,6 +633,9 @@ MtmAdjustOldestXid(TransactionId xid)
621633
return xid;
622634
}
623635

636+
637+
638+
624639
/*
625640
* -------------------------------------------
626641
* Transaction list manipulation.
@@ -789,7 +804,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
789804
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
790805
*/
791806
MtmUnlock();
792-
elog(ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
807+
elog(MtmBreakConnection ? FATAL : ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
793808
}
794809
x->containsDML = false;
795810
x->snapshot = MtmAssignCSN();
@@ -799,10 +814,8 @@ MtmBeginTransaction(MtmCurrentTrans* x)
799814

800815
/*
801816
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to caught-up.
802-
* Only "own" transactions are blocked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
803-
* and should not cause cluster status change.
804817
*/
805-
if (x->isDistributed/* && x->isReplicated*/) {
818+
if (x->isDistributed) {
806819
MtmCheckClusterLock();
807820
}
808821

@@ -1141,6 +1154,9 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
11411154
}
11421155
MtmUnlock();
11431156
if (x->isTwoPhase) {
1157+
if (x->status == TRANSACTION_STATUS_ABORTED) {
1158+
elog(ERROR, "Prepare of user's 2PC transaction %s (%llu) is aborted by DTM", x->gid, (long64)x->xid);
1159+
}
11441160
MtmResetTransaction();
11451161
}
11461162
}
@@ -2198,7 +2214,29 @@ void MtmReconnectNode(int nodeId)
21982214
MtmUnlock();
21992215
}
22002216

2201-
2217+
/*
2218+
* Check particioants of replicated transaction. This function is called by receiver at start of replicated transaction to
2219+
* check that all live nodes are participated in it.
2220+
*/
2221+
bool MtmCheckParticipants(GlobalTransactionId* gtid, nodemask_t participantsMask)
2222+
{
2223+
bool result = true;
2224+
MtmLock(LW_EXCLUSIVE);
2225+
if (BIT_CHECK(Mtm->disabledNodeMask, gtid->node-1)) {
2226+
elog(WARNING, "Ignore transaction %llu from disabled node %d", (long64)gtid->xid, gtid->node);
2227+
result = false;
2228+
} else {
2229+
nodemask_t liveMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) & ~Mtm->disabledNodeMask;
2230+
BIT_SET(participantsMask, gtid->node-1);
2231+
if (liveMask & ~participantsMask) {
2232+
elog(WARNING, "Ignore transaction %llu from node %d because some of live nodes (%llx) are not participated in it (%llx)",
2233+
(long64)gtid->xid, gtid->node, liveMask, participantsMask);
2234+
result = false;
2235+
}
2236+
}
2237+
MtmUnlock();
2238+
return result;
2239+
}
22022240

22032241
/*
22042242
* -------------------------------------------
@@ -2836,6 +2874,18 @@ _PG_init(void)
28362874
NULL
28372875
);
28382876

2877+
DefineCustomBoolVariable(
2878+
"multimaster.break_connection",
2879+
"Break connection with client when node is no online",
2880+
NULL,
2881+
&MtmBreakConnection,
2882+
false,
2883+
PGC_BACKEND,
2884+
0,
2885+
NULL,
2886+
NULL,
2887+
NULL
2888+
);
28392889
DefineCustomBoolVariable(
28402890
"multimaster.major_node",
28412891
"Node which forms a majority in case of partitioning in cliques with equal number of nodes",
@@ -4358,8 +4408,8 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
43584408
}
43594409
if (!PrepareTransactionBlock(x->gid))
43604410
{
4361-
if (!MtmVolksWagenMode)
4362-
elog(WARNING, "Failed to prepare transaction %s", x->gid);
4411+
//if (!MtmVolksWagenMode)
4412+
elog(WARNING, "Failed to prepare transaction %s (%llu)", x->gid, (long64)x->xid);
43634413
/* ??? Should we do explicit rollback */
43644414
} else {
43654415
CommitTransactionCommand();

multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ extern timestamp_t MtmRefreshClusterStatusSchedule;
347347
extern void MtmArbiterInitialize(void);
348348
extern void MtmStartReceivers(void);
349349
extern void MtmStartReceiver(int nodeId, bool dynamic);
350-
extern csn_t MtmTransactionSnapshot(TransactionId xid);
350+
extern csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, nodemask_t* participantsMask);
351351
extern csn_t MtmAssignCSN(void);
352352
extern csn_t MtmSyncClock(csn_t csn);
353353
extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot);
@@ -402,4 +402,6 @@ extern void MtmRollbackPreparedTransaction(int nodeId, char const* gid);
402402
extern bool MtmFilterTransaction(char* record, int size);
403403
extern void MtmPrecommitTransaction(char const* gid);
404404
extern char* MtmGucSerialize(void);
405+
extern bool MtmCheckParticipants(GlobalTransactionId* gtid, nodemask_t participants);
406+
405407
#endif

pglogical_apply.c

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,22 +337,21 @@ process_remote_begin(StringInfo s)
337337
{
338338
GlobalTransactionId gtid;
339339
csn_t snapshot;
340+
nodemask_t participantsMask;
340341
int rc;
341342

342343
gtid.node = pq_getmsgint(s, 4);
343344
gtid.xid = pq_getmsgint(s, 4);
344345
snapshot = pq_getmsgint64(s);
345-
346+
participantsMask = pq_getmsgint64(s);
346347
Assert(gtid.node > 0);
347348

348-
MTM_LOG2("REMOTE begin node=%d xid=%d snapshot=%lld", gtid.node, gtid.xid, snapshot);
349+
MTM_LOG2("REMOTE begin node=%d xid=%lu snapshot=%lld participantsMask=%llx", gtid.node, (long64)gtid.xid, snapshot, participantsMask);
349350
MtmResetTransaction();
350-
#if 1
351-
if (BIT_CHECK(Mtm->disabledNodeMask, gtid.node-1)) {
352-
elog(WARNING, "Ignore transaction %llu from disabled node %d", (long64)gtid.xid, gtid.node);
351+
352+
if (!MtmCheckParticipants(&gtid, participantsMask)) {
353353
return false;
354354
}
355-
#endif
356355
SetCurrentStatementStartTimestamp();
357356
StartTransactionCommand();
358357
MtmJoinTransaction(&gtid, snapshot);

pglogical_proto.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
119119
ReorderBufferTXN *txn)
120120
{
121121
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
122-
csn_t csn = MtmTransactionSnapshot(txn->xid);
122+
nodemask_t participantsMask;
123+
csn_t csn = MtmDistributedTransactionSnapshot(txn->xid, MtmReplicationNodeId, &participantsMask);
123124

124125
if (!isRecovery && csn == INVALID_CSN) {
125126
MtmIsFilteredTxn = true;
@@ -136,6 +137,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
136137
pq_sendint(out, MtmNodeId, 4);
137138
pq_sendint(out, isRecovery ? InvalidTransactionId : txn->xid, 4);
138139
pq_sendint64(out, csn);
140+
pq_sendint64(out, participantsMask);
139141

140142
MtmTransactionRecords = 0;
141143
}
@@ -205,7 +207,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
205207
return;
206208
}
207209
} else {
208-
csn_t csn = MtmTransactionSnapshot(txn->xid);
210+
nodemask_t partisipantsMask;
211+
csn_t csn = MtmDistributedTransactionSnapshot(txn->xid, MtmReplicationNodeId, &partisipantsMask);
209212
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
210213

211214
if (!isRecovery && csn == INVALID_CSN && (event != PGLOGICAL_ABORT_PREPARED || txn->origin_id != InvalidRepOriginId))

tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ if [ "$1" = 'postgres' ]; then
6969
multimaster.conn_strings = '$CONNSTRS'
7070
multimaster.heartbeat_recv_timeout = 1100
7171
multimaster.heartbeat_send_timeout = 250
72-
multimaster.min_2pc_timeout = 100000
72+
multimaster.min_2pc_timeout = 1000000
7373
EOF
7474

7575
cat $PGDATA/postgresql.conf

tests2/lib/bank_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,8 @@ def exec_tx(self, tx_block, aggname_prefix, conn_i):
185185
# back to event loop and block it
186186
yield from asyncio.sleep(0.01)
187187
except BaseException as e:
188+
print('Catch exception: ', e)
188189
agg.finish_tx(str(e).strip())
189-
print('Catch exception ', str(e).strip())
190190
# Give evloop some free time.
191191
# In case of continuous excetions we can loop here without returning
192192
# back to event loop and block it

0 commit comments

Comments
 (0)