Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0fb5c6b

Browse files
knizhnikkelvich
authored andcommitted
Perform TRUNCATE command in exclusive mode
1 parent abaab48 commit 0fb5c6b

File tree

3 files changed

+66
-10
lines changed

3 files changed

+66
-10
lines changed

multimaster.c

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
159159
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
160160
static void MtmProcessDDLCommand(char const* queryString, bool transactional);
161161

162+
static void MtmSuspendNode(void);
163+
static void MtmResumeNode(void);
164+
162165
MtmState* Mtm;
163166

164167
VacuumStmt* MtmVacuumStmt;
@@ -251,6 +254,7 @@ static bool MtmIgnoreTablesWithoutPk;
251254
static int MtmLockCount;
252255
static bool MtmMajorNode;
253256
static bool MtmBreakConnection;
257+
static bool MtmSuspended;
254258

255259
static ExecutorStart_hook_type PreviousExecutorStartHook;
256260
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -272,8 +276,11 @@ static bool MtmAtExitHookRegistered = false;
272276
* This function is called when backend is terminated because of critical error or when error is catched
273277
* by FINALLY block
274278
*/
275-
void MtmReleaseLock(void)
279+
void MtmReleaseLocks(void)
276280
{
281+
if (MtmSuspended) {
282+
MtmResumeNode();
283+
}
277284
if (MtmLockCount != 0) {
278285
Assert(Mtm->lastLockHolder == MyProcPid);
279286
MtmLockCount = 0;
@@ -296,7 +303,7 @@ void MtmLock(LWLockMode mode)
296303
{
297304
timestamp_t start, stop;
298305
if (!MtmAtExitHookRegistered) {
299-
atexit(MtmReleaseLock);
306+
atexit(MtmReleaseLocks);
300307
MtmAtExitHookRegistered = true;
301308
}
302309
if (mode == LW_EXCLUSIVE || MtmLockCount != 0) {
@@ -848,7 +855,6 @@ MtmBeginTransaction(MtmCurrentTrans* x)
848855
MTM_ELOG(MtmBreakConnection ? FATAL : ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
849856
}
850857
x->containsDML = false;
851-
x->snapshot = MtmAssignCSN();
852858
x->gtid.xid = InvalidTransactionId;
853859
x->gid[0] = '\0';
854860
x->status = TRANSACTION_STATUS_IN_PROGRESS;
@@ -858,9 +864,14 @@ MtmBeginTransaction(MtmCurrentTrans* x)
858864
* Allow applying of replicated transactions to avoid deadlock (to caught-up we need active transaction counter to become zero).
859865
* Also allow user to complete explicit 2PC transactions.
860866
*/
861-
if (x->isDistributed && !x->isReplicated && !x->isTwoPhase && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
867+
if (x->isDistributed
868+
&& (Mtm->exclusiveLock || (!x->isReplicated && !x->isTwoPhase))
869+
&& !MtmSuspended
870+
&& strcmp(application_name, MULTIMASTER_ADMIN) != 0)
871+
{
862872
MtmCheckClusterLock();
863873
}
874+
x->snapshot = MtmAssignCSN();
864875

865876
MtmUnlock();
866877

@@ -1319,6 +1330,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
13191330
{
13201331
MTM_LOG2("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, 2pc=%d, gid=%s -> %s",
13211332
MyProcPid, x->xid, x->isPrepared, x->isReplicated, x->isDistributed, x->isTwoPhase, x->gid, commit ? "commit" : "abort");
1333+
if (MtmSuspended) {
1334+
MtmResumeNode();
1335+
}
13221336
commit &= (x->status != TRANSACTION_STATUS_ABORTED);
13231337
if (x->isDistributed && (x->isPrepared || x->isReplicated) && !x->isTwoPhase) {
13241338
MtmTransState* ts = NULL;
@@ -2038,7 +2052,44 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
20382052
/* ??? Something else to do here? */
20392053
}
20402054

2055+
/*
2056+
* Prevent start of any new transactions at this node
2057+
*/
2058+
static void
2059+
MtmSuspendNode(void)
2060+
{
2061+
timestamp_t delay = MIN_WAIT_TIMEOUT;
2062+
bool insideTransaction = MtmTx.isActive;
2063+
Assert(!MtmSuspended);
2064+
MtmLock(LW_EXCLUSIVE);
2065+
if (Mtm->exclusiveLock) {
2066+
elog(ERROR, "There is already pending exclusive lock");
2067+
}
2068+
Mtm->exclusiveLock = true;
2069+
MtmSuspended = true;
2070+
while (Mtm->nActiveTransactions != insideTransaction) {
2071+
MtmUnlock();
2072+
MtmSleep(delay);
2073+
if (delay*2 <= MAX_WAIT_TIMEOUT) {
2074+
delay *= 2;
2075+
}
2076+
MtmLock(LW_EXCLUSIVE);
2077+
}
2078+
MtmUnlock();
2079+
}
20412080

2081+
/*
2082+
* Resume transaction processing at node (blocked by MtmSuspendNode)
2083+
*/
2084+
static void
2085+
MtmResumeNode(void)
2086+
{
2087+
MtmLock(LW_EXCLUSIVE);
2088+
Mtm->exclusiveLock = false;
2089+
MtmSuspended = false;
2090+
MtmUnlock();
2091+
}
2092+
20422093
/*
20432094
* If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
20442095
* WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
@@ -2050,7 +2101,7 @@ MtmCheckClusterLock()
20502101
timestamp_t delay = MIN_WAIT_TIMEOUT;
20512102
while (true)
20522103
{
2053-
if (Mtm->globalLockerMask | Mtm->walSenderLockerMask) {
2104+
if (Mtm->exclusiveLock || (Mtm->globalLockerMask | Mtm->walSenderLockerMask)) {
20542105
/* some "almost cautch-up" wal-senders are still working. */
20552106
/* Do not start new transactions until them are completed. */
20562107
MtmUnlock();
@@ -2474,6 +2525,7 @@ static void MtmInitialize()
24742525
Mtm->reconnectMask = 0;
24752526
Mtm->recoveredLSN = INVALID_LSN;
24762527
Mtm->nLockers = 0;
2528+
Mtm->exclusiveLock = false;
24772529
Mtm->nActiveTransactions = 0;
24782530
Mtm->votingTransactions = NULL;
24792531
Mtm->transListHead = NULL;
@@ -4944,8 +4996,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
49444996
}
49454997
break;
49464998

4947-
case T_DropStmt:
49484999
case T_TruncateStmt:
5000+
skipCommand = false;
5001+
MtmSuspendNode();
5002+
break;
5003+
5004+
case T_DropStmt:
49495005
{
49505006
DropStmt *stmt = (DropStmt *) parsetree;
49515007
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)

multimaster.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,10 +297,11 @@ typedef struct
297297
int nActiveTransactions; /* Number of active 2PC transactions */
298298
int nConfigChanges; /* Number of cluster configuration changes */
299299
int recoveryCount; /* Number of completed recoveries */
300-
int donorNodeId; /* Cluster node from which this node was populated */
300+
int donorNodeId; /* Cluster node from which this node was populated */
301301
int64 timeShift; /* Local time correction */
302302
csn_t csn; /* Last obtained timestamp: used to provide unique ascending CSNs based on system time */
303303
csn_t lastCsn; /* CSN of last committed transaction */
304+
sig_atomic_t exclusiveLock; /* Exclusive node lock, preventing start of new transactions */
304305
MtmTransState* votingTransactions; /* L1-list of replicated transactions notifications to coordinator.
305306
This list is used to pass information to mtm-sender BGW */
306307
MtmTransState* transListHead; /* L1 list of all finished transactions present in xid2state hash.
@@ -420,6 +421,5 @@ extern void MtmPrecommitTransaction(char const* gid);
420421
extern char* MtmGucSerialize(void);
421422
extern bool MtmTransIsActive(void);
422423
extern MtmTransState* MtmGetActiveTransaction(MtmL2List* list);
423-
extern void MtmReleaseLock(void);
424-
424+
extern void MtmReleaseLocks(void);
425425
#endif

pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,6 @@ void MtmExecutor(void* work, size_t size)
11651165
}
11661166
PG_CATCH();
11671167
{
1168-
MtmReleaseLock();
11691168
old_context = MemoryContextSwitchTo(MtmApplyContext);
11701169
MtmHandleApplyError();
11711170
MemoryContextSwitchTo(old_context);
@@ -1188,5 +1187,6 @@ void MtmExecutor(void* work, size_t size)
11881187
#endif
11891188
MemoryContextSwitchTo(top_context);
11901189
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1190+
MtmReleaseLocks();
11911191
}
11921192

0 commit comments

Comments
 (0)