38
38
#include "multimaster.h"
39
39
#include "pglogical_relid_map.h"
40
40
41
- static bool MtmIsFilteredTxn ;
42
- static int MtmTransactionRecords ;
41
+ static int MtmTransactionRecords ;
42
+ static TransactionId MtmCurrentXid ;
43
+ static bool DDLInProress = false;
43
44
44
45
static void pglogical_write_rel (StringInfo out , PGLogicalOutputData * data , Relation rel );
45
46
@@ -74,10 +75,17 @@ pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
74
75
const char * relname ;
75
76
uint8 relnamelen ;
76
77
Oid relid ;
77
- if (MtmIsFilteredTxn ) {
78
+
79
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ) {
80
+ MTM_LOG1 ("%d: pglogical_write_message filtered" , MyProcPid );
78
81
return ;
79
82
}
80
-
83
+
84
+ if (DDLInProress ) {
85
+ MTM_LOG1 ("%d: pglogical_write_message filtered DDLInProress" , MyProcPid );
86
+ return ;
87
+ }
88
+
81
89
relid = RelationGetRelid (rel );
82
90
pq_sendbyte (out , 'R' ); /* sending RELATION */
83
91
pq_sendint (out , relid , sizeof relid ); /* use Oid as relation identifier */
@@ -107,36 +115,42 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
107
115
{
108
116
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
109
117
csn_t csn = MtmTransactionSnapshot (txn -> xid );
118
+
119
+ MtmCurrentXid = txn -> xid ;
120
+
110
121
MTM_LOG3 ("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx" ,
111
122
MyProcPid , txn -> xid , MtmReplicationNodeId , csn , isRecovery , txn -> restart_decoding_lsn , txn -> first_lsn , txn -> end_lsn , MyReplicationSlot -> data .confirmed_flush );
112
-
113
- if (!isRecovery && csn == INVALID_CSN ) {
114
- MtmIsFilteredTxn = true;
115
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d filtered" , MyProcPid , txn -> xid );
116
- } else {
117
- MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" , MyProcPid , txn -> xid );
118
- MtmIsFilteredTxn = false;
119
- pq_sendbyte (out , 'B' ); /* BEGIN */
120
- pq_sendint (out , MtmNodeId , 4 );
121
- pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
122
- pq_sendint64 (out , csn );
123
- MtmTransactionRecords = 0 ;
124
- }
123
+
124
+ MTM_LOG3 ("%d: pglogical_write_begin XID=%d sent" , MyProcPid , txn -> xid );
125
+ pq_sendbyte (out , 'B' ); /* BEGIN */
126
+ pq_sendint (out , MtmNodeId , 4 );
127
+ pq_sendint (out , isRecovery ? InvalidTransactionId : txn -> xid , 4 );
128
+ pq_sendint64 (out , csn );
129
+ MtmTransactionRecords = 0 ;
125
130
}
126
131
127
132
static void
128
133
pglogical_write_message (StringInfo out ,
129
134
const char * prefix , Size sz , const char * message )
130
135
{
131
- if (* prefix == 'L' ) {
136
+ if (* prefix == 'L' )
137
+ {
132
138
MTM_LOG1 ("Send deadlock message to node %d" , MtmReplicationNodeId );
133
- } else {
134
- if (MtmIsFilteredTxn )
139
+ }
140
+ else if (* prefix == 'G' )
141
+ {
142
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN )
135
143
{
136
- MTM_LOG3 ("%d: pglogical_write_message filtered" , MyProcPid );
144
+ MTM_LOG1 ("%d: pglogical_write_message filtered" , MyProcPid );
137
145
return ;
138
146
}
147
+ DDLInProress = true;
139
148
}
149
+ else if (* prefix == 'E' )
150
+ {
151
+ DDLInProress = false;
152
+ }
153
+
140
154
pq_sendbyte (out , * prefix );
141
155
pq_sendint (out , sz , 4 );
142
156
pq_sendbytes (out , message , sz );
@@ -169,10 +183,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
169
183
Assert (flags != PGLOGICAL_COMMIT_PREPARED || txn -> xid < 1000 || MtmTransactionRecords != 1 );
170
184
171
185
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE ) {
172
- if (MtmIsFilteredTxn ) {
173
- Assert (MtmTransactionRecords == 0 );
174
- return ;
175
- }
186
+ // if (MtmIsFilteredTxn) {
187
+ // Assert(MtmTransactionRecords == 0);
188
+ // return;
189
+ // }
176
190
} else {
177
191
csn_t csn = MtmTransactionSnapshot (txn -> xid );
178
192
bool isRecovery = MtmIsRecoveredNode (MtmReplicationNodeId );
@@ -242,11 +256,20 @@ static void
242
256
pglogical_write_insert (StringInfo out , PGLogicalOutputData * data ,
243
257
Relation rel , HeapTuple newtuple )
244
258
{
245
- if (!MtmIsFilteredTxn ) {
246
- MtmTransactionRecords += 1 ;
247
- pq_sendbyte (out , 'I' ); /* action INSERT */
248
- pglogical_write_tuple (out , data , rel , newtuple );
259
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
260
+ MTM_LOG1 ("%d: pglogical_write_insert filtered" , MyProcPid );
261
+ return ;
262
+ }
263
+
264
+ if (DDLInProress ) {
265
+ MTM_LOG1 ("%d: pglogical_write_insert filtered DDLInProress" , MyProcPid );
266
+ return ;
249
267
}
268
+
269
+ MtmTransactionRecords += 1 ;
270
+ pq_sendbyte (out , 'I' ); /* action INSERT */
271
+ pglogical_write_tuple (out , data , rel , newtuple );
272
+
250
273
}
251
274
252
275
/*
@@ -256,23 +279,30 @@ static void
256
279
pglogical_write_update (StringInfo out , PGLogicalOutputData * data ,
257
280
Relation rel , HeapTuple oldtuple , HeapTuple newtuple )
258
281
{
259
- if (!MtmIsFilteredTxn ) {
260
- MtmTransactionRecords += 1 ;
282
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
283
+ MTM_LOG1 ("%d: pglogical_write_update filtered" , MyProcPid );
284
+ return ;
285
+ }
261
286
262
- MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" , MyProcPid , MyReplicationSlot -> data .confirmed_flush );
287
+ if (DDLInProress ) {
288
+ MTM_LOG1 ("%d: pglogical_write_update filtered DDLInProress" , MyProcPid );
289
+ return ;
290
+ }
263
291
292
+ MtmTransactionRecords += 1 ;
264
293
265
- pq_sendbyte (out , 'U' ); /* action UPDATE */
266
- /* FIXME support whole tuple (O tuple type) */
267
- if (oldtuple != NULL )
268
- {
269
- pq_sendbyte (out , 'K' ); /* old key follows */
270
- pglogical_write_tuple (out , data , rel , oldtuple );
271
- }
272
-
273
- pq_sendbyte (out , 'N' ); /* new tuple follows */
274
- pglogical_write_tuple (out , data , rel , newtuple );
294
+ MTM_LOG3 ("%d: pglogical_write_update confirmed_flush=%lx" , MyProcPid , MyReplicationSlot -> data .confirmed_flush );
295
+
296
+ pq_sendbyte (out , 'U' ); /* action UPDATE */
297
+ /* FIXME support whole tuple (O tuple type) */
298
+ if (oldtuple != NULL )
299
+ {
300
+ pq_sendbyte (out , 'K' ); /* old key follows */
301
+ pglogical_write_tuple (out , data , rel , oldtuple );
275
302
}
303
+
304
+ pq_sendbyte (out , 'N' ); /* new tuple follows */
305
+ pglogical_write_tuple (out , data , rel , newtuple );
276
306
}
277
307
278
308
/*
@@ -282,11 +312,19 @@ static void
282
312
pglogical_write_delete (StringInfo out , PGLogicalOutputData * data ,
283
313
Relation rel , HeapTuple oldtuple )
284
314
{
285
- if (!MtmIsFilteredTxn ) {
286
- MtmTransactionRecords += 1 ;
287
- pq_sendbyte (out , 'D' ); /* action DELETE */
288
- pglogical_write_tuple (out , data , rel , oldtuple );
315
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
316
+ MTM_LOG1 ("%d: pglogical_write_delete filtered" , MyProcPid );
317
+ return ;
318
+ }
319
+
320
+ if (DDLInProress ) {
321
+ MTM_LOG1 ("%d: pglogical_write_delete filtered DDLInProress" , MyProcPid );
322
+ return ;
289
323
}
324
+
325
+ MtmTransactionRecords += 1 ;
326
+ pq_sendbyte (out , 'D' ); /* action DELETE */
327
+ pglogical_write_tuple (out , data , rel , oldtuple );
290
328
}
291
329
292
330
/*
@@ -311,6 +349,16 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
311
349
int i ;
312
350
uint16 nliveatts = 0 ;
313
351
352
+ if (MtmTransactionSnapshot (MtmCurrentXid ) == INVALID_CSN ){
353
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered" , MyProcPid );
354
+ return ;
355
+ }
356
+
357
+ if (DDLInProress ) {
358
+ MTM_LOG1 ("%d: pglogical_write_tuple filtered DDLInProress" , MyProcPid );
359
+ return ;
360
+ }
361
+
314
362
desc = RelationGetDescr (rel );
315
363
316
364
pq_sendbyte (out , 'T' ); /* sending TUPLE */
0 commit comments