Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 191cedb

Browse files
committed
Check for heartbeat in raftable_set
1 parent e8d34a8 commit 191cedb

File tree

4 files changed

+23
-20
lines changed

4 files changed

+23
-20
lines changed

contrib/mmts/arbiter.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,6 @@ static int busy_socket;
112112
static void MtmTransSender(Datum arg);
113113
static void MtmTransReceiver(Datum arg);
114114
static void MtmSendHeartbeat(void);
115-
static void MtmCheckHeartbeat(void);
116-
117115

118116

119117
static char const* const messageText[] =
@@ -356,7 +354,7 @@ static void MtmSendHeartbeat()
356354

357355
}
358356

359-
static void MtmCheckHeartbeat()
357+
void MtmCheckHeartbeat()
360358
{
361359
if (send_heartbeat) {
362360
send_heartbeat = false;

contrib/mmts/multimaster.c

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -796,8 +796,8 @@ void MtmWatchdog(void)
796796
if (Mtm->nodes[i].lastHeartbeat != 0
797797
&& now > Mtm->nodes[i].lastHeartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
798798
{
799-
elog(WARNING, "Disable node %d because last heartbeat was received %d msec ago (%ld)",
800-
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat), USEC_TO_MSEC(now));
799+
elog(WARNING, "Heartbeat was received from node %d during %d msec",
800+
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
801801
MtmOnNodeDisconnect(i+1);
802802
}
803803
}
@@ -840,24 +840,20 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
840840

841841
timestamp_t start = MtmGetSystemTime();
842842
/* wait votes from all nodes */
843-
while (!ts->votingCompleted && start + transTimeout >= MtmGetSystemTime())
843+
while (!ts->votingCompleted && ts->status != TRANSACTION_STATUS_ABORTED && start + transTimeout >= MtmGetSystemTime())
844844
{
845845
MtmUnlock();
846-
MtmWatchdog();
847-
if (ts->status == TRANSACTION_STATUS_ABORTED) {
848-
elog(WARNING, "Transaction %d(%s) is aborted by watchdog", x->xid, x->gid);
849-
x->status = TRANSACTION_STATUS_ABORTED;
850-
return;
851-
}
852846
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET|WL_TIMEOUT, MtmHeartbeatRecvTimeout);
853847
if (result & WL_LATCH_SET) {
854848
ResetLatch(&MyProc->procLatch);
855849
}
856850
MtmLock(LW_SHARED);
857851
}
858-
if (!ts->votingCompleted) {
859-
MtmAbortTransaction(ts);
860-
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
852+
if (!ts->votingCompleted) {
853+
if (ts->status != TRANSACTION_STATUS_ABORTED) {
854+
MtmAbortTransaction(ts);
855+
elog(WARNING, "Transaction is aborted because of %d msec timeout expiration, prepare time %d msec", (int)transTimeout, (int)USEC_TO_MSEC(ts->csn - x->snapshot));
856+
}
861857
} else if (nConfigChanges != Mtm->nConfigChanges) {
862858
MtmAbortTransaction(ts);
863859
elog(WARNING, "Transaction is aborted because cluster configuration is changed during commit");
@@ -1436,7 +1432,7 @@ void MtmOnNodeDisconnect(int nodeId)
14361432
BIT_SET(Mtm->reconnectMask, nodeId-1);
14371433
MtmUnlock();
14381434

1439-
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, true); /* false); -- TODO: raftable is hanged with nowait=true */
1435+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
14401436

14411437
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
14421438

@@ -1470,7 +1466,7 @@ void MtmOnNodeConnect(int nodeId)
14701466
MtmUnlock();
14711467

14721468
MTM_LOG1("Reconnect node %d", nodeId);
1473-
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, true); /* false); -- TODO: raftable is hanged with nowait=true */
1469+
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
14741470
}
14751471

14761472

contrib/mmts/multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "bgwpool.h"
66
#include "bkb.h"
77

8+
#include "access/clog.h"
89
#include "pglogical_output/hooks.h"
910

1011
#define DEBUG_LEVEL 0
@@ -268,6 +269,8 @@ extern void MtmHandleApplyError(void);
268269
extern void MtmUpdateLsnMapping(int nodeId, XLogRecPtr endLsn);
269270
extern XLogRecPtr MtmGetFlushPosition(int nodeId);
270271
extern void MtmWatchdog(void);
272+
extern void MtmCheckHeartbeat(void);
273+
271274

272275

273276
#endif

contrib/mmts/raftable.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#include "postgres.h"
33
#include "raftable.h"
44
#include "raftable_wrapper.h"
5-
5+
#include "multimaster.h"
66

77
/*
88
* Raftable function proxies
@@ -18,8 +18,14 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
1818

1919
void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
2020
{
21-
if (MtmUseRaftable) {
22-
raftable_set(key, value, size, nowait ? 0 : -1);
21+
if (MtmUseRaftable) {
22+
if (nowait) {
23+
raftable_set(key, value, size, 0);
24+
} else {
25+
while (!raftable_set(key, value, size, MtmHeartbeatSendTimeout)) {
26+
MtmCheckHeartbeat();
27+
}
28+
}
2329
}
2430
}
2531

0 commit comments

Comments
 (0)