Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e62a796

Browse files
knizhnikkelvich
authored andcommitted
Fix recovery
1 parent d29666d commit e62a796

9 files changed

+42
-43
lines changed

arbiter.c

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
289289
elog(ERROR, "Arbiter failed to resolve host '%s' by name", host);
290290
}
291291

292-
Retry:
293-
292+
Retry:
294293
while (1) {
295294
int rc = -1;
296295

@@ -384,20 +383,29 @@ static void MtmOpenConnections()
384383

385384

386385
static bool MtmSendToNode(int node, void const* buf, int size)
387-
{
388-
while (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
389-
elog(WARNING, "Arbiter failed to write to node %d: %d", node+1, errno);
390-
if (sockets[node] >= 0) {
386+
{
387+
while (true) {
388+
if (sockets[node] >= 0 && BIT_CHECK(Mtm->reconnectMask, node)) {
389+
elog(WARNING, "Arbiter is forced to reconnect to node %d", node+1);
390+
BIT_CLEAR(Mtm->reconnectMask, node);
391391
close(sockets[node]);
392+
sockets[node] = -1;
392393
}
393-
sockets[node] = MtmConnectSocket(Mtm->nodes[node].con.hostName, MtmArbiterPort + node + 1, MtmReconnectAttempts);
394-
if (sockets[node] < 0) {
395-
MtmOnNodeDisconnect(node+1);
396-
return false;
394+
if (sockets[node] < 0 || !MtmWriteSocket(sockets[node], buf, size)) {
395+
if (sockets[node] >= 0) {
396+
elog(WARNING, "Arbiter failed to write to node %d: %d", node+1, errno);
397+
close(sockets[node]);
398+
}
399+
sockets[node] = MtmConnectSocket(Mtm->nodes[node].con.hostName, MtmArbiterPort + node + 1, MtmReconnectAttempts);
400+
if (sockets[node] < 0) {
401+
MtmOnNodeDisconnect(node+1);
402+
return false;
403+
}
404+
MTM_TRACE("Arbiter restablished connection with node %d\n", node+1);
405+
} else {
406+
return true;
397407
}
398-
elog(NOTICE, "Arbiter restablish connection with node %d", node+1);
399408
}
400-
return true;
401409
}
402410

403411
static int MtmReadFromNode(int node, void* buf, int buf_size)
@@ -477,10 +485,6 @@ static void MtmAcceptIncomingConnections()
477485

478486
sockets[MtmNodeId-1] = gateway;
479487
MtmRegisterSocket(gateway, MtmNodeId-1);
480-
481-
for (i = 0; i < MtmNodes-1; i++) {
482-
MtmAcceptOneConnection();
483-
}
484488
}
485489

486490

@@ -693,6 +697,7 @@ static void MtmTransReceiver(Datum arg)
693697
msg->node, Mtm->disabledNodeMask, msg->disabledNodeMask);
694698
ts->status = TRANSACTION_STATUS_ABORTED;
695699
MtmAdjustSubtransactions(ts);
700+
Mtm->nActiveTransactions -= 1;
696701
}
697702

698703
if (++ts->nVotes == Mtm->nNodes) {
@@ -712,6 +717,7 @@ static void MtmTransReceiver(Datum arg)
712717
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
713718
ts->status = TRANSACTION_STATUS_ABORTED;
714719
MtmAdjustSubtransactions(ts);
720+
Mtm->nActiveTransactions -= 1;
715721
}
716722
if (++ts->nVotes == Mtm->nNodes) {
717723
MtmWakeUpBackend(ts);

bgwpool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
typedef void(*BgwPoolExecutor)(int id, void* work, size_t size);
99

1010
#define MAX_DBNAME_LEN 30
11-
#define MULTIMASTER_BGW_RESTART_TIMEOUT 10 /* seconds */
11+
#define MULTIMASTER_BGW_RESTART_TIMEOUT 1 /* seconds */
1212

1313
typedef struct
1414
{

multimaster.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -736,11 +736,15 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
736736
ts = hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
737737
Assert(ts != NULL);
738738

739-
if (!MtmIsCoordinator(ts)) {
739+
if (!MtmIsCoordinator(ts) || Mtm->status == MTM_RECOVERY) {
740740
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
741741
Assert(x->gid[0]);
742742
tm->state = ts;
743-
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
743+
if (Mtm->status != MTM_RECOVERY) {
744+
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
745+
} else {
746+
ts->status = TRANSACTION_STATUS_UNKNOWN;
747+
}
744748
MtmUnlock();
745749
MtmResetTransaction(x);
746750
} else {
@@ -769,6 +773,7 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
769773
Assert(tm != NULL);
770774
tm->state->status = TRANSACTION_STATUS_ABORTED;
771775
MtmAdjustSubtransactions(tm->state);
776+
Mtm->nActiveTransactions -= 1;
772777
MtmUnlock();
773778
x->status = TRANSACTION_STATUS_ABORTED;
774779
}
@@ -809,6 +814,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
809814
Mtm->nActiveTransactions -= 1;
810815
}
811816
if (!commit && x->isReplicated && TransactionIdIsValid(x->gtid.xid)) {
817+
Assert(Mtm->status != MTM_RECOVERY);
812818
/*
813819
* Send notification only if ABORT happens during transaction processing at replicas,
814820
* do not send notification if ABORT is receiver from master
@@ -1205,6 +1211,7 @@ void MtmCheckQuorum(void)
12051211
void MtmOnNodeDisconnect(int nodeId)
12061212
{
12071213
BIT_SET(Mtm->connectivityMask, nodeId-1);
1214+
BIT_SET(Mtm->reconnectMask, nodeId-1);
12081215
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
12091216

12101217
/* Wait more than socket KEEPALIVE timeout to let other nodes update their statuses */
@@ -1292,6 +1299,7 @@ static void MtmInitialize()
12921299
Mtm->pglogicalNodeMask = 0;
12931300
Mtm->walSenderLockerMask = 0;
12941301
Mtm->nodeLockerMask = 0;
1302+
Mtm->reconnectMask = 0;
12951303
Mtm->nLockers = 0;
12961304
Mtm->nActiveTransactions = 0;
12971305
Mtm->votingTransactions = NULL;

multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@ typedef struct
133133
nodemask_t pglogicalNodeMask; /* bitmask of started pglogic receivers */
134134
nodemask_t walSenderLockerMask; /* Mask of WAL-senders IDs locking the cluster */
135135
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
136+
nodemask_t reconnectMask; /* Mask of nodes connection to which has to be reestablished by sender */
137+
136138
int nNodes; /* Number of active nodes */
137139
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
138140
int nLockers; /* Number of lockers */

pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -902,7 +902,7 @@ void MtmExecutor(int id, void* work, size_t size)
902902
{
903903
while (true) {
904904
char action = pq_getmsgbyte(&s);
905-
MTM_TRACE("%d: REMOTE process actiob %c\n", MyProcPid, action);
905+
MTM_TRACE("%d: REMOTE process action %c\n", MyProcPid, action);
906906
switch (action) {
907907
/* BEGIN */
908908
case 'B':

pglogical_proto.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
163163
pq_sendint64(out, txn->end_lsn);
164164
pq_sendint64(out, txn->commit_time);
165165

166-
if (flags == PGLOGICAL_COMMIT_PREPARED) {
166+
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {
167167
pq_sendint64(out, MtmGetTransactionCSN(txn->xid));
168168
}
169-
if (flags != PGLOGICAL_COMMIT) {
169+
if (txn->xact_action != XLOG_XACT_COMMIT) {
170170
pq_sendstring(out, txn->gid);
171171
}
172172
}

raftable.c

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,9 @@
11
#include <dlfcn.h>
22
#include "postgres.h"
3+
#include "raftable.h"
34
#include "raftable_wrapper.h"
45

56

6-
static raftable_get_t raftable_get_impl;
7-
static raftable_set_t raftable_set_impl;
8-
9-
static void RaftableResolve()
10-
{
11-
if (raftable_get_impl == NULL) {
12-
void* dll = dlopen(NULL, RTLD_NOW);
13-
raftable_get_impl = dlsym(dll, "raftable_get");
14-
raftable_set_impl = dlsym(dll, "raftable_set");
15-
Assert(raftable_get_impl != NULL && raftable_set_impl != NULL);
16-
}
17-
}
18-
197
/*
208
* Raftable function proxies
219
*/
@@ -24,16 +12,14 @@ void* RaftableGet(char const* key, size_t* size, RaftableTimestamp* ts, bool now
2412
if (!MtmUseRaftable) {
2513
return NULL;
2614
}
27-
RaftableResolve();
28-
return (*raftable_get_impl)(key, size);
15+
return raftable_get(key, size);
2916
}
3017

3118

3219
void RaftableSet(char const* key, void const* value, size_t size, bool nowait)
3320
{
3421
if (MtmUseRaftable) {
35-
RaftableResolve();
36-
(*raftable_set_impl)(key, value, size, nowait ? 0 : -1);
22+
raftable_set(key, value, size, nowait ? 0 : -1);
3723
}
3824
}
3925

raftable_wrapper.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ extern void RaftableSet(char const* key, void const* value, size_t size, bool n
3030
*/
3131
extern bool RaftableCAS(char const* key, char const* value, bool nowait);
3232

33-
typedef void* (*raftable_get_t)(char const* key, size_t* size);
34-
typedef void (*raftable_set_t)(char const* key, void const* value, size_t size, int timeout_ms);
35-
3633
extern bool MtmUseRaftable;
3734

3835
#endif

tests/postgresql.conf.mm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@
144144

145145
#max_files_per_process = 1000 # min 25
146146
# (change requires restart)
147-
shared_preload_libraries = 'multimaster' # (change requires restart)
147+
shared_preload_libraries = 'raftable,multimaster' # (change requires restart)
148148

149149
# - Cost-Based Vacuum Delay -
150150

0 commit comments

Comments
 (0)