Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2cd76ba

Browse files
knizhnikkelvich
authored andcommitted
Add functions for getting transaction state
1 parent a7095a3 commit 2cd76ba

File tree

5 files changed

+138
-24
lines changed

5 files changed

+138
-24
lines changed

arbiter.c

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ static void MtmSendHeartbeat()
382382
last_heartbeat_to_node[i] = now;
383383
/* Connectivity mask can be cleared by MtmWatchdog: in this case sockets[i] >= 0 */
384384
if (BIT_CHECK(Mtm->connectivityMask, i)) {
385+
MTM_LOG1("Force reconnect to node %d", i+1);
385386
close(sockets[i]);
386387
sockets[i] = -1;
387388
MtmReconnectNode(i+1); /* set reconnect mask to force node reconnent */
@@ -484,6 +485,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
484485
elog(WARNING, "Arbiter waiting socket to %s:%d: rc=%d, error=%d", host, port, rc, errno);
485486
}
486487
close(sd);
488+
MtmCheckHeartbeat();
487489
MtmSleep(MSEC_TO_USEC(MtmHeartbeatSendTimeout));
488490
}
489491
}
@@ -827,6 +829,7 @@ static void MtmReceiver(Datum arg)
827829
MtmBuffer* rxBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
828830
timestamp_t lastHeartbeatCheck = MtmGetSystemTime();
829831
timestamp_t now;
832+
timestamp_t selectTimeout = MtmHeartbeatRecvTimeout;
830833

831834
#if USE_EPOLL
832835
struct epoll_event* events = (struct epoll_event*)palloc(sizeof(struct epoll_event)*nNodes);
@@ -857,7 +860,7 @@ static void MtmReceiver(Datum arg)
857860

858861
while (!stop) {
859862
#if USE_EPOLL
860-
n = epoll_wait(epollfd, events, nNodes, MtmHeartbeatRecvTimeout);
863+
n = epoll_wait(epollfd, events, nNodes, selectTimeout);
861864
if (n < 0) {
862865
if (errno == EINTR) {
863866
continue;
@@ -871,16 +874,15 @@ static void MtmReceiver(Datum arg)
871874
MtmDisconnect(i);
872875
}
873876
}
874-
now = MtmGetSystemTime();
875877
for (j = 0; j < n; j++) {
876878
if (events[j].events & EPOLLIN)
877879
#else
878880
fd_set events;
879881
do {
880882
struct timeval tv;
881883
events = inset;
882-
tv.tv_sec = MtmHeartbeatRecvTimeout/1000;
883-
tv.tv_usec = MtmHeartbeatRecvTimeout%1000*1000;
884+
tv.tv_sec = selectTimeout/1000;
885+
tv.tv_usec = selectTimeout%1000*1000;
884886
do {
885887
n = select(max_fd+1, &events, NULL, NULL, &tv);
886888
} while (n < 0 && errno == EINTR);
@@ -889,7 +891,6 @@ static void MtmReceiver(Datum arg)
889891
if (n < 0) {
890892
elog(ERROR, "Arbiter failed to select sockets: %d", errno);
891893
}
892-
now = MtmGetSystemTime();
893894
for (i = 0; i < nNodes; i++) {
894895
if (sockets[i] >= 0 && FD_ISSET(sockets[i], &events))
895896
#endif
@@ -1070,8 +1071,8 @@ static void MtmReceiver(Datum arg)
10701071
break;
10711072
case MSG_ABORTED:
10721073
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1073-
elog(WARNING, "Receive ABORTED response for already committed transaction %d from node %d",
1074-
ts->xid, node);
1074+
elog(WARNING, "Receive ABORTED response for already committed transaction %d (%s) from node %d",
1075+
ts->xid, ts->gid, node);
10751076
continue;
10761077
}
10771078
if (ts->status != TRANSACTION_STATUS_ABORTED) {
@@ -1084,8 +1085,12 @@ static void MtmReceiver(Datum arg)
10841085
break;
10851086
case MSG_PRECOMMITTED:
10861087
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PRECOMMITTED");
1087-
if (ts->status != TRANSACTION_STATUS_ABORTED) {
1088-
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
1088+
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
1089+
elog(WARNING, "Receive PRECOMMITTED response for already committed transaction %d (%s) from node %d",
1090+
ts->xid, ts->gid, node);
1091+
continue;
1092+
}
1093+
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
10891094
if (msg->csn > ts->csn) {
10901095
ts->csn = msg->csn;
10911096
MtmSyncClock(ts->csn);
@@ -1096,7 +1101,9 @@ static void MtmReceiver(Datum arg)
10961101
MtmWakeUpBackend(ts);
10971102
}
10981103
} else {
1099-
elog(WARNING, "Receive PRECOMMITTED response for aborted transaction"); // How it can happen? SHould we use assert here?
1104+
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
1105+
elog(WARNING, "Receive PRECOMMITTED response for aborted transaction %d (%s) from node %d",
1106+
ts->xid, ts->gid, node); // How it can happen? SHould we use assert here?
11001107
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
11011108
MtmWakeUpBackend(ts);
11021109
}
@@ -1134,21 +1141,34 @@ static void MtmReceiver(Datum arg)
11341141
}
11351142
}
11361143
if (Mtm->status == MTM_ONLINE) {
1137-
/* "now" is time of performing select, so that delays in processing should not cause false detection */
1138-
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1139-
if (!MtmWatchdog(now)) {
1140-
for (i = 0; i < nNodes; i++) {
1141-
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1142-
MTM_LOG1("Last heartbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1144+
/* Check for hearbeat only in case of timeout expiration: it means that we do not have unproceeded events.
1145+
* It helps to avoid false node failure detection because of blocking receiver.
1146+
*/
1147+
now = MtmGetSystemTime();
1148+
if (n == 0) {
1149+
selectTimeout = MtmHeartbeatRecvTimeout; /* restore select timeout */
1150+
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1151+
if (!MtmWatchdog(now)) {
1152+
for (i = 0; i < nNodes; i++) {
1153+
if (Mtm->nodes[i].lastHeartbeat != 0 && sockets[i] >= 0) {
1154+
MTM_LOG1("Last heartbeat from node %d received %ld microseconds ago", i+1, now - Mtm->nodes[i].lastHeartbeat);
1155+
}
11431156
}
11441157
}
1158+
lastHeartbeatCheck = now;
1159+
}
1160+
if (Mtm->disabledNodeMask != 0) {
1161+
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1162+
MtmRefreshClusterStatus(false);
1163+
}
1164+
} else {
1165+
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
1166+
/* Switch to non-blocking mode to proceed all pending requests before doing watchdog check */
1167+
selectTimeout = 0;
11451168
}
1146-
lastHeartbeatCheck = now;
1147-
}
1148-
if (n == 0 && Mtm->disabledNodeMask != 0) {
1149-
/* If timeout is expired and there are disabled nodes, then recheck cluster's state */
1150-
MtmRefreshClusterStatus(false);
11511169
}
1170+
} else if (n == 0) {
1171+
selectTimeout = MtmHeartbeatRecvTimeout; /* restore select timeout */
11521172
}
11531173
}
11541174
proc_exit(1); /* force restart of this bgwroker */

multimaster--1.0.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ LANGUAGE C;
4545
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer,
4646
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" integer, "configChanges" integer);
4747

48+
CREATE TYPE mtm.trans_state AS ("status" text, "gid" text, "xid" integer, "coordinator" integer, "gxid" integer, "csn" timestamp, "snapshot" timestamp, "local" boolean, "prepared" boolean, "active" boolean, "twophase" boolean, "votingCompleted" boolean, "participants" bigint, "voted" bigint);
49+
50+
CREATE FUNCTION mtm.get_trans_by_gid(git text) RETURNS mtm.trans_state
51+
AS 'MODULE_PATHNAME','mtm_get_trans_by_gid'
52+
LANGUAGE C;
53+
54+
CREATE FUNCTION mtm.get_trans_by_xid(tid xid) RETURNS mtm.trans_state
55+
AS 'MODULE_PATHNAME','mtm_get_trans_by_xid'
56+
LANGUAGE C;
57+
4858
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
4959
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
5060
LANGUAGE C;

multimaster.c

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ PG_FUNCTION_INFO_V1(mtm_poll_node);
115115
PG_FUNCTION_INFO_V1(mtm_recover_node);
116116
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
117117
PG_FUNCTION_INFO_V1(mtm_get_csn);
118+
PG_FUNCTION_INFO_V1(mtm_get_trans_by_gid);
119+
PG_FUNCTION_INFO_V1(mtm_get_trans_by_xid);
118120
PG_FUNCTION_INFO_V1(mtm_get_last_csn);
119121
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
120122
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
@@ -3649,7 +3651,89 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
36493651
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(heap_form_tuple(usrfctx->desc, usrfctx->values, usrfctx->nulls)));
36503652
}
36513653

3654+
Datum
3655+
mtm_get_trans_by_gid(PG_FUNCTION_ARGS)
3656+
{
3657+
TupleDesc desc;
3658+
Datum values[Natts_mtm_trans_state];
3659+
bool nulls[Natts_mtm_trans_state] = {false};
3660+
MtmTransState* ts;
3661+
MtmTransMap* tm;
3662+
char *gid = text_to_cstring(PG_GETARG_TEXT_PP(0));
3663+
int i;
3664+
3665+
MtmLock(LW_SHARED);
3666+
tm = (MtmTransMap*)hash_search(MtmGid2State, gid, HASH_FIND, NULL);
3667+
if (tm == NULL) {
3668+
MtmUnlock();
3669+
PG_RETURN_NULL();
3670+
}
3671+
3672+
values[1] = CStringGetTextDatum(gid);
3673+
3674+
ts = tm->state;
3675+
if (ts == NULL) {
3676+
values[0] = CStringGetTextDatum(MtmTxnStatusMnem[tm->status]);
3677+
for (i = 2; i < Natts_mtm_trans_state; i++) {
3678+
nulls[i] = true;
3679+
}
3680+
} else {
3681+
values[0] = CStringGetTextDatum(MtmTxnStatusMnem[ts->status]);
3682+
values[2] = Int32GetDatum(ts->xid);
3683+
values[3] = Int32GetDatum(ts->gtid.node);
3684+
values[4] = Int32GetDatum(ts->gtid.xid);
3685+
values[5] = TimestampTzGetDatum(time_t_to_timestamptz(ts->csn/USECS_PER_SEC));
3686+
values[6] = TimestampTzGetDatum(time_t_to_timestamptz(ts->snapshot/USECS_PER_SEC));
3687+
values[7] = BoolGetDatum(ts->isLocal);
3688+
values[8] = BoolGetDatum(ts->isPrepared);
3689+
values[9] = BoolGetDatum(ts->isActive);
3690+
values[10] = BoolGetDatum(ts->isTwoPhase);
3691+
values[11] = BoolGetDatum(ts->votingCompleted);
3692+
values[12] = Int64GetDatum(ts->participantsMask);
3693+
values[13] = Int64GetDatum(ts->votedMask);
3694+
}
3695+
MtmUnlock();
3696+
3697+
get_call_result_type(fcinfo, NULL, &desc);
3698+
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
3699+
}
3700+
3701+
Datum
3702+
mtm_get_trans_by_xid(PG_FUNCTION_ARGS)
3703+
{
3704+
TupleDesc desc;
3705+
Datum values[Natts_mtm_trans_state];
3706+
bool nulls[Natts_mtm_trans_state] = {false};
3707+
TransactionId xid = PG_GETARG_INT32(0);
3708+
MtmTransState* ts;
36523709

3710+
MtmLock(LW_SHARED);
3711+
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
3712+
if (ts == NULL) {
3713+
MtmUnlock();
3714+
PG_RETURN_NULL();
3715+
}
3716+
3717+
values[0] = CStringGetTextDatum(MtmTxnStatusMnem[ts->status]);
3718+
values[1] = CStringGetTextDatum(ts->gid);
3719+
values[2] = Int32GetDatum(ts->xid);
3720+
values[3] = Int32GetDatum(ts->gtid.node);
3721+
values[4] = Int32GetDatum(ts->gtid.xid);
3722+
values[5] = TimestampTzGetDatum(time_t_to_timestamptz(ts->csn/USECS_PER_SEC));
3723+
values[6] = TimestampTzGetDatum(time_t_to_timestamptz(ts->snapshot/USECS_PER_SEC));
3724+
values[7] = BoolGetDatum(ts->isLocal);
3725+
values[8] = BoolGetDatum(ts->isPrepared);
3726+
values[9] = BoolGetDatum(ts->isActive);
3727+
values[10] = BoolGetDatum(ts->isTwoPhase);
3728+
values[11] = BoolGetDatum(ts->votingCompleted);
3729+
values[12] = Int64GetDatum(ts->participantsMask);
3730+
values[13] = Int64GetDatum(ts->votedMask);
3731+
MtmUnlock();
3732+
3733+
get_call_result_type(fcinfo, NULL, &desc);
3734+
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
3735+
}
3736+
36533737
Datum
36543738
mtm_get_cluster_state(PG_FUNCTION_ARGS)
36553739
{
@@ -4740,7 +4824,6 @@ MtmDetectGlobalDeadLockForXid(TransactionId xid)
47404824
Assert(replorigin_session_origin == InvalidRepOriginId);
47414825
XLogFlush(LogLogicalMessage("L", buf.data, buf.used, false));
47424826

4743-
MtmSleep(MSEC_TO_USEC(DeadlockTimeout));
47444827
MtmGraphInit(&graph);
47454828
MtmGraphAdd(&graph, (GlobalTransactionId*)buf.data, buf.used/sizeof(GlobalTransactionId));
47464829
ByteBufferFree(&buf);

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282

8383
#define Natts_mtm_cluster_state 16
8484
#define Natts_mtm_nodes_state 13
85+
#define Natts_mtm_trans_state 14
8586

8687
typedef uint64 csn_t; /* commit serial number */
8788
#define INVALID_CSN ((csn_t)-1)

tests2/test_recovery_up.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class RecoveryTest(unittest.TestCase, TestHelper):
6262

6363
def setUp(self):
6464
time.sleep(20)
65-
print('Start new test')
65+
print('Start new test at ',datetime.datetime.now())
6666
warnings.simplefilter("ignore", ResourceWarning)
6767
self.client = MtmClient([
6868
"dbname=regression user=postgres host=127.0.0.1 port=15432",
@@ -72,7 +72,7 @@ def setUp(self):
7272
self.client.bgrun()
7373

7474
def tearDown(self):
75-
print('tearDown')
75+
print('Finish test at ',datetime.datetime.now())
7676
self.client.stop()
7777

7878

0 commit comments

Comments
 (0)