Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8e9e4fd

Browse files
knizhnikkelvich
authored andcommitted
Fix bug in flushed_lsn reporting
1 parent 906fcbb commit 8e9e4fd

6 files changed

+80
-58
lines changed

multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ AS 'MODULE_PATHNAME','mtm_get_nodes_state'
3535
LANGUAGE C;
3636

3737
CREATE TYPE mtm.cluster_state AS ("status" text, "disabledNodeMask" bigint, "disconnectedNodeMask" bigint, "catchUpNodeMask" bigint, "liveNodes" integer, "allNodes" integer, "nActiveQueries" integer, "nPendingQueries" integer, "queueSize" bigint, "transCount" bigint, "timeShift" bigint, "recoverySlot" integer,
38-
"xidHashSize" bigint, "gidHashSize" bigint, "oldestSnapshot" bigint, "configChanges" integer);
38+
"xidHashSize" bigint, "gidHashSize" bigint, "oldestXid" integer, "configChanges" integer);
3939

4040
CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
4141
AS 'MODULE_PATHNAME','mtm_get_cluster_state'

multimaster.c

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ static int MtmMinRecoveryLag;
199199
static int MtmMaxRecoveryLag;
200200
static int Mtm2PCPrepareRatio;
201201
static int Mtm2PCMinTimeout;
202+
static int MtmGcPeriod;
202203
static bool MtmIgnoreTablesWithoutPk;
203204

204205
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -341,16 +342,20 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
341342
Snapshot MtmGetSnapshot(Snapshot snapshot)
342343
{
343344
snapshot = PgGetSnapshotData(snapshot);
344-
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
345+
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;
345346
return snapshot;
346347
}
347348

348349

349350
TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum)
350351
{
351352
TransactionId xmin = PgGetOldestXmin(NULL, false); /* consider all backends */
352-
xmin = MtmAdjustOldestXid(xmin);
353-
return xmin;
353+
if (TransactionIdIsValid(xmin)) {
354+
MtmLock(LW_EXCLUSIVE);
355+
xmin = MtmAdjustOldestXid(xmin);
356+
MtmUnlock();
357+
}
358+
return xmin;
354359
}
355360

356361
bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
@@ -445,53 +450,50 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
445450
static TransactionId
446451
MtmAdjustOldestXid(TransactionId xid)
447452
{
448-
if (TransactionIdIsValid(xid)) {
449-
MtmTransState *ts, *prev = NULL;
450-
int i;
451-
452-
MtmLock(LW_EXCLUSIVE);
453-
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
454-
if (ts != NULL) {
455-
csn_t oldestSnapshot = ts->snapshot;
456-
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
457-
for (i = 0; i < Mtm->nAllNodes; i++) {
458-
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
459-
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
460-
{
461-
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
462-
}
463-
}
464-
oldestSnapshot -= MtmVacuumDelay*USECS_PER_SEC;
465-
466-
for (ts = Mtm->transListHead;
467-
ts != NULL
468-
&& ts->csn < oldestSnapshot
469-
&& TransactionIdPrecedes(ts->xid, xid)
470-
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
471-
ts->status == TRANSACTION_STATUS_ABORTED);
472-
prev = ts, ts = ts->next)
453+
int i;
454+
MtmTransState *prev = NULL;
455+
MtmTransState *ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
456+
MTM_LOG1("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d", MyProcPid, xid, ts != NULL ? ts->snapshot : 0, ts != NULL ? ts->csn : 0, ts != NULL ? ts->status : -1);
457+
Mtm->gcCount = 0;
458+
if (ts != NULL) {
459+
csn_t oldestSnapshot = ts->snapshot;
460+
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
461+
for (i = 0; i < Mtm->nAllNodes; i++) {
462+
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
463+
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
473464
{
474-
if (prev != NULL) {
475-
/* Remove information about too old transactions */
476-
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
477-
}
465+
oldestSnapshot = Mtm->nodes[i].oldestSnapshot;
478466
}
479-
}
480-
if (MtmUseDtm)
467+
}
468+
oldestSnapshot -= MtmVacuumDelay*USECS_PER_SEC;
469+
470+
for (ts = Mtm->transListHead;
471+
ts != NULL
472+
&& ts->csn < oldestSnapshot
473+
&& TransactionIdPrecedes(ts->xid, xid)
474+
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
475+
ts->status == TRANSACTION_STATUS_ABORTED);
476+
prev = ts, ts = ts->next)
481477
{
482478
if (prev != NULL) {
483-
Mtm->transListHead = prev;
484-
Mtm->oldestXid = xid = prev->xid;
485-
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
486-
xid = Mtm->oldestXid;
487-
}
488-
} else {
489-
if (prev != NULL) {
490-
Mtm->transListHead = prev;
479+
/* Remove information about too old transactions */
480+
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
491481
}
492482
}
493-
MtmUnlock();
494-
}
483+
}
484+
if (MtmUseDtm)
485+
{
486+
if (prev != NULL) {
487+
Mtm->transListHead = prev;
488+
Mtm->oldestXid = xid = prev->xid;
489+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
490+
xid = Mtm->oldestXid;
491+
}
492+
} else {
493+
if (prev != NULL) {
494+
Mtm->transListHead = prev;
495+
}
496+
}
495497
return xid;
496498
}
497499
/*
@@ -613,7 +615,12 @@ static void
613615
MtmBeginTransaction(MtmCurrentTrans* x)
614616
{
615617
if (x->snapshot == INVALID_CSN) {
616-
MtmLock(LW_EXCLUSIVE);
618+
TransactionId xmin = (Mtm->gcCount >= MtmGcPeriod) ? PgGetOldestXmin(NULL, false) : InvalidTransactionId; /* Get oldest xmin outside critical section */
619+
620+
MtmLock(LW_EXCLUSIVE);
621+
if (TransactionIdIsValid(xmin) && Mtm->gcCount >= MtmGcPeriod) {
622+
MtmAdjustOldestXid(xmin);
623+
}
617624
x->xid = GetCurrentTransactionIdIfAny();
618625
x->isReplicated = false;
619626
x->isDistributed = MtmIsUserTransaction();
@@ -689,7 +696,6 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
689696
}
690697

691698
MtmLock(LW_EXCLUSIVE);
692-
693699
/*
694700
* Check if there is global multimaster lock preventing new transaction from commit to make a chance to wal-senders to catch-up.
695701
* Only "own" transactions are blacked. Transactions replicated from other nodes (including recovered transaction) should be proceeded
@@ -715,8 +721,10 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
715721

716722
x->isPrepared = true;
717723
x->csn = ts->csn;
718-
724+
719725
Mtm->transCount += 1;
726+
Mtm->gcCount += 1;
727+
720728
MtmTransactionListAppend(ts);
721729
MtmAddSubtransactions(ts, subxids, ts->nSubxids);
722730
MTM_LOG3("%d: MtmPrePrepareTransaction prepare commit of %d (gtid.xid=%d, gtid.node=%d, CSN=%ld)",
@@ -1465,8 +1473,9 @@ static void MtmInitialize()
14651473
Mtm->transListHead = NULL;
14661474
Mtm->transListTail = &Mtm->transListHead;
14671475
Mtm->nReceivers = 0;
1468-
Mtm->timeShift = 0;
1476+
Mtm->timeShift = 0;
14691477
Mtm->transCount = 0;
1478+
Mtm->gcCount = 0;
14701479
Mtm->nConfigChanges = 0;
14711480
Mtm->localTablesHashLoaded = false;
14721481
for (i = 0; i < MtmNodes; i++) {
@@ -1599,6 +1608,21 @@ _PG_init(void)
15991608
if (!process_shared_preload_libraries_in_progress)
16001609
return;
16011610

1611+
DefineCustomIntVariable(
1612+
"multimaster.gc_period",
1613+
"Number of distributed transactions after which garbage collection is started",
1614+
"Multimaster is building xid->csn hash map which has to be cleaned to avoid hash overflow. This parameter specifies interval of invoking garbage collector for this map",
1615+
&MtmGcPeriod,
1616+
MTM_HASH_SIZE/10,
1617+
1,
1618+
INT_MAX,
1619+
PGC_BACKEND,
1620+
0,
1621+
NULL,
1622+
NULL,
1623+
NULL
1624+
);
1625+
16021626
DefineCustomIntVariable(
16031627
"multimaster.max_nodes",
16041628
"Maximal number of cluster nodes",
@@ -2338,7 +2362,7 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
23382362
values[11] = Int32GetDatum(Mtm->recoverySlot);
23392363
values[12] = Int64GetDatum(hash_get_num_entries(MtmXid2State));
23402364
values[13] = Int64GetDatum(hash_get_num_entries(MtmGid2State));
2341-
values[14] = Int64GetDatum(Mtm->oldestSnapshot);
2365+
values[14] = Int32GetDatum(Mtm->oldestXid);
23422366
values[15] = Int32GetDatum(Mtm->nConfigChanges);
23432367

23442368
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ typedef struct
180180
MtmTransState** transListTail; /* Tail of L1 list of all finished transactionds, used to append new elements.
181181
This list is expected to be in CSN ascending order, by strict order may be violated */
182182
uint64 transCount; /* Counter of transactions perfromed by this node */
183+
uint64 gcCount; /* Number of global transactions performed since last GC */
183184
BgwPool pool; /* Pool of background workers for applying logical replication patches */
184185
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
185186
} MtmState;

pglogical_apply.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,6 @@ static void process_remote_insert(StringInfo s, Relation rel);
7575
static void process_remote_update(StringInfo s, Relation rel);
7676
static void process_remote_delete(StringInfo s, Relation rel);
7777

78-
static int MtmReplicationNode;
79-
8078
/*
8179
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
8280
*
@@ -481,8 +479,8 @@ static void
481479
MtmBeginSession(void)
482480
{
483481
char slot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
484-
MtmLockNode(MtmReplicationNode);
485-
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, MtmReplicationNode);
482+
MtmLockNode(MtmReplicationNodeId);
483+
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, MtmReplicationNodeId);
486484
Assert(replorigin_session_origin == InvalidRepOriginId);
487485
replorigin_session_origin = replorigin_by_name(slot_name, false);
488486
MTM_LOG3("%d: Begin setup replorigin session: %d", MyProcPid, replorigin_session_origin);
@@ -498,7 +496,7 @@ MtmEndSession(void)
498496
replorigin_session_origin = InvalidRepOriginId;
499497
replorigin_session_reset();
500498
if (unlock) {
501-
MtmUnlockNode(MtmReplicationNode);
499+
MtmUnlockNode(MtmReplicationNodeId);
502500
}
503501
MTM_LOG3("%d: End reset replorigin session: %d", MyProcPid, replorigin_session_origin);
504502
}
@@ -513,7 +511,7 @@ process_remote_commit(StringInfo in)
513511
XLogRecPtr end_lsn;
514512
/* read flags */
515513
flags = pq_getmsgbyte(in);
516-
MtmReplicationNode = pq_getmsgbyte(in);
514+
MtmReplicationNodeId = pq_getmsgbyte(in);
517515

518516
/* read fields */
519517
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */

pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ static volatile sig_atomic_t got_sighup = false;
4747

4848
/* GUC variables */
4949
static int receiver_idle_time = 0;
50-
static bool receiver_sync_mode = false;
50+
static bool receiver_sync_mode = true; /* We need sync mode to have up-to-date values of catalog_xmin in replication slots */
5151

5252
/* Worker name */
5353
static char worker_proc[BGW_MAXLEN];

tests/dtmacid.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,6 @@ int main (int argc, char* argv[])
285285
for (int i = 0; i < cfg.nReaders; i++) {
286286
readers[i].wait();
287287
nSelects += readers[i].selects;
288-
nTransactions += writers[i].transactions;
289288
}
290289

291290
time_t elapsed = getCurrentTime() - start;

0 commit comments

Comments
 (0)