Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8705f90

Browse files
committed
dmq integration: add resolver worker
1 parent 629cdfd commit 8705f90

File tree

10 files changed

+672
-63
lines changed

10 files changed

+672
-63
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ EXTENSION = multimaster
33
DATA = multimaster--1.0.sql
44
OBJS = multimaster.o dmq.o commit.o bytebuf.o bgwpool.o pglogical_output.o \
55
pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o \
6-
pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o state.o
6+
pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o state.o resolver.o
77
MODULE_big = multimaster
88

99
PG_CPPFLAGS = -I$(libpq_srcdir)

commit.c

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
static Oid MtmDatabaseId;
99
static bool DmqSubscribed;
10-
static int sender_to_node[64];
10+
static int sender_to_node[MTM_MAX_NODES];
1111

1212
MtmCurrentTrans MtmTx;
1313

@@ -97,17 +97,16 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
9797

9898
if (!DmqSubscribed)
9999
{
100-
int i, j;
100+
int i, sender_id = 0;
101101

102-
for (j = 0, i = 0; i < Mtm->nAllNodes; i++)
102+
for (i = 0; i < Mtm->nAllNodes; i++)
103103
{
104-
if (i + 1 == MtmNodeId)
105-
continue;
106-
107-
dmq_stream_subscribe(psprintf("node%d", i + 1),
104+
if (i + 1 != MtmNodeId)
105+
{
106+
dmq_stream_subscribe(psprintf("node%d", i + 1),
108107
psprintf("be%d", MyProc->pgprocno));
109-
110-
sender_to_node[j++] = i + 1;
108+
sender_to_node[sender_id++] = i + 1;
109+
}
111110
}
112111
DmqSubscribed = true;
113112
}

dmq.c

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
#include "tcop/tcopprot.h"
4040
#include "utils/dynahash.h"
4141

42-
#define DMQ_MQ_SIZE ((Size)1024)
42+
#define DMQ_MQ_SIZE ((Size) 65536)
4343
#define DMQ_MQ_MAGIC 0x646d71
4444

4545
/*
@@ -110,7 +110,8 @@ static DmqBackendState dmq_local;
110110

111111
static shmem_startup_hook_type PreviousShmemStartupHook;
112112

113-
dmq_receiver_start_hook_type dmq_receiver_start_hook;
113+
dmq_receiver_hook_type dmq_receiver_start_hook;
114+
dmq_receiver_hook_type dmq_receiver_stop_hook;
114115

115116
void dmq_sender_main(Datum main_arg);
116117

@@ -231,11 +232,11 @@ dmq_init(void)
231232
worker.bgw_main_arg = 0;
232233
sprintf(worker.bgw_library_name, "multimaster");
233234
sprintf(worker.bgw_function_name, "dmq_sender_main");
234-
snprintf(worker.bgw_name, BGW_MAXLEN, "dmq_sender");
235-
snprintf(worker.bgw_type, BGW_MAXLEN, "dmq_sender");
235+
snprintf(worker.bgw_name, BGW_MAXLEN, "mtm-dmq-sender");
236+
snprintf(worker.bgw_type, BGW_MAXLEN, "mtm-dmq-sender");
236237
RegisterBackgroundWorker(&worker);
237238

238-
/* Register shmem hook for all backends */
239+
/* Register shmem hooks */
239240
PreviousShmemStartupHook = shmem_startup_hook;
240241
shmem_startup_hook = dmq_shmem_startup_hook;
241242

@@ -838,12 +839,15 @@ _pq_getbyte_if_available(unsigned char *c)
838839
static void
839840
dmq_receiver_at_exit(int status, Datum sender)
840841
{
841-
int sender_id = DatumGetInt32(sender);
842+
int sender_id = DatumGetInt32(sender);
843+
char sender_name[DMQ_NAME_MAXLEN];
842844

843845
LWLockAcquire(dmq_state->lock, LW_EXCLUSIVE);
844-
dmq_state->receiver[DatumGetInt32(sender_id)][0] = '\0';
846+
strncmp(sender_name, dmq_state->receiver[sender_id], DMQ_NAME_MAXLEN);
847+
dmq_state->receiver[sender_id][0] = '\0';
845848
LWLockRelease(dmq_state->lock);
846849

850+
dmq_receiver_stop_hook(sender_name);
847851
}
848852

849853

@@ -1162,6 +1166,36 @@ dmq_pop(DmqSenderId *sender_id, StringInfo msg)
11621166
}
11631167
}
11641168

1169+
bool
1170+
dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg)
1171+
{
1172+
shm_mq_result res;
1173+
int i;
1174+
1175+
for (i = 0; i < dmq_local.n_inhandles; i++)
1176+
{
1177+
Size len;
1178+
void *data;
1179+
1180+
res = shm_mq_receive(dmq_local.mq_inh[i], &len, &data, true);
1181+
if (res == SHM_MQ_SUCCESS)
1182+
{
1183+
msg->data = data;
1184+
msg->len = len;
1185+
msg->maxlen = len;
1186+
msg->cursor = 0;
1187+
*sender_id = i;
1188+
// elog(LOG, "pop message %s, from %d", (char *) *dataptr, i);
1189+
return true;
1190+
}
1191+
else if (res == SHM_MQ_DETACHED)
1192+
{
1193+
elog(ERROR, "dmq_pop: queue detached");
1194+
}
1195+
}
1196+
1197+
return false;
1198+
}
11651199

11661200
DmqDestinationId
11671201
dmq_destination_add(char *connstr, char *sender_name, int ping_period)

dmq.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@ extern DmqDestinationId dmq_destination_add(char *connstr, char *sender_name, in
1414
extern void dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg);
1515
extern void dmq_stream_subscribe(char *sender_name, char *stream_name);
1616
extern void dmq_pop(DmqSenderId *sender_id, StringInfo msg);
17+
extern bool dmq_pop_nb(DmqSenderId *sender_id, StringInfo msg);
1718
extern void dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *buffer, size_t len);
1819

19-
typedef void (*dmq_receiver_start_hook_type) (char *);
20-
extern dmq_receiver_start_hook_type dmq_receiver_start_hook;
20+
typedef void (*dmq_receiver_hook_type) (char *);
21+
extern dmq_receiver_hook_type dmq_receiver_start_hook;
22+
extern dmq_receiver_hook_type dmq_receiver_stop_hook;
2123

2224
#endif

multimaster.c

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
161161
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError, int forceOnNode);
162162
static void MtmProcessDDLCommand(char const* queryString, bool transactional);
163163

164-
static int MtmGidParseNodeId(const char* gid);
164+
// static int MtmGidParseNodeId(const char* gid);
165165

166166
// static void MtmLockCluster(void);
167167
// static void MtmUnlockCluster(void);
@@ -214,6 +214,18 @@ char const* const MtmTxnStatusMnem[] =
214214
"Unknown"
215215
};
216216

217+
char const* const MtmTxStateMnem[] =
218+
{
219+
"MtmTxUnknown",
220+
"MtmTxNotFound",
221+
"MtmTxInProgress",
222+
"MtmTxPrepared",
223+
"MtmTxPreCommited",
224+
"MtmTxPreAborted",
225+
"MtmTxCommited",
226+
"MtmTxAborted"
227+
};
228+
217229
bool MtmDoReplication;
218230
char* MtmDatabaseName;
219231
char* MtmDatabaseUser;
@@ -1013,7 +1025,7 @@ bool MtmWatchdog(timestamp_t now)
10131025
{
10141026
MTM_LOG1("[STATE] Node %i: Disconnect due to heartbeat timeout (%d msec)",
10151027
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
1016-
MtmOnNodeDisconnect(i+1);
1028+
// MtmOnNodeDisconnect(i+1);
10171029
allAlive = false;
10181030
}
10191031
}
@@ -2400,7 +2412,7 @@ static void MtmInitialize()
24002412
Mtm->nLiveNodes = 0; //MtmNodes;
24012413
Mtm->nAllNodes = MtmNodes;
24022414
Mtm->disabledNodeMask = (((nodemask_t)1 << MtmNodes) - 1);
2403-
Mtm->clique = 0;
2415+
Mtm->clique = (((nodemask_t)1 << Mtm->nAllNodes) - 1); //0;
24042416
Mtm->refereeGrant = false;
24052417
Mtm->refereeWinnerId = 0;
24062418
Mtm->stalledNodeMask = 0;
@@ -2802,14 +2814,6 @@ static bool ConfigIsSane(void)
28022814
return ok;
28032815
}
28042816

2805-
static void
2806-
MtmDmqStartup(char *sender)
2807-
{
2808-
int node_id;
2809-
sscanf(sender, "node%d", &node_id);
2810-
MtmOnNodeConnect(node_id);
2811-
}
2812-
28132817
void
28142818
_PG_init(void)
28152819
{
@@ -3273,7 +3277,10 @@ _PG_init(void)
32733277
MtmMonitorInitialize();
32743278

32753279
dmq_init();
3276-
dmq_receiver_start_hook = MtmDmqStartup;
3280+
dmq_receiver_start_hook = MtmOnNodeConnect;
3281+
dmq_receiver_stop_hook = MtmOnNodeDisconnect;
3282+
3283+
ResolverInit();
32773284

32783285
/*
32793286
* Install hooks.
@@ -3422,7 +3429,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
34223429
{
34233430
/* Lock on us */
34243431
Mtm->recoverySlot = nodeId;
3425-
MtmPollStatusOfPreparedTransactions(false);
3432+
// MtmPollStatusOfPreparedTransactions(false);
3433+
ResolveAllTransactions();
34263434
MtmUnlock();
34273435
return REPLMODE_RECOVERY;
34283436
}
@@ -4651,7 +4659,7 @@ MtmGenerateGid(char* gid)
46514659
// MTM_LOG1("MtmGenerateGid: %s", gid);
46524660
}
46534661

4654-
static int
4662+
int
46554663
MtmGidParseNodeId(const char* gid)
46564664
{
46574665
int MtmNodeId;

multimaster.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
8080
#define MULTIMASTER_ADMIN "mtm_admin"
8181
#define MULTIMASTER_PRECOMMITTED "precommitted"
82+
#define MULTIMASTER_PREABORTED "preaborted"
8283

8384
#define MULTIMASTER_DEFAULT_ARBITER_PORT 5433
8485

@@ -107,9 +108,28 @@ typedef ulong64 lsn_t;
107108

108109
typedef char pgid_t[GIDSIZE];
109110

111+
// XXX! need rename: that's actually a disconnectivity mask
110112
#define SELF_CONNECTIVITY_MASK (Mtm->nodes[MtmNodeId-1].connectivityMask)
111113
#define EFFECTIVE_CONNECTIVITY_MASK ( SELF_CONNECTIVITY_MASK | Mtm->stoppedNodeMask | ~Mtm->clique )
112114

115+
#define MTM_MAX_NODES 16
116+
117+
typedef enum
118+
{
119+
MtmTxUnknown = (0<<0),
120+
MtmTxNotFound = (0<<1),
121+
MtmTxInProgress = (0<<2),
122+
MtmTxPrepared = (0<<3),
123+
MtmTxPreCommited = (0<<4),
124+
MtmTxPreAborted = (0<<5),
125+
MtmTxCommited = (0<<6),
126+
MtmTxAborted = (0<<7)
127+
} MtmTxState;
128+
129+
130+
131+
typedef int MtmTxStateMask;
132+
113133
typedef enum
114134
{
115135
PGLOGICAL_COMMIT,
@@ -164,6 +184,7 @@ typedef enum
164184
typedef struct
165185
{
166186
MtmMessageCode code; /* Message code: MSG_PREPARE, MSG_PRECOMMIT, MSG_COMMIT, MSG_ABORT,... */
187+
MtmTxState state;
167188
int node; /* Sender node ID */
168189
bool lockReq;/* Whether sender node needs to lock cluster to let wal-sender caught-up and complete recovery */
169190
bool locked; /* Whether sender node is locked */
@@ -387,6 +408,7 @@ typedef struct {
387408

388409
extern char const* const MtmNodeStatusMnem[];
389410
extern char const* const MtmTxnStatusMnem[];
411+
extern char const* const MtmTxStateMnem[];
390412
extern char const* const MtmMessageKindMnem[];
391413

392414
extern MtmState* Mtm;
@@ -429,6 +451,12 @@ extern void MtmMonitorInitialize(void);
429451
extern bool MtmTwoPhaseCommit(MtmCurrentTrans* x);
430452
extern bool MtmIsUserTransaction(void);
431453
extern void MtmGenerateGid(char* gid);
454+
extern int MtmGidParseNodeId(const char* gid);
455+
456+
extern void ResolverMain(void);
457+
extern void ResolverInit(void);
458+
extern void ResolveTransactionsForNode(int node_id);
459+
extern void ResolveAllTransactions(void);
432460

433461
extern void MtmStartReceivers(void);
434462
extern void MtmStartReceiver(int nodeId, bool dynamic);

0 commit comments

Comments
 (0)