Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 67b733f

Browse files
committed
Add ATX fixes for multimaster
1 parent c70ab55 commit 67b733f

File tree

7 files changed

+101
-28
lines changed

7 files changed

+101
-28
lines changed

contrib/mmts/multimaster.c

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ static void* MtmCreateSavepointContext(void);
166166
static void MtmRestoreSavepointContext(void* ctx);
167167
static void MtmReleaseSavepointContext(void* ctx);
168168
static void MtmSetRemoteFunction(char const* list, void* extra);
169+
static void* MtmSuspendTransaction(void);
170+
static void MtmResumeTransaction(void* ctx);
169171

170172
// static void MtmCheckClusterLock(void);
171173
static void MtmCheckSlots(void);
@@ -218,7 +220,9 @@ static TransactionManager MtmTM =
218220
MtmInitializeSequence,
219221
MtmCreateSavepointContext,
220222
MtmRestoreSavepointContext,
221-
MtmReleaseSavepointContext
223+
MtmReleaseSavepointContext,
224+
MtmSuspendTransaction,
225+
MtmResumeTransaction
222226
};
223227

224228
char const* const MtmNodeStatusMnem[] =
@@ -557,6 +561,22 @@ static void MtmReleaseSavepointContext(void* ctx)
557561
{
558562
}
559563

564+
static void* MtmSuspendTransaction(void)
565+
{
566+
MtmCurrentTrans* ctx = malloc(sizeof(MtmCurrentTrans));
567+
*ctx = MtmTx;
568+
MtmResetTransaction();
569+
MtmBeginTransaction(&MtmTx);
570+
return ctx;
571+
}
572+
573+
static void MtmResumeTransaction(void* ctx)
574+
{
575+
MtmTx = *(MtmCurrentTrans*)ctx;
576+
MtmInsideTransaction = true;
577+
free(ctx);
578+
}
579+
560580

561581
/*
562582
* -------------------------------------------
@@ -964,6 +984,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
964984
x->isSuspended = false;
965985
x->isTwoPhase = false;
966986
x->isTransactionBlock = IsTransactionBlock();
987+
967988
/* Application name can be changed using PGAPPNAME environment variable */
968989
if (x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0
969990
&& strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) != 0
@@ -4715,6 +4736,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
47154736

47164737
if (!x->isReplicated && x->isDistributed && x->containsDML) {
47174738
MtmGenerateGid(x->gid);
4739+
47184740
if (!x->isTransactionBlock) {
47194741
BeginTransactionBlock(false);
47204742
x->isTransactionBlock = true;

src/backend/access/transam/xact.c

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1860,6 +1860,7 @@ typedef struct {
18601860
void *PgStatState;
18611861
void *TriggerState;
18621862
void *SPIState;
1863+
void *XTMState;
18631864
void *SnapshotState;
18641865
void *PredicateState;
18651866
void *StorageState;
@@ -1945,26 +1946,29 @@ StartTransaction(void)
19451946
AtStart_Memory();
19461947
AtStart_ResourceOwner();
19471948

1948-
/*
1949-
* Assign a new LocalTransactionId, and combine it with the backendId to
1950-
* form a virtual transaction id.
1951-
*/
1952-
vxid.backendId = MyBackendId;
1953-
vxid.localTransactionId = GetNextLocalTransactionId();
1949+
if (getNestLevelATX() == 0)
1950+
{
1951+
/*
1952+
* Assign a new LocalTransactionId, and combine it with the backendId to
1953+
* form a virtual transaction id.
1954+
*/
1955+
vxid.backendId = MyBackendId;
1956+
vxid.localTransactionId = GetNextLocalTransactionId();
19541957

1955-
/*
1956-
* Lock the virtual transaction id before we announce it in the proc array
1957-
*/
1958-
VirtualXactLockTableInsert(vxid);
1958+
/*
1959+
* Lock the virtual transaction id before we announce it in the proc array
1960+
*/
1961+
VirtualXactLockTableInsert(vxid);
19591962

1960-
/*
1961-
* Advertise it in the proc array. We assume assignment of
1962-
* LocalTransactionID is atomic, and the backendId should be set already.
1963-
*/
1964-
Assert(MyProc->backendId == vxid.backendId);
1965-
MyProc->lxid = vxid.localTransactionId;
1963+
/*
1964+
* Advertise it in the proc array. We assume assignment of
1965+
* LocalTransactionID is atomic, and the backendId should be set already.
1966+
*/
1967+
Assert(MyProc->backendId == vxid.backendId);
1968+
MyProc->lxid = vxid.localTransactionId;
19661969

1967-
TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
1970+
TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
1971+
}
19681972

19691973
/*
19701974
* set transaction_timestamp() (a/k/a now()). We want this to be the same
@@ -1982,7 +1986,7 @@ StartTransaction(void)
19821986
* note: prevXactReadOnly is not used at the outermost level
19831987
*/
19841988
s->nestingLevel = 1;
1985-
s->gucNestLevel = 1;
1989+
s->gucNestLevel = getNestLevelATX() != 0 ? NewGUCNestLevel() : 1;
19861990
s->childXids = NULL;
19871991
s->nChildXids = 0;
19881992
s->maxChildXids = 0;
@@ -2505,7 +2509,7 @@ PrepareTransaction(void)
25052509
PostPrepare_Twophase();
25062510

25072511
/* PREPARE acts the same as COMMIT as far as GUC is concerned */
2508-
AtEOXact_GUC(true, 1);
2512+
AtEOXact_GUC(true, CurrentTransactionState->gucNestLevel);
25092513
AtEOXact_SPI(true);
25102514
AtEOXact_on_commit_actions(true);
25112515
AtEOXact_Namespace(true, false);
@@ -2959,7 +2963,6 @@ CommitTransactionCommand(void)
29592963
case TBLOCK_PREPARE:
29602964
PrepareTransaction();
29612965
s->blockState = TBLOCK_DEFAULT;
2962-
ResumeTransaction();
29632966
break;
29642967

29652968
/*
@@ -3225,7 +3228,6 @@ AbortCurrentTransaction(void)
32253228
AbortTransaction();
32263229
CleanupTransaction();
32273230
s->blockState = TBLOCK_DEFAULT;
3228-
ResumeTransaction();
32293231
break;
32303232

32313233
/*
@@ -3545,6 +3547,9 @@ void SuspendTransaction(void)
35453547
//Assert(XactTopTransactionId != InvalidTransactionId);
35463548
Assert(nParallelCurrentXids == 0);
35473549

3550+
if (suspendedXactNum >= MAX_SUSPENDED_XACTS)
3551+
elog(ERROR, "Too many recursive autonomouse transactions");
3552+
35483553
SuspendPgXact(MyPgXact);
35493554
{
35503555
TransactionState tts;
@@ -3616,6 +3621,7 @@ void SuspendTransaction(void)
36163621
MOVELEFT(sus->vxid.localTransactionId, MyProc->lxid, GetNextLocalTransactionId());
36173622
MOVELEFT(sus->on_commit_actions, pg_on_commit_actions, NULL);
36183623

3624+
sus->XTMState = TM->SuspendTransaction();
36193625
sus->PgStatState = PgStatSuspend();
36203626
sus->TriggerState = TriggerSuspend();
36213627
sus->SPIState = SuspendSPI();
@@ -3638,6 +3644,7 @@ bool ResumeTransaction(void)
36383644
ResumeSPI(sus->SPIState);
36393645
TriggerResume(sus->TriggerState);
36403646
PgStatResume(sus->PgStatState);
3647+
TM->ResumeTransaction(sus->XTMState);
36413648

36423649
TopTransactionStateData = sus->TopTransactionStateData;
36433650
CurrentTransactionState = sus->CurrentTransactionState;
@@ -3671,6 +3678,8 @@ bool ResumeTransaction(void)
36713678
CurTransactionContext = sus->CurTransactionContext;
36723679
TransactionAbortContext = sus->TransactionAbortContext;
36733680

3681+
Assert(TopTransactionContext && CurTransactionContext && TransactionAbortContext);
3682+
36743683
CurrentResourceOwner = sus->CurrentResourceOwner;
36753684
CurTransactionResourceOwner = sus->CurTransactionResourceOwner;
36763685
TopTransactionResourceOwner = sus->TopTransactionResourceOwner;

src/backend/access/transam/xtm.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ void PgReleaseSavepointContext(void* ctx)
7272
}
7373

7474

75+
void* PgSuspendTransaction(void)
76+
{
77+
return NULL;
78+
}
79+
80+
void PgResumeTransaction(void* ctx)
81+
{
82+
}
83+
7584
TransactionManager PgTM = {
7685
PgTransactionIdGetStatus,
7786
PgTransactionIdSetTreeStatus,
@@ -89,7 +98,9 @@ TransactionManager PgTM = {
8998
PgInitializeSequence,
9099
PgCreateSavepointContext,
91100
PgRestoreSavepointContext,
92-
PgReleaseSavepointContext
101+
PgReleaseSavepointContext,
102+
PgSuspendTransaction,
103+
PgResumeTransaction
93104
};
94105

95106
TransactionManager *TM = &PgTM;

src/backend/executor/spi.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2801,7 +2801,7 @@ SPI_register_trigger_data(TriggerData *tdata)
28012801
return SPI_OK_TD_REGISTER;
28022802
}
28032803

2804-
#define MOVELEFT(A, B, C) do { (A) = (B); } while (0)
2804+
#define MOVELEFT(A, B, C) do { (A) = (B); (B) = (C); } while (0)
28052805
void *
28062806
SuspendSPI(void)
28072807
{

src/include/access/xtm.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ typedef struct
110110
*/
111111
void (*ReleaseSavepointContext)(void* ctx);
112112

113+
/*
114+
* Suspend transaction (ATX support)
115+
*/
116+
void* (*SuspendTransaction)(void);
117+
118+
/*
119+
* Resume transaction state saved by SuspendTransaction (ATX support)
120+
*/
121+
void (*ResumeTransaction)(void* state);
122+
113123
} TransactionManager;
114124

115125
/* Get pointer to transaction manager: actually returns content of TM variable */
@@ -147,6 +157,7 @@ extern void PgInitializeSequence(int64* start, int64* step);
147157
extern void* PgCreateSavepointContext(void);
148158
extern void PgRestoreSavepointContext(void*);
149159
extern void PgReleaseSavepointContext(void*);
150-
160+
extern void* PgSuspendTransaction(void);
161+
extern void PgResumeTransaction(void* ctx);
151162

152163
#endif

src/pl/plpgsql/src/pl_exec.c

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1181,7 +1181,8 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block)
11811181
volatile int rc = -1;
11821182
int i;
11831183
int n;
1184-
1184+
int spi_rc;
1185+
11851186
/*
11861187
* First initialize all variables declared in this block
11871188
*/
@@ -1275,6 +1276,9 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block)
12751276

12761277
SuspendTransaction();
12771278

1279+
if ((spi_rc = SPI_connect()) != SPI_OK_CONNECT)
1280+
elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(spi_rc));
1281+
12781282
PG_TRY();
12791283
{
12801284
EState *old_shared_estate;
@@ -1304,11 +1308,15 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block)
13041308
resTypByVal, resTypLen);
13051309
}
13061310

1311+
if ((spi_rc = SPI_finish()) != SPI_OK_FINISH)
1312+
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(spi_rc));
1313+
13071314
plpgsql_destroy_econtext(estate);
13081315

13091316
old_shared_estate = shared_simple_eval_estate;
13101317
shared_simple_eval_estate = NULL;
13111318
error_inside_commit = true;
1319+
13121320
CommitTransactionCommand();
13131321
shared_simple_eval_estate = old_shared_estate;
13141322

@@ -1332,6 +1340,9 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block)
13321340

13331341
if (!error_inside_commit)
13341342
{
1343+
if ((spi_rc = SPI_finish()) != SPI_OK_FINISH)
1344+
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(spi_rc));
1345+
13351346
plpgsql_destroy_econtext(estate);
13361347
old_shared_estate = shared_simple_eval_estate;
13371348
shared_simple_eval_estate = NULL;
@@ -1396,9 +1407,9 @@ exec_stmt_block(PLpgSQL_execstate *estate, PLpgSQL_stmt_block *block)
13961407

13971408
/* If no match found, re-throw the error */
13981409
if (e == NULL)
1399-
{
14001410
ReThrowError(edata);
1401-
}
1411+
else
1412+
FreeErrorData(edata);
14021413
}
14031414
PG_END_TRY();
14041415
}

src/pl/plpython/plpy_atxobject.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,13 @@ PLy_atx_dealloc(PyObject *atx)
105105
static PyObject *
106106
PLy_atx_enter(PyObject *self, PyObject *unused)
107107
{
108+
int spi_rc;
109+
108110
SuspendTransaction();
109111

112+
if ((spi_rc = SPI_connect()) != SPI_OK_CONNECT)
113+
elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(spi_rc));
114+
110115
Py_INCREF(self);
111116
return self;
112117
}
@@ -128,10 +133,14 @@ PLy_atx_exit(PyObject *self, PyObject *args)
128133
PyObject *type;
129134
PyObject *value;
130135
PyObject *traceback;
136+
int spi_rc;
131137

132138
if (!PyArg_ParseTuple(args, "OOO", &type, &value, &traceback))
133139
return NULL;
134140

141+
if ((spi_rc = SPI_finish()) != SPI_OK_FINISH)
142+
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(spi_rc));
143+
135144
if (type != Py_None)
136145
{
137146
AbortCurrentTransaction();

0 commit comments

Comments
 (0)