Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6fd8584

Browse files
knizhnikkelvich
authored andcommitted
Fix configuration problem
1 parent 5fd6066 commit 6fd8584

File tree

4 files changed

+58
-34
lines changed

4 files changed

+58
-34
lines changed

arbiter.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
282282
req.hdr.sxid = ShmemVariableCache->nextXid;
283283
req.hdr.csn = MtmGetCurrentTime();
284284
req.hdr.disabledNodeMask = Mtm->disabledNodeMask;
285-
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].connStr);
285+
strcpy(req.connStr, Mtm->nodes[MtmNodeId-1].con.connStr);
286286
if (!MtmWriteSocket(sd, &req, sizeof req)) {
287287
elog(WARNING, "Arbiter failed to send handshake message to %s:%d: %d", host, port, errno);
288288
close(sd);
@@ -321,7 +321,7 @@ static void MtmOpenConnections()
321321

322322
for (i = 0; i < nNodes; i++) {
323323
if (i+1 != MtmNodeId) {
324-
sockets[i] = MtmConnectSocket(Mtm->nodes[i].hostName, MtmArbiterPort + i + 1, MtmConnectAttempts);
324+
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, MtmArbiterPort + i + 1, MtmConnectAttempts);
325325
if (sockets[i] < 0) {
326326
MtmOnNodeDisconnect(i+1);
327327
}
@@ -345,7 +345,7 @@ static bool MtmSendToNode(int node, void const* buf, int size)
345345
if (sockets[node] >= 0) {
346346
close(sockets[node]);
347347
}
348-
sockets[node] = MtmConnectSocket(Mtm->nodes[node].hostName, MtmArbiterPort + node + 1, MtmReconnectAttempts);
348+
sockets[node] = MtmConnectSocket(Mtm->nodes[node].con.hostName, MtmArbiterPort + node + 1, MtmReconnectAttempts);
349349
if (sockets[node] < 0) {
350350
MtmOnNodeDisconnect(node+1);
351351
return false;
@@ -385,7 +385,7 @@ static void MtmAcceptOneConnection()
385385
resp.dxid = HANDSHAKE_MAGIC;
386386
resp.sxid = ShmemVariableCache->nextXid;
387387
resp.csn = MtmGetCurrentTime();
388-
MtmUpdateNodeConnStr(req.hdr.node, req.connStr);
388+
MtmUpdateNodeConnectionInfo(&Mtm->nodes[req.hdr.node-1].con, req.connStr);
389389
if (!MtmWriteSocket(fd, &resp, sizeof resp)) {
390390
elog(WARNING, "Arbiter failed to write response for handshake message to node %d", resp.node);
391391
close(fd);

multimaster.c

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ int MtmConnectAttempts;
166166
int MtmConnectTimeout;
167167
int MtmKeepaliveTimeout;
168168
int MtmReconnectAttempts;
169+
MtmConnectionInfo* MtmConnections;
169170

170171
static char* MtmConnStrs;
171172
static int MtmQueueSize;
@@ -420,8 +421,13 @@ MtmAdjustOldestXid(TransactionId xid)
420421
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
421422
}
422423
}
423-
for (ts = Mtm->transListHead; ts != NULL && ts->csn < oldestSnapshot; prev = ts, ts = ts->next) {
424-
Assert(ts->status == TRANSACTION_STATUS_COMMITTED || ts->status == TRANSACTION_STATUS_ABORTED || ts->status == TRANSACTION_STATUS_IN_PROGRESS);
424+
for (ts = Mtm->transListHead;
425+
ts != NULL
426+
&& ts->csn < oldestSnapshot
427+
&& (ts->status == TRANSACTION_STATUS_COMMITTED || ts->status == TRANSACTION_STATUS_ABORTED)
428+
&& TransactionIdPrecedes(ts->xid, xid);
429+
prev = ts, ts = ts->next)
430+
{
425431
if (prev != NULL) {
426432
/* Remove information about too old transactions */
427433
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
@@ -989,7 +995,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
989995
for (i = 0; i < n; i++) {
990996
if (i+1 != MtmNodeId) {
991997
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
992-
matrix[i] = *(nodemask_t*)data;
998+
matrix[i] = data ? *(nodemask_t*)data : 0;
993999
} else {
9941000
matrix[i] = Mtm->connectivityMask;
9951001
}
@@ -1153,6 +1159,7 @@ static void MtmInitialize()
11531159
for (i = 0; i < MtmNodes; i++) {
11541160
Mtm->nodes[i].oldestSnapshot = 0;
11551161
Mtm->nodes[i].transDelay = 0;
1162+
Mtm->nodes[i].con = MtmConnections[i];
11561163
}
11571164
PGSemaphoreCreate(&Mtm->votingSemaphore);
11581165
PGSemaphoreReset(&Mtm->votingSemaphore);
@@ -1178,17 +1185,17 @@ MtmShmemStartup(void)
11781185
MtmInitialize();
11791186
}
11801187

1181-
void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
1188+
void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr)
11821189
{
11831190
char const* host;
11841191
char const* end;
11851192
int hostLen;
11861193

11871194
if (strlen(connStr) >= MULTIMASTER_MAX_CONN_STR_SIZE) {
1188-
elog(ERROR, "Too long (%d) connection string '%s' for node %d, limit is %d",
1189-
(int)strlen(connStr), connStr, nodeId, MULTIMASTER_MAX_CONN_STR_SIZE-1);
1195+
elog(ERROR, "Too long (%d) connection string '%s': limit is %d",
1196+
(int)strlen(connStr), connStr, MULTIMASTER_MAX_CONN_STR_SIZE-1);
11901197
}
1191-
strcpy(Mtm->nodes[nodeId-1].connStr, connStr);
1198+
strcpy(conn->connStr, connStr);
11921199

11931200
host = strstr(connStr, "host=");
11941201
if (host == NULL) {
@@ -1198,30 +1205,46 @@ void MtmUpdateNodeConnStr(int nodeId, char const* connStr)
11981205
for (end = host; *end != ' ' && *end != '\0'; end++);
11991206
hostLen = end - host;
12001207
if (hostLen >= MULTIMASTER_MAX_HOST_NAME_SIZE) {
1201-
elog(ERROR, "Too long (%d) host name '%.*s' for node %d, limit is %d",
1202-
hostLen, hostLen, host, nodeId, MULTIMASTER_MAX_HOST_NAME_SIZE-1);
1208+
elog(ERROR, "Too long (%d) host name '%.*s': limit is %d",
1209+
hostLen, hostLen, host, MULTIMASTER_MAX_HOST_NAME_SIZE-1);
12031210
}
1204-
memcpy(Mtm->nodes[nodeId-1].hostName, host, hostLen);
1205-
Mtm->nodes[nodeId-1].hostName[hostLen] = '\0';
1211+
memcpy(conn->hostName, host, hostLen);
1212+
conn->hostName[hostLen] = '\0';
12061213
}
12071214

12081215
static void MtmSplitConnStrs(void)
12091216
{
12101217
int i;
1211-
char* copy = strdup(MtmConnStrs);
1218+
char* copy = pstrdup(MtmConnStrs);
12121219
char* connStr = copy;
12131220
char* connStrEnd = connStr + strlen(connStr);
12141221

1222+
for (i = 0; connStr < connStrEnd; i++) {
1223+
char* p = strchr(connStr, ',');
1224+
if (p == NULL) {
1225+
p = connStrEnd;
1226+
}
1227+
connStr = p + 1;
1228+
}
1229+
if (i > MAX_NODES) {
1230+
elog(ERROR, "Multimaster with more than %d nodes is not currently supported", MAX_NODES);
1231+
}
1232+
if (i < 2) {
1233+
elog(ERROR, "Multimaster should have at least two nodes");
1234+
}
1235+
MtmNodes = i;
1236+
MtmConnections = (MtmConnectionInfo*)palloc(i*sizeof(MtmConnectionInfo));
1237+
connStr = copy;
1238+
12151239
for (i = 0; connStr < connStrEnd; i++) {
12161240
char* p = strchr(connStr, ',');
12171241
if (p == NULL) {
12181242
p = connStrEnd;
12191243
}
1220-
if (i == MAX_NODES) {
1221-
elog(ERROR, "Multimaster with more than %d nodes is not currently supported", MAX_NODES);
1222-
}
12231244
*p = '\0';
1224-
MtmUpdateNodeConnStr(i+1, connStr);
1245+
1246+
MtmUpdateNodeConnectionInfo(&MtmConnections[i], connStr);
1247+
12251248
if (i+1 == MtmNodeId) {
12261249
char* dbName = strstr(connStr, "dbname=");
12271250
char* end;
@@ -1232,20 +1255,13 @@ static void MtmSplitConnStrs(void)
12321255
dbName += 7;
12331256
for (end = dbName; *end != ' ' && *end != '\0'; end++);
12341257
len = end - dbName;
1235-
MtmDatabaseName = (char*)malloc(len + 1);
1258+
MtmDatabaseName = (char*)palloc(len + 1);
12361259
memcpy(MtmDatabaseName, dbName, len);
12371260
MtmDatabaseName[len] = '\0';
12381261
}
12391262
connStr = p + 1;
12401263
}
1241-
free(copy);
1242-
if (i < 2) {
1243-
elog(ERROR, "Multimaster should have at least two nodes");
1244-
}
1245-
MtmNodes = i;
1246-
if (MtmNodeId > MtmNodes) {
1247-
elog(ERROR, "Invalid node id %d for specified nubmer of nodes %d", MtmNodeId, MtmNodes);
1248-
}
1264+
pfree(copy);
12491265
}
12501266

12511267
void

multimaster.h

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,16 @@ typedef enum
8181

8282
typedef struct
8383
{
84+
char hostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
85+
char connStr[MULTIMASTER_MAX_CONN_STR_SIZE];
86+
} MtmConnectionInfo;
87+
88+
89+
typedef struct
90+
{
91+
MtmConnectionInfo con;
8492
time_t transDelay;
8593
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
86-
char hostName[MULTIMASTER_MAX_HOST_NAME_SIZE];
87-
char connStr[MULTIMASTER_MAX_CONN_STR_SIZE];
8894
} MtmNodeInfo;
8995

9096
typedef struct MtmTransState
@@ -152,6 +158,8 @@ extern int MtmReconnectAttempts;
152158
extern int MtmKeepaliveTimeout;
153159
extern HTAB* MtmXid2State;
154160

161+
extern MtmConnectionInfo* MtmConnections;
162+
155163
extern void MtmArbiterInitialize(void);
156164
extern void MtmStartReceivers(void);
157165
extern csn_t MtmTransactionSnapshot(TransactionId xid);
@@ -183,6 +191,6 @@ extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);
183191
extern bool MtmIsRecoveredNode(int nodeId);
184192
extern void MtmRefreshClusterStatus(bool nowait);
185193
extern void MtmSwitchClusterMode(MtmNodeStatus mode);
186-
extern void MtmUpdateNodeConnStr(int nodeId, char const* connStr);
194+
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
187195

188196
#endif

pglogical_receiver.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -578,8 +578,8 @@ void MtmStartReceivers(void)
578578

579579
for (i = 0; i < MtmNodes; i++) {
580580
if (i+1 != MtmNodeId) {
581-
ReceiverArgs* ctx = (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
582-
ctx->receiver_conn_string = psprintf("replication=database %s", Mtm->nodes[i].connStr);
581+
ReceiverArgs* ctx = (ReceiverArgs*)palloc(sizeof(ReceiverArgs));
582+
ctx->receiver_conn_string = psprintf("replication=database %s", MtmConnections[i].connStr);
583583
sprintf(ctx->receiver_slot, MULTIMASTER_SLOT_PATTERN, MtmNodeId);
584584
ctx->local_node = MtmNodeId;
585585
ctx->remote_node = i+1;

0 commit comments

Comments
 (0)