Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 97b31fb

Browse files
committed
multidb refactor: move receiver-related functions to logical_receiver.c, adopot new logger everywhere
1 parent 7f99d61 commit 97b31fb

17 files changed

+631
-863
lines changed

src/commit.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
*/
1111

1212
#include "postgres.h"
13+
#include "access/twophase.h"
14+
#include "access/transam.h"
1315
#include "storage/proc.h"
1416
#include "utils/guc.h"
1517
#include "miscadmin.h"
@@ -19,6 +21,7 @@
1921
#include "multimaster.h"
2022
#include "logger.h"
2123
#include "ddl.h"
24+
#include "state.h"
2225

2326
static Oid MtmDatabaseId;
2427
static bool DmqSubscribed;
@@ -78,7 +81,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
7881
/* Reject all user's transactions at offline cluster.
7982
* Allow execution of transaction by bg-workers to makeit possible to perform recovery.
8083
*/
81-
MTM_ELOG(ERROR,
84+
mtm_log(ERROR,
8285
"Multimaster node is not online: current status %s",
8386
MtmNodeStatusMnem[Mtm->status]);
8487
}

src/include/logger.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,46 @@ typedef enum MtmLogTag
3636
/* status worker */
3737
StatusRequest = LOG,
3838

39+
/* ddd */
3940
BgwPoolEvent = LOG,
4041

42+
/* ddd */
4143
DeadlockCheck = LOG,
4244
DeadlockUpdate = LOG,
4345
DeadlockSerialize = DEBUG3,
4446

47+
/* ddl XXX! */
4548
DMLStmtOutgoing = DEBUG1,
4649
DMLStmtIncoming = DEBUG1,
4750
DMLProcessingTrace = DEBUG1,
4851

52+
/* broadcast service */
53+
BroadcastNotice = DEBUG1,
54+
55+
/* walsender's proto */
56+
ProtoTraceFilter = DEBUG1,
57+
ProtoTraceTx = DEBUG1,
58+
ProtoTraceMode = LOG,
59+
ProtoTraceMessage = LOG,
60+
61+
/* receiver */
62+
MtmReceiverMode = LOG,
63+
MtmReceiverFilter = LOG,
64+
MtmApplyMessage = LOG,
65+
MtmApplyTrace = DEBUG2,
66+
MtmApplyError = LOG,
67+
68+
/* state */
69+
MtmStateSwitch = LOG,
70+
MtmStateMessage = LOG
4971
} MtmLogTag;
5072

5173
// XXX: also meaningful process name would be cool
5274

75+
#define MTM_TAG "[MTM] "
76+
77+
#define MTM_ERRMSG(fmt,...) errmsg(MTM_TAG fmt, ## __VA_ARGS__)
78+
5379
#define mtm_log(tag, fmt, ...) ereport(tag, \
54-
(errmsg("[MTM] " fmt, ## __VA_ARGS__), \
80+
(errmsg(MTM_TAG fmt, ## __VA_ARGS__), \
5581
errhidestmt(true), errhidecontext(true)))

src/include/multimaster.h

Lines changed: 5 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,17 @@
11
#ifndef __MULTIMASTER_H__
22
#define __MULTIMASTER_H__
33

4+
#include "postgres.h"
5+
46
#include "bytebuf.h"
57
#include "bgwpool.h"
68
#include "bkb.h"
79

8-
#include "access/clog.h"
9-
#include "pglogical_output/hooks.h"
10-
#include "commands/vacuum.h"
11-
#include "libpq-fe.h"
10+
#include "storage/lwlock.h"
1211

1312
#include "dmq.h"
1413
#include "mm.h"
15-
16-
#ifndef DEBUG_LEVEL
17-
#define DEBUG_LEVEL 0
18-
#endif
19-
20-
21-
#define MTM_TAG "[MTM] "
22-
#define MTM_ELOG(level,fmt,...) elog(level, MTM_TAG fmt, ## __VA_ARGS__)
23-
#define MTM_ERRMSG(fmt,...) errmsg(MTM_TAG fmt, ## __VA_ARGS__)
24-
25-
#if DEBUG_LEVEL == 0
26-
#define MTM_LOG1(fmt, ...) ereport(LOG, \
27-
(errmsg("[MTM] " fmt, ## __VA_ARGS__), \
28-
errhidestmt(true), errhidecontext(true)))
29-
30-
#define MTM_LOG2(fmt, ...)
31-
#define MTM_LOG3(fmt, ...)
32-
#define MTM_LOG4(fmt, ...)
33-
#elif DEBUG_LEVEL == 1
34-
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
35-
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
36-
#define MTM_LOG3(fmt, ...)
37-
#define MTM_LOG4(fmt, ...)
38-
#elif DEBUG_LEVEL == 2
39-
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
40-
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
41-
#define MTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
42-
#define MTM_LOG4(fmt, ...)
43-
#elif DEBUG_LEVEL >= 3
44-
#define MTM_LOG1(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
45-
#define MTM_LOG2(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
46-
#define MTM_LOG3(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
47-
#define MTM_LOG4(fmt, ...) fprintf(stderr, fmt "\n", ## __VA_ARGS__)
48-
#endif
49-
50-
// #define MTM_TXFINISH 1
51-
52-
#ifndef MTM_TXFINISH
53-
#define TXFINISH(fmt, ...)
54-
#else
55-
#define TXFINISH(fmt, ...) elog(LOG, MTM_TAG "[TXFINISH] " fmt, ## __VA_ARGS__)
56-
#endif
57-
58-
// #define MTM_TRACE 1
59-
60-
#ifndef MTM_TRACE
61-
#define MTM_TXTRACE(tx, event, ...)
62-
#else
63-
#define MTM_TXTRACE(tx, event, ...) \
64-
elog(LOG, MTM_TAG "%s, %lld, %u " event "\n", tx->gid, (long long)MtmGetSystemTime(), MyProcPid, ## __VA_ARGS__)
65-
#endif
14+
#include "resolver.h"
6615

6716
#define MULTIMASTER_NAME "multimaster"
6817
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
@@ -86,13 +35,6 @@
8635
#define USEC_TO_MSEC(t) ((t)/1000)
8736
#define MSEC_TO_USEC(t) ((timestamp_t)(t)*1000)
8837

89-
#define Natts_mtm_ddl_log 2
90-
#define Anum_mtm_ddl_log_issued 1
91-
#define Anum_mtm_ddl_log_query 2
92-
93-
#define Natts_mtm_trans_state 15
94-
#define Natts_mtm_nodes_state 17
95-
#define Natts_mtm_cluster_state 20
9638

9739
typedef ulong64 lsn_t;
9840
#define INVALID_LSN InvalidXLogRecPtr
@@ -103,20 +45,6 @@ typedef ulong64 lsn_t;
10345

10446
#define MTM_MAX_NODES 16
10547

106-
typedef enum
107-
{
108-
MtmTxUnknown = (1<<0),
109-
MtmTxNotFound = (1<<1),
110-
MtmTxInProgress = (1<<2),
111-
MtmTxPrepared = (1<<3),
112-
MtmTxPreCommited = (1<<4),
113-
MtmTxPreAborted = (1<<5),
114-
MtmTxCommited = (1<<6),
115-
MtmTxAborted = (1<<7)
116-
} MtmTxState;
117-
118-
typedef int MtmTxStateMask;
119-
12048
typedef enum
12149
{
12250
PGLOGICAL_COMMIT,
@@ -153,23 +81,13 @@ typedef enum
15381
MTM_ONLINE /* Ready to receive client's queries */
15482
} MtmNodeStatus;
15583

156-
typedef enum
157-
{
158-
REPLMODE_EXIT, /* receiver should exit */
159-
REPLMODE_RECOVERED, /* recovery of receiver node is completed so drop old slot and restart replication from the current position in WAL */
160-
REPLMODE_RECOVERY, /* perform recovery of the node by applying all data from the slot from specified point */
161-
REPLMODE_CREATE_NEW, /* destination node is recovered: drop old slot and restart from roveredLsn position */
162-
REPLMODE_OPEN_EXISTED /* normal mode: use existed slot or create new one and start receiving data from it from the remembered position */
163-
} MtmReplicationMode;
164-
16584
typedef struct
16685
{
16786
MtmMessageCode code; /* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
16887
MtmTxState state;
16988
int node; /* Sender node ID */
17089
TransactionId dxid; /* Transaction ID at destination node */
17190
TransactionId sxid; /* Transaction ID at sender node */
172-
XidStatus status; /* Transaction status */
17391
nodemask_t connectivityMask; /* Connectivity bitmask at the sender of message */
17492
pgid_t gid; /* Global transaction identifier */
17593
} MtmArbiterMessage;
@@ -223,28 +141,9 @@ typedef struct
223141
int nAllNodes; /* Total number of nodes */
224142
int recoveryCount; /* Number of completed recoveries */
225143
int donorNodeId; /* Cluster node from which this node was populated */
226-
lsn_t recoveredLSN; /* LSN at the moment of recovery completion */
227144
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
228145
} MtmState;
229146

230-
typedef struct MtmFlushPosition
231-
{
232-
dlist_node node;
233-
int node_id;
234-
lsn_t local_end;
235-
lsn_t remote_end;
236-
} MtmFlushPosition;
237-
238-
typedef struct
239-
{
240-
int magic;
241-
bool is_recovery;
242-
} MtmDecoderPrivate;
243-
244-
extern char const* const MtmNodeStatusMnem[];
245-
extern char const* const MtmTxnStatusMnem[];
246-
extern char const* const MtmMessageKindMnem[];
247-
248147
extern MtmState* Mtm;
249148

250149
extern int MtmReplicationNodeId;
@@ -260,8 +159,7 @@ extern bool MtmMajorNode;
260159
extern bool MtmBackgroundWorker;
261160
extern char* MtmRefereeConnStr;
262161

263-
extern void MtmXactCallback2(XactEvent event, void *arg);
264-
extern void MtmMonitorInitialize(void);
162+
extern void MtmXactCallback2(XactEvent event, void *arg);
265163
extern bool MtmIsUserTransaction(void);
266164
extern void MtmGenerateGid(char *gid, TransactionId xid);
267165
extern int MtmGidParseNodeId(const char* gid);
@@ -270,55 +168,23 @@ extern TransactionId MtmGidParseXid(const char* gid);
270168
extern void MtmWaitForExtensionCreation(void);
271169
extern void erase_option_from_connstr(const char *option, char *connstr);
272170

273-
extern void ResolverMain(void);
274-
extern void ResolverInit(void);
275-
extern void ResolveTransactionsForNode(int node_id);
276-
extern void ResolveAllTransactions(void);
277-
extern char *MtmTxStateMnem(MtmTxState state);
278-
279-
extern void MtmStartReceivers(void);
280-
extern void MtmStartReceiver(int nodeId, bool dynamic);
281-
282-
extern MtmReplicationMode MtmGetReplicationMode(int nodeId);
283-
extern void MtmExecutor(void* work, size_t size);
284-
285-
286-
287171
extern void MtmLock(LWLockMode mode);
288172
extern void MtmUnlock(void);
289173
extern void MtmLockNode(int nodeId, LWLockMode mode);
290174
extern bool MtmTryLockNode(int nodeId, LWLockMode mode);
291175
extern void MtmUnlockNode(int nodeId);
292176

293-
extern void MtmStopNode(int nodeId, bool dropSlot);
294-
extern void MtmRecoverNode(int nodeId);
295-
extern void MtmResumeNode(int nodeId);
296-
297177
extern void MtmSleep(timestamp_t interval);
298178

299179
extern void MtmSetCurrentTransactionGID(char const* gid, int node_id);
300180
extern void MtmSetCurrentTransactionCSN(void);
301181
extern TransactionId MtmGetCurrentTransactionId(void);
302182
extern void MtmResetTransaction(void);
303183

304-
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
305-
306-
extern void MtmHandleApplyError(void);
307-
308-
extern void MtmUpdateLsnMapping(int nodeId, lsn_t endLsn);
309-
extern lsn_t MtmGetFlushPosition(int nodeId);
310-
311-
312-
extern void MtmReleaseRecoverySlot(int nodeId);
313184
extern PGconn *PQconnectdb_safe(const char *conninfo, int timeout);
314-
extern void MtmBeginSession(int nodeId);
315-
extern void MtmEndSession(int nodeId, bool unlock);
316-
317-
extern bool MtmFilterTransaction(char* record, int size);
318185

319186
extern void MtmInitMessage(MtmArbiterMessage* msg, MtmMessageCode code);
320187

321-
extern void MtmRefereeInitialize(void);
322188
extern void MtmUpdateControlFile(void);
323189

324190
extern void MtmCheckSlots(void);

src/include/pglogical_output.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ typedef struct HookFuncName
4141
char schema[NAMEDATALEN];
4242
} HookFuncName;
4343

44+
typedef struct MtmDecoderPrivate
45+
{
46+
int magic;
47+
bool is_recovery;
48+
} MtmDecoderPrivate;
49+
4450
typedef struct PGLogicalOutputData
4551
{
4652
MemoryContext context;

src/include/receiver.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#include "multimaster.h"
2+
#include "mm.h"
3+
4+
extern void MtmStartReceivers(void);
5+
extern void MtmStartReceiver(int nodeId, bool dynamic);
6+
7+
extern void MtmExecutor(void* work, size_t size);
8+
extern void MtmUpdateLsnMapping(int node_id, lsn_t end_lsn);
9+
10+
extern void MtmBeginSession(int nodeId);
11+
extern void MtmEndSession(int nodeId, bool unlock);

src/include/resolver.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#ifndef RESOLVER_H
2+
#define RESOLVER_H
3+
4+
typedef enum
5+
{
6+
MtmTxUnknown = (1<<0),
7+
MtmTxNotFound = (1<<1),
8+
MtmTxInProgress = (1<<2),
9+
MtmTxPrepared = (1<<3),
10+
MtmTxPreCommited = (1<<4),
11+
MtmTxPreAborted = (1<<5),
12+
MtmTxCommited = (1<<6),
13+
MtmTxAborted = (1<<7)
14+
} MtmTxState;
15+
16+
typedef int MtmTxStateMask;
17+
18+
extern void ResolverMain(void);
19+
extern void ResolverInit(void);
20+
extern void ResolveTransactionsForNode(int node_id);
21+
extern void ResolveAllTransactions(void);
22+
extern char *MtmTxStateMnem(MtmTxState state);
23+
24+
#endif /* RESOLVER_H */

src/include/state.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ typedef enum
2121
MTM_NONRECOVERABLE_ERROR
2222
} MtmEvent;
2323

24+
extern char const* const MtmNodeStatusMnem[];
25+
26+
extern void MtmMonitorInitialize(void);
27+
2428
extern void MtmStateProcessNeighborEvent(int node_id, MtmNeighborEvent ev, bool locked);
2529
extern void MtmStateProcessEvent(MtmEvent ev, bool locked);
2630
extern void MtmDisableNode(int nodeId);

0 commit comments

Comments
 (0)