38
38
#include "multimaster.h"
39
39
#include "pglogical_relid_map.h"
40
40
41
- static bool MtmIsFilteredTxn ;
42
- static int MtmTransactionRecords ;
41
+ static int MtmTransactionRecords ;
42
+ static TransactionId MtmCurrentXid ;
43
+ static bool DDLInProress = false;
43
44
44
45
static void pglogical_write_rel (StringInfo out , PGLogicalOutputData * data , Relation rel );
45
46
@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
74
75
const char * relname ;
75
76
uint8 relnamelen ;
76
77
Oid relid ;
77
- if (MtmIsFilteredTxn ) {
78
+
79
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ) {
80
+ MTM_LOG1 ("%d: pglogical_write_message filtered" , MyProcPid );
78
81
return ;
79
82
}
80
-
83
+
84
+ if (DDLInProress ) {
85
+ MTM_LOG1 ("%d: pglogical_write_message filtered DDLInProress" , MyProcPid );
86
+ return ;
87
+ }
88
+
81
89
relid = RelationGetRelid (rel );
82
90
pq_sendbyte (out , 'R' ); /* sending RELATION */
83
91
pq_sendint (out , relid , sizeof relid ); /* use Oid as relation identifier */
@@ -107,30 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107
115
{
108
116
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
109
117
csn_t csn = MtmTransactionSnapshot (txn -> xid );
118
+
119
+ MtmCurrentXid = txn -> xid ;
120
+
110
121
MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
111
122
MyProcPid , txn -> xid , MtmReplicationNodeId , csn , isRecovery , txn -> restart_decoding_lsn , txn -> first_lsn , txn -> end_lsn , MyReplicationSlot -> data .confirmed_flush );
112
-
113
- if (!isRecovery && csn == INVALID_CSN ) {
114
- MtmIsFilteredTxn = true;
115
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d filtered" , MyProcPid , txn -> xid );
116
- } else {
117
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" , MyProcPid , txn -> xid );
118
- MtmIsFilteredTxn = false;
119
- pq_sendbyte (out , 'B' ); /* BEGIN */
120
- pq_sendint (out , MtmNodeId , 4 );
121
- pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
122
- pq_sendint64 (out , csn );
123
- MtmTransactionRecords = 0 ;
124
- }
123
+
124
+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" , MyProcPid , txn -> xid );
125
+ pq_sendbyte (out , 'B' ); /* BEGIN */
126
+ pq_sendint (out , MtmNodeId , 4 );
127
+ pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
128
+ pq_sendint64 (out , csn );
129
+ MtmTransactionRecords = 0 ;
125
130
}
126
131
127
132
static void
128
133
pglogical_write_message (StringInfo out ,
129
134
const char * prefix , Size sz , const char * message )
130
135
{
131
- if (* prefix == 'L' ) {
136
+ if (* prefix == 'L' )
137
+ {
132
138
MTM_LOG1 ("Send deadlock message to node %d" , MtmReplicationNodeId );
133
139
}
140
+ else if (* prefix == 'G' )
141
+ {
142
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN )
143
+ {
144
+ MTM_LOG1 ("%d: pglogical_write_message filtered" , MyProcPid );
145
+ return ;
146
+ }
147
+ DDLInProress = true;
148
+ }
149
+ else if (* prefix == 'E' )
150
+ {
151
+ DDLInProress = false;
152
+ }
153
+
134
154
pq_sendbyte (out , * prefix );
135
155
pq_sendint (out , sz , 4 );
136
156
pq_sendbytes (out , message , sz );
@@ -163,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
163
183
Assert (flags != PGLOGICAL_COMMIT_PREPARED || txn -> xid < 1000 || MtmTransactionRecords != 1 );
164
184
165
185
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
166
- if (MtmIsFilteredTxn ) {
167
- Assert (MtmTransactionRecords == 0 );
168
- return ;
169
- }
186
+ // if (MtmIsFilteredTxn) {
187
+ // Assert(MtmTransactionRecords == 0);
188
+ // return;
189
+ // }
170
190
} else {
171
191
csn_t csn = MtmTransactionSnapshot (txn -> xid );
172
192
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
@@ -236,11 +256,20 @@ static void
236
256
pglogical_write_insert (StringInfo out , PGLogicalOutputData * data ,
237
257
Relation rel , HeapTuple newtuple )
238
258
{
239
- if (!MtmIsFilteredTxn ) {
240
- MtmTransactionRecords += 1 ;
241
- pq_sendbyte (out , 'I' ); /* action INSERT */
242
- pglogical_write_tuple (out , data , rel , newtuple );
259
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
260
+ MTM_LOG1 ("%d: pglogical_write_insert filtered" , MyProcPid );
261
+ return ;
262
+ }
263
+
264
+ if (DDLInProress ) {
265
+ MTM_LOG1 ("%d: pglogical_write_insert filtered DDLInProress" , MyProcPid );
266
+ return ;
243
267
}
268
+
269
+ MtmTransactionRecords += 1 ;
270
+ pq_sendbyte (out , 'I' ); /* action INSERT */
271
+ pglogical_write_tuple (out , data , rel , newtuple );
272
+
244
273
}
245
274
246
275
/*
@@ -250,23 +279,30 @@ static void
250
279
pglogical_write_update (StringInfo out , PGLogicalOutputData * data ,
251
280
Relation rel , HeapTuple oldtuple , HeapTuple newtuple )
252
281
{
253
- if (!MtmIsFilteredTxn ) {
254
- MtmTransactionRecords += 1 ;
282
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
283
+ MTM_LOG1 ("%d: pglogical_write_update filtered" , MyProcPid );
284
+ return ;
285
+ }
255
286
256
- MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" , MyProcPid , MyReplicationSlot -> data .confirmed_flush );
287
+ if (DDLInProress ) {
288
+ MTM_LOG1 ("%d: pglogical_write_update filtered DDLInProress" , MyProcPid );
289
+ return ;
290
+ }
257
291
292
+ MtmTransactionRecords += 1 ;
258
293
259
- pq_sendbyte (out , 'U' ); /* action UPDATE */
260
- /* FIXME support whole tuple (O tuple type) */
261
- if (oldtuple != NULL )
262
- {
263
- pq_sendbyte (out , 'K' ); /* old key follows */
264
- pglogical_write_tuple (out , data , rel , oldtuple );
265
- }
266
-
267
- pq_sendbyte (out , 'N' ); /* new tuple follows */
268
- pglogical_write_tuple (out , data , rel , newtuple );
294
+ MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" , MyProcPid , MyReplicationSlot -> data .confirmed_flush );
295
+
296
+ pq_sendbyte (out , 'U' ); /* action UPDATE */
297
+ /* FIXME support whole tuple (O tuple type) */
298
+ if (oldtuple != NULL )
299
+ {
300
+ pq_sendbyte (out , 'K' ); /* old key follows */
301
+ pglogical_write_tuple (out , data , rel , oldtuple );
269
302
}
303
+
304
+ pq_sendbyte (out , 'N' ); /* new tuple follows */
305
+ pglogical_write_tuple (out , data , rel , newtuple );
270
306
}
271
307
272
308
/*
@@ -276,11 +312,19 @@ static void
276
312
pglogical_write_delete (StringInfo out , PGLogicalOutputData * data ,
277
313
Relation rel , HeapTuple oldtuple )
278
314
{
279
- if (!MtmIsFilteredTxn ) {
280
- MtmTransactionRecords += 1 ;
281
- pq_sendbyte (out , 'D' ); /* action DELETE */
282
- pglogical_write_tuple (out , data , rel , oldtuple );
315
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
316
+ MTM_LOG1 ("%d: pglogical_write_delete filtered" , MyProcPid );
317
+ return ;
283
318
}
319
+
320
+ if (DDLInProress ) {
321
+ MTM_LOG1 ("%d: pglogical_write_delete filtered DDLInProress" , MyProcPid );
322
+ return ;
323
+ }
324
+
325
+ MtmTransactionRecords += 1 ;
326
+ pq_sendbyte (out , 'D' ); /* action DELETE */
327
+ pglogical_write_tuple (out , data , rel , oldtuple );
284
328
}
285
329
286
330
/*
@@ -305,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
305
349
int i ;
306
350
uint16 nliveatts = 0 ;
307
351
352
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
353
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered" , MyProcPid );
354
+ return ;
355
+ }
356
+
357
+ if (DDLInProress ) {
358
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered DDLInProress" , MyProcPid );
359
+ return ;
360
+ }
361
+
308
362
desc = RelationGetDescr (rel );
309
363
310
364
pq_sendbyte (out , 'T' ); /* sending TUPLE */
0 commit comments