39
39
#include "pglogical_relid_map.h"
40
40
41
41
static int MtmTransactionRecords ;
42
+ static bool MtmIsFilteredTxn ;
42
43
static TransactionId MtmCurrentXid ;
43
- static bool DDLInProress = false;
44
+ static bool DDLInProgress = false;
44
45
45
46
static void pglogical_write_rel (StringInfo out , PGLogicalOutputData * data , Relation rel );
46
47
@@ -76,13 +77,13 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
76
77
uint8 relnamelen ;
77
78
Oid relid ;
78
79
79
- if (MtmTransactionSnapshot ( MtmCurrentXid ) == INVALID_CSN ) {
80
+ if (MtmIsFilteredTxn ) {
80
81
MTM_LOG2 ("%d: pglogical_write_message filtered" , MyProcPid );
81
82
return ;
82
83
}
83
84
84
- if (DDLInProress ) {
85
- MTM_LOG2 ("%d: pglogical_write_message filtered DDLInProress " , MyProcPid );
85
+ if (DDLInProgress ) {
86
+ MTM_LOG2 ("%d: pglogical_write_message filtered DDLInProgress " , MyProcPid );
86
87
return ;
87
88
}
88
89
@@ -116,17 +117,22 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
116
117
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
117
118
csn_t csn = MtmTransactionSnapshot (txn -> xid );
118
119
119
- MtmCurrentXid = txn -> xid ;
120
-
121
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
122
- MyProcPid , txn -> xid , MtmReplicationNodeId , csn , isRecovery , txn -> restart_decoding_lsn , txn -> first_lsn , txn -> end_lsn , MyReplicationSlot -> data .confirmed_flush );
123
-
124
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" , MyProcPid , txn -> xid );
125
- pq_sendbyte (out , 'B' ); /* BEGIN */
126
- pq_sendint (out , MtmNodeId , 4 );
127
- pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
128
- pq_sendint64 (out , csn );
129
- MtmTransactionRecords = 0 ;
120
+ if (!isRecovery && csn == INVALID_CSN ) {
121
+ MtmIsFilteredTxn = true;
122
+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d filtered" , MyProcPid , txn -> xid );
123
+ } else {
124
+ MtmCurrentXid = txn -> xid ;
125
+ MtmIsFilteredTxn = false;
126
+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
127
+ MyProcPid , txn -> xid , MtmReplicationNodeId , csn , isRecovery , txn -> restart_decoding_lsn , txn -> first_lsn , txn -> end_lsn , MyReplicationSlot -> data .confirmed_flush );
128
+
129
+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" , MyProcPid , txn -> xid );
130
+ pq_sendbyte (out , 'B' ); /* BEGIN */
131
+ pq_sendint (out , MtmNodeId , 4 );
132
+ pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
133
+ pq_sendint64 (out , csn );
134
+ MtmTransactionRecords = 0 ;
135
+ }
130
136
}
131
137
132
138
static void
@@ -142,14 +148,14 @@ pglogical_write_message(StringInfo out,
142
148
}
143
149
break ;
144
150
case 'D' :
145
- if (MtmTransactionSnapshot ( MtmCurrentXid ) == INVALID_CSN ) {
151
+ if (MtmIsFilteredTxn ) {
146
152
MTM_LOG2 ("%d: pglogical_write_message filtered" , MyProcPid );
147
153
return ;
148
154
}
149
- DDLInProress = true;
155
+ DDLInProgress = true;
150
156
break ;
151
157
case 'E' :
152
- DDLInProress = false;
158
+ DDLInProgress = false;
153
159
/*
154
160
* we use End message only as indicator of DDL transaction finish,
155
161
* so no need to send that to replicas.
@@ -187,11 +193,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
187
193
Assert (false);
188
194
189
195
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
190
- //Assert(txn->xid < 1000 || MtmTransactionRecords != 1);
191
- // if (MtmIsFilteredTxn) {
192
- // Assert(MtmTransactionRecords == 0);
193
- // return;
194
- // }
196
+ if (MtmIsFilteredTxn ) {
197
+ Assert (MtmTransactionRecords == 0 );
198
+ return ;
199
+ }
195
200
} else {
196
201
csn_t csn = MtmTransactionSnapshot (txn -> xid );
197
202
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
@@ -261,13 +266,13 @@ static void
261
266
pglogical_write_insert (StringInfo out , PGLogicalOutputData * data ,
262
267
Relation rel , HeapTuple newtuple )
263
268
{
264
- if (MtmTransactionSnapshot ( MtmCurrentXid ) == INVALID_CSN ) {
269
+ if (MtmIsFilteredTxn ) {
265
270
MTM_LOG2 ("%d: pglogical_write_insert filtered" , MyProcPid );
266
271
return ;
267
272
}
268
273
269
- if (DDLInProress ) {
270
- MTM_LOG2 ("%d: pglogical_write_insert filtered DDLInProress " , MyProcPid );
274
+ if (DDLInProgress ) {
275
+ MTM_LOG2 ("%d: pglogical_write_insert filtered DDLInProgress " , MyProcPid );
271
276
return ;
272
277
}
273
278
@@ -284,13 +289,13 @@ static void
284
289
pglogical_write_update (StringInfo out , PGLogicalOutputData * data ,
285
290
Relation rel , HeapTuple oldtuple , HeapTuple newtuple )
286
291
{
287
- if (MtmTransactionSnapshot ( MtmCurrentXid ) == INVALID_CSN ) {
292
+ if (MtmIsFilteredTxn ) {
288
293
MTM_LOG2 ("%d: pglogical_write_update filtered" , MyProcPid );
289
294
return ;
290
295
}
291
296
292
- if (DDLInProress ) {
293
- MTM_LOG2 ("%d: pglogical_write_update filtered DDLInProress " , MyProcPid );
297
+ if (DDLInProgress ) {
298
+ MTM_LOG2 ("%d: pglogical_write_update filtered DDLInProgress " , MyProcPid );
294
299
return ;
295
300
}
296
301
@@ -317,13 +322,13 @@ static void
317
322
pglogical_write_delete (StringInfo out , PGLogicalOutputData * data ,
318
323
Relation rel , HeapTuple oldtuple )
319
324
{
320
- if (MtmTransactionSnapshot ( MtmCurrentXid ) == INVALID_CSN ) {
325
+ if (MtmIsFilteredTxn ) {
321
326
MTM_LOG2 ("%d: pglogical_write_delete filtered" , MyProcPid );
322
327
return ;
323
328
}
324
329
325
- if (DDLInProress ) {
326
- MTM_LOG2 ("%d: pglogical_write_delete filtered DDLInProress " , MyProcPid );
330
+ if (DDLInProgress ) {
331
+ MTM_LOG2 ("%d: pglogical_write_delete filtered DDLInProgress " , MyProcPid );
327
332
return ;
328
333
}
329
334
@@ -354,13 +359,13 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
354
359
int i ;
355
360
uint16 nliveatts = 0 ;
356
361
357
- if (MtmTransactionSnapshot ( MtmCurrentXid ) == INVALID_CSN ) {
362
+ if (MtmIsFilteredTxn ) {
358
363
MTM_LOG2 ("%d: pglogical_write_tuple filtered" , MyProcPid );
359
364
return ;
360
365
}
361
366
362
- if (DDLInProress ) {
363
- MTM_LOG2 ("%d: pglogical_write_tuple filtered DDLInProress " , MyProcPid );
367
+ if (DDLInProgress ) {
368
+ MTM_LOG2 ("%d: pglogical_write_tuple filtered DDLInProgress " , MyProcPid );
364
369
return ;
365
370
}
366
371
0 commit comments