Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 74f28a2

Browse files
knizhnikkelvich
authored andcommitted
Perform drop node under lock
1 parent bbfb53b commit 74f28a2

File tree

2 files changed

+49
-37
lines changed

2 files changed

+49
-37
lines changed

multimaster.c

+16-5
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,11 @@ void MtmLockNode(int nodeId, LWLockMode mode)
371371
LWLockAcquire((LWLockId)&Mtm->locks[nodeId], mode);
372372
}
373373

374+
bool MtmTryLockNode(int nodeId, LWLockMode mode)
375+
{
376+
return LWLockConditionalAcquire((LWLockId)&Mtm->locks[nodeId], mode);
377+
}
378+
374379
void MtmUnlockNode(int nodeId)
375380
{
376381
Assert(nodeId > 0 && nodeId <= MtmMaxNodes*2);
@@ -1684,7 +1689,14 @@ static void MtmStartRecovery()
16841689

16851690
static void MtmDropSlot(int nodeId)
16861691
{
1687-
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN, nodeId));
1692+
if (MtmTryLockNode(nodeId, LW_EXCLUSIVE))
1693+
{
1694+
MTM_ELOG(INFO, "Drop replication slot for node %d", nodeId);
1695+
ReplicationSlotDrop(psprintf(MULTIMASTER_SLOT_PATTERN, nodeId));
1696+
MtmUnlockNode(nodeId);
1697+
} else {
1698+
MTM_ELOG(WARNING, "Failed to drop replication slot for node %d", nodeId);
1699+
}
16881700
MtmLock(LW_EXCLUSIVE);
16891701
BIT_SET(Mtm->stalledNodeMask, nodeId-1);
16901702
BIT_SET(Mtm->stoppedNodeMask, nodeId-1); /* stalled node can not be automatically recovered */
@@ -2766,7 +2778,7 @@ void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
27662778
} else {
27672779
conn->arbiterPort = MULTIMASTER_DEFAULT_ARBITER_PORT;
27682780
}
2769-
MTM_ELOG(WARNING, "Using arbiter port: %d", conn->arbiterPort);
2781+
MTM_ELOG(INFO, "Using arbiter port: %d", conn->arbiterPort);
27702782

27712783
port = strstr(connStr, " port=");
27722784
if (port == NULL && strncmp(connStr, "port=", 5) == 0) {
@@ -3422,7 +3434,7 @@ _PG_init(void)
34223434
PreviousProcessUtilityHook = ProcessUtility_hook;
34233435
ProcessUtility_hook = MtmProcessUtility;
34243436

3425-
PreviousSeqNextvalHook = SeqNextvalHook;
3437+
PreviousSeqNextvalHook = SeqNextvalHook;
34263438
SeqNextvalHook = MtmSeqNextvalHook;
34273439
}
34283440

@@ -5380,7 +5392,7 @@ static void MtmSeqNextvalHook(Oid seqid, int64 next)
53805392
MtmSeqPosition pos;
53815393
pos.seqid = seqid;
53825394
pos.next = next;
5383-
LogLogicalMessage("N", (char*)&pos, sizeof(pos), true);
5395+
LogLogicalMessage("N", (char*)&pos, sizeof(pos), true);
53845396
}
53855397
}
53865398

@@ -5567,4 +5579,3 @@ Datum mtm_referee_poll(PG_FUNCTION_ARGS)
55675579

55685580
PG_RETURN_INT64(recoveredNodeMask);
55695581
}
5570-

multimaster.h

+33-32
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,25 @@
2323
#define MTM_ERRMSG(fmt,...) errmsg(MTM_TAG fmt, ## __VA_ARGS__)
2424

2525
#if DEBUG_LEVEL == 0
26-
#define MTM_LOG1(fmt, ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27-
#define MTM_LOG2(fmt, ...)
28-
#define MTM_LOG3(fmt, ...)
29-
#define MTM_LOG4(fmt, ...)
26+
#define MTM_LOG1(fmt, ...) elog(LOG, "[MTM] " fmt, ## __VA_ARGS__)
27+
#define MTM_LOG2(fmt, ...)
28+
#define MTM_LOG3(fmt, ...)
29+
#define MTM_LOG4(fmt, ...)
3030
#elif DEBUG_LEVEL == 1
31-
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32-
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33-
#define MTM_LOG3(fmt, ...)
34-
#define MTM_LOG4(fmt, ...)
31+
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
32+
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
33+
#define MTM_LOG3(fmt, ...)
34+
#define MTM_LOG4(fmt, ...)
3535
#elif DEBUG_LEVEL == 2
36-
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37-
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38-
#define MTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39-
#define MTM_LOG4(fmt, ...)
36+
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
37+
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
38+
#define MTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
39+
#define MTM_LOG4(fmt, ...)
4040
#elif DEBUG_LEVEL >= 3
41-
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42-
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43-
#define MTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44-
#define MTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
41+
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42+
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
43+
#define MTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
44+
#define MTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
4545
#endif
4646

4747
#if MTM_TRACE == 0
@@ -98,7 +98,7 @@ typedef char pgid_t[MULTIMASTER_MAX_GID_SIZE];
9898
#define SELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
9999

100100
typedef enum
101-
{
101+
{
102102
PGLOGICAL_COMMIT,
103103
PGLOGICAL_PREPARE,
104104
PGLOGICAL_COMMIT_PREPARED,
@@ -107,7 +107,7 @@ typedef enum
107107
} PGLOGICAL_EVENT;
108108

109109
/* Identifier of global transaction */
110-
typedef struct
110+
typedef struct
111111
{
112112
int node; /* Zero based index of node initiating transaction */
113113
TransactionId xid; /* Transaction ID at this node */
@@ -116,7 +116,7 @@ typedef struct
116116
#define EQUAL_GTID(x,y) ((x).node == (y).node && (x).xid == (y).xid)
117117

118118
typedef enum
119-
{
119+
{
120120
MSG_INVALID,
121121
MSG_HANDSHAKE,
122122
MSG_PREPARED,
@@ -153,12 +153,12 @@ typedef enum
153153
typedef struct
154154
{
155155
MtmMessageCode code; /* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
156-
int node; /* Sender node ID */
156+
int node; /* Sender node ID */
157157
bool lockReq;/* Whether sender node needs to lock cluster to let wal-sender caught-up and complete recovery */
158158
bool locked; /* Whether sender node is locked */
159159
TransactionId dxid; /* Transaction ID at destination node */
160-
TransactionId sxid; /* Transaction ID at sender node */
161-
XidStatus status; /* Transaction status */
160+
TransactionId sxid; /* Transaction ID at sender node */
161+
XidStatus status; /* Transaction status */
162162
csn_t csn; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
163163
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
164164
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes at the sender of message */
@@ -185,13 +185,13 @@ typedef struct MtmMessageQueue
185185
struct MtmMessageQueue* next;
186186
} MtmMessageQueue;
187187

188-
typedef struct
188+
typedef struct
189189
{
190190
MtmArbiterMessage hdr;
191191
char connStr[MULTIMASTER_MAX_CONN_STR_SIZE];
192192
} MtmHandshakeMessage;
193193

194-
typedef struct
194+
typedef struct
195195
{
196196
int used;
197197
int size;
@@ -227,7 +227,7 @@ typedef struct
227227
int senderPid;
228228
int receiverPid;
229229
lsn_t flushPos;
230-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
230+
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
231231
lsn_t restartLSN;
232232
RepOriginId originId;
233233
int timeline;
@@ -246,12 +246,12 @@ typedef struct MtmL2List
246246
typedef struct MtmTransState
247247
{
248248
TransactionId xid;
249-
XidStatus status;
249+
XidStatus status;
250250
pgid_t gid; /* Global transaction ID (used for 2PC) */
251251
GlobalTransactionId gtid; /* Transaction id at coordinator */
252252
csn_t csn; /* commit serial number */
253253
csn_t snapshot; /* transaction snapshot, or INVALID_CSN for local transactions */
254-
int procno; /* pgprocno of transaction coordinator waiting for responses from replicas,
254+
int procno; /* pgprocno of transaction coordinator waiting for responses from replicas,
255255
used to notify coordinator by arbiter */
256256
int nSubxids; /* Number of subtransanctions */
257257
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
@@ -293,7 +293,7 @@ typedef struct
293293
nodemask_t pglogicalSenderMask; /* bitmask of started pglogic senders */
294294
nodemask_t currentLockNodeMask; /* Mask of nodes IDs which are locking the cluster */
295295
nodemask_t inducedLockNodeMask; /* Mask of node IDs which requested cluster-wide lock */
296-
nodemask_t originLockNodeMask; /* Mask of node IDs which WAL-senders are locking the cluster.
296+
nodemask_t originLockNodeMask; /* Mask of node IDs which WAL-senders are locking the cluster.
297297
* MtmNodeId bit is used by recovered node to complete recovery and by MtmLockCluster method */
298298
nodemask_t reconnectMask; /* Mask of nodes connection to which has to be reestablished by sender */
299299
int lastLockHolder; /* PID of process last obtaining the node lock */
@@ -319,13 +319,13 @@ typedef struct
319319
MtmTransState** transListTail; /* Tail of L1 list of all finished transactions, used to append new elements.
320320
This list is expected to be in CSN ascending order, by strict order may be violated */
321321
MtmL2List activeTransList; /* List of active transactions */
322-
ulong64 transCount; /* Counter of transactions performed by this node */
322+
ulong64 transCount; /* Counter of transactions performed by this node */
323323
ulong64 gcCount; /* Number of global transactions performed since last GC */
324324
MtmMessageQueue* sendQueue; /* Messages to be sent by arbiter sender */
325325
MtmMessageQueue* freeQueue; /* Free messages */
326326
lsn_t recoveredLSN; /* LSN at the moment of recovery completion */
327327
BgwPool pool; /* Pool of background workers for applying logical replication patches */
328-
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
328+
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
329329
} MtmState;
330330

331331
typedef struct MtmFlushPosition
@@ -342,7 +342,7 @@ typedef struct MtmSeqPosition
342342
Oid seqid;
343343
int64 next;
344344
} MtmSeqPosition;
345-
345+
346346
#define MtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
347347

348348
extern char const* const MtmNodeStatusMnem[];
@@ -394,14 +394,15 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
394394
extern void MtmLock(LWLockMode mode);
395395
extern void MtmUnlock(void);
396396
extern void MtmLockNode(int nodeId, LWLockMode mode);
397+
extern bool MtmTryLockNode(int nodeId, LWLockMode mode);
397398
extern void MtmUnlockNode(int nodeId);
398399
extern void MtmStopNode(int nodeId, bool dropSlot);
399400
extern void MtmReconnectNode(int nodeId);
400401
extern void MtmRecoverNode(int nodeId);
401402
extern void MtmOnNodeDisconnect(int nodeId);
402403
extern void MtmOnNodeConnect(int nodeId);
403404
extern void MtmWakeUpBackend(MtmTransState* ts);
404-
extern void MtmSleep(timestamp_t interval);
405+
extern void MtmSleep(timestamp_t interval);
405406
extern void MtmAbortTransaction(MtmTransState* ts);
406407
extern void MtmSetCurrentTransactionGID(char const* gid);
407408
extern csn_t MtmGetTransactionCSN(TransactionId xid);

0 commit comments

Comments
 (0)