Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b3f24e7

Browse files
knizhnikkelvich
authored andcommitted
Fix memory leak in MMTS
1 parent dc5ae4a commit b3f24e7

6 files changed

+92
-41
lines changed

decoder_raw.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
108108
Assert(lastXid != txn->xid);
109109
lastXid = txn->xid;
110110
if (MMIsLocalTransaction(txn->xid)) {
111-
MTM_INFO("Skip local transaction %u\n", txn->xid);
111+
MTM_TRACE("Skip local transaction %u\n", txn->xid);
112112
data->isLocal = true;
113113
} else {
114114
OutputPluginPrepareWrite(ctx, true);
115-
MTM_INFO("Send transaction %u to replica\n", txn->xid);
115+
MTM_TRACE("Send transaction %u to replica\n", txn->xid);
116116
appendStringInfo(ctx->out, "BEGIN %u;", txn->xid);
117117
OutputPluginWrite(ctx, true);
118118
data->isLocal = false;
@@ -126,12 +126,12 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
126126
{
127127
DecoderRawData *data = ctx->output_plugin_private;
128128
if (!data->isLocal) {
129-
MTM_INFO("Send commit of transaction %u to replica\n", txn->xid);
129+
MTM_TRACE("Send commit of transaction %u to replica\n", txn->xid);
130130
OutputPluginPrepareWrite(ctx, true);
131131
appendStringInfoString(ctx->out, "COMMIT;");
132132
OutputPluginWrite(ctx, true);
133133
} else {
134-
MTM_INFO("Skip commit of transaction %u\n", txn->xid);
134+
MTM_TRACE("Skip commit of transaction %u\n", txn->xid);
135135
}
136136
}
137137

@@ -483,10 +483,10 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
483483

484484
data = ctx->output_plugin_private;
485485
if (data->isLocal) {
486-
MTM_INFO("Skip action %d in transaction %u\n", change->action, txn->xid);
486+
MTM_TRACE("Skip action %d in transaction %u\n", change->action, txn->xid);
487487
return;
488488
}
489-
MTM_INFO("Send action %d in transaction %u to replica\n", change->action, txn->xid);
489+
MTM_TRACE("Send action %d in transaction %u to replica\n", change->action, txn->xid);
490490

491491
/* Avoid leaking memory by using and resetting our own context */
492492
old = MemoryContextSwitchTo(data->context);

multimaster.c

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,9 @@ MtmResetTransaction(MtmCurrentTrans* x)
527527
x->xid = InvalidTransactionId;
528528
x->gtid.xid = InvalidTransactionId;
529529
x->isDistributed = false;
530+
x->isPrepared = false;
531+
x->isPrepared = false;
532+
x->status = TRANSACTION_STATUS_UNKNOWN;
530533
}
531534

532535
static void
@@ -625,16 +628,15 @@ static void
625628
MtmPostPrepareTransaction(MtmCurrentTrans* x)
626629
{
627630
MtmTransState* ts;
628-
MtmTransMap* tm;
629631

630632
MtmLock(LW_EXCLUSIVE);
631633
ts = hash_search(MtmXid2State, &x->xid, HASH_FIND, NULL);
632634
Assert(ts != NULL);
633-
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
634-
Assert(x->gid[0]);
635-
tm->state = ts;
636635

637636
if (!MtmIsCoordinator(ts)) {
637+
MtmTransMap* tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_ENTER, NULL);
638+
Assert(x->gid[0]);
639+
tm->state = ts;
638640
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
639641
MtmUnlock();
640642
MtmResetTransaction(x);
@@ -658,21 +660,23 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
658660
{
659661
MtmTransMap* tm;
660662

661-
MtmLock(LW_EXCLUSIVE);
662-
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
663-
Assert(tm != NULL);
664-
tm->state->status = TRANSACTION_STATUS_ABORTED;
665-
MtmAdjustSubtransactions(tm->state);
666-
MtmUnlock();
667-
MtmResetTransaction(x);
663+
if (x->status != TRANSACTION_STATUS_ABORTED) {
664+
MtmLock(LW_EXCLUSIVE);
665+
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
666+
Assert(tm != NULL);
667+
tm->state->status = TRANSACTION_STATUS_ABORTED;
668+
MtmAdjustSubtransactions(tm->state);
669+
MtmUnlock();
670+
x->status = TRANSACTION_STATUS_ABORTED;
671+
}
668672
}
669673

670674
static void
671675
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
672676
{
673677
MTM_TRACE("%d: End transaction %d, prepared=%d, replicated=%d, distributed=%d, gid=%s -> %s\n",
674678
MyProcPid, x->xid, x->isPrepared, x->isReplicated, x->isDistributed, x->gid, commit ? "commit" : "abort");
675-
if (x->isDistributed && (x->isPrepared || x->isReplicated)) {
679+
if (x->status != TRANSACTION_STATUS_ABORTED && x->isDistributed && (x->isPrepared || x->isReplicated)) {
676680
MtmTransState* ts = NULL;
677681
MtmLock(LW_EXCLUSIVE);
678682
if (x->isPrepared) {
@@ -690,6 +694,10 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
690694
if (commit) {
691695
Assert(ts->status == TRANSACTION_STATUS_UNKNOWN);
692696
ts->status = TRANSACTION_STATUS_COMMITTED;
697+
if (x->csn > ts->csn) {
698+
ts->csn = x->csn;
699+
MtmSyncClock(ts->csn);
700+
}
693701
} else {
694702
ts->status = TRANSACTION_STATUS_ABORTED;
695703
}
@@ -799,6 +807,26 @@ XidStatus MtmGetGlobalTransactionStatus(char const* gid)
799807
return status;
800808
}
801809

810+
void MtmSetCurrentTransactionCSN(csn_t csn)
811+
{
812+
MTM_TRACE("Set current transaction CSN %ld\n", csn);
813+
dtmTx.csn = csn;
814+
dtmTx.isDistributed = true;
815+
dtmTx.isReplicated = true;
816+
}
817+
818+
819+
csn_t MtmGetTransactionCSN(TransactionId xid)
820+
{
821+
MtmTransState* ts;
822+
csn_t csn;
823+
MtmLock(LW_SHARED);
824+
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
825+
Assert(ts != NULL);
826+
csn = ts->csn;
827+
MtmUnlock();
828+
return csn;
829+
}
802830

803831
/*
804832
* -------------------------------------------

multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1111
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1212
*/
13+
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1314
#define MTM_TRACE(fmt, ...)
14-
#define MTM_TUPLE_TRACE(fmt, ...)
1515
/* */
1616

1717
#define MULTIMASTER_NAME "multimaster"
@@ -162,6 +162,8 @@ extern MtmState* MtmGetState(void);
162162
extern timestamp_t MtmGetCurrentTime(void);
163163
extern void MtmSleep(timestamp_t interval);
164164
extern void MtmSetCurrentTransactionGID(char const* gid);
165+
extern csn_t MtmGetTransactionCSN(TransactionId xid);
166+
extern void MtmSetCurrentTransactionCSN(csn_t csn);
165167
extern TransactionId MtmGetCurrentTransactionId(void);
166168
extern XidStatus MtmGetCurrentTransactionStatus(void);
167169
extern XidStatus MtmGetGlobalTransactionStatus(char const* gid);

pglogical_apply.c

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -465,20 +465,37 @@ read_rel(StringInfo s, LOCKMODE mode)
465465
}
466466

467467
static void
468-
MtmSetCurrentSession(int nodeId)
468+
MtmBeginSession(int nodeId)
469469
{
470+
#if 0
470471
char slot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
471472
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, nodeId);
473+
Assert(replorigin_session_origin == InvalidRepOriginId);
472474
replorigin_session_origin = replorigin_by_name(slot_name, false);
475+
MTM_INFO("%d: Begin setup replorigin session: %d\n", MyProcPid, replorigin_session_origin);
473476
replorigin_session_setup(replorigin_session_origin);
477+
MTM_INFO("%d: End setup replorigin session: %d\n", MyProcPid, replorigin_session_origin);
478+
#endif
479+
}
480+
481+
static void
482+
MtmEndSession(void)
483+
{
484+
if (replorigin_session_origin != InvalidRepOriginId) {
485+
MTM_INFO("%d: Begin reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
486+
replorigin_session_origin = InvalidRepOriginId;
487+
replorigin_session_reset();
488+
MTM_INFO("%d: End reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
489+
}
474490
}
475491

476492
static void
477493
process_remote_commit(StringInfo in)
478494
{
479-
uint8 flags;
480-
uint8 nodeId;
481-
const char *gid = NULL;
495+
uint8 flags;
496+
uint8 nodeId;
497+
csn_t csn;
498+
const char *gid = NULL;
482499

483500
/* read flags */
484501
flags = pq_getmsgbyte(in);
@@ -489,14 +506,16 @@ process_remote_commit(StringInfo in)
489506
pq_getmsgint64(in); /* end_lsn */
490507
replorigin_session_origin_timestamp = pq_getmsgint64(in); /* commit_time */
491508

509+
Assert(replorigin_session_origin == InvalidRepOriginId);
510+
492511
switch(PGLOGICAL_XACT_EVENT(flags))
493512
{
494513
case PGLOGICAL_COMMIT:
495514
{
496515
MTM_TRACE("%d: PGLOGICAL_COMMIT commit\n", MyProcPid);
497516
if (IsTransactionState()) {
498517
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
499-
MtmSetCurrentSession(nodeId);
518+
MtmBeginSession(nodeId);
500519
CommitTransactionCommand();
501520
}
502521
break;
@@ -510,7 +529,7 @@ process_remote_commit(StringInfo in)
510529
BeginTransactionBlock();
511530
CommitTransactionCommand();
512531
StartTransactionCommand();
513-
MtmSetCurrentSession(nodeId);
532+
MtmBeginSession(nodeId);
514533
/* PREPARE itself */
515534
MtmSetCurrentTransactionGID(gid);
516535
PrepareTransactionBlock(gid);
@@ -520,10 +539,12 @@ process_remote_commit(StringInfo in)
520539
case PGLOGICAL_COMMIT_PREPARED:
521540
{
522541
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
542+
csn = pq_getmsgint64(in);
523543
gid = pq_getmsgstring(in);
524544
MTM_TRACE("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n", MyProcPid, csn, gid);
525545
StartTransactionCommand();
526-
MtmSetCurrentSession(nodeId);
546+
MtmBeginSession(nodeId);
547+
MtmSetCurrentTransactionCSN(csn);
527548
MtmSetCurrentTransactionGID(gid);
528549
FinishPreparedTransaction(gid, true);
529550
CommitTransactionCommand();
@@ -545,8 +566,7 @@ process_remote_commit(StringInfo in)
545566
default:
546567
Assert(false);
547568
}
548-
replorigin_session_reset();
549-
replorigin_session_origin = InvalidRepOriginId;
569+
MtmEndSession();
550570
}
551571

552572
static void
@@ -859,10 +879,10 @@ void MtmExecutor(int id, void* work, size_t size)
859879
{
860880
StringInfoData s;
861881
Relation rel = NULL;
862-
initStringInfo(&s);
863882
s.data = work;
864883
s.len = size;
865884
s.maxlen = -1;
885+
s.cursor = 0;
866886

867887
if (ApplyContext == NULL) {
868888
ApplyContext = AllocSetContextCreate(TopMemoryContext,
@@ -910,12 +930,10 @@ void MtmExecutor(int id, void* work, size_t size)
910930
}
911931
PG_CATCH();
912932
{
913-
if (replorigin_session_origin != InvalidRepOriginId) {
914-
replorigin_session_reset();
915-
}
916933
EmitErrorReport();
917934
FlushErrorState();
918935
MTM_TRACE("%d: REMOTE begin abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
936+
MtmEndSession();
919937
AbortCurrentTransaction();
920938
MTM_TRACE("%d: REMOTE end abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
921939
}

pglogical_proto.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
169169
pq_sendint64(out, txn->end_lsn);
170170
pq_sendint64(out, txn->commit_time);
171171

172+
if (flags == PGLOGICAL_COMMIT_PREPARED) {
173+
pq_sendint64(out, MtmGetTransactionCSN(txn->xid));
174+
}
172175
if (flags != PGLOGICAL_COMMIT) {
173176
pq_sendstring(out, txn->gid);
174177
}

pglogical_receiver.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ pglogical_receiver_main(Datum main_arg)
214214
XLogRecPtr originStartPos = 0;
215215
RepOriginId originId;
216216
char* originName;
217+
/* Buffer for COPY data */
218+
char *copybuf = NULL;
217219

218220
/* Register functions for SIGTERM/SIGHUP management */
219221
pqsignal(SIGHUP, receiver_raw_sighup);
@@ -314,8 +316,6 @@ pglogical_receiver_main(Datum main_arg)
314316
while (!got_sigterm)
315317
{
316318
int rc, hdr_len;
317-
/* Buffer for COPY data */
318-
char *copybuf = NULL;
319319
/* Wait necessary amount of time */
320320
rc = WaitLatch(&MyProc->procLatch,
321321
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
@@ -347,13 +347,6 @@ pglogical_receiver_main(Datum main_arg)
347347
}
348348

349349

350-
/* Some cleanup */
351-
if (copybuf != NULL)
352-
{
353-
PQfreemem(copybuf);
354-
copybuf = NULL;
355-
}
356-
357350
/*
358351
* Receive data.
359352
*/
@@ -362,6 +355,13 @@ pglogical_receiver_main(Datum main_arg)
362355
XLogRecPtr walEnd;
363356
char* stmt;
364357

358+
/* Some cleanup */
359+
if (copybuf != NULL)
360+
{
361+
PQfreemem(copybuf);
362+
copybuf = NULL;
363+
}
364+
365365
rc = PQgetCopyData(conn, &copybuf, 1);
366366
if (rc <= 0) {
367367
break;

0 commit comments

Comments
 (0)