Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 63d4492

Browse files
knizhnikkelvich
authored andcommitted
Check for heartbeat in raftable_set
1 parent d50a7ec commit 63d4492

File tree

4 files changed

+23
-20
lines changed

4 files changed

+23
-20
lines changed

arbiter.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ static int busy_socket;
112112
static void MtmTransSender(Datum arg);
113113
static void MtmTransReceiver(Datum arg);
114114
static void MtmSendHeartbeat(void);
115-
static void MtmCheckHeartbeat(void);
116-
117115

118116

119117
static char const* const messageText[] =
@@ -356,7 +354,7 @@ static void MtmSendHeartbeat()
356354

357355
}
358356

359-
static void MtmCheckHeartbeat()
357+
void MtmCheckHeartbeat()
360358
{
361359
if (send_heartbeat) {
362360
send_heartbeat = false;

multimaster.c

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -795,8 +795,8 @@ void MtmWatchdog(void)
795795
if (Mtm->nodes[i].lastHeartbeat != 0
796796
&& now > Mtm->nodes[i].lastHeartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
797797
{
798-
elog(WARNING, "Disable node %d because last heartbeat was received %d msec ago (%ld)",
799-
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat), USEC_TO_MSEC(now));
798+
elog(WARNING, "Heartbeat was received from node %d during %d msec",
799+
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
800800
MtmOnNodeDisconnect(i+1);
801801
}
802802
}
@@ -839,24 +839,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
839839

840840
timestamp_t start = MtmGetSystemTime();
841841
/* wait votes from all nodes */
842-
while (!ts->votingCompleted && start + transTimeout >= MtmGetSystemTime())
842+
while (!ts->votingCompleted && ts->status != TRANSACTION_STATUS_ABORTED && start + transTimeout >= MtmGetSystemTime())
843843
{
844844
MtmUnlock();
845-
MtmWatchdog();
846-
if (ts->status == TRANSACTION_STATUS_ABORTED) {
847-
elog(WARNING, "Transaction %d(%s) is aborted by watchdog", x->xid, x->gid);
848-
x->status = TRANSACTION_STATUS_ABORTED;
849-
return;
850-
}
851845
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, MtmHeartbeatRecvTimeout);
852846
if (result & WL_LATCH_SET) {
853847
ResetLatch(&MyProc->procLatch);
854848
}
855849
MtmLock(LW_SHARED);
856850
}
857-
if (!ts->votingCompleted) {
858-
MtmAbortTransaction(ts);
859-
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
851+
if (!ts->votingCompleted) {
852+
if (ts->status != TRANSACTION_STATUS_ABORTED) {
853+
MtmAbortTransaction(ts);
854+
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
855+
}
860856
} else if (nConfigChanges != Mtm->nConfigChanges) {
861857
MtmAbortTransaction(ts);
862858
elog(WARNING, "Transaction is aborted because cluster configuration is changed during commit");
@@ -1435,7 +1431,7 @@ void MtmOnNodeDisconnect(int nodeId)
14351431
BIT_SET(Mtm->reconnectMask, nodeId-1);
14361432
MtmUnlock();
14371433

1438-
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, true); /* false); -- TODO: raftable is hanged with nowait=true */
1434+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
14391435

14401436
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
14411437

@@ -1469,7 +1465,7 @@ void MtmOnNodeConnect(int nodeId)
14691465
MtmUnlock();
14701466

14711467
MTM_LOG1("Reconnect node %d", nodeId);
1472-
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, true); /* false); -- TODO: raftable is hanged with nowait=true */
1468+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
14731469
}
14741470

14751471

multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "bgwpool.h"
66
#include "bkb.h"
77

8+
#include "access/clog.h"
89
#include "pglogical_output/hooks.h"
910

1011
#define DEBUG_LEVEL 0
@@ -268,6 +269,8 @@ extern void MtmHandleApplyError(void);
268269
extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
269270
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
270271
extern void MtmWatchdog(void);
272+
extern void MtmCheckHeartbeat(void);
273+
271274

272275

273276
#endif

raftable.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include "postgres.h"
33
#include "raftable.h"
44
#include "raftable_wrapper.h"
5-
5+
#include "multimaster.h"
66

77
/*
88
* Raftable function proxies
@@ -18,8 +18,14 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
1818

1919
void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
2020
{
21-
if (MtmUseRaftable) {
22-
raftable_set(key, value, size, nowait ? 0 : -1);
21+
if (MtmUseRaftable) {
22+
if (nowait) {
23+
raftable_set(key, value, size, 0);
24+
} else {
25+
while (!raftable_set(key, value, size, MtmHeartbeatSendTimeout)) {
26+
MtmCheckHeartbeat();
27+
}
28+
}
2329
}
2430
}
2531

0 commit comments

Comments
 (0)