Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7b2a465

Browse files
knizhnikkelvich
authored andcommitted
Fix prepare/abort_prepared race condition
1 parent 811e5ce commit 7b2a465

File tree

5 files changed

+61
-41
lines changed

5 files changed

+61
-41
lines changed

arbiter.c

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -915,17 +915,15 @@ static void MtmTransReceiver(Datum arg)
915915
} else {
916916
switch (msg->code) {
917917
case MSG_PREPARE:
918-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
919-
ts->status = TRANSACTION_STATUS_UNKNOWN;
920-
ts->csn = MtmAssignCSN();
921-
MtmAdjustSubtransactions(ts);
922-
MtmSendNotificationMessage(ts, MSG_PREPARED);
923-
#if 0
918+
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
919+
ts->status = TRANSACTION_STATUS_UNKNOWN;
920+
ts->csn = MtmAssignCSN();
921+
MtmAdjustSubtransactions(ts);
922+
MtmSendNotificationMessage(ts, MSG_PREPARED);
924923
} else {
925924
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
926925
MtmSendNotificationMessage(ts, MSG_ABORTED);
927926
}
928-
#endif
929927
break;
930928
default:
931929
Assert(false);

multimaster.c

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ typedef struct {
7777

7878
typedef struct {
7979
char gid[MULTIMASTER_MAX_GID_SIZE];
80+
bool abort;
81+
XidStatus status;
8082
MtmTransState* state;
8183
} MtmTransMap;
8284

@@ -274,7 +276,7 @@ timestamp_t MtmGetSystemTime(void)
274276
{
275277
struct timeval tv;
276278
gettimeofday(&tv, NULL);
277-
return (timestamp_t)tv.tv_sec*USECS_PER_SEC + tv.tv_usec + Mtm->timeShift;
279+
return (timestamp_t)tv.tv_sec*USECS_PER_SEC + tv.tv_usec;
278280
}
279281

280282
timestamp_t MtmGetCurrentTime(void)
@@ -721,11 +723,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
721723
{
722724
MtmTransState* ts;
723725
TransactionId* subxids;
724-
726+
725727
if (!x->isDistributed) {
726728
return;
727729
}
728730

731+
729732
if (Mtm->inject2PCError == 1) {
730733
Mtm->inject2PCError = 0;
731734
elog(ERROR, "ERROR INJECTION for transaction %d (%s)", x->xid, x->gid);
@@ -754,7 +757,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
754757
if (!x->isReplicated) {
755758
MtmCheckClusterLock();
756759
}
757-
758760
ts = MtmCreateTransState(x);
759761
/*
760762
* Invalid CSN prevent replication of transaction by logical replication
@@ -779,8 +781,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
779781
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
780782
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
781783
MyProcPid, x->xid, ts->gtid.xid, ts->gtid.node, ts->csn);
782-
MtmUnlock();
783-
784+
MtmUnlock();
784785
}
785786

786787
/*
@@ -818,7 +819,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
818819
Assert(ts != NULL);
819820

820821
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
821-
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
822+
bool found;
823+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, &found);
822824
Assert(x->gid[0]);
823825
tm->state = ts;
824826
ts->votingCompleted = true;
@@ -876,8 +878,8 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
876878
if (x->status != TRANSACTION_STATUS_ABORTED) {
877879
MtmLock(LW_EXCLUSIVE);
878880
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
879-
Assert(tm != NULL);
880-
MTM_LOG1("Abort prepared transaction %d with gid='%s' is already aborted", x->xid, x->gid);
881+
Assert(tm != NULL && tm->state != NULL);
882+
MTM_LOG1("%ld: Abort prepared transaction %d with gid='%s'", MtmGetSystemTime(), x->xid, x->gid);
881883
MtmAbortTransaction(tm->state);
882884
MtmUnlock();
883885
x->status = TRANSACTION_STATUS_ABORTED;
@@ -1015,21 +1017,26 @@ XidStatus MtmGetCurrentTransactionStatus(void)
10151017
return MtmTx.status;
10161018
}
10171019

1018-
XidStatus MtmGetGlobalTransactionStatus(char const* gid)
1020+
XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus new_status)
10191021
{
1020-
XidStatus status;
10211022
MtmTransMap* tm;
1023+
bool found;
1024+
XidStatus old_status = TRANSACTION_STATUS_IN_PROGRESS;
10221025

10231026
Assert(gid[0]);
1024-
MtmLock(LW_SHARED);
1025-
tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_FIND, NULL);
1026-
if (tm != NULL) {
1027-
status = tm->state->status;
1027+
MtmLock(LW_EXCLUSIVE);
1028+
tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_ENTER, &found);
1029+
if (found) {
1030+
old_status = tm->status;
1031+
if (old_status != TRANSACTION_STATUS_ABORTED) {
1032+
tm->status = new_status;
1033+
}
10281034
} else {
1029-
status = TRANSACTION_STATUS_ABORTED;
1035+
tm->state = NULL;
1036+
tm->status = new_status;
10301037
}
10311038
MtmUnlock();
1032-
return status;
1039+
return old_status;
10331040
}
10341041

10351042
void MtmSetCurrentTransactionCSN(csn_t csn)

multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ extern csn_t MtmGetTransactionCSN(TransactionId xid);
256256
extern void MtmSetCurrentTransactionCSN(csn_t csn);
257257
extern TransactionId MtmGetCurrentTransactionId(void);
258258
extern XidStatus MtmGetCurrentTransactionStatus(void);
259-
extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
259+
extern XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus status);
260260
extern bool MtmIsRecoveredNode(int nodeId);
261261
extern bool MtmRefreshClusterStatus(bool nowait);
262262
extern void MtmSwitchClusterMode(MtmNodeStatus mode);

pglogical_apply.c

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,8 @@ process_remote_commit(StringInfo in)
508508
uint8 flags;
509509
csn_t csn;
510510
const char *gid = NULL;
511-
XLogRecPtr end_lsn;
511+
XLogRecPtr end_lsn;
512+
512513
/* read flags */
513514
flags = pq_getmsgbyte(in);
514515
MtmReplicationNodeId = pq_getmsgbyte(in);
@@ -536,25 +537,38 @@ process_remote_commit(StringInfo in)
536537
{
537538
Assert(IsTransactionState() && TransactionIdIsValid(MtmGetCurrentTransactionId()));
538539
gid = pq_getmsgstring(in);
539-
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
540-
MTM_LOG1("%d: PGLOGICAL_PREPARE commit: gid=%s", MyProcPid, gid);
541-
BeginTransactionBlock();
542-
CommitTransactionCommand();
543-
StartTransactionCommand();
544-
545-
MtmBeginSession();
546-
/* PREPARE itself */
547-
MtmSetCurrentTransactionGID(gid);
548-
PrepareTransactionBlock(gid);
549-
CommitTransactionCommand();
540+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_IN_PROGRESS) == TRANSACTION_STATUS_ABORTED) {
541+
MTM_LOG1("%ld: avoid prepare of previously aborted global transaction %s", MtmGetSystemTime(), gid);
542+
AbortCurrentTransaction();
543+
} else {
544+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
545+
MTM_LOG1("%ld: PGLOGICAL_PREPARE commit: gid=%s", MtmGetSystemTime(), gid);
546+
BeginTransactionBlock();
547+
CommitTransactionCommand();
548+
StartTransactionCommand();
549+
550+
MtmBeginSession();
551+
/* PREPARE itself */
552+
MtmSetCurrentTransactionGID(gid);
553+
PrepareTransactionBlock(gid);
554+
CommitTransactionCommand();
555+
556+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_UNKNOWN) == TRANSACTION_STATUS_ABORTED) {
557+
MTM_LOG1("%ld: perform delayed rollback of prepared global transaction %s", MtmGetSystemTime(), gid);
558+
StartTransactionCommand();
559+
MtmSetCurrentTransactionGID(gid);
560+
FinishPreparedTransaction(gid, false);
561+
CommitTransactionCommand();
562+
}
563+
}
550564
break;
551565
}
552566
case PGLOGICAL_COMMIT_PREPARED:
553567
{
554568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
555569
csn = pq_getmsgint64(in);
556570
gid = pq_getmsgstring(in);
557-
MTM_LOG1("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", MyProcPid, csn, gid);
571+
MTM_LOG1("%ld: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", MtmGetSystemTime(), csn, gid);
558572
StartTransactionCommand();
559573
MtmBeginSession();
560574
MtmSetCurrentTransactionCSN(csn);
@@ -567,9 +581,9 @@ process_remote_commit(StringInfo in)
567581
{
568582
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569583
gid = pq_getmsgstring(in);
570-
MTM_LOG1("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s", MyProcPid, gid);
571-
if (MtmGetGlobalTransactionStatus(gid) != TRANSACTION_STATUS_ABORTED) {
572-
MTM_LOG2("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", MyProcPid, gid);
584+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s", MtmGetSystemTime(), gid);
585+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) ==TRANSACTION_STATUS_UNKNOWN) {
586+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", MtmGetSystemTime(), gid);
573587
StartTransactionCommand();
574588
MtmSetCurrentTransactionGID(gid);
575589
FinishPreparedTransaction(gid, false);

pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150150
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151151
* We do not need to send such transactions unless we perform recovery
152152
*/
153-
if (csn == INVALID_CSN && !isRecovery) {
153+
if (csn == INVALID_CSN && !isRecovery)
154+
{
154155
return;
155156
}
156157
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {

0 commit comments

Comments
 (0)