Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8e072df

Browse files
committed
Fix prepare/abort_prepared race condition
1 parent ac0afb1 commit 8e072df

File tree

7 files changed

+63
-43
lines changed

7 files changed

+63
-43
lines changed

contrib/mmts/arbiter.c

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -915,17 +915,15 @@ static void MtmTransReceiver(Datum arg)
915915
} else {
916916
switch (msg->code) {
917917
case MSG_PREPARE:
918-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
919-
ts->status = TRANSACTION_STATUS_UNKNOWN;
920-
ts->csn = MtmAssignCSN();
921-
MtmAdjustSubtransactions(ts);
922-
MtmSendNotificationMessage(ts, MSG_PREPARED);
923-
#if 0
918+
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
919+
ts->status = TRANSACTION_STATUS_UNKNOWN;
920+
ts->csn = MtmAssignCSN();
921+
MtmAdjustSubtransactions(ts);
922+
MtmSendNotificationMessage(ts, MSG_PREPARED);
924923
} else {
925924
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
926925
MtmSendNotificationMessage(ts, MSG_ABORTED);
927926
}
928-
#endif
929927
break;
930928
default:
931929
Assert(false);

contrib/mmts/multimaster.c

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ typedef struct {
7878

7979
typedef struct {
8080
char gid[MULTIMASTER_MAX_GID_SIZE];
81+
bool abort;
82+
XidStatus status;
8183
MtmTransState* state;
8284
} MtmTransMap;
8385

@@ -275,7 +277,7 @@ timestamp_t MtmGetSystemTime(void)
275277
{
276278
struct timeval tv;
277279
gettimeofday(&tv, NULL);
278-
return (timestamp_t)tv.tv_sec*USECS_PER_SEC + tv.tv_usec + Mtm->timeShift;
280+
return (timestamp_t)tv.tv_sec*USECS_PER_SEC + tv.tv_usec;
279281
}
280282

281283
timestamp_t MtmGetCurrentTime(void)
@@ -722,11 +724,12 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
722724
{
723725
MtmTransState* ts;
724726
TransactionId* subxids;
725-
727+
726728
if (!x->isDistributed) {
727729
return;
728730
}
729731

732+
730733
if (Mtm->inject2PCError == 1) {
731734
Mtm->inject2PCError = 0;
732735
elog(ERROR, "ERROR INJECTION for transaction %d (%s)", x->xid, x->gid);
@@ -755,7 +758,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
755758
if (!x->isReplicated) {
756759
MtmCheckClusterLock();
757760
}
758-
759761
ts = MtmCreateTransState(x);
760762
/*
761763
* Invalid CSN prevent replication of transaction by logical replication
@@ -780,8 +782,7 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
780782
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
781783
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
782784
MyProcPid, x->xid, ts->gtid.xid, ts->gtid.node, ts->csn);
783-
MtmUnlock();
784-
785+
MtmUnlock();
785786
}
786787

787788
/*
@@ -819,7 +820,8 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
819820
Assert(ts != NULL);
820821

821822
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
822-
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
823+
bool found;
824+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, &found);
823825
Assert(x->gid[0]);
824826
tm->state = ts;
825827
ts->votingCompleted = true;
@@ -877,8 +879,8 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
877879
if (x->status != TRANSACTION_STATUS_ABORTED) {
878880
MtmLock(LW_EXCLUSIVE);
879881
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
880-
Assert(tm != NULL);
881-
MTM_LOG1("Abort prepared transaction %d with gid='%s' is already aborted", x->xid, x->gid);
882+
Assert(tm != NULL && tm->state != NULL);
883+
MTM_LOG1("%ld: Abort prepared transaction %d with gid='%s'", MtmGetSystemTime(), x->xid, x->gid);
882884
MtmAbortTransaction(tm->state);
883885
MtmUnlock();
884886
x->status = TRANSACTION_STATUS_ABORTED;
@@ -1016,21 +1018,26 @@ XidStatus MtmGetCurrentTransactionStatus(void)
10161018
return MtmTx.status;
10171019
}
10181020

1019-
XidStatus MtmGetGlobalTransactionStatus(char const* gid)
1021+
XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus new_status)
10201022
{
1021-
XidStatus status;
10221023
MtmTransMap* tm;
1024+
bool found;
1025+
XidStatus old_status = TRANSACTION_STATUS_IN_PROGRESS;
10231026

10241027
Assert(gid[0]);
1025-
MtmLock(LW_SHARED);
1026-
tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_FIND, NULL);
1027-
if (tm != NULL) {
1028-
status = tm->state->status;
1028+
MtmLock(LW_EXCLUSIVE);
1029+
tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_ENTER, &found);
1030+
if (found) {
1031+
old_status = tm->status;
1032+
if (old_status != TRANSACTION_STATUS_ABORTED) {
1033+
tm->status = new_status;
1034+
}
10291035
} else {
1030-
status = TRANSACTION_STATUS_ABORTED;
1036+
tm->state = NULL;
1037+
tm->status = new_status;
10311038
}
10321039
MtmUnlock();
1033-
return status;
1040+
return old_status;
10341041
}
10351042

10361043
void MtmSetCurrentTransactionCSN(csn_t csn)

contrib/mmts/multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ extern csn_t MtmGetTransactionCSN(TransactionId xid);
256256
extern void MtmSetCurrentTransactionCSN(csn_t csn);
257257
extern TransactionId MtmGetCurrentTransactionId(void);
258258
extern XidStatus MtmGetCurrentTransactionStatus(void);
259-
extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
259+
extern XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus status);
260260
extern bool MtmIsRecoveredNode(int nodeId);
261261
extern bool MtmRefreshClusterStatus(bool nowait);
262262
extern void MtmSwitchClusterMode(MtmNodeStatus mode);

contrib/mmts/pglogical_apply.c

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,8 @@ process_remote_commit(StringInfo in)
508508
uint8 flags;
509509
csn_t csn;
510510
const char *gid = NULL;
511-
XLogRecPtr end_lsn;
511+
XLogRecPtr end_lsn;
512+
512513
/* read flags */
513514
flags = pq_getmsgbyte(in);
514515
MtmReplicationNodeId = pq_getmsgbyte(in);
@@ -536,25 +537,38 @@ process_remote_commit(StringInfo in)
536537
{
537538
Assert(IsTransactionState() && TransactionIdIsValid(MtmGetCurrentTransactionId()));
538539
gid = pq_getmsgstring(in);
539-
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
540-
MTM_LOG1("%d: PGLOGICAL_PREPARE commit: gid=%s", MyProcPid, gid);
541-
BeginTransactionBlock();
542-
CommitTransactionCommand();
543-
StartTransactionCommand();
544-
545-
MtmBeginSession();
546-
/* PREPARE itself */
547-
MtmSetCurrentTransactionGID(gid);
548-
PrepareTransactionBlock(gid);
549-
CommitTransactionCommand();
540+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_IN_PROGRESS) == TRANSACTION_STATUS_ABORTED) {
541+
MTM_LOG1("%ld: avoid prepare of previously aborted global transaction %s", MtmGetSystemTime(), gid);
542+
AbortCurrentTransaction();
543+
} else {
544+
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
545+
MTM_LOG1("%ld: PGLOGICAL_PREPARE commit: gid=%s", MtmGetSystemTime(), gid);
546+
BeginTransactionBlock();
547+
CommitTransactionCommand();
548+
StartTransactionCommand();
549+
550+
MtmBeginSession();
551+
/* PREPARE itself */
552+
MtmSetCurrentTransactionGID(gid);
553+
PrepareTransactionBlock(gid);
554+
CommitTransactionCommand();
555+
556+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_UNKNOWN) == TRANSACTION_STATUS_ABORTED) {
557+
MTM_LOG1("%ld: perform delayed rollback of prepared global transaction %s", MtmGetSystemTime(), gid);
558+
StartTransactionCommand();
559+
MtmSetCurrentTransactionGID(gid);
560+
FinishPreparedTransaction(gid, false);
561+
CommitTransactionCommand();
562+
}
563+
}
550564
break;
551565
}
552566
case PGLOGICAL_COMMIT_PREPARED:
553567
{
554568
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
555569
csn = pq_getmsgint64(in);
556570
gid = pq_getmsgstring(in);
557-
MTM_LOG1("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", MyProcPid, csn, gid);
571+
MTM_LOG1("%ld: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s", MtmGetSystemTime(), csn, gid);
558572
StartTransactionCommand();
559573
MtmBeginSession();
560574
MtmSetCurrentTransactionCSN(csn);
@@ -567,9 +581,9 @@ process_remote_commit(StringInfo in)
567581
{
568582
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
569583
gid = pq_getmsgstring(in);
570-
MTM_LOG1("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s", MyProcPid, gid);
571-
if (MtmGetGlobalTransactionStatus(gid) != TRANSACTION_STATUS_ABORTED) {
572-
MTM_LOG2("%d: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", MyProcPid, gid);
584+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s", MtmGetSystemTime(), gid);
585+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_ABORTED) ==TRANSACTION_STATUS_UNKNOWN) {
586+
MTM_LOG1("%ld: PGLOGICAL_ABORT_PREPARED commit: gid=%s #2", MtmGetSystemTime(), gid);
573587
StartTransactionCommand();
574588
MtmSetCurrentTransactionGID(gid);
575589
FinishPreparedTransaction(gid, false);

contrib/mmts/pglogical_proto.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
150150
* INVALID_CSN means replicated transaction (transaction initiated by some other nodes).
151151
* We do not need to send such transactions unless we perform recovery
152152
*/
153-
if (csn == INVALID_CSN && !isRecovery) {
153+
if (csn == INVALID_CSN && !isRecovery)
154+
{
154155
return;
155156
}
156157
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {

contrib/raftable/raft/src/raft.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,7 @@ raft_msg_t raft_recv_message(raft_t r) {
11161116
(struct sockaddr*)&addr, &addrlen
11171117
);
11181118

1119-
if (recved == -1) {
1119+
if (recved <= 0) {
11201120
if (
11211121
(errno == EAGAIN) ||
11221122
(errno == EWOULDBLOCK) ||

contrib/raftable/raftable.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ static bool timed_read(int sock, void *data, size_t len, timeout_t *timeout)
144144
}
145145

146146
newbytes = read(sock, (char *)data + recved, len - recved);
147-
if (newbytes == -1)
147+
if (newbytes <= 0)
148148
{
149149
if (errno == EAGAIN) {
150150
if (poll_until_readable(sock, timeout)) {

0 commit comments

Comments
 (0)