Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d29666d

Browse files
knizhnikkelvich
authored andcommitted
some fixes in recovery
1 parent aface56 commit d29666d

File tree

4 files changed

+49
-32
lines changed

4 files changed

+49
-32
lines changed

multimaster.c

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -635,9 +635,11 @@ MtmBeginTransaction(MtmCurrentTrans* x)
635635
x->isDistributed = MtmIsUserTransaction();
636636
x->isPrepared = false;
637637
x->isTransactionBlock = IsTransactionBlock();
638-
/* Application name can be cahnged usnig PGAPPNAME environment variable */
638+
/* Application name can be changed usnig PGAPPNAME environment variable */
639639
if (!IsBackgroundWorker && x->isDistributed && Mtm->status != MTM_ONLINE && strcmp(application_name, MULTIMASTER_ADMIN) != 0) {
640-
/* reject all user's transactions at offline cluster */
640+
/* Reject all user's transactions at offline cluster.
641+
* Allow execution of transaction by bg-workers to make it possible to perform recovery.
642+
*/
641643
MtmUnlock();
642644
elog(ERROR, "Multimaster node is not online: current status %s", MtmNodeStatusMnem[Mtm->status]);
643645
}
@@ -673,14 +675,17 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
673675
if (Mtm->disabledNodeMask != 0) {
674676
MtmRefreshClusterStatus(true);
675677
if (!IsBackgroundWorker && Mtm->status != MTM_ONLINE) {
676-
elog(ERROR, "Abort current transaction because this cluster node is not online");
678+
/* Do not take in accoutn bg-workers which are performing recovery */
679+
elog(ERROR, "Abort current transaction because this cluster node is in %s status", MtmNodeStatusMnem[Mtm->status]);
677680
}
678681
}
679682

680683
MtmLock(LW_EXCLUSIVE);
681684

682685
/*
683-
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up
686+
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
687+
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
688+
* and should not cause cluster status change.
684689
*/
685690
if (!x->isReplicated) {
686691
MtmCheckClusterLock();
@@ -716,7 +721,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
716721
}
717722
MtmTransactionListAppend(ts);
718723
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
719-
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d CSN=%ld\n", MyProcPid, x->xid, ts->csn);
724+
MTM_TRACE("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)\n",
725+
MyProcPid, x->xid, ts->gtid.xid, ts->gtid.node, ts->csn);
720726
MtmUnlock();
721727

722728
}
@@ -842,14 +848,6 @@ void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd)
842848
}
843849
}
844850

845-
void MtmRecoveryCompleted(void)
846-
{
847-
elog(WARNING, "Recovery of node %d is completed", MtmNodeId);
848-
Mtm->recoverySlot = 0;
849-
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
850-
MtmSwitchClusterMode(MTM_ONLINE);
851-
}
852-
853851
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
854852
{
855853
MtmLock(LW_EXCLUSIVE);
@@ -933,6 +931,18 @@ csn_t MtmGetTransactionCSN(TransactionId xid)
933931
* -------------------------------------------
934932
*/
935933

934+
void MtmRecoveryCompleted(void)
935+
{
936+
elog(WARNING, "Recovery of node %d is completed", MtmNodeId);
937+
MtmLock(LW_EXCLUSIVE);
938+
Mtm->recoverySlot = 0;
939+
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
940+
/* Mode will be changed to online once all locagical reciever are connected */
941+
MtmSwitchClusterMode(MTM_CONNECTED);
942+
MtmUnlock();
943+
}
944+
945+
936946

937947
/**
938948
* Check state of replication slots. If some of them are too much lag behind wal, then drop this slots to avoid
@@ -993,10 +1003,10 @@ bool MtmIsRecoveredNode(int nodeId)
9931003
bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
9941004
{
9951005
bool caughtUp = false;
1006+
MtmLock(LW_EXCLUSIVE);
9961007
if (MtmIsRecoveredNode(nodeId)) {
9971008
XLogRecPtr walLSN = GetXLogInsertRecPtr();
998-
MtmLock(LW_EXCLUSIVE);
999-
if (slotLSN == walLSN) {
1009+
if (slotLSN == walLSN && Mtm->nActiveTransactions == 0) {
10001010
if (BIT_CHECK(Mtm->nodeLockerMask, nodeId-1)) {
10011011
elog(WARNING,"Node %d is caught-up", nodeId);
10021012
BIT_CLEAR(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
@@ -1018,18 +1028,17 @@ bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN)
10181028
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
10191029
* Is there some better way to establish mapping between nodes ad WAL-seconder?
10201030
*/
1021-
elog(WARNING,"Node %d is almost caught-up: lock cluster", nodeId);
1031+
elog(WARNING,"Node %d is almost caught-up: slot position %lx, WAL position %lx, active transactions %d",
1032+
nodeId, slotLSN, walLSN, Mtm->nActiveTransactions);
10221033
Assert(MyWalSnd != NULL); /* This function is called by WAL-sender, so it should not be NULL */
10231034
BIT_SET(Mtm->nodeLockerMask, nodeId-1);
10241035
BIT_SET(Mtm->walSenderLockerMask, MyWalSnd - WalSndCtl->walsnds);
10251036
Mtm->nLockers += 1;
10261037
} else {
10271038
MTM_INFO("Continue recovery of node %d, slot position %lx, WAL position %lx, WAL sender position %lx, lockers %d, active transactions %d\n", nodeId, slotLSN, walLSN, MyWalSnd->sentPtr, Mtm->nLockers, Mtm->nActiveTransactions);
10281039
}
1029-
MtmUnlock();
1030-
} else {
1031-
MTM_INFO("Node %d is not in recovery mode\n", nodeId);
10321040
}
1041+
MtmUnlock();
10331042
return caughtUp;
10341043
}
10351044

@@ -1044,7 +1053,7 @@ void MtmSwitchClusterMode(MtmNodeStatus mode)
10441053
/*
10451054
* If there are recovering nodes which are catching-up WAL, check the status and prevent new transaction from commit to give
10461055
* WAL-sender a chance to catch-up WAL, completely synchronize replica and switch it to normal mode.
1047-
* This function is called at transaction start with multimaster lock set
1056+
* This function is called before transaction prepare with multimaster lock set.
10481057
*/
10491058
static void
10501059
MtmCheckClusterLock()
@@ -1071,8 +1080,8 @@ MtmCheckClusterLock()
10711080
}
10721081
}
10731082
if (mask != 0) {
1074-
/* some "almost catch-up" wal-senders are still working */
1075-
/* Do not start new transactions until them complete */
1083+
/* some "almost catch-up" wal-senders are still working. */
1084+
/* Do not start new transactions until them are completed. */
10761085
MtmUnlock();
10771086
MtmSleep(delay);
10781087
if (delay*2 <= MAX_WAIT_TIMEOUT) {
@@ -1215,6 +1224,7 @@ void MtmOnNodeDisconnect(int nodeId)
12151224
void MtmOnNodeConnect(int nodeId)
12161225
{
12171226
BIT_CLEAR(Mtm->connectivityMask, nodeId-1);
1227+
elog(NOTICE, "Reconnect node %d", nodeId);
12181228
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
12191229
}
12201230

@@ -1645,19 +1655,23 @@ _PG_fini(void)
16451655
}
16461656

16471657

1648-
1658+
/*
1659+
* This functions is called by pglogical receiver main function when receiver background worker is started.
1660+
* We switch to ONLINE mode when all receviers are connected.
1661+
* As far as background worker can be restarted multiple times, use node bitmask.
1662+
*/
16491663
void MtmReceiverStarted(int nodeId)
16501664
{
1651-
SpinLockAcquire(&Mtm->spinlock);
1665+
MtmLock(LW_EXCLUSIVE);
16521666
if (!BIT_CHECK(Mtm->pglogicalNodeMask, nodeId-1)) {
16531667
BIT_SET(Mtm->pglogicalNodeMask, nodeId-1);
16541668
if (++Mtm->nReceivers == Mtm->nNodes-1) {
16551669
if (Mtm->status == MTM_CONNECTED) {
16561670
MtmSwitchClusterMode(MTM_ONLINE);
16571671
}
16581672
}
1659-
}
1660-
SpinLockRelease(&Mtm->spinlock);
1673+
}
1674+
MtmUnlock();
16611675
}
16621676

16631677
/*

multimaster.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ typedef uint64 csn_t; /* commit serial number */
4545

4646
#define PGLOGICAL_XACT_EVENT(flags) (flags & 0x03)
4747

48+
#define PGLOGICAL_CAUGHT_UP 0x04
49+
50+
4851
typedef uint64 timestamp_t;
4952

5053
/* Identifier of global transaction */

pglogical_apply.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,10 @@ process_remote_commit(StringInfo in)
497497
uint8 flags;
498498
csn_t csn;
499499
const char *gid = NULL;
500-
bool caughtUp;
501500

502501
/* read flags */
503502
flags = pq_getmsgbyte(in);
504503
MtmReplicationNode = pq_getmsgbyte(in);
505-
caughtUp = pq_getmsgbyte(in) != 0;
506504

507505
/* read fields */
508506
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
@@ -571,7 +569,7 @@ process_remote_commit(StringInfo in)
571569
Assert(false);
572570
}
573571
MtmEndSession(true);
574-
if (caughtUp) {
572+
if (flags & PGLOGICAL_CAUGHT_UP) {
575573
MtmRecoveryCompleted();
576574
}
577575
}

pglogical_proto.c

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
103103
{
104104
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
105105
csn_t csn = MtmTransactionSnapshot(txn->xid);
106-
MTM_INFO("%d: pglogical_write_begin %d CSN=%ld\n", MyProcPid, txn->xid, csn);
106+
MTM_INFO("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d\n", MyProcPid, txn->xid, MtmReplicationNodeId, csn, isRecovery);
107107

108108
if (csn == INVALID_CSN && !isRecovery) {
109109
MtmIsFilteredTxn = true;
@@ -124,7 +124,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
124124
ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
125125
{
126126
uint8 flags = 0;
127-
127+
128128
if (txn->xact_action == XLOG_XACT_COMMIT)
129129
flags = PGLOGICAL_COMMIT;
130130
else if (txn->xact_action == XLOG_XACT_PREPARE)
@@ -146,6 +146,9 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
146146
if (csn == INVALID_CSN && !isRecovery) {
147147
return;
148148
}
149+
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {
150+
flags |= PGLOGICAL_CAUGHT_UP;
151+
}
149152
}
150153
pq_sendbyte(out, 'C'); /* sending COMMIT */
151154

@@ -154,7 +157,6 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
154157
/* send the flags field */
155158
pq_sendbyte(out, flags);
156159
pq_sendbyte(out, MtmNodeId);
157-
pq_sendbyte(out, MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn));
158160

159161
/* send fixed fields */
160162
pq_sendint64(out, commit_lsn);

0 commit comments

Comments
 (0)