Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 14941de

Browse files
knizhnikkelvich
authored andcommitted
Add check that hearbeats are sent
1 parent 308dc9b commit 14941de

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

arbiter.c

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,12 @@ typedef struct
102102
MtmArbiterMessage* data;
103103
} MtmBuffer;
104104

105-
static int* sockets;
106-
static int gateway;
107-
static bool send_heartbeat;
108-
static TimeoutId heartbeat_timer;
109-
static int busy_socket;
105+
static int* sockets;
106+
static int gateway;
107+
static bool send_heartbeat;
108+
static timestamp_t last_sent_hearbeat;
109+
static TimeoutId heartbeat_timer;
110+
static int busy_socket;
110111

111112
static void MtmTransSender(Datum arg);
112113
static void MtmTransReceiver(Datum arg);
@@ -326,6 +327,8 @@ static void MtmSetSocketOptions(int sd)
326327

327328
static void MtmScheduleHeartbeat()
328329
{
330+
Assert(!last_sent_hearbeat || last_sent_hearbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
331+
enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
329332
send_heartbeat = true;
330333
PGSemaphoreUnlock(&Mtm->votingSemaphore);
331334
}
@@ -338,7 +341,8 @@ static void MtmSendHeartbeat()
338341
msg.disabledNodeMask = Mtm->disabledNodeMask;
339342
msg.oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
340343
msg.node = MtmNodeId;
341-
344+
last_sent_hearbeat = MtmGetSystemTime();
345+
342346
for (i = 0; i < Mtm->nAllNodes; i++)
343347
{
344348
if (sockets[i] >= 0 && sockets[i] != busy_socket && !BIT_CHECK(Mtm->disabledNodeMask|Mtm->reconnectMask, i))

0 commit comments

Comments
 (0)