Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 30c4916

Browse files
committed
fix prepare error propagation to publisher
1 parent c989e6a commit 30c4916

File tree

2 files changed

+13
-14
lines changed

2 files changed

+13
-14
lines changed

src/include/multimaster.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,16 @@ typedef uint64 nodemask_t;
8989
typedef struct
9090
{
9191
int node; /* One based id of node initiating transaction */
92-
TransactionId xid; /* Transaction ID at this node */
92+
TransactionId xid; /* Transaction ID at origin node */
93+
TransactionId my_xid; /* Transaction ID at our node */
9394
} GlobalTransactionId;
9495

9596
typedef struct
9697
{
97-
bool contains_ddl;
98+
bool contains_temp_ddl;
99+
bool contains_persistent_ddl;
98100
bool contains_dml; /* transaction contains DML statements */
99-
bool accessed_temp;
101+
// bool accessed_temp;
100102
bool explicit_twophase; /* user level 2PC */
101103
bool distributed;
102104
pgid_t gid; /* global transaction identifier (only in case

src/pglogical_apply.c

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -479,8 +479,6 @@ create_rel_estate(Relation rel)
479479
static bool
480480
process_remote_begin(StringInfo s, GlobalTransactionId *gtid)
481481
{
482-
TransactionId xid;
483-
484482
gtid->node = pq_getmsgint(s, 4);
485483
gtid->xid = pq_getmsgint64(s);
486484

@@ -496,8 +494,8 @@ process_remote_begin(StringInfo s, GlobalTransactionId *gtid)
496494
StartTransactionCommand();
497495

498496
/* ddd */
499-
xid = GetCurrentTransactionId();
500-
MtmDeadlockDetectorAddXact(xid, gtid);
497+
gtid->my_xid = GetCurrentTransactionId();
498+
MtmDeadlockDetectorAddXact(gtid->my_xid, gtid);
501499

502500
// AcceptInvalidationMessages();
503501
// if (!receiver_mtm_cfg_valid)
@@ -955,18 +953,17 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid, MtmRecei
955953
res = PrepareTransactionBlock(gid);
956954
mtm_log(MtmTxFinish, "TXFINISH: %s prepared", gid);
957955

956+
CommitTransactionCommand();
957+
958958
/*
959-
* Clean this before CommitTransactionCommand(). Otherwise we'll
960-
* have a time window where exception handler will try to call
961-
* MtmDeadlockDetectorRemoveXact() with InvalidTransactionId,
962-
* because there is already no active transaction.
959+
* Clean this after CommitTransactionCommand(), as it may throw an
960+
* error that we should propagate to the originating node.
963961
*/
964962
current_gtid->node = 0;
965963
current_gtid->xid = InvalidTransactionId;
964+
current_gtid->my_xid = InvalidTransactionId;
966965
MtmDeadlockDetectorRemoveXact(xid);
967966

968-
CommitTransactionCommand();
969-
970967
if (receiver_ctx->parallel_allowed)
971968
{
972969
TransactionId origin_xid = MtmGidParseXid(gid);
@@ -1614,7 +1611,7 @@ MtmExecutor(void* work, size_t size, MtmReceiverContext *receiver_ctx)
16141611
/* handle only prepare errors here */
16151612
if (TransactionIdIsValid(current_gtid.xid))
16161613
{
1617-
MtmDeadlockDetectorRemoveXact(GetCurrentTransactionIdIfAny());
1614+
MtmDeadlockDetectorRemoveXact(current_gtid.my_xid);
16181615
if (receiver_ctx->parallel_allowed)
16191616
mtm_send_reply(current_gtid.xid, current_gtid.node, MSG_ABORTED);
16201617
}

0 commit comments

Comments
 (0)