Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 629f58b

Browse files
knizhnikkelvich
authored andcommitted
Make it possible to add nodes to the cluster
1 parent b8f9bb7 commit 629f58b

File tree

6 files changed

+179
-120
lines changed

6 files changed

+179
-120
lines changed

arbiter.c

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -369,13 +369,13 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
369369

370370
static void MtmOpenConnections()
371371
{
372-
int nNodes = MtmNodes;
372+
int nNodes = MtmMaxNodes;
373373
int i;
374374

375375
sockets = (int*)palloc(sizeof(int)*nNodes);
376376

377377
for (i = 0; i < nNodes; i++) {
378-
if (i+1 != MtmNodeId) {
378+
if (i+1 != MtmNodeId && i < Mtm->nAllNodes) {
379379
sockets[i] = MtmConnectSocket(Mtm->nodes[i].con.hostName, MtmArbiterPort + i + 1, MtmConnectAttempts);
380380
if (sockets[i] < 0) {
381381
MtmOnNodeDisconnect(i+1);
@@ -384,8 +384,8 @@ static void MtmOpenConnections()
384384
sockets[i] = -1;
385385
}
386386
}
387-
if (Mtm->nNodes < MtmNodes/2+1) { /* no quorum */
388-
elog(WARNING, "Node is out of quorum: only %d nodes from %d are accssible", Mtm->nNodes, MtmNodes);
387+
if (Mtm->nLiveNodes < Mtm->nAllNodes/2+1) { /* no quorum */
388+
elog(WARNING, "Node is out of quorum: only %d nodes of %d are accessible", Mtm->nLiveNodes, Mtm->nAllNodes);
389389
MtmSwitchClusterMode(MTM_IN_MINORITY);
390390
} else if (Mtm->status == MTM_INITIALIZATION) {
391391
MtmSwitchClusterMode(MTM_CONNECTED);
@@ -444,7 +444,7 @@ static void MtmAcceptOneConnection()
444444
elog(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
445445
close(fd);
446446
} else{
447-
Assert(req.hdr.node > 0 && req.hdr.node <= MtmNodes && req.hdr.node != MtmNodeId);
447+
Assert(req.hdr.node > 0 && req.hdr.node <= Mtm->nAllNodes && req.hdr.node != MtmNodeId);
448448
resp.code = MSG_STATUS;
449449
resp.disabledNodeMask = Mtm->disabledNodeMask;
450450
resp.dxid = HANDSHAKE_MAGIC;
@@ -472,9 +472,10 @@ static void MtmAcceptIncomingConnections()
472472
struct sockaddr_in sock_inet;
473473
int on = 1;
474474
int i;
475+
int nNodes = MtmMaxNodes;
475476

476-
sockets = (int*)palloc(sizeof(int)*MtmNodes);
477-
for (i = 0; i < MtmNodes; i++) {
477+
sockets = (int*)palloc(sizeof(int)*nNodes);
478+
for (i = 0; i < nNodes; i++) {
478479
sockets[i] = -1;
479480
}
480481
sock_inet.sin_family = AF_INET;
@@ -490,7 +491,7 @@ static void MtmAcceptIncomingConnections()
490491
if (bind(gateway, (struct sockaddr*)&sock_inet, sizeof(sock_inet)) < 0) {
491492
elog(ERROR, "Arbiter failed to bind socket: %d", errno);
492493
}
493-
if (listen(gateway, MtmNodes) < 0) {
494+
if (listen(gateway, nNodes) < 0) {
494495
elog(ERROR, "Arbiter failed to listen socket: %d", errno);
495496
}
496497

@@ -527,22 +528,22 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
527528
{
528529
int i;
529530
int n = 1;
530-
for (i = 0; i < MtmNodes; i++)
531+
for (i = 0; i < Mtm->nAllNodes; i++)
531532
{
532533
if (!BIT_CHECK(Mtm->disabledNodeMask, i) && TransactionIdIsValid(ts->xids[i])) {
533534
Assert(i+1 != MtmNodeId);
534535
MtmAppendBuffer(txBuffer, ts->xids[i], i, ts);
535536
n += 1;
536537
}
537538
}
538-
Assert(n == Mtm->nNodes);
539+
Assert(n == Mtm->nLiveNodes);
539540
}
540541

541542

542543
static void MtmTransSender(Datum arg)
543544
{
544545
sigset_t sset;
545-
int nNodes = MtmNodes;
546+
int nNodes = MtmMaxNodes;
546547
int i;
547548
MtmBuffer* txBuffer = (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
548549

@@ -580,7 +581,7 @@ static void MtmTransSender(Datum arg)
580581

581582
MtmUnlock();
582583

583-
for (i = 0; i < nNodes; i++) {
584+
for (i = 0; i < Mtm->nAllNodes; i++) {
584585
if (txBuffer[i].used != 0) {
585586
MtmSendToNode(i, txBuffer[i].data, txBuffer[i].used*sizeof(MtmArbiterMessage));
586587
txBuffer[i].used = 0;
@@ -593,7 +594,7 @@ static void MtmTransSender(Datum arg)
593594
#if !USE_EPOLL
594595
static bool MtmRecovery()
595596
{
596-
int nNodes = MtmNodes;
597+
int nNodes = Mtm->nAllNodes;
597598
bool recovered = false;
598599
int i;
599600

@@ -618,7 +619,7 @@ static bool MtmRecovery()
618619
static void MtmTransReceiver(Datum arg)
619620
{
620621
sigset_t sset;
621-
int nNodes = MtmNodes;
622+
int nNodes = MtmMaxNodes;
622623
int nResponses;
623624
int i, j, n, rc;
624625
MtmBuffer* rxBuffer = (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
@@ -708,7 +709,7 @@ static void MtmTransReceiver(Datum arg)
708709
if (MtmIsCoordinator(ts)) {
709710
switch (msg->code) {
710711
case MSG_READY:
711-
Assert(ts->nVotes < Mtm->nNodes);
712+
Assert(ts->nVotes < Mtm->nLiveNodes);
712713
Mtm->nodes[msg->node-1].transDelay += MtmGetCurrentTime() - ts->csn;
713714
ts->xids[msg->node-1] = msg->sxid;
714715

@@ -720,7 +721,7 @@ static void MtmTransReceiver(Datum arg)
720721
MtmAbortTransaction(ts);
721722
}
722723

723-
if (++ts->nVotes == Mtm->nNodes) {
724+
if (++ts->nVotes == Mtm->nLiveNodes) {
724725
/* All nodes are finished their transactions */
725726
if (ts->status == TRANSACTION_STATUS_ABORTED) {
726727
MtmWakeUpBackend(ts);
@@ -736,23 +737,23 @@ static void MtmTransReceiver(Datum arg)
736737
}
737738
break;
738739
case MSG_ABORTED:
739-
Assert(ts->nVotes < Mtm->nNodes);
740+
Assert(ts->nVotes < Mtm->nLiveNodes);
740741
if (ts->status != TRANSACTION_STATUS_ABORTED) {
741742
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
742743
MtmAbortTransaction(ts);
743744
}
744-
if (++ts->nVotes == Mtm->nNodes) {
745+
if (++ts->nVotes == Mtm->nLiveNodes) {
745746
MtmWakeUpBackend(ts);
746747
}
747748
break;
748749
case MSG_PREPARED:
749750
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
750-
Assert(ts->nVotes < Mtm->nNodes);
751+
Assert(ts->nVotes < Mtm->nLiveNodes);
751752
if (msg->csn > ts->csn) {
752753
ts->csn = msg->csn;
753754
MtmSyncClock(ts->csn);
754755
}
755-
if (++ts->nVotes == Mtm->nNodes) {
756+
if (++ts->nVotes == Mtm->nLiveNodes) {
756757
ts->csn = MtmAssignCSN();
757758
ts->status = TRANSACTION_STATUS_UNKNOWN;
758759
MtmWakeUpBackend(ts);

bgwpool.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ typedef uint64 timestamp_t;
1212
#define MAX_DBNAME_LEN 30
1313
#define MULTIMASTER_BGW_RESTART_TIMEOUT 1 /* seconds */
1414

15+
extern timestamp_t MtmGetSystemTime(void); /* non-adjusted current system time */
16+
extern timestamp_t MtmGetCurrentTime(void); /* adjusted current system time */
17+
1518
typedef struct
1619
{
1720
BgwPoolExecutor executor;

multimaster--1.0.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS 'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16+
CREATE FUNCTION mtm.add_node(conn_str cstring) RETURNS void
17+
AS 'MODULE_PATHNAME','mtm_add_node'
18+
LANGUAGE C;
19+
1620
-- Create replication slot for the node which was previously dropped together with it's slot
1721
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
1822
AS 'MODULE_PATHNAME','mtm_recover_node'
@@ -30,7 +34,7 @@ CREATE FUNCTION mtm.get_nodes_state() RETURNS SETOF mtm.node_state
3034
AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3135
LANGUAGE C;
3236

33-
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "nNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer);
37+
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer);
3438

3539
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3640
AS 'MODULE_PATHNAME','mtm_get_cluster_state'

0 commit comments

Comments
 (0)