Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c367ad6

Browse files
committed
multidb refactor: dead code elimination
1 parent c1b55de commit c367ad6

File tree

8 files changed

+136
-594
lines changed

8 files changed

+136
-594
lines changed

src/ddl.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
#include "ddl.h"
4141
#include "logger.h"
4242

43+
#include "multimaster.h"
44+
4345

4446
// XXX: is it defined somewhere?
4547
#define GUC_KEY_MAXLEN 255
@@ -74,7 +76,6 @@ static VacuumStmt* MtmVacuumStmt;
7476
static IndexStmt* MtmIndexStmt;
7577
static DropStmt* MtmDropStmt;
7678
static void* MtmTablespaceStmt; /* CREATE/DELETE tablespace */
77-
static bool localTablesHashLoaded;
7879

7980
static HTAB *MtmGucHash = NULL;
8081
static dlist_head MtmGucList = DLIST_STATIC_INIT(MtmGucList);
@@ -1115,14 +1116,14 @@ MtmIsRelationLocal(Relation rel)
11151116
bool found;
11161117

11171118
LWLockAcquire(MtmLocalTablesMapLock, LW_SHARED);
1118-
if (!localTablesHashLoaded)
1119+
if (!Mtm->localTablesHashLoaded)
11191120
{
11201121
LWLockRelease(MtmLocalTablesMapLock);
11211122
LWLockAcquire(MtmLocalTablesMapLock, LW_EXCLUSIVE);
1122-
if (!localTablesHashLoaded)
1123+
if (!Mtm->localTablesHashLoaded)
11231124
{
11241125
MtmLoadLocalTables();
1125-
localTablesHashLoaded = true;
1126+
Mtm->localTablesHashLoaded = true;
11261127
}
11271128
}
11281129

src/include/multimaster.h

Lines changed: 21 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,6 @@
9494
#define Natts_mtm_nodes_state 17
9595
#define Natts_mtm_cluster_state 20
9696

97-
typedef ulong64 csn_t; /* commit serial number */
98-
#define INVALID_CSN ((csn_t)-1)
99-
10097
typedef ulong64 lsn_t;
10198
#define INVALID_LSN InvalidXLogRecPtr
10299

@@ -170,50 +167,13 @@ typedef struct
170167
MtmMessageCode code; /* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
171168
MtmTxState state;
172169
int node; /* Sender node ID */
173-
bool lockReq;/* Whether sender node needs to lock cluster to let wal-sender caught-up and complete recovery */
174-
bool locked; /* Whether sender node is locked */
175170
TransactionId dxid; /* Transaction ID at destination node */
176171
TransactionId sxid; /* Transaction ID at sender node */
177172
XidStatus status; /* Transaction status */
178-
csn_t csn; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
179-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
180-
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes at the sender of message */
181173
nodemask_t connectivityMask; /* Connectivity bitmask at the sender of message */
182174
pgid_t gid; /* Global transaction identifier */
183175
} MtmArbiterMessage;
184176

185-
/*
186-
* Abort logical message is send by replica when error is happen while applying prepared transaction.
187-
* In this case we do not have prepared transaction and can not do abort-prepared.
188-
* But we have to record the fact of abort to be able to replay it in case of crash of coordinator of this transaction.
189-
* We are using logical abort message with code 'A' for it
190-
*/
191-
typedef struct MtmAbortLogicalMessage
192-
{
193-
pgid_t gid;
194-
int origin_node;
195-
lsn_t origin_lsn;
196-
} MtmAbortLogicalMessage;
197-
198-
typedef struct MtmMessageQueue
199-
{
200-
MtmArbiterMessage msg;
201-
struct MtmMessageQueue* next;
202-
} MtmMessageQueue;
203-
204-
typedef struct
205-
{
206-
MtmArbiterMessage hdr;
207-
char connStr[MULTIMASTER_MAX_CONN_STR_SIZE];
208-
} MtmHandshakeMessage;
209-
210-
typedef struct
211-
{
212-
int used;
213-
int size;
214-
MtmArbiterMessage* data;
215-
} MtmBuffer;
216-
217177
typedef struct
218178
{
219179
char hostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
@@ -222,138 +182,48 @@ typedef struct
222182
int postmasterPort;
223183
} MtmConnectionInfo;
224184

225-
226185
typedef struct
227186
{
228187
Oid sourceTable;
229188
nodemask_t targetNodes;
230189
} MtmCopyRequest;
231190

232-
233191
typedef struct
234192
{
235193
MtmConnectionInfo con;
236194
/* Pool of background workers for applying logical replication */
237195
BgwPool pool;
238-
timestamp_t transDelay;
239-
timestamp_t lastStatusChangeTime;
240-
timestamp_t receiverStartTime;
241-
timestamp_t senderStartTime;
242-
timestamp_t lastHeartbeat;
243-
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes received from this node */
244196
nodemask_t connectivityMask; /* Connectivity mask at this node */
245197
int senderPid;
246198
int receiverPid;
247199
lsn_t flushPos;
248-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
249200
lsn_t restartLSN;
250201
RepOriginId originId;
251202
int timeline;
252-
void* lockGraphData;
253-
int lockGraphAllocated;
254-
int lockGraphUsed;
255-
uint64 nHeartbeats;
256203
bool manualRecovery;
257204
DmqDestinationId destination_id;
258-
bool slotDeleted; /* Signalizes that node is already deleted our slot and
259-
* recovery from that node isn't possible.
260-
*/
261205
} MtmNodeInfo;
262206

263-
typedef struct MtmL2List
264-
{
265-
struct MtmL2List* next;
266-
struct MtmL2List* prev;
267-
} MtmL2List;
268-
269-
typedef struct MtmTransState
270-
{
271-
TransactionId xid;
272-
XidStatus status;
273-
pgid_t gid; /* Global transaction ID (used for 2PC) */
274-
GlobalTransactionId gtid; /* Transaction id at coordinator */
275-
csn_t csn; /* commit serial number */
276-
csn_t snapshot; /* transaction snapshot, or INVALID_CSN for local transactions */
277-
int procno; /* pgprocno of transaction coordinator waiting for responses from replicas,
278-
used to notify coordinator by arbiter */
279-
int nSubxids; /* Number of subtransanctions */
280-
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
281-
MtmL2List activeList; /* L2-list of active transactions */
282-
bool votingCompleted; /* 2PC voting is completed */
283-
bool isLocal; /* Transaction is either replicated, either doesn't contain DML statements, so it should be ignored by pglogical replication */
284-
bool isEnqueued; /* Transaction is inserted in queue */
285-
bool isPrepared; /* Transaction is prepared: now it is safe to commit transaction */
286-
bool isActive; /* Transaction is active */
287-
bool isTwoPhase; /* User level 2PC */
288-
bool isPinned; /* Transaction oid protected from GC */
289-
int nConfigChanges; /* Number of cluster configuration changes at moment of transaction start */
290-
nodemask_t participantsMask; /* Mask of nodes involved in transaction */
291-
nodemask_t votedMask; /* Mask of voted nodes */
292-
int abortedByNode; /* Store info about node on which this tx was aborted */
293-
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
294-
} MtmTransState;
295-
296-
typedef struct {
297-
pgid_t gid;
298-
bool abort;
299-
XidStatus status;
300-
MtmTransState* state;
301-
} MtmTransMap;
302-
303207
typedef struct
304208
{
305209
bool extension_created;
306210
MtmNodeStatus status; /* Status of this node */
307-
/* A human-readable description of why the current status was set */
308-
char *statusReason;
211+
char *statusReason; /* A human-readable description of why the current status was set */
309212
int recoverySlot; /* NodeId of recovery slot or 0 if none */
310-
volatile slock_t queueSpinlock; /* spinlock used to protect sender queue */
311-
PGSemaphore sendSemaphore; /* semaphore used to notify mtm-sender about new responses to coordinator */
312213
LWLockPadded *locks; /* multimaster lock tranche */
313-
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
314214
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes */
315215
nodemask_t clique; /* Bitmask of nodes that are connected and we allowed to connect/send wal/receive wal with them */
316216
bool refereeGrant; /* Referee allowed us to work with half of the nodes */
317217
int refereeWinnerId; /* Node that won referee contest */
318-
nodemask_t deadNodeMask; /* Bitmask of nodes considered as dead by referee */
319-
nodemask_t recoveredNodeMask; /* Bitmask of nodes recoverd after been reported as dead by referee */
320218
nodemask_t stalledNodeMask; /* Bitmask of stalled nodes (node with dropped replication slot which makes it not possible automatic recovery of such node) */
321219
nodemask_t stoppedNodeMask; /* Bitmask of stopped (permanently disabled nodes) */
322220
nodemask_t pglogicalReceiverMask; /* bitmask of started pglogic receivers */
323221
nodemask_t pglogicalSenderMask; /* bitmask of started pglogic senders */
324-
nodemask_t currentLockNodeMask; /* Mask of nodes IDs which are locking the cluster */
325-
nodemask_t inducedLockNodeMask; /* Mask of node IDs which requested cluster-wide lock */
326-
nodemask_t originLockNodeMask; /* Mask of node IDs which WAL-senders are locking the cluster.
327-
* MtmNodeId bit is used by recovered node to complete recovery and by MtmLockCluster method */
328-
nodemask_t reconnectMask; /* Mask of nodes connection to which has to be reestablished by sender */
329-
int lastLockHolder; /* PID of process last obtaining the node lock */
330222
bool localTablesHashLoaded; /* Whether data from local_tables table is loaded in shared memory hash table */
331-
bool preparedTransactionsLoaded; /* GIDs of prepared transactions are loaded at startup */
332-
int inject2PCError; /* Simulate error during 2PC commit at this node */
333-
int nLiveNodes; /* Number of active nodes */
334-
int nAllNodes; /* Total number of nodes */
335-
int nReceivers; /* Number of initialized logical receivers (used to determine moment when initialization/recovery is completed) */
336-
int nSenders; /* Number of started WAL senders (used to determine moment when recovery) */
337-
int nActiveTransactions; /* Number of active 2PC transactions */
338-
int nRunningTransactions; /* Number of all running transactions */
339-
int nConfigChanges; /* Number of cluster configuration changes */
223+
int nAllNodes; /* Total number of nodes */
340224
int recoveryCount; /* Number of completed recoveries */
341225
int donorNodeId; /* Cluster node from which this node was populated */
342-
int64 timeShift; /* Local time correction */
343-
csn_t csn; /* Last obtained timestamp: used to provide unique ascending CSNs based on system time */
344-
csn_t lastCsn; /* CSN of last committed transaction */
345-
MtmTransState* votingTransactions; /* L1-list of replicated transactions notifications to coordinator.
346-
This list is used to pass information to mtm-sender BGW */
347-
MtmTransState* transListHead; /* L1 list of all finished transactions present in xid2state hash.
348-
It is cleanup by MtmGetOldestXmin */
349-
MtmTransState** transListTail; /* Tail of L1 list of all finished transactions, used to append new elements.
350-
This list is expected to be in CSN ascending order, by strict order may be violated */
351-
MtmL2List activeTransList; /* List of active transactions */
352-
ulong64 transCount; /* Counter of transactions performed by this node */
353-
ulong64 gcCount; /* Number of global transactions performed since last GC */
354-
MtmMessageQueue* sendQueue; /* Messages to be sent by arbiter sender */
355-
MtmMessageQueue* freeQueue; /* Free messages */
356-
lsn_t recoveredLSN; /* LSN at the moment of recovery completion */
226+
lsn_t recoveredLSN; /* LSN at the moment of recovery completion */
357227
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
358228
} MtmState;
359229

@@ -366,8 +236,6 @@ typedef struct MtmFlushPosition
366236
} MtmFlushPosition;
367237

368238

369-
#define MtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
370-
371239
extern char const* const MtmNodeStatusMnem[];
372240
extern char const* const MtmTxnStatusMnem[];
373241
extern char const* const MtmMessageKindMnem[];
@@ -376,26 +244,17 @@ extern MtmState* Mtm;
376244

377245
extern int MtmReplicationNodeId;
378246
extern int MtmNodes;
379-
extern int MtmArbiterPort;
380247
extern char* MtmDatabaseName;
381-
extern int MtmNodeDisableDelay;
382248
extern int MtmTransSpillThreshold;
383249
extern int MtmHeartbeatSendTimeout;
384250
extern int MtmHeartbeatRecvTimeout;
385-
extern bool MtmUseRDMA;
386-
extern bool MtmUseDtm;
387251
extern bool MtmPreserveCommitOrder;
388-
extern HTAB* MtmXid2State;
389-
extern HTAB* MtmGid2State;
390252

391-
extern lsn_t MtmSenderWalEnd;
392-
extern timestamp_t MtmRefreshClusterStatusSchedule;
393253
extern MtmConnectionInfo* MtmConnections;
394254
extern bool MtmMajorNode;
395255
extern bool MtmBackgroundWorker;
396256
extern char* MtmRefereeConnStr;
397257
extern bool MtmIsRecoverySession;
398-
extern int MtmWorkers;
399258

400259
extern void MtmXactCallback2(XactEvent event, void *arg);
401260
extern void MtmMonitorInitialize(void);
@@ -413,67 +272,55 @@ extern void ResolveTransactionsForNode(int node_id);
413272
extern void ResolveAllTransactions(void);
414273
extern char *MtmTxStateMnem(MtmTxState state);
415274

416-
extern void MtmFollowerHandleAbort(void);
417-
418275
extern void MtmStartReceivers(void);
419276
extern void MtmStartReceiver(int nodeId, bool dynamic);
420-
extern csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, nodemask_t* participantsMask);
421-
extern csn_t MtmAssignCSN(void);
422-
extern csn_t MtmSyncClock(csn_t csn);
423-
extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot, nodemask_t participantsMask);
277+
424278
extern MtmReplicationMode MtmGetReplicationMode(int nodeId);
425279
extern void MtmExecutor(void* work, size_t size);
426-
extern void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd);
427-
extern void MtmSendMessage(MtmArbiterMessage* msg);
428-
extern void MtmAdjustSubtransactions(MtmTransState* ts);
280+
281+
282+
429283
extern void MtmLock(LWLockMode mode);
430284
extern void MtmUnlock(void);
431-
extern void MtmDeepUnlock(void);
432285
extern void MtmLockNode(int nodeId, LWLockMode mode);
433286
extern bool MtmTryLockNode(int nodeId, LWLockMode mode);
434287
extern void MtmUnlockNode(int nodeId);
288+
435289
extern void MtmStopNode(int nodeId, bool dropSlot);
436290
extern void MtmRecoverNode(int nodeId);
437291
extern void MtmResumeNode(int nodeId);
438-
extern void MtmWakeUpBackend(MtmTransState* ts);
292+
439293
extern void MtmSleep(timestamp_t interval);
440-
extern void MtmAbortTransaction(MtmTransState* ts);
441-
extern void MtmSetCurrentTransactionGID(char const* gid);
442-
extern csn_t MtmGetTransactionCSN(TransactionId xid);
443-
extern void MtmSetCurrentTransactionCSN(csn_t csn);
294+
295+
extern void MtmSetCurrentTransactionGID(char const* gid, int node_id);
296+
297+
extern void MtmSetCurrentTransactionCSN(void);
298+
444299
extern TransactionId MtmGetCurrentTransactionId(void);
445300
extern XidStatus MtmGetCurrentTransactionStatus(void);
446-
extern XidStatus MtmExchangeGlobalTransactionStatus(char const* gid, XidStatus status);
301+
447302
extern bool MtmIsRecoveredNode(int nodeId);
448303
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
449304
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
450305
extern bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr);
451306
extern void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN);
452-
extern void MtmMakeTableLocal(char const* schema, char const* name);
453-
extern void MtmMakeRelationLocal(Oid relid);
307+
454308
extern void MtmHandleApplyError(void);
309+
455310
extern void MtmUpdateLsnMapping(int nodeId, lsn_t endLsn);
456311
extern lsn_t MtmGetFlushPosition(int nodeId);
457-
extern bool MtmWatchdog(timestamp_t now);
458-
extern void MtmCheckHeartbeat(void);
312+
459313
extern void MtmResetTransaction(void);
460314
extern void MtmReleaseRecoverySlot(int nodeId);
461315
extern PGconn *PQconnectdb_safe(const char *conninfo, int timeout);
462316
extern void MtmBeginSession(int nodeId);
463317
extern void MtmEndSession(int nodeId, bool unlock);
464-
extern void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit);
465-
extern void MtmRollbackPreparedTransaction(int nodeId, char const* gid);
318+
466319
extern bool MtmFilterTransaction(char* record, int size);
467-
extern void MtmPrecommitTransaction(char const* gid);
468-
extern char* MtmGucSerialize(void);
469-
extern MtmTransState* MtmGetActiveTransaction(MtmL2List* list);
470-
extern void MtmReleaseLocks(void);
320+
471321
extern void MtmInitMessage(MtmArbiterMessage* msg, MtmMessageCode code);
472-
extern void MtmSetSnapshot(csn_t snapshot);
322+
473323
extern void MtmRefereeInitialize(void);
474-
extern void MtmPollStatusOfPreparedTransactionsForDisabledNode(int disabledNodeId, bool commitPrecommited);
475-
extern void MtmPollStatusOfPreparedTransactions(bool majorMode);
476-
extern int MtmGetNumberOfVotingNodes(void);
477324
extern void MtmUpdateControlFile(void);
478325

479326
extern void MtmCheckSlots(void);

src/include/state.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ typedef enum
2121
MTM_NONRECOVERABLE_ERROR
2222
} MtmEvent;
2323

24-
extern void MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev);
25-
extern void MtmStateProcessEvent(MtmEvent ev);
24+
extern void MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev, bool locked);
25+
extern void MtmStateProcessEvent(MtmEvent ev, bool locked);
2626
extern void MtmDisableNode(int nodeId);
2727
extern void MtmEnableNode(int nodeId);
2828

0 commit comments

Comments
 (0)