Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b7658c2

Browse files
author
Amit Kapila
committed
Fix data inconsistency between publisher and subscriber.
We were not updating the partition map cache in the subscriber even when the corresponding remote rel is changed. Due to this data was getting incorrectly replicated for partition tables after the publisher has changed the table schema. Fix it by resetting the required entries in the partition map cache after receiving a new relation mapping from the publisher. Reported-by: Shi Yu Author: Shi Yu, Hou Zhijie Reviewed-by: Amit Langote, Amit Kapila Backpatch-through: 13, where it was introduced Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
1 parent ffffeeb commit b7658c2

File tree

4 files changed

+53
-0
lines changed

4 files changed

+53
-0
lines changed

src/backend/replication/logical/relation.c

+34
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,40 @@ logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
486486
}
487487
}
488488

489+
/*
490+
* Reset the entries in the partition map that refer to remoterel.
491+
*
492+
* Called when new relation mapping is sent by the publisher to update our
493+
* expected view of incoming data from said publisher.
494+
*
495+
* Note that we don't update the remoterel information in the entry here,
496+
* we will update the information in logicalrep_partition_open to avoid
497+
* unnecessary work.
498+
*/
499+
void
500+
logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
501+
{
502+
HASH_SEQ_STATUS status;
503+
LogicalRepPartMapEntry *part_entry;
504+
LogicalRepRelMapEntry *entry;
505+
506+
if (LogicalRepPartMap == NULL)
507+
return;
508+
509+
hash_seq_init(&status, LogicalRepPartMap);
510+
while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
511+
{
512+
entry = &part_entry->relmapentry;
513+
514+
if (entry->remoterel.remoteid != remoterel->remoteid)
515+
continue;
516+
517+
logicalrep_relmap_free_entry(entry);
518+
519+
memset(entry, 0, sizeof(LogicalRepRelMapEntry));
520+
}
521+
}
522+
489523
/*
490524
* Initialize the partition map cache.
491525
*/

src/backend/replication/logical/worker.c

+3
Original file line numberDiff line numberDiff line change
@@ -1562,6 +1562,9 @@ apply_handle_relation(StringInfo s)
15621562

15631563
rel = logicalrep_read_rel(s);
15641564
logicalrep_relmap_update(rel);
1565+
1566+
/* Also reset all entries in the partition map that refer to remoterel. */
1567+
logicalrep_partmap_reset_relmap(rel);
15651568
}
15661569

15671570
/*

src/include/replication/logicalrelation.h

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ typedef struct LogicalRepRelMapEntry
3838
} LogicalRepRelMapEntry;
3939

4040
extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
41+
extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
4142

4243
extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
4344
LOCKMODE lockmode);

src/test/subscription/t/013_partition.pl

+15
Original file line numberDiff line numberDiff line change
@@ -853,4 +853,19 @@ BEGIN
853853
"SELECT a, b, c FROM tab5 ORDER BY 1");
854854
is($result, qq(3|1|), 'updates of tab5 replicated correctly after altering table on subscriber');
855855

856+
# Test that replication into the partitioned target table continues to
857+
# work correctly when the published table is altered.
858+
$node_publisher->safe_psql(
859+
'postgres', q{
860+
ALTER TABLE tab5 DROP COLUMN b, ADD COLUMN c INT;
861+
ALTER TABLE tab5 ADD COLUMN b INT;});
862+
863+
$node_publisher->safe_psql('postgres', "UPDATE tab5 SET c = 1 WHERE a = 3");
864+
865+
$node_publisher->wait_for_catchup('sub2');
866+
867+
$result = $node_subscriber2->safe_psql('postgres',
868+
"SELECT a, b, c FROM tab5 ORDER BY 1");
869+
is($result, qq(3||1), 'updates of tab5 replicated correctly after altering table on publisher');
870+
856871
done_testing();

0 commit comments

Comments
 (0)