Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 17a908a

Browse files
knizhnikkelvich
authored andcommitted
2PC fixes
1 parent 75e4922 commit 17a908a

File tree

3 files changed

+26
-19
lines changed

3 files changed

+26
-19
lines changed

multimaster.c

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -643,33 +643,36 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
643643
{
644644
MTM_TRACE("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n", MyProcPid, x->xid, x->isPrepared, x->isDistributed, commit ? "commit" : "abort");
645645
if (x->isDistributed && (x->isPrepared || x->isReplicated)) {
646-
MtmTransState* ts;
646+
MtmTransState* ts = NULL;
647647
MtmLock(LW_EXCLUSIVE);
648648
if (x->isPrepared) {
649649
ts = hash_search(xid2state, &x->xid, HASH_FIND, NULL);
650650
Assert(ts != NULL);
651651
} else {
652652
MtmTransMap* hm = (MtmTransMap*)hash_search(gid2xid, x->gid, HASH_REMOVE, NULL);
653-
Assert(hm != NULL);
654-
ts = hm->state;
653+
if (hm != NULL) {
654+
ts = hm->state;
655+
}
655656
}
656-
if (commit) {
657-
ts->status = TRANSACTION_STATUS_COMMITTED;
658-
if (x->csn > ts->csn) {
659-
ts->csn = x->csn;
660-
MtmSyncClock(ts->csn);
657+
if (ts != NULL) {
658+
if (commit) {
659+
ts->status = TRANSACTION_STATUS_COMMITTED;
660+
if (x->csn > ts->csn) {
661+
ts->csn = x->csn;
662+
MtmSyncClock(ts->csn);
663+
}
664+
} else {
665+
ts->status = TRANSACTION_STATUS_ABORTED;
666+
if (x->isReplicated && TransactionIdIsValid(x->gtid.xid)) {
667+
/*
668+
* Send notification only of ABORT happens during transaction processing at replicas,
669+
* do not send notification if ABORT is receiver from master
670+
*/
671+
MtmSendNotificationMessage(ts); /* send notification to coordinator */
672+
}
661673
}
662-
} else {
663-
ts->status = TRANSACTION_STATUS_ABORTED;
664-
if (x->isReplicated && TransactionIdIsValid(x->gtid.xid)) {
665-
/*
666-
* Send notification only of ABORT happens during transaction processing at replicas,
667-
* do not send notification if ABORT is receiver from master
668-
*/
669-
MtmSendNotificationMessage(ts); /* send notification to coordinator */
670-
}
674+
MtmAdjustSubtransactions(ts);
671675
}
672-
MtmAdjustSubtransactions(ts);
673676
MtmUnlock();
674677
}
675678
x->snapshot = INVALID_CSN;
@@ -1691,6 +1694,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
16911694
DestReceiver *dest, char *completionTag)
16921695
{
16931696
bool skipCommand;
1697+
MTM_TRACE("%d: Process utility statement %s\n", MyProcPid, queryString);
16941698
switch (nodeTag(parsetree))
16951699
{
16961700
case T_TransactionStmt:
@@ -1702,8 +1706,10 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
17021706
if (dtmTx.isDistributed && dtmTx.containsDML) {
17031707
char gid[MULTIMASTER_MAX_GID_SIZE];
17041708
MtmGenerateGid(gid);
1709+
MTM_TRACE("%d: Start 2PC with GID=%s for %s\n", MyProcPid, gid, queryString);
17051710
if (!IsTransactionBlock()) {
17061711
elog(WARNING, "Start transaction block for %d", dtmTx.xid);
1712+
BeginTransactionBlock();
17071713
CommitTransactionCommand();
17081714
StartTransactionCommand();
17091715
}

pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ process_remote_insert(StringInfo s, Relation rel)
623623
char* ddl = TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
624624
int rc;
625625
SPI_connect();
626+
MTM_TRACE("%d: Execute utility statement %s\n", MyProcPid, ddl);
626627
rc = SPI_execute(ddl, false, 0);
627628
SPI_finish();
628629
if (rc != SPI_OK_UTILITY) {

pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ pglogical_receiver_main(Datum main_arg)
274274
if (originId != InvalidRepOriginId) {
275275
originStartPos = replorigin_get_progress(originId, false);
276276
elog(WARNING, "Restart logical receiver at position %lx from node %d", originStartPos, args->remote_node);
277-
} else {
277+
} else {
278278
elog(WARNING, "Start logical receiver from node %d", args->remote_node);
279279
}
280280
CommitTransactionCommand();

0 commit comments

Comments
 (0)