Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e0539a9

Browse files
knizhnikkelvich
authored andcommitted
eXplicitely set restart LSN
1 parent 7463957 commit e0539a9

6 files changed

+49
-46
lines changed

arbiter.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,14 @@ static void MtmTransReceiver(Datum arg)
912912
switch (msg->code) {
913913
case MSG_READY:
914914
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_READY");
915+
if (ts->status == TRANSACTION_STATUS_COMMITTED) {
916+
elog(WARNING, "Receive READY response for already committed transaction %d from node %d",
917+
ts->xid, msg->node);
918+
continue;
919+
}
915920
if (ts->nVotes >= Mtm->nLiveNodes) {
921+
elog(WARNING, "Receive deteriorated READY response for transaction %d (%s) from node %d",
922+
ts->xid, ts->gid, msg->node);
916923
MtmAbortTransaction(ts);
917924
MtmWakeUpBackend(ts);
918925
} else {
@@ -956,6 +963,8 @@ static void MtmTransReceiver(Datum arg)
956963
case MSG_PREPARED:
957964
MTM_TXTRACE(ts, "MtmTransReceiver got MSG_PREPARED");
958965
if (ts->nVotes >= Mtm->nLiveNodes) {
966+
elog(WARNING, "Receive deteriorated PREPARED response for transaction %d (%s) from node %d",
967+
ts->xid, ts->gid, msg->node);
959968
MtmAbortTransaction(ts);
960969
MtmWakeUpBackend(ts);
961970
} else {

multimaster.c

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -390,16 +390,17 @@ MtmInitializeSequence(int64* start, int64* step)
390390

391391
csn_t MtmTransactionSnapshot(TransactionId xid)
392392
{
393-
MtmTransState* ts;
394393
csn_t snapshot = INVALID_CSN;
395-
394+
396395
MtmLock(LW_SHARED);
397-
ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
398-
if (ts != NULL && !ts->isLocal) {
399-
snapshot = ts->snapshot;
396+
if (Mtm->status == MTM_ONLINE) {
397+
MtmTransState* ts = hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
398+
if (ts != NULL && !ts->isLocal) {
399+
snapshot = ts->snapshot;
400+
Assert(ts->gtid.node == MtmNodeId || MtmIsRecoverySession);
401+
}
400402
}
401403
MtmUnlock();
402-
403404
return snapshot;
404405
}
405406

@@ -1009,7 +1010,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
10091010
}
10101011
}
10111012
if (!commit && x->isReplicated && TransactionIdIsValid(x->gtid.xid)) {
1012-
Assert(Mtm->status != MTM_RECOVERY);
1013+
Assert(Mtm->status != MTM_RECOVERY || Mtm->recoverySlot != MtmNodeId);
10131014
/*
10141015
* Send notification only if ABORT happens during transaction processing at replicas,
10151016
* do not send notification if ABORT is received from master
@@ -2467,29 +2468,32 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
24672468
return REPLMODE_EXIT;
24682469
}
24692470
MTM_LOG2("%d: receiver slot mode %s", MyProcPid, MtmNodeStatusMnem[Mtm->status]);
2471+
MtmLock(LW_EXCLUSIVE);
24702472
if (Mtm->status == MTM_RECOVERY) {
24712473
recovery = true;
24722474
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) {
24732475
/* Choose for recovery first available slot */
2474-
MTM_LOG1("Start recovery from node %d", nodeId);
2476+
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
24752477
Mtm->recoverySlot = nodeId;
24762478
Mtm->nReceivers = 0;
24772479
Mtm->recoveryCount += 1;
24782480
Mtm->pglogicalNodeMask = 0;
2479-
FinishAllPreparedTransactions(false);
24802481
for (i = 0; i < Mtm->nAllNodes; i++) {
2481-
Mtm->nodes[i].restartLsn = 0;
2482+
Mtm->nodes[i].restartLsn = InvalidXLogRecPtr;
24822483
}
2484+
MtmUnlock();
2485+
FinishAllPreparedTransactions(false);
24832486
return REPLMODE_RECOVERY;
24842487
}
24852488
}
2489+
MtmUnlock();
24862490
/* delay opening of other slots until recovery is completed */
24872491
MtmSleep(STATUS_POLL_DELAY);
24882492
}
24892493
if (recovery) {
2490-
MTM_LOG1("Recreate replication slot for node %d after end of recovery", nodeId);
2494+
MTM_LOG1("%d: Restart replication for node %d after end of recovery", MyProcPid, nodeId);
24912495
} else {
2492-
MTM_LOG2("%d: Reuse replication slot for node %d", MyProcPid, nodeId);
2496+
MTM_LOG1("%d: Continue replication slot for node %d", MyProcPid, nodeId);
24932497
}
24942498
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
24952499
return recovery ? REPLMODE_RECOVERED : REPLMODE_NORMAL;

pglogical_apply.c

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ typedef struct TupleData
5959
bool changed[MaxTupleAttributeNumber];
6060
} TupleData;
6161

62-
static int MtmTransactionRecords;
62+
static bool inside_tx = false;
6363

6464
static Relation read_rel(StringInfo s, LOCKMODE mode);
6565
static void read_tuple_parts(StringInfo s, Relation rel, TupleData *tup);
@@ -528,6 +528,8 @@ MtmEndSession(void)
528528
if (replorigin_session_origin != InvalidRepOriginId) {
529529
MTM_LOG2("%d: Begin reset replorigin session for node %d: %d, progress %lx", MyProcPid, MtmReplicationNodeId, replorigin_session_origin, replorigin_session_get_progress(false));
530530
replorigin_session_origin = InvalidRepOriginId;
531+
replorigin_session_origin_lsn = InvalidXLogRecPtr;
532+
replorigin_session_origin_timestamp = 0;
531533
replorigin_session_reset();
532534
if (unlock) {
533535
MtmUnlockNode(MtmReplicationNodeId);
@@ -539,42 +541,25 @@ MtmEndSession(void)
539541
static void
540542
process_remote_commit(StringInfo in)
541543
{
542-
int i;
543544
uint8 flags;
544545
csn_t csn;
545546
const char *gid = NULL;
546547
XLogRecPtr end_lsn;
547548
XLogRecPtr origin_lsn;
548-
RepOriginId originId;
549-
int n_records;
549+
int origin_node;
550550
/* read flags */
551551
flags = pq_getmsgbyte(in);
552552
MtmReplicationNodeId = pq_getmsgbyte(in);
553553

554-
n_records = pq_getmsgint(in, 4);
555-
if (MtmTransactionRecords != n_records) {
556-
elog(ERROR, "Transaction %d flags %d contains %d records instead of %d", MtmGetCurrentTransactionId(), flags, MtmTransactionRecords, n_records);
557-
}
558-
559554
/* read fields */
560555
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
561556
end_lsn = pq_getmsgint64(in); /* end_lsn */
562557
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
563558

564-
originId = (RepOriginId)pq_getmsgint(in, 2);
559+
origin_node = pq_getmsgbyte(in);
565560
origin_lsn = pq_getmsgint64(in);
561+
Mtm->nodes[origin_node-1].restartLsn = origin_lsn;
566562

567-
if (originId != InvalidRepOriginId) {
568-
for (i = 0; i < Mtm->nAllNodes; i++) {
569-
if (Mtm->nodes[i].originId == originId) {
570-
Mtm->nodes[i].restartLsn = origin_lsn;
571-
break;
572-
}
573-
}
574-
if (i == Mtm->nAllNodes) {
575-
elog(WARNING, "Failed to map origin %d", originId);
576-
}
577-
}
578563
Assert(replorigin_session_origin == InvalidRepOriginId);
579564

580565
switch(PGLOGICAL_XACT_EVENT(flags))
@@ -676,8 +661,6 @@ process_remote_insert(StringInfo s, Relation rel)
676661
ScanKey *index_keys;
677662
int i;
678663

679-
MtmTransactionRecords += 1;
680-
681664
estate = create_rel_estate(rel);
682665
newslot = ExecInitExtraTupleSlot(estate);
683666
oldslot = ExecInitExtraTupleSlot(estate);
@@ -776,8 +759,6 @@ process_remote_update(StringInfo s, Relation rel)
776759
ScanKeyData skey[INDEX_MAX_KEYS];
777760
HeapTuple remote_tuple = NULL;
778761

779-
MtmTransactionRecords += 1;
780-
781762
action = pq_getmsgbyte(s);
782763

783764
/* old key present, identifying key changed */
@@ -895,8 +876,6 @@ process_remote_delete(StringInfo s, Relation rel)
895876
ScanKeyData skey[INDEX_MAX_KEYS];
896877
bool found_old;
897878

898-
MtmTransactionRecords += 1;
899-
900879
estate = create_rel_estate(rel);
901880
oldslot = ExecInitExtraTupleSlot(estate);
902881
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
@@ -984,7 +963,6 @@ void MtmExecutor(int id, void* work, size_t size)
984963
}
985964
MemoryContextSwitchTo(ApplyContext);
986965
replorigin_session_origin = InvalidRepOriginId;
987-
MtmTransactionRecords = 0;
988966
PG_TRY();
989967
{
990968
while (true) {

pglogical_proto.c

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,25 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
186186
pq_sendbyte(out, MtmNodeId);
187187

188188
Assert(txn->xact_action != XLOG_XACT_PREPARE || txn->xid < 1000 || MtmTransactionRecords >= 2);
189-
pq_sendint(out, MtmTransactionRecords, 4);
190-
189+
191190
/* send fixed fields */
192191
pq_sendint64(out, commit_lsn);
193192
pq_sendint64(out, txn->end_lsn);
194193
pq_sendint64(out, txn->commit_time);
195194

196-
pq_sendint(out, txn->origin_id, 2);
195+
if (txn->origin_id != InvalidRepOriginId) {
196+
int i;
197+
for (i = 0; i < Mtm->nAllNodes && Mtm->nodes[i].originId != txn->origin_id; i++);
198+
if (i == Mtm->nAllNodes) {
199+
elog(WARNING, "Failed to map origin %d", txn->origin_id);
200+
i = MtmNodeId-1;
201+
} else {
202+
//Assert(i == MtmNodeId-1 || txn->origin_lsn != InvalidXLogRecPtr);
203+
}
204+
pq_sendbyte(out, i+1);
205+
} else {
206+
pq_sendbyte(out, MtmNodeId);
207+
}
197208
pq_sendint64(out, txn->origin_lsn);
198209

199210
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {

pglogical_receiver.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ pglogical_receiver_main(Datum main_arg)
247247
{
248248
int count;
249249
ConnStatusType status;
250-
XLogRecPtr originStartPos = 0;
250+
XLogRecPtr originStartPos = InvalidXLogRecPtr;
251251

252252
/*
253253
* Determine when and how we should open replication slot.
@@ -306,8 +306,9 @@ pglogical_receiver_main(Datum main_arg)
306306
/* Start logical replication at specified position */
307307
if (mode == REPLMODE_RECOVERED) {
308308
originStartPos = Mtm->nodes[nodeId-1].restartLsn;
309+
MTM_LOG1("Restart replication from node %d from position %lx", nodeId, originStartPos);
309310
}
310-
if (originStartPos == 0) {
311+
if (originStartPos == InvalidXLogRecPtr) {
311312
StartTransactionCommand();
312313
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
313314
originId = replorigin_by_name(originName, true);

tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ if [ "$1" = 'postgres' ]; then
6161

6262
############################################################################
6363

64-
CONNSTRS='dbname=postgres host=node1, dbname=postgres host=node2, dbname=postgres host=node3'
64+
CONNSTRS='dbname=postgres user=postgres host=node1, dbname=postgres user=postgres host=node2, dbname=postgres user=postgres host=node3'
6565
RAFT_PEERS='1:node1:6666, 2:node2:6666, 3:node3:6666'
6666

6767
cat <<-EOF >> $PGDATA/postgresql.conf

0 commit comments

Comments
 (0)