Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit abaab48

Browse files
knizhnikkelvich
authored andcommitted
Reset memory context after each iteration of pglogical apply
1 parent d3f6b2d commit abaab48

File tree

4 files changed

+52
-48
lines changed

4 files changed

+52
-48
lines changed

arbiter.c

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -724,9 +724,9 @@ static void MtmSender(Datum arg)
724724
MTM_ELOG(LOG, "Start arbiter sender %d", MyProcPid);
725725
InitializeTimeouts();
726726

727-
signal(SIGINT, SetStop);
728-
signal(SIGQUIT, SetStop);
729-
signal(SIGTERM, SetStop);
727+
pqsignal(SIGINT, SetStop);
728+
pqsignal(SIGQUIT, SetStop);
729+
pqsignal(SIGTERM, SetStop);
730730

731731
/* We're now ready to receive signals */
732732
BackgroundWorkerUnblockSignals();
@@ -803,9 +803,9 @@ static bool MtmRecovery()
803803

804804
static void MtmMonitor(Datum arg)
805805
{
806-
signal(SIGINT, SetStop);
807-
signal(SIGQUIT, SetStop);
808-
signal(SIGTERM, SetStop);
806+
pqsignal(SIGINT, SetStop);
807+
pqsignal(SIGQUIT, SetStop);
808+
pqsignal(SIGTERM, SetStop);
809809

810810
/* We're now ready to receive signals */
811811
BackgroundWorkerUnblockSignals();
@@ -840,9 +840,9 @@ static void MtmReceiver(Datum arg)
840840
max_fd = 0;
841841
#endif
842842

843-
signal(SIGINT, SetStop);
844-
signal(SIGQUIT, SetStop);
845-
signal(SIGTERM, SetStop);
843+
pqsignal(SIGINT, SetStop);
844+
pqsignal(SIGQUIT, SetStop);
845+
pqsignal(SIGTERM, SetStop);
846846

847847
/* We're now ready to receive signals */
848848
BackgroundWorkerUnblockSignals();

bgwpool.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@ static void BgwPoolMainLoop(BgwPool* pool)
3535
MtmIsLogicalReceiver = true;
3636
MtmPool = pool;
3737

38-
signal(SIGINT, BgwShutdownWorker);
39-
signal(SIGQUIT, BgwShutdownWorker);
40-
signal(SIGTERM, BgwShutdownWorker);
38+
pqsignal(SIGINT, BgwShutdownWorker);
39+
pqsignal(SIGQUIT, BgwShutdownWorker);
40+
pqsignal(SIGTERM, BgwShutdownWorker);
4141

4242
BackgroundWorkerUnblockSignals();
4343
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);

multimaster.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -528,8 +528,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
528528
if (ts != NULL /*&& ts->status != TRANSACTION_STATUS_IN_PROGRESS*/)
529529
{
530530
if (ts->csn > MtmTx.snapshot) {
531-
MTM_LOG4("%d: tuple with xid=%d(csn=%lld) is invisible in snapshot %lld",
532-
MyProcPid, xid, ts->csn, MtmTx.snapshot);
531+
MTM_LOG4("%d: tuple with xid=%lld(csn=%lld) is invisible in snapshot %lld",
532+
MyProcPid, (long64)xid, ts->csn, MtmTx.snapshot);
533533
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
534534
MTM_ELOG(WARNING, "Backend %d waits for transaction %s (%llu) status %lld usecs", MyProcPid, ts->gid, (long64)xid, MtmGetSystemTime() - start);
535535
}
@@ -569,8 +569,8 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
569569
else
570570
{
571571
bool invisible = ts->status != TRANSACTION_STATUS_COMMITTED;
572-
MTM_LOG4("%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld",
573-
MyProcPid, xid, ts->csn, invisible ? "rollbacked" : "committed", MtmTx.snapshot);
572+
MTM_LOG4("%d: tuple with xid=%lld(csn= %lld) is %s in snapshot %lld",
573+
MyProcPid, (long64)xid, ts->csn, invisible ? "rollbacked" : "committed", MtmTx.snapshot);
574574
MtmUnlock();
575575
if (MtmGetSystemTime() - start > USECS_PER_SEC) {
576576
MTM_ELOG(WARNING, "Backend %d waits for %s transaction %s (%llu) %lld usecs", MyProcPid, invisible ? "rollbacked" : "committed",
@@ -581,7 +581,7 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
581581
}
582582
else
583583
{
584-
MTM_LOG4("%d: visibility check is skipped for transaction %u in snapshot %llu", MyProcPid, xid, MtmTx.snapshot);
584+
MTM_LOG4("%d: visibility check is skipped for transaction %llu in snapshot %llu", MyProcPid, (long64)xid, MtmTx.snapshot);
585585
MtmUnlock();
586586
return PgXidInMVCCSnapshot(xid, snapshot);
587587
}
@@ -4945,6 +4945,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
49454945
break;
49464946

49474947
case T_DropStmt:
4948+
case T_TruncateStmt:
49484949
{
49494950
DropStmt *stmt = (DropStmt *) parsetree;
49504951
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent)

pglogical_apply.c

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -408,16 +408,13 @@ process_remote_message(StringInfo s)
408408
if (MtmVacuumStmt != NULL) {
409409
ExecVacuum(MtmVacuumStmt, 1);
410410
} else if (MtmIndexStmt != NULL) {
411-
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
412411
Oid relid = RangeVarGetRelidExtended(MtmIndexStmt->relation, ShareUpdateExclusiveLock,
413412
false, false,
414413
NULL,
415414
NULL);
416415
/* Run parse analysis ... */
417416
MtmIndexStmt = transformIndexStmt(relid, MtmIndexStmt, messageBody);
418417

419-
MemoryContextSwitchTo(oldContext);
420-
421418
DefineIndex(relid, /* OID of heap relation */
422419
MtmIndexStmt,
423420
InvalidOid, /* no predefined OID */
@@ -618,6 +615,7 @@ read_rel(StringInfo s, LOCKMODE mode)
618615
RangeVar* rv;
619616
Oid remote_relid = pq_getmsgint(s, 4);
620617
Oid local_relid;
618+
MemoryContext old_context;
621619

622620
local_relid = pglogical_relid_map_get(remote_relid);
623621
if (local_relid == InvalidOid) {
@@ -630,7 +628,9 @@ read_rel(StringInfo s, LOCKMODE mode)
630628
rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
631629

632630
local_relid = RangeVarGetRelidExtended(rv, mode, false, false, NULL, NULL);
631+
old_context = MemoryContextSwitchTo(TopMemoryContext);
633632
pglogical_relid_map_put(remote_relid, local_relid);
633+
MemoryContextSwitchTo(old_context);
634634
} else {
635635
nspnamelen = pq_getmsgbyte(s);
636636
s->cursor += nspnamelen;
@@ -1060,7 +1060,8 @@ void MtmExecutor(void* work, size_t size)
10601060
int spill_file = -1;
10611061
int save_cursor = 0;
10621062
int save_len = 0;
1063-
MemoryContext topContext;
1063+
MemoryContext old_context;
1064+
MemoryContext top_context;
10641065

10651066
s.data = work;
10661067
s.len = size;
@@ -1074,13 +1075,15 @@ void MtmExecutor(void* work, size_t size)
10741075
ALLOCSET_DEFAULT_INITSIZE,
10751076
ALLOCSET_DEFAULT_MAXSIZE);
10761077
}
1077-
topContext = MemoryContextSwitchTo(MtmApplyContext);
1078-
1078+
top_context = MemoryContextSwitchTo(MtmApplyContext);
10791079
replorigin_session_origin = InvalidRepOriginId;
10801080
PG_TRY();
10811081
{
1082-
while (true) {
1082+
bool inside_transaction = true;
1083+
do {
10831084
char action = pq_getmsgbyte(&s);
1085+
old_context = MemoryContextSwitchTo(MtmApplyContext);
1086+
10841087
MTM_LOG2("%d: REMOTE process action %c", MyProcPid, action);
10851088
#if 0
10861089
if (Mtm->status == MTM_RECOVERY) {
@@ -1091,84 +1094,81 @@ void MtmExecutor(void* work, size_t size)
10911094
switch (action) {
10921095
/* BEGIN */
10931096
case 'B':
1094-
if (process_remote_begin(&s)) {
1095-
continue;
1096-
} else {
1097-
break;
1098-
}
1097+
inside_transaction = process_remote_begin(&s);
1098+
break;
10991099
/* COMMIT */
11001100
case 'C':
11011101
close_rel(rel);
11021102
process_remote_commit(&s);
1103+
inside_transaction = false;
11031104
break;
11041105
/* INSERT */
11051106
case 'I':
1106-
process_remote_insert(&s, rel);
1107-
continue;
1107+
process_remote_insert(&s, rel);
1108+
break;
11081109
/* UPDATE */
11091110
case 'U':
11101111
process_remote_update(&s, rel);
1111-
continue;
1112+
break;
11121113
/* DELETE */
11131114
case 'D':
11141115
process_remote_delete(&s, rel);
1115-
continue;
1116+
break;
11161117
case 'R':
11171118
close_rel(rel);
11181119
rel = read_rel(&s, RowExclusiveLock);
1119-
continue;
1120+
break;
11201121
case 'F':
11211122
{
11221123
int node_id = pq_getmsgint(&s, 4);
11231124
int file_id = pq_getmsgint(&s, 4);
11241125
Assert(spill_file < 0);
11251126
spill_file = MtmOpenSpillFile(node_id, file_id);
1126-
continue;
1127+
break;
11271128
}
11281129
case '(':
11291130
{
11301131
size_t size = pq_getmsgint(&s, 4);
1131-
s.data = palloc(size);
1132+
s.data = MemoryContextAlloc(TopMemoryContext, size);
11321133
save_cursor = s.cursor;
11331134
save_len = s.len;
11341135
s.cursor = 0;
11351136
s.len = size;
11361137
MtmReadSpillFile(spill_file, s.data, size);
1137-
continue;
1138+
break;
11381139
}
11391140
case ')':
11401141
{
11411142
pfree(s.data);
11421143
s.data = work;
11431144
s.cursor = save_cursor;
11441145
s.len = save_len;
1145-
continue;
1146+
break;
11461147
}
11471148
case 'M':
11481149
{
1149-
if (process_remote_message(&s)) {
1150-
break;
1151-
}
1152-
continue;
1150+
inside_transaction = !process_remote_message(&s);
1151+
break;
11531152
}
11541153
case 'Z':
11551154
{
11561155
MtmRecoveryCompleted();
1156+
inside_transaction = false;
11571157
break;
11581158
}
11591159
default:
11601160
MTM_ELOG(ERROR, "unknown action of type %c", action);
11611161
}
1162-
break;
1163-
}
1162+
MemoryContextSwitchTo(old_context);
1163+
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1164+
} while (inside_transaction);
11641165
}
11651166
PG_CATCH();
11661167
{
1167-
MemoryContext oldcontext;
11681168
MtmReleaseLock();
1169-
oldcontext = MemoryContextSwitchTo(MtmApplyContext);
1169+
old_context = MemoryContextSwitchTo(MtmApplyContext);
11701170
MtmHandleApplyError();
1171-
MemoryContextSwitchTo(oldcontext);
1171+
MemoryContextSwitchTo(old_context);
11721172
EmitErrorReport();
11731173
FlushErrorState();
11741174
MTM_LOG1("%d: REMOTE begin abort transaction %llu", MyProcPid, (long64)MtmGetCurrentTransactionId());
@@ -1178,12 +1178,15 @@ void MtmExecutor(void* work, size_t size)
11781178
MTM_LOG2("%d: REMOTE end abort transaction %llu", MyProcPid, (long64)MtmGetCurrentTransactionId());
11791179
}
11801180
PG_END_TRY();
1181+
if (s.data != work) {
1182+
pfree(s.data);
1183+
}
11811184
#if 0 /* spill file is expecrted to be closed by tranaction commit or rollback */
11821185
if (spill_file >= 0) {
11831186
MtmCloseSpillFile(spill_file);
11841187
}
11851188
#endif
1189+
MemoryContextSwitchTo(top_context);
11861190
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1187-
MemoryContextSwitchTo(topContext);
11881191
}
11891192

0 commit comments

Comments
 (0)