Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f5abea5

Browse files
knizhnikkelvich
authored andcommitted
Add MtmMaxRecoveryLag
1 parent 37df65e commit f5abea5

File tree

1 file changed

+45
-1
lines changed

1 file changed

+45
-1
lines changed

multimaster.c

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ static int MtmQueueSize;
142142
static int MtmWorkers;
143143
static int MtmVacuumDelay;
144144
static int MtmMinRecoveryLag;
145+
static int MtmMaxRecoveryLag;
145146

146147
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
147148
static ProcessUtility_hook_type PreviousProcessUtilityHook;
@@ -579,6 +580,29 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
579580
MTM_TRACE("%d: MtmPrepareTransaction prepare commit of %d CSN=%ld\n", getpid(), x->xid, ts->csn);
580581
}
581582

583+
static void MtmCheckSlots()
584+
{
585+
if (MtmMaxRecoveryLag != 0 && dtm->disabledNodeMask != 0)
586+
{
587+
int i;
588+
for (i = 0; i < max_replication_slots; i++) {
589+
ReplicationSlot* slot = &ReplicationSlotCtl->replication_slots[i];
590+
int nodeId;
591+
if (slot->in_use
592+
&& sscanf(slot->data.name.data, MULTIMASTER_SLOT_PATTERN, &nodeId) == 1
593+
&& BIT_CHECK(dtm->disabledNodeMask, nodeId-1)
594+
&& slot->data.restart_lsn + MtmMaxRecoveryLag < GetXLogInsertRecPtr())
595+
{
596+
elog(WARNING, "Drop slot for node %d which lag %ld is larger than threshold %d",
597+
nodeId,
598+
GetXLogInsertRecPtr() - slot->data.restart_lsn,
599+
MtmMaxRecoveryLag);
600+
ReplicationSlotDrop(slot->data.name.data);
601+
}
602+
}
603+
}
604+
}
605+
582606
static void
583607
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
584608
{
@@ -594,6 +618,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
594618
x->snapshot = INVALID_CSN;
595619
x->xid = InvalidTransactionId;
596620
x->gtid.xid = InvalidTransactionId;
621+
MtmCheckSlots();
597622
}
598623

599624
void MtmSendNotificationMessage(MtmTransState* ts)
@@ -752,7 +777,8 @@ _PG_init(void)
752777
DefineCustomIntVariable(
753778
"multimaster.min_recovery_lag",
754779
"Minamal lag of WAL-sender performing recovery after which cluster is locked until recovery is completed",
755-
NULL,
780+
"When wal-sender almost catch-up WAL current position we need to stop 'Achilles tortile compeition' and "
781+
"temporary stop commit of new transactions until node will be completely repared",
756782
&MtmMinRecoveryLag,
757783
100000,
758784
1,
@@ -764,6 +790,22 @@ _PG_init(void)
764790
NULL
765791
);
766792

793+
DefineCustomIntVariable(
794+
"multimaster.max_recovery_lag",
795+
"Maximal lag of replication slot of failed node after which this slot is dropped to avoid transaction log overflow",
796+
"Dropping slog makes it not possible to recover node using logical replication mechanism, it will eb ncessary to completely copy content of some other nodes "
797+
"usimg basebackup or similar tool",
798+
&MtmMaxRecoveryLag,
799+
100000000,
800+
0,
801+
INT_MAX,
802+
PGC_BACKEND,
803+
0,
804+
NULL,
805+
NULL,
806+
NULL
807+
);
808+
767809
DefineCustomIntVariable(
768810
"multimaster.vacuum_delay",
769811
"Minimal age of records which can be vacuumed (seconds)",
@@ -944,12 +986,14 @@ _PG_fini(void)
944986
* ***************************************************************************
945987
*/
946988

989+
947990
static void MtmSwitchFromRecoveryToNormalMode()
948991
{
949992
dtm->status = MTM_ONLINE;
950993
/* ??? Something else to do here? */
951994
}
952995

996+
953997
void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t globalSnapshot)
954998
{
955999
csn_t localSnapshot;

0 commit comments

Comments
 (0)