Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5282ad8

Browse files
knizhnikkelvich
authored andcommitted
Avoid redundand sending of relation to replicas
1 parent 38f35b9 commit 5282ad8

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

pglogical_apply.c

+12-3
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ typedef struct TupleData
6262
bool changed[MaxTupleAttributeNumber];
6363
} TupleData;
6464

65+
6566
static Relation read_rel(StringInfo s, LOCKMODE mode);
6667
static void read_tuple_parts(StringInfo s, Relation rel, TupleData *tup);
6768
static EState* create_rel_estate(Relation rel);
@@ -581,6 +582,15 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
581582
}
582583
}
583584

585+
static void
586+
close_rel(Relation rel)
587+
{
588+
if (rel != NULL)
589+
{
590+
heap_close(rel, NoLock);
591+
}
592+
}
593+
584594
static Relation
585595
read_rel(StringInfo s, LOCKMODE mode)
586596
{
@@ -823,7 +833,6 @@ process_remote_insert(StringInfo s, Relation rel)
823833
MtmMakeRelationLocal(RelationGetRelid(rel));
824834
}
825835

826-
heap_close(rel, NoLock);
827836
ExecResetTupleTable(estate->es_tupleTable, true);
828837
FreeExecutorState(estate);
829838

@@ -944,7 +953,6 @@ process_remote_update(StringInfo s, Relation rel)
944953

945954
/* release locks upon commit */
946955
index_close(idxrel, NoLock);
947-
heap_close(rel, NoLock);
948956

949957
ExecResetTupleTable(estate->es_tupleTable, true);
950958
FreeExecutorState(estate);
@@ -1019,7 +1027,6 @@ process_remote_delete(StringInfo s, Relation rel)
10191027
PopActiveSnapshot();
10201028

10211029
index_close(idxrel, NoLock);
1022-
heap_close(rel, NoLock);
10231030

10241031
ExecResetTupleTable(estate->es_tupleTable, true);
10251032
FreeExecutorState(estate);
@@ -1071,6 +1078,7 @@ void MtmExecutor(void* work, size_t size)
10711078
}
10721079
/* COMMIT */
10731080
case 'C':
1081+
close_rel(rel);
10741082
process_remote_commit(&s);
10751083
break;
10761084
/* INSERT */
@@ -1086,6 +1094,7 @@ void MtmExecutor(void* work, size_t size)
10861094
process_remote_delete(&s, rel);
10871095
continue;
10881096
case 'R':
1097+
close_rel(rel);
10891098
rel = read_rel(&s, RowExclusiveLock);
10901099
continue;
10911100
case 'F':

pglogical_proto.c

+7
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ static bool MtmIsFilteredTxn;
4343
static TransactionId MtmCurrentXid;
4444
static bool DDLInProgress = false;
4545
static Oid MtmSenderTID; /* transaction identifier for WAL sender */
46+
static Oid MtmLastRelId; /* last relation ID sent to the receiver in this transaction */
4647

4748
static void pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel);
4849

@@ -95,6 +96,11 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
9596

9697
relid = RelationGetRelid(rel);
9798

99+
if (relid == MtmLastRelId) {
100+
return;
101+
}
102+
MtmLastRelId = relid;
103+
98104
pq_sendbyte(out, 'R'); /* sending RELATION */
99105
pq_sendint(out, relid, sizeof relid); /* use Oid as relation identifier */
100106

@@ -143,6 +149,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
143149
pglogical_relid_map_reset();
144150
MtmSenderTID += 1; /* skip InvalidOid */
145151
}
152+
MtmLastRelId = InvalidOid;
146153
MtmCurrentXid = txn->xid;
147154
MtmIsFilteredTxn = false;
148155
MTM_LOG3("%d: pglogical_write_begin XID=%d node=%d CSN=%lld recovery=%d restart_decoding_lsn=%llx first_lsn=%llx end_lsn=%llx confirmed_flush=%llx",

0 commit comments

Comments
 (0)