@@ -92,6 +92,10 @@ typedef struct SubOpts
92
92
} SubOpts ;
93
93
94
94
static List * fetch_table_list (WalReceiverConn * wrconn , List * publications );
95
+ static void check_publications_origin (WalReceiverConn * wrconn ,
96
+ List * publications , bool copydata ,
97
+ char * origin , Oid * subrel_local_oids ,
98
+ int subrel_count , char * subname );
95
99
static void check_duplicates_in_publist (List * publist , Datum * datums );
96
100
static List * merge_publications (List * oldpublist , List * newpublist , bool addpub , const char * subname );
97
101
static void ReportSlotConnectionError (List * rstates , Oid subid , char * slotname , char * err );
@@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
680
684
PG_TRY ();
681
685
{
682
686
check_publications (wrconn , publications );
687
+ check_publications_origin (wrconn , publications , opts .copy_data ,
688
+ opts .origin , NULL , 0 , stmt -> subname );
683
689
684
690
/*
685
691
* Set sync state based on if we were asked to do data copy or
@@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
786
792
ListCell * lc ;
787
793
int off ;
788
794
int remove_rel_len ;
795
+ int subrel_count ;
789
796
Relation rel = NULL ;
790
797
typedef struct SubRemoveRels
791
798
{
@@ -815,28 +822,33 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
815
822
816
823
/* Get local table list. */
817
824
subrel_states = GetSubscriptionRelations (sub -> oid , false);
825
+ subrel_count = list_length (subrel_states );
818
826
819
827
/*
820
828
* Build qsorted array of local table oids for faster lookup. This can
821
829
* potentially contain all tables in the database so speed of lookup
822
830
* is important.
823
831
*/
824
- subrel_local_oids = palloc (list_length ( subrel_states ) * sizeof (Oid ));
832
+ subrel_local_oids = palloc (subrel_count * sizeof (Oid ));
825
833
off = 0 ;
826
834
foreach (lc , subrel_states )
827
835
{
828
836
SubscriptionRelState * relstate = (SubscriptionRelState * ) lfirst (lc );
829
837
830
838
subrel_local_oids [off ++ ] = relstate -> relid ;
831
839
}
832
- qsort (subrel_local_oids , list_length ( subrel_states ) ,
840
+ qsort (subrel_local_oids , subrel_count ,
833
841
sizeof (Oid ), oid_cmp );
834
842
843
+ check_publications_origin (wrconn , sub -> publications , copy_data ,
844
+ sub -> origin , subrel_local_oids ,
845
+ subrel_count , sub -> name );
846
+
835
847
/*
836
848
* Rels that we want to remove from subscription and drop any slots
837
849
* and origins corresponding to them.
838
850
*/
839
- sub_remove_rels = palloc (list_length ( subrel_states ) * sizeof (SubRemoveRels ));
851
+ sub_remove_rels = palloc (subrel_count * sizeof (SubRemoveRels ));
840
852
841
853
/*
842
854
* Walk over the remote tables and try to match them to locally known
@@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
862
874
pubrel_local_oids [off ++ ] = relid ;
863
875
864
876
if (!bsearch (& relid , subrel_local_oids ,
865
- list_length ( subrel_states ) , sizeof (Oid ), oid_cmp ))
877
+ subrel_count , sizeof (Oid ), oid_cmp ))
866
878
{
867
879
AddSubscriptionRelState (sub -> oid , relid ,
868
880
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY ,
@@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
881
893
sizeof (Oid ), oid_cmp );
882
894
883
895
remove_rel_len = 0 ;
884
- for (off = 0 ; off < list_length ( subrel_states ) ; off ++ )
896
+ for (off = 0 ; off < subrel_count ; off ++ )
885
897
{
886
898
Oid relid = subrel_local_oids [off ];
887
899
@@ -1784,6 +1796,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
1784
1796
table_close (rel , RowExclusiveLock );
1785
1797
}
1786
1798
1799
+ /*
1800
+ * Check and log a warning if the publisher has subscribed to the same table
1801
+ * from some other publisher. This check is required only if "copy_data = true"
1802
+ * and "origin = none" for CREATE SUBSCRIPTION and
1803
+ * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
1804
+ * having origin might have been copied.
1805
+ *
1806
+ * This check need not be performed on the tables that are already added
1807
+ * because incremental sync for those tables will happen through WAL and the
1808
+ * origin of the data can be identified from the WAL records.
1809
+ *
1810
+ * subrel_local_oids contains the list of relation oids that are already
1811
+ * present on the subscriber.
1812
+ */
1813
+ static void
1814
+ check_publications_origin (WalReceiverConn * wrconn , List * publications ,
1815
+ bool copydata , char * origin , Oid * subrel_local_oids ,
1816
+ int subrel_count , char * subname )
1817
+ {
1818
+ WalRcvExecResult * res ;
1819
+ StringInfoData cmd ;
1820
+ TupleTableSlot * slot ;
1821
+ Oid tableRow [1 ] = {TEXTOID };
1822
+ List * publist = NIL ;
1823
+ int i ;
1824
+
1825
+ if (!copydata || !origin ||
1826
+ (pg_strcasecmp (origin , LOGICALREP_ORIGIN_NONE ) != 0 ))
1827
+ return ;
1828
+
1829
+ initStringInfo (& cmd );
1830
+ appendStringInfoString (& cmd ,
1831
+ "SELECT DISTINCT P.pubname AS pubname\n"
1832
+ "FROM pg_publication P,\n"
1833
+ " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
1834
+ " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
1835
+ " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
1836
+ "WHERE C.oid = GPT.relid AND P.pubname IN (" );
1837
+ get_publications_str (publications , & cmd , true);
1838
+ appendStringInfoString (& cmd , ")\n" );
1839
+
1840
+ /*
1841
+ * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
1842
+ * the list of relation oids that are already present on the subscriber.
1843
+ * This check should be skipped for these tables.
1844
+ */
1845
+ for (i = 0 ; i < subrel_count ; i ++ )
1846
+ {
1847
+ Oid relid = subrel_local_oids [i ];
1848
+ char * schemaname = get_namespace_name (get_rel_namespace (relid ));
1849
+ char * tablename = get_rel_name (relid );
1850
+
1851
+ appendStringInfo (& cmd , "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n" ,
1852
+ schemaname , tablename );
1853
+ }
1854
+
1855
+ res = walrcv_exec (wrconn , cmd .data , 1 , tableRow );
1856
+ pfree (cmd .data );
1857
+
1858
+ if (res -> status != WALRCV_OK_TUPLES )
1859
+ ereport (ERROR ,
1860
+ (errcode (ERRCODE_CONNECTION_FAILURE ),
1861
+ errmsg ("could not receive list of replicated tables from the publisher: %s" ,
1862
+ res -> err )));
1863
+
1864
+ /* Process tables. */
1865
+ slot = MakeSingleTupleTableSlot (res -> tupledesc , & TTSOpsMinimalTuple );
1866
+ while (tuplestore_gettupleslot (res -> tuplestore , true, false, slot ))
1867
+ {
1868
+ char * pubname ;
1869
+ bool isnull ;
1870
+
1871
+ pubname = TextDatumGetCString (slot_getattr (slot , 1 , & isnull ));
1872
+ Assert (!isnull );
1873
+
1874
+ ExecClearTuple (slot );
1875
+ publist = list_append_unique (publist , makeString (pubname ));
1876
+ }
1877
+
1878
+ /*
1879
+ * Log a warning if the publisher has subscribed to the same table from
1880
+ * some other publisher. We cannot know the origin of data during the
1881
+ * initial sync. Data origins can be found only from the WAL by looking at
1882
+ * the origin id.
1883
+ *
1884
+ * XXX: For simplicity, we don't check whether the table has any data or
1885
+ * not. If the table doesn't have any data then we don't need to
1886
+ * distinguish between data having origin and data not having origin so we
1887
+ * can avoid logging a warning in that case.
1888
+ */
1889
+ if (publist )
1890
+ {
1891
+ StringInfo pubnames = makeStringInfo ();
1892
+
1893
+ /* Prepare the list of publication(s) for warning message. */
1894
+ get_publications_str (publist , pubnames , false);
1895
+ ereport (WARNING ,
1896
+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
1897
+ errmsg ("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin" ,
1898
+ subname ),
1899
+ errdetail_plural ("Subscribed publication %s is subscribing to other publications." ,
1900
+ "Subscribed publications %s are subscribing to other publications." ,
1901
+ list_length (publist ), pubnames -> data ),
1902
+ errhint ("Verify that initial data copied from the publisher tables did not come from other origins." ));
1903
+ }
1904
+
1905
+ ExecDropSingleTupleTableSlot (slot );
1906
+
1907
+ walrcv_clear_result (res );
1908
+ }
1909
+
1787
1910
/*
1788
1911
* Get the list of tables which belong to specified publications on the
1789
1912
* publisher connection.
0 commit comments