Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e0105f6

Browse files
knizhnikkelvich
authored andcommitted
Update Paxos interface
1 parent 9eb2b3d commit e0105f6

File tree

5 files changed

+53
-26
lines changed

5 files changed

+53
-26
lines changed

arbiter.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
297297

298298
/* Some node considered that I am dead, so switch to recovery mode */
299299
if (BIT_CHECK(msg.disabledNodeMask, MtmNodeId-1)) {
300-
MtmClusterSwitchMode(MTM_RECOVERY);
300+
MtmSwitchClusterMode(MTM_RECOVERY);
301301
}
302302
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */
303303
ds->disabledNodeMask |= msg.disabledNodeMask;
@@ -344,7 +344,7 @@ static void MtmOpenConnections()
344344
elog(WARNING, "Node is out of quorum: only %d nodes from %d are accssible", ds->nNodes, MtmNodes);
345345
ds->status = MTM_OFFLINE;
346346
} else if (ds->status == MTM_INITIALIZATION) {
347-
MtmClusterSwitchMode(MTM_CONNECTED);
347+
MtmSwitchClusterMode(MTM_CONNECTED);
348348
}
349349
}
350350

@@ -745,7 +745,7 @@ static void MtmTransReceiver(Datum arg)
745745
}
746746
if (n == 0 && ds->disabledNodeMask != 0) {
747747
/* If timeout is expired and there are didabled nodes, then recheck cluster's state */
748-
MtmUpdateClusterStatus();
748+
MtmRefreshClusterStatus(false);
749749
}
750750
}
751751
}

multimaster.c

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ static TransactionManager MtmTM = {
131131
MtmGetName
132132
};
133133

134-
static char const* const MtmNodeStatusMnem[] =
134+
char const* const MtmNodeStatusMnem[] =
135135
{
136136
"Intialization",
137137
"Offline",
@@ -602,7 +602,7 @@ static void MtmPrecommitTransaction(MtmCurrentTrans* x)
602602
x->xid = GetCurrentTransactionId();
603603

604604
if (dtm->disabledNodeMask != 0) {
605-
MtmUpdateClusterStatus();
605+
MtmRefreshClusterStatus(true);
606606
if (dtm->status != MTM_ONLINE) {
607607
elog(ERROR, "Abort current transaction because this cluster node is not online");
608608
}
@@ -1096,7 +1096,7 @@ _PG_fini(void)
10961096
*/
10971097

10981098

1099-
void MtmClusterSwitchMode(MtmNodeStatus mode)
1099+
void MtmSwitchClusterMode(MtmNodeStatus mode)
11001100
{
11011101
dtm->status = mode;
11021102
elog(WARNING, "Switch to %s mode", MtmNodeStatusMnem[mode]);
@@ -1121,7 +1121,8 @@ void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
11211121
Assert(dtm->status == MTM_RECOVERY);
11221122
} else if (dtm->status == MTM_RECOVERY) {
11231123
/* When recovery is completed we get normal transaction ID and switch to normal mode */
1124-
MtmClusterSwitchMode(MTM_ONLINE);
1124+
dtm->recoverySlot = 0;
1125+
MtmSwitchClusterMode(MTM_ONLINE);
11251126
}
11261127
dtmTx.gtid = *gtid;
11271128
dtmTx.xid = GetCurrentTransactionId();
@@ -1137,9 +1138,8 @@ void MtmReceiverStarted(int nodeId)
11371138
if (!BIT_CHECK(dtm->pglogicalNodeMask, nodeId-1)) {
11381139
BIT_SET(dtm->pglogicalNodeMask, nodeId-1);
11391140
if (++dtm->nReceivers == dtm->nNodes-1) {
1140-
elog(WARNING, "All receivers are started, switch to normal mode");
11411141
Assert(dtm->status == MTM_CONNECTED);
1142-
dtm->status = MTM_ONLINE;
1142+
MtmSwitchClusterMode(MTM_OFFLINE);
11431143
}
11441144
}
11451145
SpinLockRelease(&dtm->spinlock);
@@ -1632,14 +1632,14 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16321632

16331633
ByteBufferAlloc(&buf);
16341634
EnumerateLocks(MtmSerializeLock, &buf);
1635-
PaxosSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used);
1635+
PaxosSet(psprintf("lock-graph-%d", MtmNodeId), buf.data, buf.used, true);
16361636
MtmGraphInit(&graph);
16371637
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
16381638
ByteBufferFree(&buf);
16391639
for (i = 0; i < MtmNodes; i++) {
16401640
if (i+1 != MtmNodeId && !BIT_CHECK(dtm->disabledNodeMask, i)) {
16411641
int size;
1642-
void* data = PaxosGet(psprintf("lock-graph-%d", i+1), &size, NULL);
1642+
void* data = PaxosGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
16431643
if (data == NULL) {
16441644
hasDeadlock = true; /* Just temporary hack until no Paxos */
16451645
} else {
@@ -1655,12 +1655,12 @@ MtmDetectGlobalDeadLock(PGPROC* proc)
16551655
}
16561656

16571657
static void
1658-
MtmBuildConnectivityMatrix(nodemask_t* matrix)
1658+
MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
16591659
{
16601660
int i, j, n = MtmNodes;
16611661
for (i = 0; i < n; i++) {
16621662
if (i+1 != MtmNodeId) {
1663-
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL);
1663+
void* data = PaxosGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
16641664
matrix[i] = *(nodemask_t*)data;
16651665
} else {
16661666
matrix[i] = dtm->connectivityMask;
@@ -1678,14 +1678,14 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix)
16781678
* Build connectivity graph, find clique in it and extend disabledNodeMask by nodes not included in clique.
16791679
* This function returns false if current node is excluded from cluster, true otherwise
16801680
*/
1681-
void MtmUpdateClusterStatus(void)
1681+
void MtmRefreshClusterStatus(bool nowait)
16821682
{
16831683
nodemask_t mask, clique;
16841684
nodemask_t matrix[MAX_NODES];
16851685
int clique_size;
16861686
int i;
16871687

1688-
MtmBuildConnectivityMatrix(matrix);
1688+
MtmBuildConnectivityMatrix(matrix, nowait);
16891689

16901690
clique = MtmFindMaxClique(matrix, MtmNodes, &clique_size);
16911691
if (clique_size >= MtmNodes/2+1) { /* have quorum */
@@ -1708,11 +1708,11 @@ void MtmUpdateClusterStatus(void)
17081708
if (BIT_CHECK(dtm->disabledNodeMask, MtmNodeId-1)) {
17091709
if (dtm->status == MTM_ONLINE) {
17101710
/* I was excluded from cluster:( */
1711-
MtmClusterSwitchMode(MTM_OFFLINE);
1711+
MtmSwitchClusterMode(MTM_OFFLINE);
17121712
}
17131713
} else if (dtm->status == MTM_OFFLINE) {
17141714
/* Should we somehow restart logical receivers? */
1715-
MtmClusterSwitchMode(MTM_RECOVERY);
1715+
MtmSwitchClusterMode(MTM_RECOVERY);
17161716
}
17171717
} else {
17181718
elog(WARNING, "Clique %lx has no quorum", clique);
@@ -1722,30 +1722,30 @@ void MtmUpdateClusterStatus(void)
17221722
void MtmOnNodeDisconnect(int nodeId)
17231723
{
17241724
BIT_SET(dtm->connectivityMask, nodeId-1);
1725-
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &dtm->connectivityMask, sizeof dtm->connectivityMask);
1725+
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &dtm->connectivityMask, sizeof dtm->connectivityMask, false);
17261726

17271727
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
17281728
MtmSleep(MtmKeepaliveTimeout);
17291729

1730-
MtmUpdateClusterStatus();
1730+
MtmRefreshClusterStatus(false);
17311731
}
17321732

17331733
void MtmOnNodeConnect(int nodeId)
17341734
{
17351735
BIT_CLEAR(dtm->connectivityMask, nodeId-1);
1736-
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &dtm->connectivityMask, sizeof dtm->connectivityMask);
1736+
PaxosSet(psprintf("node-mask-%d", MtmNodeId), &dtm->connectivityMask, sizeof dtm->connectivityMask, false);
17371737
}
17381738

17391739
/*
17401740
* Paxos function stubs (until them are miplemented)
17411741
*/
1742-
void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts)
1742+
void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts, bool nowait)
17431743
{
17441744
if (size != NULL) {
17451745
*size = 0;
17461746
}
17471747
return NULL;
17481748
}
17491749

1750-
void PaxosSet(char const* key, void const* value, int size)
1750+
void PaxosSet(char const* key, void const* value, int size, bool nowait)
17511751
{}

multimaster.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ typedef struct
118118

119119
#define MtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
120120

121+
extern char const* const MtmNodeStatusMnem[];
122+
121123
extern char* MtmConnStrs;
122124
extern int MtmNodeId;
123125
extern int MtmNodes;
@@ -150,6 +152,6 @@ extern MtmState* MtmGetState(void);
150152
extern timestamp_t MtmGetCurrentTime(void);
151153
extern void MtmSleep(timestamp_t interval);
152154
extern bool MtmIsRecoveredNode(int nodeId);
153-
extern void MtmUpdateClusterStatus(void);
154-
extern void MtmClusterSwitchMode(MtmNodeStatus mode);
155+
extern void MtmRefreshClusterStatus(bool nowait);
156+
extern void MtmSwitchClusterMode(MtmNodeStatus mode);
155157
#endif

paxos.h

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,26 @@ typedef struct PaxosTimestamp {
77
uint32 psn; /* PAXOS serial number */
88
} PaxosTimestamp;
99

10-
extern void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts);
11-
extern void PaxosSet(char const* key, void const* value, int size);
10+
/*
11+
* Get key value.
12+
* Returns NULL if key doesn't exist.
13+
* Value should be copied into caller's private memory using palloc.
14+
* If "size" is not NULL, then it is assigned size of value.
15+
* If "ts" is not NULL, then it is assigned timestamp of last update of this value
16+
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
17+
*/
18+
extern void* PaxosGet(char const* key, int* size, PaxosTimestamp* ts, bool nowait);
1219

20+
/*
21+
* Set new value for the specified key. IF value is NULL, then key should be deleted.
22+
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
23+
*/
24+
extern void PaxosSet(char const* key, void const* value, int size, bool nowait);
25+
26+
/*
27+
* If key doesn't exists or its value is not equal to the specified value then store this value and return true.
28+
* Otherwise do nothing and return false.
29+
* If RAFT master is not accessible, then depending non value of "nowait" parameter, this funciton should either block until RAFT quorum is reached, either report error.
30+
*/
31+
extern bool PaxosCAS(char const* key, char const* value, bool nowait);
1332
#endif

pglogical_receiver.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,12 @@ pglogical_receiver_main(Datum main_arg)
332332
if (rc & WL_POSTMASTER_DEATH)
333333
proc_exit(1);
334334

335+
if (ds->status != MTM_ONLINE && (ds->status != MTM_RECOVERY || ds->recoverySlot != args->remote_node)) {
336+
ereport(LOG, (errmsg("%s: terminating WAL receiver because node is switched to %s mode", worker_proc, MtmNodeStatusMnem[ds->status])));
337+
proc_exit(0);
338+
}
339+
340+
335341
/* Some cleanup */
336342
if (copybuf != NULL)
337343
{

0 commit comments

Comments
 (0)