Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fc8a364

Browse files
knizhnikkelvich
authored andcommitted
Fix wait socket function
1 parent 7d8a4d6 commit fc8a364

File tree

5 files changed

+37
-20
lines changed

5 files changed

+37
-20
lines changed

arbiter.c

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -220,18 +220,20 @@ static int MtmWaitSocket(int sd, bool forWrite, timestamp_t timeoutMsec)
220220
fd_set set;
221221
int rc;
222222
timestamp_t deadline = MtmGetSystemTime() + MSEC_TO_USEC(timeoutMsec);
223-
FD_ZERO(&set);
224-
FD_SET(sd, &set);
223+
225224
do {
226225
timestamp_t now;
227226
MtmCheckHeartbeat();
228-
now = MtmGetSystemTime();
229-
if (now > deadline) {
227+
now = MtmGetSystemTime();
228+
if (now > deadline) {
230229
now = deadline;
231230
}
231+
FD_ZERO(&set);
232+
FD_SET(sd, &set);
232233
tv.tv_sec = (deadline - now)/USECS_PER_SEC;
233234
tv.tv_usec = (deadline - now)%USECS_PER_SEC;
234235
} while ((rc = select(sd+1, forWrite ? NULL : &set, forWrite ? &set : NULL, NULL, &tv)) < 0 && errno == EINTR);
236+
235237
return rc;
236238
}
237239

@@ -384,7 +386,7 @@ static void MtmSendHeartbeat()
384386
|| !BIT_CHECK(Mtm->disabledNodeMask, i)
385387
|| BIT_CHECK(Mtm->reconnectMask, i)))
386388
{
387-
if (!MtmSendToNode(i, &msg, sizeof(msg), MtmHeartbeatSendTimeout)) {
389+
if (!MtmSendToNode(i, &msg, sizeof(msg), MtmHeartbeatRecvTimeout)) {
388390
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
389391
} else {
390392
if (last_heartbeat_to_node[i] + MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2 < now) {
@@ -402,7 +404,7 @@ static void MtmSendHeartbeat()
402404
MTM_LOG4("Send heartbeat to node %d with timestamp %lld", i+1, now);
403405
}
404406
} else {
405-
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %s", i+1, (long long) busy_mask, MtmNodeStatusMnem[Mtm->status]);
407+
MTM_LOG2("Do not send heartbeat to node %d, busy mask %lld, status %s", i+1, busy_mask, MtmNodeStatusMnem[Mtm->status]);
406408
}
407409
}
408410
}
@@ -828,7 +830,7 @@ static void MtmMonitor(Datum arg)
828830
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL);
829831

830832
while (!stop) {
831-
int rc = WaitLatch(&MyProc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatSendTimeout);
833+
int rc = WaitLatch(&MyProc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout);
832834
if (rc & WL_POSTMASTER_DEATH) {
833835
break;
834836
}
@@ -938,7 +940,7 @@ static void MtmReceiver(Datum arg)
938940
Assert(node > 0 && node <= nNodes && node != MtmNodeId);
939941

940942
if (Mtm->nodes[node-1].connectivityMask != msg->connectivityMask) {
941-
elog(LOG, "Node %d changes it connectivity mask from %llx to %llx", node, (long long)Mtm->nodes[node-1].connectivityMask, (long long)msg->connectivityMask);
943+
elog(LOG, "Node %d changes it connectivity mask from %llx to %llx", node, Mtm->nodes[node-1].connectivityMask, msg->connectivityMask);
942944
}
943945

944946
Mtm->nodes[node-1].oldestSnapshot = msg->oldestSnapshot;
@@ -1002,11 +1004,11 @@ static void MtmReceiver(Datum arg)
10021004
replorigin_session_origin = InvalidRepOriginId;
10031005
} else {
10041006
MTM_LOG1("Receive response for transaction %s -> %s, participants=%llx, voted=%llx",
1005-
msg->gid, MtmTxnStatusMnem[msg->status], (long long)ts->participantsMask, (long long)ts->votedMask);
1007+
msg->gid, MtmTxnStatusMnem[msg->status], ts->participantsMask, ts->votedMask);
10061008
}
10071009
} else {
10081010
elog(LOG, "Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
1009-
MtmTxnStatusMnem[msg->status], msg->gid, node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask));
1011+
MtmTxnStatusMnem[msg->status], msg->gid, node, ts->votedMask, ts->participantsMask & ~Mtm->disabledNodeMask);
10101012
continue;
10111013
}
10121014
} else if (ts->status == TRANSACTION_STATUS_ABORTED && msg->status == TRANSACTION_STATUS_COMMITTED) {
@@ -1015,7 +1017,7 @@ static void MtmReceiver(Datum arg)
10151017
elog(WARNING, "Transaction %s is committed at node %d but aborted at node %d", msg->gid, MtmNodeId, node);
10161018
} else {
10171019
elog(LOG, "Receive response %s for transaction %s status %s for node %d, votedMask %llx, participantsMask %llx",
1018-
MtmTxnStatusMnem[msg->status], msg->gid, MtmTxnStatusMnem[ts->status], node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask) );
1020+
MtmTxnStatusMnem[msg->status], msg->gid, MtmTxnStatusMnem[ts->status], node, ts->votedMask, ts->participantsMask & ~Mtm->disabledNodeMask);
10191021
}
10201022
}
10211023
continue;

multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ AS 'MODULE_PATHNAME','mtm_get_nodes_state'
4343
LANGUAGE C;
4444

4545
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer,
46-
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" bigint, "configChanges" integer, "stalledNodeMask" bigint, "stoppedNodeMask" bigint);
46+
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" bigint, "configChanges" integer, "stalledNodeMask" bigint, "stoppedNodeMask" bigint, "lastStatusChange" timestamp);
4747

4848
CREATE TYPE mtm.trans_state AS ("status" text, "gid" text, "xid" bigint, "coordinator" integer, "gxid" bigint, "csn" timestamp, "snapshot" timestamp, "local" boolean, "prepared" boolean, "active" boolean, "twophase" boolean, "votingCompleted" boolean, "participants" bigint, "voted" bigint, "configChanges" integer);
4949

multimaster.c

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2128,7 +2128,7 @@ void MtmRefreshClusterStatus()
21282128
* connectivity graph is stabilized.
21292129
*/
21302130
oldClique = newClique;
2131-
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout)*2); /* double timeout to condider worst case when heartbeat send interval is added with refresh cluster status interval */
2131+
MtmSleep(MSEC_TO_USEC(MtmHeartbeatRecvTimeout)*2); /* double timeout to consider the worst case when heartbeat receive interval is added with refresh cluster status interval */
21322132
MtmBuildConnectivityMatrix(matrix);
21332133
newClique = MtmFindMaxClique(matrix, Mtm->nAllNodes, &cliqueSize);
21342134
} while (newClique != oldClique);
@@ -2232,7 +2232,7 @@ void MtmOnNodeDisconnect(int nodeId)
22322232
BIT_SET(SELF_CONNECTIVITY_MASK, nodeId-1);
22332233
BIT_SET(Mtm->reconnectMask, nodeId-1);
22342234
elog(LOG, "Disconnect node %d connectivity mask %llx",
2235-
nodeId, (long long)SELF_CONNECTIVITY_MASK);
2235+
nodeId, SELF_CONNECTIVITY_MASK);
22362236
MtmUnlock();
22372237
}
22382238

@@ -2242,7 +2242,7 @@ void MtmOnNodeDisconnect(int nodeId)
22422242
void MtmOnNodeConnect(int nodeId)
22432243
{
22442244
MtmLock(LW_EXCLUSIVE);
2245-
elog(LOG, "Connect node %d connectivity mask %llx", nodeId, (long long)SELF_CONNECTIVITY_MASK);
2245+
elog(LOG, "Connect node %d connectivity mask %llx", nodeId, SELF_CONNECTIVITY_MASK);
22462246
BIT_CLEAR(SELF_CONNECTIVITY_MASK, nodeId-1);
22472247
BIT_SET(Mtm->reconnectMask, nodeId-1); /* force sender to reestablish connection and send heartbeat */
22482248
MtmUnlock();
@@ -2254,6 +2254,7 @@ void MtmOnNodeConnect(int nodeId)
22542254
void MtmReconnectNode(int nodeId)
22552255
{
22562256
MtmLock(LW_EXCLUSIVE);
2257+
elog(LOG, "Reconnect node %d connectivity mask %llx", nodeId, SELF_CONNECTIVITY_MASK);
22572258
BIT_SET(Mtm->reconnectMask, nodeId-1);
22582259
MtmUnlock();
22592260
}
@@ -3289,7 +3290,9 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
32893290
MtmSetCurrentTransactionGID(ts->gid);
32903291
MtmTx.isActive = true;
32913292
FinishPreparedTransaction(ts->gid, commit);
3292-
3293+
if (commit) {
3294+
MTM_LOG1("Distributed transaction %s is committed", ts->gid);
3295+
}
32933296
if (!insideTransaction) {
32943297
CommitTransactionCommand();
32953298
Assert(!MtmTx.isActive);
@@ -3326,15 +3329,21 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33263329
MtmUnlock();
33273330
return REPLMODE_EXIT;
33283331
}
3329-
/* We are not interested in receiving any deteriorated logical messages from recovered node, do recreate slot */
3332+
/* We are not interested in receiving any deteriorated logical messages from recovered node, so recreate slot */
33303333
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
33313334
mode = REPLMODE_CREATE_NEW;
33323335
}
33333336
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
33343337
if (Mtm->status == MTM_RECOVERY) {
33353338
mode = REPLMODE_RECOVERED;
3336-
if ((Mtm->recoverySlot == 0 && (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId))
3337-
|| Mtm->recoverySlot == nodeId)
3339+
/* Choose node for recovery if
3340+
* 1. It is not chosen yet or the same node was chosen before
3341+
* 2. It is donor node or there is no donor node
3342+
* 3. Connections with all other live nodes were established
3343+
*/
3344+
if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3345+
&& (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId)
3346+
&& (SELF_CONNECTIVITY_MASK & ~Mtm->disabledNodeMask) == 0)
33383347
{
33393348
/* Choose for recovery first available slot or slot of donor node (if any) */
33403349
if (Mtm->nAllNodes >= 3) {
@@ -3354,6 +3363,8 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33543363
return REPLMODE_RECOVERY;
33553364
}
33563365
}
3366+
MTM_LOG1("Replication to node %d is pending: recovery node=%d, donor node=%d, connectivity mask=%llx, disabled mask=%llx",
3367+
nodeId, Mtm->recoverySlot, Mtm->donorNodeId, SELF_CONNECTIVITY_MASK, Mtm->disabledNodeMask);
33573368
MtmUnlock();
33583369
/* delay opening of other slots until recovery is completed */
33593370
MtmSleep(STATUS_POLL_DELAY);
@@ -3492,6 +3503,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
34923503
}
34933504
}
34943505
}
3506+
MTM_LOG1("Startup of logical replication to node %d", MtmReplicationNodeId);
34953507
MtmLock(LW_EXCLUSIVE);
34963508
if (BIT_CHECK(Mtm->stoppedNodeMask, MtmReplicationNodeId-1)) {
34973509
elog(WARNING, "Stopped node %d tries to initiate recovery", MtmReplicationNodeId);
@@ -4074,6 +4086,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
40744086
values[15] = Int32GetDatum(Mtm->nConfigChanges);
40754087
values[16] = Int64GetDatum(Mtm->stalledNodeMask);
40764088
values[17] = Int64GetDatum(Mtm->stoppedNodeMask);
4089+
values[18] = TimestampTzGetDatum(time_t_to_timestamptz(Mtm->nodes[MtmNodeId-1].lastStatusChangeTime/USECS_PER_SEC));
40774090

40784091
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
40794092
}
@@ -4456,6 +4469,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
44564469
elog(ERROR, "Transaction %s (%llu) is aborted by DTM", x->gid, (long64)x->xid);
44574470
} else {
44584471
FinishPreparedTransaction(x->gid, true);
4472+
MTM_LOG1("Distributed transaction %s is committed", x->gid);
44594473
}
44604474
}
44614475
}

multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282

8383
#define Natts_mtm_trans_state 15
8484
#define Natts_mtm_nodes_state 16
85-
#define Natts_mtm_cluster_state 18
85+
#define Natts_mtm_cluster_state 19
8686

8787
typedef ulong64 csn_t; /* commit serial number */
8888
#define INVALID_CSN ((csn_t)-1)

pglogical_apply.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,7 @@ process_remote_commit(StringInfo in)
703703
MtmSetCurrentTransactionCSN(csn);
704704
MtmSetCurrentTransactionGID(gid);
705705
FinishPreparedTransaction(gid, true);
706+
MTM_LOG1("Distributed transaction %s is committed", gid);
706707
CommitTransactionCommand();
707708
Assert(!MtmTransIsActive());
708709
MtmEndSession(origin_node, true);

0 commit comments

Comments
 (0)