@@ -116,14 +116,15 @@ static TransactionId MtmAdjustOldestXid(TransactionId xid);
116
116
static bool MtmDetectGlobalDeadLock (PGPROC * proc );
117
117
static void MtmAddSubtransactions (MtmTransState * ts , TransactionId * subxids , int nSubxids );
118
118
static char const * MtmGetName (void );
119
- static void MtmCheckClusterLock ()
119
+ static void MtmCheckClusterLock (void );
120
+ static void MtmCheckSlots (void );
121
+ static void MtmAddSubtransactions (MtmTransState * ts , TransactionId * subxids , int nSubxids );
120
122
121
123
static void MtmShmemStartup (void );
122
124
123
125
static BgwPool * MtmPoolConstructor (void );
124
126
static bool MtmRunUtilityStmt (PGconn * conn , char const * sql );
125
127
static void MtmBroadcastUtilityStmt (char const * sql , bool ignoreError );
126
- static void MtmVoteForTransaction (MtmTransState * ts );
127
128
128
129
static HTAB * xid2state ;
129
130
static HTAB * gid2xid ;
@@ -543,10 +544,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
543
544
* Prepare transaction for two-phase commit.
544
545
* This code is executed by PRE_PREPARE hook before PREPARE message is sent to replicas by logical replication
545
546
*/
547
+ static void
546
548
MtmPrePrepareTransaction (MtmCurrentTrans * x )
547
549
{
548
550
MtmTransState * ts ;
549
- int i ;
551
+ TransactionId * subxids ;
550
552
551
553
if (!x -> isDistributed ) {
552
554
return ;
@@ -575,9 +577,9 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
575
577
ts -> gtid = x -> gtid ;
576
578
ts -> procno = MyProc -> pgprocno ;
577
579
ts -> nVotes = 0 ;
578
-
580
+ ts -> nSubxids = xactGetCommittedChildren ( & subxids );
579
581
x -> isPrepared = true;
580
- x -> csn = csn ;
582
+ x -> csn = ts -> csn ;
581
583
582
584
dtm -> transCount += 1 ;
583
585
@@ -588,34 +590,36 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
588
590
ts -> gtid .node = MtmNodeId ;
589
591
}
590
592
MtmTransactionListAppend (ts );
593
+ MtmAddSubtransactions (ts , subxids , ts -> nSubxids );
591
594
592
595
MtmUnlock ();
593
596
594
597
MTM_TRACE ("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n" , MyProcPid , x -> xid , ts -> csn );
595
598
}
596
599
600
+ static void
597
601
MtmPrepareTransaction (MtmCurrentTrans * x )
598
602
{
599
603
MtmTransState * ts ;
600
604
601
605
MtmLock (LW_EXCLUSIVE );
602
606
ts = hash_search (xid2state , & x -> xid , HASH_ENTER , NULL );
603
607
Assert (ts != NULL );
604
- if (ts -> status = TRANSACTION_STATUS_IN_PROGRESS ) {
608
+ if (ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
605
609
ts -> status = TRANSACTION_STATUS_UNKNOWN ;
606
610
MtmAdjustSubtransactions (ts );
607
611
}
608
612
609
613
if (!MtmIsCoordinator (ts )) {
610
- MtmHashMap * hm = (MtmHashMap * )hash_search (gid2xid , x -> gid , HASH_ENTER , NULL );
614
+ MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid , x -> gid , HASH_ENTER , NULL );
611
615
Assert (x -> gid [0 ]);
612
616
hm -> state = ts ;
613
617
MtmSendNotificationMessage (ts ); /* send notification to coordinator */
614
618
MtmUnlock ();
615
619
} else {
616
620
/* wait N commits or just one ABORT */
617
- ts -> nVotes += 1 ;
618
- while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_PROGRESS ) {
621
+ ts -> nVotes += 1 ; /* I vote myself */
622
+ while (ts -> nVotes != dtm -> nNodes && ts -> status == TRANSACTION_STATUS_IN_PROGRESS ) {
619
623
MtmUnlock ();
620
624
WaitLatch (& MyProc -> procLatch , WL_LATCH_SET , -1 );
621
625
ResetLatch (& MyProc -> procLatch );
@@ -633,14 +637,14 @@ static void
633
637
MtmEndTransaction (MtmCurrentTrans * x , bool commit )
634
638
{
635
639
MTM_TRACE ("%d: End transaction %d, prepared=%d, distributed=%d -> %s\n" , MyProcPid , x -> xid , x -> isPrepared , x -> isDistributed , commit ? "commit" : "abort" );
636
- if (x -> isDistributed ) {
640
+ if (x -> isDistributed && ( TransactionIdIsValid ( x -> xid ) || x -> isReplicated ) ) {
637
641
MtmTransState * ts ;
638
642
MtmLock (LW_EXCLUSIVE );
639
643
if (x -> isPrepared ) {
640
644
ts = hash_search (xid2state , & x -> xid , HASH_FIND , NULL );
641
645
Assert (ts != NULL );
642
646
} else {
643
- MtmHashMap * hm = (MtmHashMap * )hash_search (gid2xid , x -> gid , HASH_REMOVE , NULL );
647
+ MtmTransMap * hm = (MtmTransMap * )hash_search (gid2xid , x -> gid , HASH_REMOVE , NULL );
644
648
Assert (hm != NULL );
645
649
ts = hm -> state ;
646
650
}
@@ -712,12 +716,18 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
712
716
713
717
void MtmSetCurrentTransactionGID (char const * gid )
714
718
{
719
+ MTM_TRACE ("Set current transaction GID %s\n" , gid );
715
720
strcpy (dtmTx .gid , gid );
721
+ dtmTx .isDistributed = true;
722
+ dtmTx .isReplicated = true;
716
723
}
717
724
718
725
void MtmSetCurrentTransactionCSN (csn_t csn )
719
726
{
727
+ MTM_TRACE ("Set current transaction CSN %ld\n" , csn );
720
728
dtmTx .csn = csn ;
729
+ dtmTx .isDistributed = true;
730
+ dtmTx .isReplicated = true;
721
731
}
722
732
723
733
/*
@@ -731,7 +741,8 @@ void MtmSetCurrentTransactionCSN(csn_t csn)
731
741
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
732
742
* WAL overflow
733
743
*/
734
- static void MtmCheckSlots ()
744
+ static void
745
+ MtmCheckSlots ()
735
746
{
736
747
if (MtmMaxRecoveryLag != 0 && dtm -> disabledNodeMask != 0 )
737
748
{
@@ -1682,14 +1693,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
1682
1693
{
1683
1694
case TRANS_STMT_COMMIT :
1684
1695
if (dtmTx .isDistributed && dtmTx .containsDML ) {
1685
- char gid { MUTLIMASTER_MAX_GID_SIZE ];
1686
- MtmGenerateGid (& gid );
1696
+ char gid [ MULTIMASTER_MAX_GID_SIZE ];
1697
+ MtmGenerateGid (gid );
1687
1698
if (!IsTransactionBlock ()) {
1688
1699
elog (WARNING , "Start transaction block for %d" , dtmTx .xid );
1689
1700
CommitTransactionCommand ();
1690
1701
StartTransactionCommand ();
1691
1702
}
1692
- if (!PrepareTransactionBlock (& gid ))
1703
+ if (!PrepareTransactionBlock (gid ))
1693
1704
{
1694
1705
elog (WARNING , "Failed to prepare transaction %s" , gid );
1695
1706
/* report unsuccessful commit in completionTag */
0 commit comments