Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5712cd8

Browse files
committed
Sparse nodemasks
1 parent 120e4ab commit 5712cd8

11 files changed

+493
-507
lines changed

src/commit.c

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ MtmXactCallback2(XactEvent event, void *arg)
100100
void
101101
MtmBeginTransaction()
102102
{
103+
MtmNodeStatus node_status;
104+
103105
// XXX: clean MtmTx on commit and check on begin that it is clean.
104106
// That should unveil probable issues with subxacts.
105107

@@ -138,8 +140,10 @@ MtmBeginTransaction()
138140

139141
MtmDDLResetStatement();
140142

143+
node_status = MtmGetCurrentStatus();
144+
141145
/* Application name can be changed using PGAPPNAME environment variable */
142-
if (Mtm->status != MTM_ONLINE
146+
if (node_status != MTM_ONLINE
143147
&& strcmp(application_name, MULTIMASTER_ADMIN) != 0
144148
&& strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) != 0)
145149
{
@@ -150,13 +154,13 @@ MtmBeginTransaction()
150154
{
151155
mtm_log(ERROR,
152156
"Multimaster node is not online: current status %s",
153-
MtmNodeStatusMnem[Mtm->status]);
157+
MtmNodeStatusMnem[node_status]);
154158
}
155159
else
156160
{
157161
mtm_log(FATAL,
158162
"Multimaster node is not online: current status %s",
159-
MtmNodeStatusMnem[Mtm->status]);
163+
MtmNodeStatusMnem[node_status]);
160164
}
161165
}
162166
}
@@ -216,12 +220,8 @@ MtmTwoPhaseCommit()
216220

217221
LWLockAcquire(MtmCommitBarrier, LW_SHARED);
218222

219-
MtmLock(LW_SHARED);
220-
participantsMask = ~Mtm->disabledNodeMask &
223+
participantsMask = MtmGetEnabledNodeMask() &
221224
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
222-
if (Mtm->status != MTM_ONLINE)
223-
mtm_log(ERROR, "This node became offline during current transaction");
224-
MtmUnlock();
225225

226226
ret = PrepareTransactionBlock(gid);
227227
if (!ret)
@@ -305,24 +305,13 @@ GatherPrepares(TransactionId xid, nodemask_t participantsMask, int *failed_at)
305305
* disconnected. Let's wait when it became disabled as we can
306306
* became offline by this time.
307307
*/
308-
MtmLock(LW_SHARED);
309-
if (BIT_CHECK(Mtm->disabledNodeMask, sender_to_node[sender_id] - 1))
308+
if (!BIT_CHECK(MtmGetEnabledNodeMask(), sender_to_node[sender_id] - 1))
310309
{
311-
if (Mtm->status != MTM_ONLINE)
312-
{
313-
elog(ERROR, "our node was disabled during transaction commit");
314-
}
315-
else
316-
{
317-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
318-
mtm_log(MtmTxTrace,
319-
"GatherPrepares: dropping node%d from participants of tx" XID_FMT,
320-
sender_to_node[sender_id], xid);
321-
prepared = false;
322-
*failed_at = sender_to_node[sender_id];
323-
}
310+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
311+
mtm_log(MtmTxTrace,
312+
"GatherPrepares: dropping node%d from participants of tx" XID_FMT,
313+
sender_to_node[sender_id], xid);
324314
}
325-
MtmUnlock();
326315
}
327316
}
328317

@@ -365,22 +354,13 @@ GatherPrecommits(TransactionId xid, nodemask_t participantsMask, MtmMessageCode
365354
* disconnected. Let's wait when it became disabled as we can
366355
* became offline by this time.
367356
*/
368-
MtmLock(LW_SHARED);
369-
if (BIT_CHECK(Mtm->disabledNodeMask, sender_to_node[sender_id] - 1))
357+
if (!BIT_CHECK(MtmGetEnabledNodeMask(), sender_to_node[sender_id] - 1))
370358
{
371-
if (Mtm->status != MTM_ONLINE)
372-
{
373-
elog(ERROR, "our node was disabled during transaction commit");
374-
}
375-
else
376-
{
377-
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
378-
mtm_log(MtmTxTrace,
379-
"GatherPrecommit: dropping node%d from participants of tx" XID_FMT,
380-
sender_to_node[sender_id], xid);
381-
}
359+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
360+
mtm_log(MtmTxTrace,
361+
"GatherPrecommit: dropping node%d from participants of tx" XID_FMT,
362+
sender_to_node[sender_id], xid);
382363
}
383-
MtmUnlock();
384364
}
385365
}
386366

src/ddd.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "ddd.h"
1818
#include "bytebuf.h"
1919
#include "mm.h"
20+
#include "state.h"
2021
#include "logger.h"
2122

2223
#define LOCK_BY_INDEX(i) ((LWLockId)&ddd_shared->locks[(i)])
@@ -383,7 +384,7 @@ MtmDetectGlobalDeadLockForXid(TransactionId xid)
383384
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
384385
ByteBufferFree(&buf);
385386
for (i = 0; i < ddd_shared->n_nodes; i++) {
386-
if (i+1 != Mtm->my_node_id && !BIT_CHECK(Mtm->disabledNodeMask, i)) {
387+
if (i+1 != Mtm->my_node_id && BIT_CHECK(MtmGetEnabledNodeMask(), i)) {
387388
size_t lockGraphSize;
388389
void* lockGraphData;
389390

@@ -439,7 +440,7 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
439440

440441
mtm_log(DeadlockCheck, "Detect global deadlock for " XID_FMT " by backend %d", pgxact->xid, MyProcPid);
441442

442-
if (Mtm->status != MTM_ONLINE || !TransactionIdIsValid(pgxact->xid))
443+
if (!TransactionIdIsValid(pgxact->xid))
443444
return false;
444445

445446
return MtmDetectGlobalDeadLockForXid(pgxact->xid);

src/include/multimaster.h

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,6 @@ typedef enum
133133
MSG_POLL_STATUS
134134
} MtmMessageCode;
135135

136-
typedef enum
137-
{
138-
MTM_DISABLED, /* Node disabled */
139-
MTM_RECOVERY, /* Node is in recovery process */
140-
MTM_RECOVERED, /* Node is recovered by is not yet switched to
141-
* ONLINE because not all sender/receivers are
142-
* restarted */
143-
MTM_ONLINE /* Ready to receive client's queries */
144-
} MtmNodeStatus;
145-
146136
typedef struct
147137
{
148138
MtmMessageCode code; /* Message code: MSG_PREPARE, MSG_PRECOMMIT,
@@ -184,54 +174,28 @@ typedef struct
184174
{
185175
int my_node_id;
186176
bool stop_new_commits;
187-
bool recovered;
188177
XLogRecPtr latestSyncpoint;
189-
MtmNodeStatus status; /* Status of this node */
190-
char *statusReason; /* A human-readable description of why the
191-
* current status was set */
192-
int recoverySlot; /* NodeId of recovery slot or 0 if none */
193-
LWLockPadded *locks; /* multimaster lock tranche */
194-
nodemask_t selfConnectivityMask;
195-
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes */
196-
nodemask_t clique; /* Bitmask of nodes that are connected and we
197-
* allowed to connect/send wal/receive wal
198-
* with them */
199-
bool refereeGrant; /* Referee allowed us to work with half of the
200-
* nodes */
201-
int refereeWinnerId; /* Node that won referee contest */
202-
nodemask_t stalledNodeMask; /* Bitmask of stalled nodes (node with
203-
* dropped replication slot which makes it
204-
* not possible automatic recovery of such
205-
* node) */
206-
nodemask_t stoppedNodeMask; /* Bitmask of stopped (permanently
207-
* disabled nodes) */
208-
nodemask_t pglogicalReceiverMask; /* bitmask of started pglogic
209-
* receivers */
210-
nodemask_t pglogicalSenderMask; /* bitmask of started pglogic senders */
211178
bool localTablesHashLoaded; /* Whether data from local_tables
212179
* table is loaded in shared memory
213180
* hash table */
214-
int nAllNodes; /* Total number of nodes */
215-
int recoveryCount; /* Number of completed recoveries */
216-
int donorNodeId; /* Cluster node from which this node was
217-
* populated */
218181
struct {
219182
MtmReplicationMode receiver_mode;
220183
pid_t sender_pid;
221184
pid_t receiver_pid;
222185
int dmq_dest_id;
223186
} peers[MTM_MAX_NODES];
224187
BgwPool pools[FLEXIBLE_ARRAY_MEMBER]; /* [Mtm->nAllNodes]: per-node data */
225-
} MtmState;
188+
} MtmShared;
226189

227-
extern MtmState *Mtm;
190+
extern MtmShared *Mtm;
228191

229192
/* XXX: to delete */
230193
extern int MtmReplicationNodeId;
231194
extern MtmCurrentTrans MtmTx;
232195
extern MemoryContext MtmApplyContext;
233196

234197
/* Locks */
198+
extern LWLock *MtmLock;
235199
extern LWLock *MtmCommitBarrier;
236200
extern LWLock *MtmReceiverBarrier;
237201
extern LWLock *MtmSyncpointLock;
@@ -252,10 +216,6 @@ extern int MtmMaxWorkers;
252216
extern int MtmMaxNodes;
253217
extern bool MtmBreakConnection;
254218

255-
/* XXX! need rename: that's actually a disconnectivity mask */
256-
#define SELF_CONNECTIVITY_MASK (Mtm->selfConnectivityMask)
257-
#define EFFECTIVE_CONNECTIVITY_MASK ( SELF_CONNECTIVITY_MASK | Mtm->stoppedNodeMask | ~Mtm->clique )
258-
259219
extern bool MtmTwoPhaseCommit(void);
260220
extern void MtmBeginTransaction(void);
261221

@@ -278,12 +238,6 @@ extern MtmNode *MtmNodeById(MtmConfig *cfg, int node_id);
278238

279239
extern void MtmStateFill(MtmConfig *cfg);
280240

281-
extern void MtmLock(LWLockMode mode);
282-
extern void MtmUnlock(void);
283-
extern void MtmLockNode(int nodeId, LWLockMode mode);
284-
extern bool MtmTryLockNode(int nodeId, LWLockMode mode);
285-
extern void MtmUnlockNode(int nodeId);
286-
287241
extern void MtmInitMessage(MtmArbiterMessage *msg, MtmMessageCode code);
288242

289243
extern bool MtmIsEnabled(void);

src/include/resolver.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ typedef int MtmTxStateMask;
2020
extern void ResolverMain(Datum main_arg);
2121
extern void ResolverInit(void);
2222
extern BackgroundWorkerHandle *ResolverStart(Oid db_id, Oid user_id);
23-
extern void ResolveTransactionsForNode(int node_id);
24-
extern void ResolveAllTransactions(void);
23+
extern void ResolveTransactionsForNode(int node_id, int n_all_nodes);
24+
extern void ResolveAllTransactions(int n_all_nodes);
2525
extern char *MtmTxStateMnem(MtmTxState state);
2626

2727
#endif /* RESOLVER_H */

src/include/state.h

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
#ifndef STATE_H
2+
#define STATE_H
13

24
typedef enum
35
{
46
MTM_NEIGHBOR_CLIQUE_DISABLE,
57
MTM_NEIGHBOR_WAL_RECEIVER_START,
8+
MTM_NEIGHBOR_WAL_RECEIVER_ERROR,
69
MTM_NEIGHBOR_WAL_SENDER_START_RECOVERY,
710
MTM_NEIGHBOR_WAL_SENDER_START_RECOVERED,
8-
MTM_NEIGHBOR_RECOVERY_CAUGHTUP
11+
MTM_NEIGHBOR_RECOVERY_CAUGHTUP,
12+
MTM_NEIGHBOR_WAL_SENDER_STOP
913
} MtmNeighborEvent;
1014

1115
typedef enum
@@ -21,24 +25,40 @@ typedef enum
2125
MTM_NONRECOVERABLE_ERROR
2226
} MtmEvent;
2327

24-
extern char const* const MtmNodeStatusMnem[];
28+
typedef enum
29+
{
30+
MTM_DISABLED, /* Node disabled */
31+
MTM_RECOVERY, /* Node is in recovery process */
32+
MTM_RECOVERED, /* Node is recovered by is not yet switched to
33+
* ONLINE because not all sender/receivers are
34+
* restarted */
35+
MTM_ONLINE /* Ready to receive client's queries */
36+
} MtmNodeStatus;
37+
38+
extern char const *const MtmNodeStatusMnem[];
2539

40+
extern void MtmStateInit(void);
41+
extern void MtmStateShmemStartup(void);
2642
extern void MtmStateLoad(MtmConfig *cfg);
2743

2844
extern void MtmMonitorStart(Oid db_id, Oid user_id);
2945

3046
extern void MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev, bool locked);
3147
extern void MtmStateProcessEvent(MtmEvent ev, bool locked);
32-
extern void MtmDisableNode(int nodeId);
33-
extern void MtmEnableNode(int nodeId);
3448

3549
extern void MtmOnNodeDisconnect(char *node_name);
3650
extern void MtmOnNodeConnect(char *node_name);
37-
extern void MtmReconnectNode(int nodeId);
3851

3952
extern void MtmRefreshClusterStatus(void);
4053

4154
extern int countZeroBits(nodemask_t mask, int nNodes);
4255

56+
extern MtmReplicationMode MtmGetReplicationMode(int nodeId);
57+
4358
extern MtmNodeStatus MtmGetCurrentStatus(void);
44-
extern nodemask_t MtmGetDisabledNodeMask(void);
59+
extern nodemask_t MtmGetDisabledNodeMask(void);
60+
extern nodemask_t MtmGetConnectedNodeMask(void);
61+
extern nodemask_t MtmGetEnabledNodeMask(void);
62+
extern int MtmGetRecoveryCount(void);
63+
64+
#endif

0 commit comments

Comments
 (0)