@@ -42,6 +42,7 @@ static int MtmTransactionRecords;
42
42
static bool MtmIsFilteredTxn ;
43
43
static TransactionId MtmCurrentXid ;
44
44
static bool DDLInProgress = false;
45
+ static Oid MtmSenderTID ; /* transaction identifier for WAL sender */
45
46
46
47
static void pglogical_write_rel (StringInfo out , PGLogicalOutputData * data , Relation rel );
47
48
@@ -80,6 +81,7 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
80
81
const char * relname ;
81
82
uint8 relnamelen ;
82
83
Oid relid ;
84
+ Oid tid ;
83
85
84
86
if (MtmIsFilteredTxn ) {
85
87
MTM_LOG2 ("%d: pglogical_write_message filtered" , MyProcPid );
@@ -92,23 +94,32 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
92
94
}
93
95
94
96
relid = RelationGetRelid (rel );
97
+
95
98
pq_sendbyte (out , 'R' ); /* sending RELATION */
96
99
pq_sendint (out , relid , sizeof relid ); /* use Oid as relation identifier */
97
100
98
- nspname = get_namespace_name (rel -> rd_rel -> relnamespace );
99
- if (nspname == NULL )
100
- elog (ERROR , "cache lookup failed for namespace %u" ,
101
+ Assert (MtmSenderTID != InvalidOid );
102
+ tid = pglogical_relid_map_get (relid );
103
+ if (tid == MtmSenderTID ) { /* this relation was already sent in this transaction */
104
+ pq_sendbyte (out , 0 ); /* do not need to send relation namespace and name in this case */
105
+ pq_sendbyte (out , 0 );
106
+ } else {
107
+ pglogical_relid_map_put (relid , MtmSenderTID );
108
+ nspname = get_namespace_name (rel -> rd_rel -> relnamespace );
109
+ if (nspname == NULL )
110
+ elog (ERROR , "cache lookup failed for namespace %u" ,
101
111
rel -> rd_rel -> relnamespace );
102
- nspnamelen = strlen (nspname ) + 1 ;
103
-
104
- relname = NameStr (rel -> rd_rel -> relname );
105
- relnamelen = strlen (relname ) + 1 ;
106
-
107
- pq_sendbyte (out , nspnamelen ); /* schema name length */
108
- pq_sendbytes (out , nspname , nspnamelen );
109
-
110
- pq_sendbyte (out , relnamelen ); /* table name length */
111
- pq_sendbytes (out , relname , relnamelen );
112
+ nspnamelen = strlen (nspname ) + 1 ;
113
+
114
+ relname = NameStr (rel -> rd_rel -> relname );
115
+ relnamelen = strlen (relname ) + 1 ;
116
+
117
+ pq_sendbyte (out , nspnamelen ); /* schema name length */
118
+ pq_sendbytes (out , nspname , nspnamelen );
119
+
120
+ pq_sendbyte (out , relnamelen ); /* table name length */
121
+ pq_sendbytes (out , relname , relnamelen );
122
+ }
112
123
}
113
124
114
125
/*
@@ -128,6 +139,10 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
128
139
MtmIsFilteredTxn = true;
129
140
MTM_LOG2 ("%d: pglogical_write_begin XID=%lld filtered" , MyProcPid , (long64 )txn -> xid );
130
141
} else {
142
+ if (++ MtmSenderTID == InvalidOid ) {
143
+ pglogical_relid_map_reset ();
144
+ MtmSenderTID += 1 ; /* skip InvalidOid */
145
+ }
131
146
MtmCurrentXid = txn -> xid ;
132
147
MtmIsFilteredTxn = false;
133
148
MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%lld recovery=%d restart_decoding_lsn=%llx first_lsn=%llx end_lsn=%llx confirmed_flush=%llx" ,
0 commit comments