Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b978ab1

Browse files
committed
Eliminate dupplicated messages
1 parent 5655801 commit b978ab1

File tree

6 files changed

+34
-26
lines changed

6 files changed

+34
-26
lines changed

contrib/mmts/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ USER postgres
3131
ENV CFLAGS -O0
3232
WORKDIR /pg
3333

34-
ENV REBUILD 1
34+
ENV REBUILD 2
3535

3636
RUN cd /pg && \
3737
git clone https://github.com/postgrespro/postgres_cluster.git --depth 1 && \

contrib/mmts/arbiter.c

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,13 @@
8181
typedef struct
8282
{
8383
MtmMessageCode code; /* Message code: MSG_READY, MSG_PREPARE, MSG_COMMIT, MSG_ABORT */
84-
int node; /* Sender node ID */
84+
int node; /* Sender node ID */
8585
TransactionId dxid; /* Transaction ID at destination node */
8686
TransactionId sxid; /* Transaction ID at sender node */
8787
csn_t csn; /* Local CSN in case of sending data from replica to master, global CSN master->replica */
8888
nodemask_t disabledNodeMask; /* Bitmask of disabled nodes at the sender of message */
8989
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
90+
uint64 seqno;/* Message sequence number (used to eliminate duplicated messages) */
9091
} MtmArbiterMessage;
9192

9293
typedef struct
@@ -112,6 +113,7 @@ static int busy_socket;
112113
static void MtmTransSender(Datum arg);
113114
static void MtmTransReceiver(Datum arg);
114115
static void MtmSendHeartbeat(void);
116+
static bool MtmSendToNode(int node, void const* buf, int size);
115117

116118

117119
static char const* const messageText[] =
@@ -248,6 +250,7 @@ static bool MtmWriteSocket(int sd, void const* buf, int size)
248250
if (rc == 1) {
249251
int n = send(sd, src, size, 0);
250252
if (n < 0) {
253+
Assert(errno != EINTR); /* should not happen in non-blocking call */
251254
busy_socket = -1;
252255
return false;
253256
}
@@ -266,6 +269,7 @@ static int MtmReadSocket(int sd, void* buf, int buf_size)
266269
{
267270
int rc = recv(sd, buf, buf_size, 0);
268271
if (rc <= 0) {
272+
Assert(errno != EINTR); /* should not happen in non-blocking call */
269273
return -1;
270274
}
271275
return rc;
@@ -346,9 +350,8 @@ static void MtmSendHeartbeat()
346350
{
347351
if (sockets[i] >= 0 && sockets[i] != busy_socket && !BIT_CHECK(Mtm->disabledNodeMask|Mtm->reconnectMask, i))
348352
{
349-
size_t rc = send(sockets[i], &msg, sizeof(msg), 0);
350-
if ((size_t)rc != sizeof(msg)) {
351-
elog(LOG, "Failed to send heartbeat to node %d: %d", i+1, errno);
353+
if (!MtmSendToNode(i, &msg, sizeof(msg))) {
354+
elog(LOG, "Arbiter failed to send heartbeat to node %d", i+1);
352355
}
353356
}
354357
}
@@ -629,6 +632,7 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, TransactionId xid, int node, Mt
629632
MTM_LOG3("Send %s message CSN=%ld to node %d from node %d for global transaction %d/local transaction %d",
630633
messageText[ts->cmd], ts->csn, node+1, MtmNodeId, ts->gtid.xid, ts->xid);
631634
Assert(ts->cmd != MSG_INVALID);
635+
buf->data[buf->used].seqno = ++Mtm->nodes[node].sendSeqNo;
632636
buf->data[buf->used].code = ts->cmd;
633637
buf->data[buf->used].sxid = ts->xid;
634638
buf->data[buf->used].csn = ts->csn;
@@ -845,10 +849,17 @@ static void MtmTransReceiver(Datum arg)
845849
elog(WARNING, "Ignore message from dead node %d\n", msg->node);
846850
continue;
847851
}
852+
if (msg->seqno <= Mtm->nodes[msg->node-1].recvSeqNo) {
853+
elog(WARNING, "Ignore duplicated message %ld from node %d", msg->seqno, msg->node);
854+
continue;
855+
}
856+
Mtm->nodes[msg->node-1].recvSeqNo = msg->seqno;
848857

849858
ts = (MtmTransState*)hash_search(MtmXid2State, &msg->dxid, HASH_FIND, NULL);
850-
Assert(ts != NULL);
851-
859+
if (ts == NULL) {
860+
elog(WARNING, "Ignore response for unexisted transaction %d from node %d", msg->dxid, msg->node);
861+
continue;
862+
}
852863
if (BIT_CHECK(msg->disabledNodeMask, MtmNodeId-1) && Mtm->status != MTM_RECOVERY) {
853864
elog(PANIC, "Node %d thinks that I was dead: perform hara-kiri not to be a zombie", msg->node);
854865
}

contrib/mmts/multimaster.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,10 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
811811
{
812812
MtmTransState* ts;
813813

814+
if (!x->isDistributed) {
815+
return;
816+
}
817+
814818
if (Mtm->inject2PCError == 2) {
815819
Mtm->inject2PCError = 0;
816820
elog(ERROR, "ERROR INJECTION for transaction %d (%s)", x->xid, x->gid);
@@ -1654,6 +1658,8 @@ static void MtmInitialize()
16541658
Mtm->nodes[i].con = MtmConnections[i];
16551659
Mtm->nodes[i].flushPos = 0;
16561660
Mtm->nodes[i].lastHeartbeat = 0;
1661+
Mtm->nodes[i].sendSeqNo = 0;
1662+
Mtm->nodes[i].recvSeqNo = 0;
16571663
}
16581664
PGSemaphoreCreate(&Mtm->votingSemaphore);
16591665
PGSemaphoreReset(&Mtm->votingSemaphore);

contrib/mmts/multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,9 @@ typedef struct
133133
int senderPid;
134134
int receiverPid;
135135
XLogRecPtr flushPos;
136-
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
136+
csn_t oldestSnapshot; /* Oldest snapshot used by active transactions at this node */
137+
uint64 sendSeqNo;
138+
uint64 recvSeqNo;
137139
} MtmNodeInfo;
138140

139141
typedef struct MtmTransState

contrib/mmts/pglogical_receiver.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "spill.h"
4141

4242
#define ERRCODE_DUPLICATE_OBJECT_STR "42710"
43+
#define RECEIVER_SUSPEND_TIMEOUT (10*USECS_PER_SEC)
4344

4445
/* Signal handling */
4546
static volatile sig_atomic_t got_sigterm = false;
@@ -355,8 +356,9 @@ pglogical_receiver_main(Datum main_arg)
355356
proc_exit(1);
356357

357358
if (Mtm->status == MTM_OFFLINE || (Mtm->status == MTM_RECOVERY && Mtm->recoverySlot != nodeId)) {
358-
ereport(LOG, (errmsg("%s: terminating WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
359-
proc_exit(0);
359+
ereport(LOG, (errmsg("%s: suspending WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
360+
MtmSleep(RECEIVER_SUSPEND_TIMEOUT);
361+
continue;
360362
}
361363

362364

contrib/raftable/worker.c

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -491,32 +491,19 @@ static void worker_main(Datum arg)
491491
CHECK_FOR_INTERRUPTS();
492492
}
493493
elog(LOG, "Raftable worker stopped");
494-
exit(1);
494+
exit(1); /* automatically restart raftable */
495495
}
496496

497-
static BackgroundWorker RaftableWorker = {
498-
"raftable-worker",
499-
BGWORKER_SHMEM_ACCESS, /* do not need connection to the database */
500-
BgWorkerStart_ConsistentState,
501-
1,
502-
worker_main
503-
};
504-
505497
void worker_register(WorkerConfig *cfg)
506498
{
507-
#if 0
508-
BackgroundWorker worker = {};
499+
BackgroundWorker worker;
509500
strcpy(worker.bgw_name, "raftable worker");
510501
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
511-
worker.bgw_start_time = BgWorkerStart_ConsistentState;/*BgWorkerStart_PostmasterStart;*/
502+
worker.bgw_start_time = BgWorkerStart_ConsistentState;
512503
worker.bgw_restart_time = 1;
513504
worker.bgw_main = worker_main;
514505
worker.bgw_main_arg = PointerGetDatum(cfg);
515506
RegisterBackgroundWorker(&worker);
516-
#else
517-
RaftableWorker.bgw_main_arg = PointerGetDatum(cfg);
518-
RegisterBackgroundWorker(&RaftableWorker);
519-
#endif
520507
}
521508

522509

0 commit comments

Comments
 (0)