Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 06a9513

Browse files
knizhnikkelvich
authored andcommitted
Introduce COMMITTED state
1 parent ed3caee commit 06a9513

File tree

4 files changed

+29
-36
lines changed

4 files changed

+29
-36
lines changed

arbiter.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,9 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
243243
if (rc == 1) {
244244
while ((rc = send(sd, src, size, 0)) < 0 && errno == EINTR);
245245
if (rc < 0) {
246+
if (errno == EINPROGRESS) {
247+
continue;
248+
}
246249
return false;
247250
}
248251
size -= rc;
@@ -258,7 +261,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
258261
{
259262
int rc;
260263
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
261-
if (rc <= 0 && errno == EAGAIN) {
264+
if (rc <= 0 && (errno == EAGAIN || errno == EINPROGRESS)) {
262265
rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout);
263266
if (rc == 1) {
264267
while ((rc = recv(sd, buf, buf_size, 0)) < 0 && errno == EINTR);
@@ -328,6 +331,7 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
328331
if (BIT_CHECK(resp->disabledNodeMask, MtmNodeId-1)
329332
&& !BIT_CHECK(Mtm->disabledNodeMask, resp->node-1)
330333
&& Mtm->status != MTM_RECOVERY
334+
&& Mtm->status != MTM_RECOVERED
331335
&& Mtm->nodes[MtmNodeId-1].lastStatusChangeTime + MSEC_TO_USEC(MtmNodeDisableDelay) < MtmGetSystemTime())
332336
{
333337
elog(WARNING, "Node %d thinks that I am dead, while I am %s (message %s)", resp->node, MtmNodeStatusMnem[Mtm->status], MtmMessageKindMnem[resp->code]);

multimaster.c

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ char const* const MtmNodeStatusMnem[] =
201201
"Connected",
202202
"Online",
203203
"Recovery",
204+
"Recovered",
204205
"InMinor",
205206
"OutOfService"
206207
};
@@ -831,7 +832,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
831832
* Allow applying of replicated transactions to avoid deadlock (to caught-up we need active transaction counter to become zero).
832833
* Also allow user to complete explicit 2PC transactions.
833834
*/
834-
if (x->isDistributed && !x->isReplicated && x->isTwoPhase && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
835+
if (x->isDistributed && !x->isReplicated && !x->isTwoPhase && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
835836
MtmCheckClusterLock();
836837
}
837838

@@ -1872,7 +1873,7 @@ void MtmRecoveryCompleted(void)
18721873
BIT_SET(Mtm->nodeLockerMask, MtmNodeId-1); /* it is trik: this mask was originally use by WAL senders performing recovery, but here we are in opposite (recovered) side:
18731874
* if this mask is not zero loadReq will be broadcasted to all other nodes by heartbeat, suspending their activity
18741875
*/
1875-
MtmSwitchClusterMode(MTM_CONNECTED);
1876+
MtmSwitchClusterMode(MTM_RECOVERED);
18761877
}
18771878
MtmUnlock();
18781879
}
@@ -2030,35 +2031,17 @@ MtmCheckClusterLock()
20302031
timestamp_t delay = MIN_WAIT_TIMEOUT;
20312032
while (true)
20322033
{
2033-
nodemask_t mask = Mtm->walSenderLockerMask;
2034-
if (Mtm->globalLockerMask | mask) {
2035-
if (Mtm->nActiveTransactions == 0) {
2036-
lsn_t currLogPos = GetXLogInsertRecPtr();
2037-
int i;
2038-
for (i = 0; mask != 0; i++, mask >>= 1) {
2039-
if (mask & 1) {
2040-
if (WalSndCtl->walsnds[i].sentPtr != currLogPos) {
2041-
/* recovery is in progress */
2042-
break;
2043-
} else {
2044-
/* recovered replica caught up with master */
2045-
MTM_LOG1("WAL-sender %d complete recovery", i);
2046-
BIT_CLEAR(Mtm->walSenderLockerMask, i);
2047-
}
2048-
}
2049-
}
2034+
if (Mtm->globalLockerMask | Mtm->walSenderLockerMask) {
2035+
/* some "almost cautch-up" wal-senders are still working. */
2036+
/* Do not start new transactions until them are completed. */
2037+
MtmUnlock();
2038+
MtmSleep(delay);
2039+
if (delay*2 <= MAX_WAIT_TIMEOUT) {
2040+
delay *= 2;
20502041
}
2051-
if (Mtm->globalLockerMask | mask) {
2052-
/* some "almost cautch-up" wal-senders are still working. */
2053-
/* Do not start new transactions until them are completed. */
2054-
MtmUnlock();
2055-
MtmSleep(delay);
2056-
if (delay*2 <= MAX_WAIT_TIMEOUT) {
2057-
delay *= 2;
2058-
}
2059-
MtmLock(LW_EXCLUSIVE);
2060-
continue;
2061-
} else {
2042+
MtmLock(LW_EXCLUSIVE);
2043+
} else {
2044+
if (Mtm->nodeLockerMask != 0) {
20622045
/* All lockers have synchronized their logs */
20632046
/* Remove lock and mark them as recovered */
20642047
MTM_LOG1("Complete recovery of %d nodes (node mask %llx)", Mtm->nLockers, Mtm->nodeLockerMask);
@@ -2071,8 +2054,8 @@ MtmCheckClusterLock()
20712054
Mtm->nodeLockerMask = 0;
20722055
MtmCheckQuorum();
20732056
}
2057+
break;
20742058
}
2075-
break;
20762059
}
20772060
}
20782061

@@ -3230,7 +3213,9 @@ void MtmReceiverStarted(int nodeId)
32303213
MtmCheckQuorum();
32313214
}
32323215
elog(LOG, "Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
3233-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3216+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1
3217+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
3218+
{
32343219
BIT_CLEAR(Mtm->nodeLockerMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
32353220
MtmSwitchClusterMode(MTM_ONLINE);
32363221
}
@@ -3332,7 +3317,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33323317
Mtm->preparedTransactionsLoaded = true;
33333318
}
33343319

3335-
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
3320+
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_RECOVERED && Mtm->status != MTM_ONLINE)
3321+
|| BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
33363322
{
33373323
if (*shutdown)
33383324
{
@@ -3541,7 +3527,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
35413527
if (!BIT_CHECK(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1)) {
35423528
elog(LOG, "Start %d senders and %d receivers from %d cluster status %s", Mtm->nSenders+1, Mtm->nReceivers, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
35433529
BIT_SET(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1);
3544-
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3530+
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1
3531+
&& (Mtm->status == MTM_RECOVERED || Mtm->status == MTM_CONNECTED))
3532+
{
35453533
/* All logical replication connections from and to this node are established, so we can switch cluster to online mode */
35463534
BIT_CLEAR(Mtm->nodeLockerMask, MtmNodeId-1); /* recovery is completed: release cluster lock */
35473535
MtmSwitchClusterMode(MTM_ONLINE);

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ typedef enum
133133
MTM_CONNECTED, /* Arbiter is established connections with other nodes */
134134
MTM_ONLINE, /* Ready to receive client's queries */
135135
MTM_RECOVERY, /* Node is in recovery process */
136+
MTM_RECOVERED, /* Node is recovered by is not yet switched to ONLINE because not all sender/receivers are restarted */
136137
MTM_IN_MINORITY, /* Node is out of quorum */
137138
MTM_OUT_OF_SERVICE /* Node is not avaiable to to critical, non-recoverable error */
138139
} MtmNodeStatus;

pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ pglogical_receiver_main(Datum main_arg)
305305
timestamp_t start = MtmGetSystemTime();
306306
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
307307
res = PQexec(conn, query->data);
308-
elog(LOG, "Recreate replication slot %s: %ld millisconds", slotName, (long)USEC_TO_MSEC(MtmGetSystemTime() - start));
308+
elog(LOG, "Drop replication slot %s: %ld milliseconds", slotName, (long)USEC_TO_MSEC(MtmGetSystemTime() - start));
309309
PQclear(res);
310310
resetPQExpBuffer(query);
311311
timeline = Mtm->nodes[nodeId-1].timeline;

0 commit comments

Comments
 (0)