45
45
#include "storage/pmsignal.h"
46
46
#include "storage/proc.h"
47
47
#include "utils/syscache.h"
48
+ #include "utils/lsyscache.h"
48
49
#include "replication/walsender.h"
49
50
#include "replication/walsender_private.h"
50
51
#include "replication/slot.h"
53
54
#include "nodes/makefuncs.h"
54
55
#include "access/htup_details.h"
55
56
#include "catalog/indexing.h"
57
+ #include "catalog/namespace.h"
56
58
#include "pglogical_output/hooks.h"
57
59
58
60
#include "multimaster.h"
@@ -104,6 +106,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
104
106
PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
105
107
PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
106
108
PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
109
+ PG_FUNCTION_INFO_V1 (mtm_make_table_local );
107
110
108
111
static Snapshot MtmGetSnapshot (Snapshot snapshot );
109
112
static void MtmInitialize (void );
@@ -134,6 +137,7 @@ MtmState* Mtm;
134
137
135
138
HTAB * MtmXid2State ;
136
139
static HTAB * MtmGid2State ;
140
+ static HTAB * MtmLocalTables ;
137
141
138
142
static MtmCurrentTrans MtmTx ;
139
143
@@ -175,11 +179,12 @@ bool MtmUseRaftable;
175
179
MtmConnectionInfo * MtmConnections ;
176
180
177
181
static char * MtmConnStrs ;
178
- static int MtmQueueSize ;
179
- static int MtmWorkers ;
180
- static int MtmVacuumDelay ;
181
- static int MtmMinRecoveryLag ;
182
- static int MtmMaxRecoveryLag ;
182
+ static int MtmQueueSize ;
183
+ static int MtmWorkers ;
184
+ static int MtmVacuumDelay ;
185
+ static int MtmMinRecoveryLag ;
186
+ static int MtmMaxRecoveryLag ;
187
+ static bool MtmIgnoreTablesWithoutPk ;
183
188
184
189
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
185
190
static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1279,6 +1284,71 @@ MtmCreateGidMap(void)
1279
1284
return htab ;
1280
1285
}
1281
1286
1287
+ static HTAB *
1288
+ MtmCreateLocalTableMap (void )
1289
+ {
1290
+ HASHCTL info ;
1291
+ HTAB * htab ;
1292
+ memset (& info , 0 , sizeof (info ));
1293
+ info .keysize = sizeof (Oid );
1294
+ htab = ShmemInitHash (
1295
+ "MtmLocalTables" ,
1296
+ MULTIMASTER_MAX_LOCAL_TABLES , MULTIMASTER_MAX_LOCAL_TABLES ,
1297
+ & info ,
1298
+ 0
1299
+ );
1300
+ return htab ;
1301
+ }
1302
+
1303
+ static void MtmMakeRelationLocal (Oid relid )
1304
+ {
1305
+ if (OidIsValid (relid )) {
1306
+ MtmLock (LW_EXCLUSIVE );
1307
+ hash_search (MtmLocalTables , & relid , HASH_ENTER , NULL );
1308
+ MtmUnlock ();
1309
+ }
1310
+ }
1311
+
1312
+
1313
+ void MtmMakeTableLocal (char * schema , char * name )
1314
+ {
1315
+ RangeVar * rv = makeRangeVar (schema , name , -1 );
1316
+ Oid relid = RangeVarGetRelid (rv , NoLock , true);
1317
+ MtmMakeRelationLocal (relid );
1318
+ }
1319
+
1320
+
1321
+ typedef struct {
1322
+ NameData schema ;
1323
+ NameData name ;
1324
+ } MtmLocalTablesTuple ;
1325
+
1326
+ static void MtmLoadLocalTables (void )
1327
+ {
1328
+ RangeVar * rv ;
1329
+ Relation rel ;
1330
+ SysScanDesc scan ;
1331
+ HeapTuple tuple ;
1332
+
1333
+ Assert (IsTransactionState ());
1334
+
1335
+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME , MULTIMASTER_LOCAL_TABLES_TABLE , -1 );
1336
+ rel = heap_openrv_extended (rv , RowExclusiveLock , true);
1337
+ if (rel != NULL ) {
1338
+ scan = systable_beginscan (rel , 0 , true, NULL , 0 , NULL );
1339
+
1340
+ while (HeapTupleIsValid (tuple = systable_getnext (scan )))
1341
+ {
1342
+ MtmLocalTablesTuple * t = (MtmLocalTablesTuple * ) GETSTRUCT (tuple );
1343
+ MtmMakeTableLocal (NameStr (t -> schema ), NameStr (t -> name ));
1344
+ }
1345
+
1346
+ systable_endscan (scan );
1347
+ heap_close (rel , RowExclusiveLock );
1348
+ }
1349
+ }
1350
+
1351
+
1282
1352
static void MtmInitialize ()
1283
1353
{
1284
1354
bool found ;
@@ -1308,6 +1378,7 @@ static void MtmInitialize()
1308
1378
Mtm -> nReceivers = 0 ;
1309
1379
Mtm -> timeShift = 0 ;
1310
1380
Mtm -> transCount = 0 ;
1381
+ Mtm -> localTablesHashLoaded = false;
1311
1382
for (i = 0 ; i < MtmNodes ; i ++ ) {
1312
1383
Mtm -> nodes [i ].oldestSnapshot = 0 ;
1313
1384
Mtm -> nodes [i ].transDelay = 0 ;
@@ -1323,6 +1394,7 @@ static void MtmInitialize()
1323
1394
}
1324
1395
MtmXid2State = MtmCreateXidMap ();
1325
1396
MtmGid2State = MtmCreateGidMap ();
1397
+ MtmLocalTables = MtmCreateLocalTableMap ();
1326
1398
MtmDoReplication = true;
1327
1399
TM = & MtmTM ;
1328
1400
LWLockRelease (AddinShmemInitLock );
@@ -1475,6 +1547,19 @@ _PG_init(void)
1475
1547
NULL
1476
1548
);
1477
1549
1550
+ DefineCustomBoolVariable (
1551
+ "multimaster.ignore_tables_without_pk" ,
1552
+ "Do not replicate tables withpout primary key" ,
1553
+ NULL ,
1554
+ & MtmIgnoreTablesWithoutPk ,
1555
+ false,
1556
+ PGC_BACKEND ,
1557
+ 0 ,
1558
+ NULL ,
1559
+ NULL ,
1560
+ NULL
1561
+ );
1562
+
1478
1563
DefineCustomIntVariable (
1479
1564
"multimaster.workers" ,
1480
1565
"Number of multimaster executor workers per node" ,
@@ -1804,11 +1889,30 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1804
1889
return res ;
1805
1890
}
1806
1891
1892
+ static bool
1893
+ MtmReplicationRowFilterHook (struct PGLogicalRowFilterArgs * args )
1894
+ {
1895
+ bool isDistributed ;
1896
+ MtmLock (LW_SHARED );
1897
+ if (!Mtm -> localTablesHashLoaded ) {
1898
+ MtmUnlock ();
1899
+ MtmLock (LW_EXCLUSIVE );
1900
+ if (!Mtm -> localTablesHashLoaded ) {
1901
+ MtmLoadLocalTables ();
1902
+ Mtm -> localTablesHashLoaded = true;
1903
+ }
1904
+ }
1905
+ isDistributed = hash_search (MtmLocalTables , & RelationGetRelid (args -> changed_rel ), HASH_FIND , NULL ) == NULL ;
1906
+ MtmUnlock ();
1907
+ return isDistributed ;
1908
+ }
1909
+
1807
1910
void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
1808
1911
{
1809
1912
hooks -> startup_hook = MtmReplicationStartupHook ;
1810
1913
hooks -> shutdown_hook = MtmReplicationShutdownHook ;
1811
1914
hooks -> txn_filter_hook = MtmReplicationTxnFilterHook ;
1915
+ hooks -> row_filter_hook = MtmReplicationRowFilterHook ;
1812
1916
}
1813
1917
1814
1918
@@ -1935,6 +2039,52 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
1935
2039
PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
1936
2040
}
1937
2041
2042
+
2043
+ Datum mtm_make_table_local (PG_FUNCTION_ARGS )
2044
+ {
2045
+ Oid reloid = PG_GETARG_OID (1 );
2046
+ RangeVar * rv ;
2047
+ Relation rel ;
2048
+ TupleDesc tupDesc ;
2049
+ HeapTuple tup ;
2050
+ Datum values [Natts_mtm_local_tables ];
2051
+ bool nulls [Natts_mtm_local_tables ];
2052
+
2053
+ MtmMakeRelationLocal (reloid );
2054
+
2055
+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME , MULTIMASTER_LOCAL_TABLES_TABLE , -1 );
2056
+ rel = heap_openrv (rv , RowExclusiveLock );
2057
+ if (rel != NULL ) {
2058
+ char * tableName = get_rel_name (reloid );
2059
+ Oid schemaid = get_rel_namespace (reloid );
2060
+ char * schemaName = get_namespace_name (schemaid );
2061
+
2062
+ tupDesc = RelationGetDescr (rel );
2063
+
2064
+ /* Form a tuple. */
2065
+ memset (nulls , false, sizeof (nulls ));
2066
+
2067
+ values [Anum_mtm_local_tables_rel_schema - 1 ] = CStringGetTextDatum (schemaName );
2068
+ values [Anum_mtm_local_tables_rel_name - 1 ] = CStringGetTextDatum (tableName );
2069
+
2070
+ tup = heap_form_tuple (tupDesc , values , nulls );
2071
+
2072
+ /* Insert the tuple to the catalog. */
2073
+ simple_heap_insert (rel , tup );
2074
+
2075
+ /* Update the indexes. */
2076
+ CatalogUpdateIndexes (rel , tup );
2077
+
2078
+ /* Cleanup. */
2079
+ heap_freetuple (tup );
2080
+ heap_close (rel , RowExclusiveLock );
2081
+
2082
+ MtmTx .containsDML = true;
2083
+ }
2084
+ return false;
2085
+ }
2086
+
2087
+
1938
2088
/*
1939
2089
* -------------------------------------------
1940
2090
* Broadcast utulity statements
@@ -2243,10 +2393,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
2243
2393
if (estate -> es_processed != 0 && (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE )) {
2244
2394
int i ;
2245
2395
for (i = 0 ; i < estate -> es_num_result_relations ; i ++ ) {
2246
- if (RelationNeedsWAL (estate -> es_result_relations [i ].ri_RelationDesc )) {
2396
+ Relation rel = estate -> es_result_relations [i ].ri_RelationDesc ;
2397
+ if (RelationNeedsWAL (rel )) {
2247
2398
MtmTx .containsDML = true;
2248
2399
break ;
2249
2400
}
2401
+ if (MtmIgnoreTablesWithoutPk ) {
2402
+ if (!rel -> rd_indexvalid ) {
2403
+ RelationGetIndexList (rel );
2404
+ }
2405
+ MtmMakeRelationLocal (rel -> rd_replidindex );
2406
+ }
2250
2407
}
2251
2408
}
2252
2409
if (MtmTx .isDistributed && MtmTx .containsDML && !IsTransactionBlock ()) {
0 commit comments