Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 79cc60b

Browse files
knizhnikkelvich
authored andcommitted
Fix timer initialziation
1 parent 7f5318f commit 79cc60b

File tree

2 files changed

+21
-12
lines changed

2 files changed

+21
-12
lines changed

arbiter.c

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ typedef struct
103103
static int* sockets;
104104
static int gateway;
105105
static bool send_heartbeat;
106+
static TimeoutId heartbeat_timer;
106107

107108
static void MtmTransSender(Datum arg);
108109
static void MtmTransReceiver(Datum arg);
@@ -526,7 +527,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
526527
buf->data[buf->used].csn = ts->csn;
527528
} else {
528529
buf->data[buf->used].code = MSG_HEARTBEAT;
529-
MTM_LOG3("Send HEARTBEAT message to node %d from node %d\n", node+1, MtmNodeId);
530+
MTM_LOG3("Send HEARTBEAT to node %d from node %d at %ld\n", node+1, MtmNodeId, USEC_TO_MSEC(MtmGetSystemTime()));
530531
}
531532
buf->data[buf->used].node = MtmNodeId;
532533
buf->data[buf->used].disabledNodeMask = Mtm->disabledNodeMask;
@@ -540,8 +541,9 @@ static void MtmBroadcastMessage(MtmBuffer* txBuffer, MtmTransState* ts)
540541
int n = 1;
541542
for (i = 0; i < Mtm->nAllNodes; i++)
542543
{
543-
if (!BIT_CHECK(Mtm->disabledNodeMask, i) && (ts == NULL || TransactionIdIsValid(ts->xids[i]))) {
544-
Assert(i+1 != MtmNodeId);
544+
if (i+1 != MtmNodeId && !BIT_CHECK(Mtm->disabledNodeMask, i)
545+
&& (ts == NULL || TransactionIdIsValid(ts->xids[i])))
546+
{
545547
MtmAppendBuffer(txBuffer, ts ? ts->xids[i] : InvalidTransactionId, i, ts);
546548
n += 1;
547549
}
@@ -553,6 +555,7 @@ static void MtmSendHeartbeat()
553555
{
554556
send_heartbeat = true;
555557
PGSemaphoreUnlock(&Mtm->votingSemaphore);
558+
//enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
556559
}
557560

558561

@@ -561,15 +564,19 @@ static void MtmTransSender(Datum arg)
561564
sigset_t sset;
562565
int nNodes = MtmMaxNodes;
563566
int i;
567+
564568
MtmBuffer* txBuffer = (MtmBuffer*)palloc(sizeof(MtmBuffer)*nNodes);
565569

570+
InitializeTimeouts();
571+
566572
signal(SIGINT, SetStop);
567573
signal(SIGQUIT, SetStop);
568574
signal(SIGTERM, SetStop);
569575
sigfillset(&sset);
570576
sigprocmask(SIG_UNBLOCK, &sset, NULL);
571577

572-
RegisterTimeout(USER_TIMEOUT, MtmSendHeartbeat);
578+
heartbeat_timer = RegisterTimeout(USER_TIMEOUT, MtmSendHeartbeat);
579+
enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
573580

574581
MtmOpenConnections();
575582

@@ -584,6 +591,7 @@ static void MtmTransSender(Datum arg)
584591

585592
if (send_heartbeat) {
586593
send_heartbeat = false;
594+
enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
587595
MtmBroadcastMessage(txBuffer, NULL);
588596
}
589597
/*
@@ -725,7 +733,8 @@ static void MtmTransReceiver(Datum arg)
725733
Mtm->nodes[msg->node-1].oldestSnapshot = msg->oldestSnapshot;
726734
Mtm->nodes[msg->node-1].lastHeartbeat = MtmGetSystemTime();
727735

728-
if (msg->code == MSG_HEARTBEAT) {
736+
if (msg->code == MSG_HEARTBEAT) {
737+
MTM_LOG3("Receive HEARTBEAT from node %d at %ld", msg->node, USEC_TO_MSEC(MtmGetSystemTime()));
729738
continue;
730739
}
731740
ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);

multimaster.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -755,8 +755,8 @@ static void MtmWatchdog()
755755
if (Mtm->nodes[i].lastHeartbeat != 0
756756
&& now > Mtm->nodes[i].lastHeartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout))
757757
{
758-
elog(WARNING, "Disable node %d because last heartbeat was received %d msec ago",
759-
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat));
758+
elog(WARNING, "Disable node %d because last heartbeat was received %d msec ago (%ld)",
759+
i+1, (int)USEC_TO_MSEC(now - Mtm->nodes[i].lastHeartbeat), USEC_TO_MSEC(now));
760760
MtmOnNodeDisconnect(i+1);
761761
}
762762
}
@@ -801,7 +801,7 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
801801
/* wait votes from all nodes */
802802
while (!ts->votingCompleted) {
803803
MtmUnlock();
804-
MtmWatchdog();
804+
//MtmWatchdog();
805805
if (ts->status == TRANSACTION_STATUS_ABORTED) {
806806
elog(WARNING, "Transaction %d(%s) is aborted by watchdog", x->xid, x->gid);
807807
x->status = TRANSACTION_STATUS_ABORTED;
@@ -1693,7 +1693,7 @@ _PG_init(void)
16931693
"Timeout in milliseconds of receiving heartbeat messages",
16941694
"If no heartbeat message is received from node within this period, it assumed to be dead",
16951695
&MtmHeartbeatRecvTimeout,
1696-
2000,
1696+
100000,
16971697
1,
16981698
INT_MAX,
16991699
PGC_BACKEND,
@@ -1752,7 +1752,7 @@ _PG_init(void)
17521752
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",
17531753
"Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)",
17541754
&Mtm2PCMinTimeout,
1755-
100000, /* 100 seconds */
1755+
10000, /* 100 seconds */
17561756
0,
17571757
INT_MAX,
17581758
PGC_BACKEND,
@@ -1813,8 +1813,8 @@ _PG_init(void)
18131813
DefineCustomIntVariable(
18141814
"multimaster.max_recovery_lag",
18151815
"Maximal lag of replication slot of failed node after which this slot is dropped to avoid transaction log overflow",
1816-
"Dropping slog makes it not possible to recover node using logical replication mechanism, it will be ncessary to completely copy content of some other nodes "
1817-
"usimg basebackup or similar tool. Zero value of parameter disable droipping slot.",
1816+
"Dropping slot makes it not possible to recover node using logical replication mechanism, it will be ncessary to completely copy content of some other nodes "
1817+
"using basebackup or similar tool. Zero value of parameter disable dropping slot.",
18181818
&MtmMaxRecoveryLag,
18191819
100000000,
18201820
0,

0 commit comments

Comments
 (0)