Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4dda4ec

Browse files
committed
2 parents 4586a37 + 7228f87 commit 4dda4ec

File tree

8 files changed

+43
-19
lines changed

8 files changed

+43
-19
lines changed

contrib/mmts/arbiter.c

+1
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,7 @@ static void MtmReceiver(Datum arg)
10511051
MtmWakeUpBackend(ts);
10521052
}
10531053
} else {
1054+
elog(WARNING, "Receive PRECOMMITTED response for aborted transaction"); // How it can happen? SHould we use assert here?
10541055
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
10551056
MtmWakeUpBackend(ts);
10561057
}

contrib/mmts/multimaster.c

+13-4
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,7 @@ void MtmPrecommitTransaction(char const* gid)
942942
ts->csn = MtmAssignCSN();
943943
MtmAdjustSubtransactions(ts);
944944
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
945+
SetPrepareTransactionState(ts->gid, "precommitted");
945946
}
946947
}
947948
MtmUnlock();
@@ -1311,6 +1312,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
13111312

13121313
if (MtmIsCoordinator(ts)) {
13131314
int i;
1315+
Assert(false); // All broadcasts are now done through logical decoding
13141316
for (i = 0; i < Mtm->nAllNodes; i++)
13151317
{
13161318
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask, i))
@@ -1560,7 +1562,10 @@ void MtmHandleApplyError(void)
15601562
}
15611563

15621564
/**
1563-
* Check status of all prepared transactions with coordinator at disabled node
1565+
* Check status of all prepared transactions with coordinator at disabled node.
1566+
* Actually, if node is precommitted (state == UNKNOWN) at any of nodes, then is is prepared at all nodes and so can be committed.
1567+
* But if coordinator of transaction is crashed, we made a decision about transaction commit only if transaction is precommitted at ALL live nodes.
1568+
* The reason is that we want to avoid extra polling to obtain maximum CSN from all nodes to assign it to committed transaction.
15641569
* Called only from MtmDisableNode in critical section.
15651570
*/
15661571
static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
@@ -1601,9 +1606,12 @@ static void MtmDisableNode(int nodeId)
16011606
Mtm->nodes[nodeId-1].lastStatusChangeTime = now;
16021607
Mtm->nodes[nodeId-1].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16031608
if (nodeId != MtmNodeId) {
1604-
Mtm->nLiveNodes -= 1;
1609+
Mtm->nLiveNodes -= 1;
1610+
}
1611+
if (Mtm->nLiveNodes >= Mtm->nAllNodes/2+1) {
1612+
/* Make decision about prepared transaction status only in quorum */
1613+
MtmPollStatusOfPreparedTransactions(nodeId);
16051614
}
1606-
MtmPollStatusOfPreparedTransactions(nodeId);
16071615
}
16081616

16091617
static void MtmEnableNode(int nodeId)
@@ -3228,9 +3236,10 @@ bool MtmFilterTransaction(char* record, int size)
32283236
origin_node != 0 &&
32293237
(Mtm->status == MTM_RECOVERY || origin_node == replication_node));
32303238

3231-
switch(PGLOGICAL_XACT_EVENT(flags))
3239+
switch (PGLOGICAL_XACT_EVENT(flags))
32323240
{
32333241
case PGLOGICAL_PREPARE:
3242+
case PGLOGICAL_PRECOMMIT_PREPARED:
32343243
case PGLOGICAL_ABORT_PREPARED:
32353244
gid = pq_getmsgstring(&s);
32363245
break;

contrib/mmts/multimaster.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
#define MTM_TXTRACE(tx, event)
4545
#else
4646
#define MTM_TXTRACE(tx, event) \
47-
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, getpid())
47+
fprintf(stderr, "[MTM_TXTRACE], %s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
4848
#endif
4949

5050
#define MULTIMASTER_NAME "multimaster"
@@ -160,6 +160,12 @@ typedef struct
160160
pgid_t gid; /* Global transaction identifier */
161161
} MtmArbiterMessage;
162162

163+
/*
164+
* Abort logical message is send by replica when error is happen while applying prepared transaction.
165+
* In this case we do not have prepared transaction and can not do abort-prepared.
166+
* But we have to record the fact of abort to be able to replay it in case of crash of coordinator of this transaction.
167+
* We are using logical abort message with code 'A' for it
168+
*/
163169
typedef struct MtmAbortLogicalMessage
164170
{
165171
pgid_t gid;

contrib/mmts/pglogical_apply.c

+5
Original file line numberDiff line numberDiff line change
@@ -625,8 +625,12 @@ process_remote_commit(StringInfo in)
625625
{
626626
case PGLOGICAL_PRECOMMIT_PREPARED:
627627
{
628+
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
628629
gid = pq_getmsgstring(in);
630+
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s", MyProcPid, gid);
631+
MtmBeginSession(origin_node);
629632
MtmPrecommitTransaction(gid);
633+
MtmEndSession(origin_node, true);
630634
return;
631635
}
632636
case PGLOGICAL_COMMIT:
@@ -691,6 +695,7 @@ process_remote_commit(StringInfo in)
691695
{
692696
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
693697
gid = pq_getmsgstring(in);
698+
/* MtmRollbackPreparedTransaction will set origin session itself */
694699
MtmRollbackPreparedTransaction(origin_node, gid);
695700
break;
696701
}

contrib/mmts/pglogical_proto.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
192192
else
193193
Assert(false);
194194

195-
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE || flags == PGLOGICAL_PRECOMMIT_PREPARED) {
195+
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
196+
/* COMMIT and PREPARE are preceded by BEGIN, which set MtmIsFilteredTxn flag */
196197
if (MtmIsFilteredTxn) {
197198
Assert(MtmTransactionRecords == 0);
198199
return;

src/backend/access/transam/twophase.c

+12-10
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ static TwoPhaseStateData *TwoPhaseState;
183183
static GlobalTransaction MyLockedGxact = NULL;
184184

185185
static bool twophaseExitRegistered = false;
186+
static TransactionId cached_xid = InvalidTransactionId;
187+
static GlobalTransaction cached_gxact = NULL;
188+
186189

187190
static void RecordTransactionCommitPrepared(TransactionId xid,
188191
int nchildren,
@@ -216,10 +219,10 @@ TwoPhaseShmemSize(void)
216219
/* Need the fixed struct, the array of pointers, and the GTD structs */
217220
size = offsetof(TwoPhaseStateData, prepXacts);
218221
size = add_size(size, mul_size(max_prepared_xacts,
219-
sizeof(GlobalTransaction)));
222+
sizeof(GlobalTransaction)*2));
220223
size = MAXALIGN(size);
221224
size = add_size(size, mul_size(max_prepared_xacts,
222-
sizeof(GlobalTransactionData)*2));
225+
sizeof(GlobalTransactionData)));
223226

224227
return size;
225228
}
@@ -247,9 +250,9 @@ TwoPhaseShmemInit(void)
247250
gxacts = (GlobalTransaction)
248251
((char *) TwoPhaseState +
249252
MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
250-
sizeof(GlobalTransaction) * max_prepared_xacts));
253+
sizeof(GlobalTransaction) * 2 * max_prepared_xacts));
251254

252-
TwoPhaseState->hashTable = (GlobalTransaction*)&gxacts[max_prepared_xacts];
255+
TwoPhaseState->hashTable = &TwoPhaseState->prepXacts[max_prepared_xacts];
253256

254257
for (i = 0; i < max_prepared_xacts; i++)
255258
{
@@ -438,6 +441,10 @@ MarkAsPreparing(TransactionId xid, const char *gid,
438441
proc->lwWaitMode = 0;
439442
proc->waitLock = NULL;
440443
proc->waitProcLock = NULL;
444+
445+
cached_xid = xid;
446+
cached_gxact = gxact;
447+
441448
for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
442449
SHMQueueInit(&(proc->myProcLocks[i]));
443450
/* subxid data must be filled later by GXactLoadSubxactData */
@@ -696,9 +703,7 @@ void SetPrepareTransactionState(char const* gid, char const* state)
696703
strcpy(gxact->state_3pc, state);
697704
EndPrepare(gxact);
698705

699-
/* Unlock GXact */
700-
gxact->locking_backend = InvalidBackendId;
701-
MyLockedGxact = NULL;
706+
PostPrepare_Twophase();
702707
}
703708

704709
/* Working status for pg_prepared_xact */
@@ -827,9 +832,6 @@ TwoPhaseGetGXact(TransactionId xid)
827832
GlobalTransaction result = NULL;
828833
int i;
829834

830-
static TransactionId cached_xid = InvalidTransactionId;
831-
static GlobalTransaction cached_gxact = NULL;
832-
833835
/*
834836
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
835837
* repeatedly for the same XID. We can save work with a simple cache.

src/backend/replication/logical/reorderbuffer.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -1346,7 +1346,7 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
13461346
*
13471347
* We currently can only decode a transaction's contents in when their commit
13481348
* record is read because that's currently the only place where we know about
1349-
* cache invalidations. Thus, once a toplevel commit is read, we iterate over
1349+
* cache invalidati ons. Thus, once a toplevel commit is read, we iterate over
13501350
* the top and subtransactions (using a k-way merge) and replay the changes in
13511351
* lsn order.
13521352
*/
@@ -1734,7 +1734,7 @@ ReorderBufferCommitBareXact(ReorderBuffer *rb, TransactionId xid,
17341734
txn->origin_lsn = origin_lsn;
17351735
txn->xact_action = rb->xact_action;
17361736
strcpy(txn->gid, rb->gid);
1737-
*txn->gid = '\0';
1737+
*txn->state_3pc = '\0';
17381738

17391739
rb->commit(rb, txn, commit_lsn);
17401740
}

src/include/catalog/pg_proc.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -3057,7 +3057,7 @@ DATA(insert OID = 1371 ( pg_lock_status PGNSP PGUID 12 1 1000 0 0 f f f f t t
30573057
DESCR("view system lock information");
30583058
DATA(insert OID = 2561 ( pg_blocking_pids PGNSP PGUID 12 1 0 0 0 f f f f t f v s 1 0 1007 "23" _null_ _null_ _null_ _null_ _null_ pg_blocking_pids _null_ _null_ _null_ ));
30593059
DESCR("get array of PIDs of sessions blocking specified backend PID");
3060-
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26,25}" "{o,o,o,o,o,0}" "{transaction,gid,prepared,ownerid,dbid,state_3pc}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ ));
3060+
DATA(insert OID = 1065 ( pg_prepared_xact PGNSP PGUID 12 1 1000 0 0 f f f f t t v s 0 0 2249 "" "{28,25,1184,26,26,25}" "{o,o,o,o,o,o}" "{transaction,gid,prepared,ownerid,dbid,state3pc}" _null_ _null_ pg_prepared_xact _null_ _null_ _null_ ));
30613061
DESCR("view two-phase transactions");
30623062
DATA(insert OID = 3445 ( pg_precommit_prepared PGNSP PGUID 12 1 0 0 0 f f f f t f v s 2 0 2278 "2275,2275" _null_ _null_ _null_ _null_ _null_ pg_precommit_prepared _null_ _null_ _null_ ));
30633063
DESCR("alter state of prepared transaction (used for 3pc)");

0 commit comments

Comments
 (0)