64
64
#include "catalog/indexing.h"
65
65
#include "catalog/namespace.h"
66
66
#include "catalog/pg_constraint_fn.h"
67
+ #include "catalog/pg_proc.h"
67
68
#include "pglogical_output/hooks.h"
68
69
#include "parser/analyze.h"
69
70
#include "parser/parse_relation.h"
@@ -255,8 +256,6 @@ bool MtmUseDtm;
255
256
bool MtmPreserveCommitOrder ;
256
257
bool MtmVolksWagenMode ; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
257
258
258
- TransactionId MtmUtilityProcessedInXid ;
259
-
260
259
static char * MtmConnStrs ;
261
260
static char * MtmRemoteFunctionsList ;
262
261
static char * MtmClusterName ;
@@ -275,6 +274,7 @@ static bool MtmClusterLocked;
275
274
static bool MtmInsideTransaction ;
276
275
static bool MtmReferee ;
277
276
static bool MtmMonotonicSequences ;
277
+ static void const * MtmDDLStatement ;
278
278
279
279
static ExecutorStart_hook_type PreviousExecutorStartHook ;
280
280
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
@@ -923,6 +923,7 @@ MtmResetTransaction()
923
923
x -> csn = INVALID_CSN ;
924
924
x -> status = TRANSACTION_STATUS_UNKNOWN ;
925
925
x -> gid [0 ] = '\0' ;
926
+ MtmDDLStatement = NULL ;
926
927
}
927
928
928
929
#if 0
@@ -986,6 +987,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
986
987
MtmCheckClusterLock ();
987
988
}
988
989
MtmInsideTransaction = true;
990
+ MtmDDLStatement = NULL ;
989
991
Mtm -> nRunningTransactions += 1 ;
990
992
991
993
x -> snapshot = MtmAssignCSN ();
@@ -3447,7 +3449,7 @@ _PG_init(void)
3447
3449
& MtmRemoteFunctionsList ,
3448
3450
"lo_create,lo_unlink" ,
3449
3451
PGC_USERSET , /* context */
3450
- 0 , /* flags */
3452
+ GUC_LIST_INPUT | GUC_LIST_QUOTE , /* flags */
3451
3453
NULL , /* GucStringCheckHook check_hook */
3452
3454
MtmSetRemoteFunction , /* GucStringAssignHook assign_hook */
3453
3455
NULL /* GucShowHook show_hook */
@@ -4961,14 +4963,17 @@ static void MtmGucDiscard()
4961
4963
dlist_init (& MtmGucList );
4962
4964
4963
4965
hash_destroy (MtmGucHash );
4964
- MtmGucInit () ;
4966
+ MtmGucHash = NULL ;
4965
4967
}
4966
4968
4967
4969
static inline void MtmGucUpdate (const char * key , char * value )
4968
4970
{
4969
4971
MtmGucEntry * hentry ;
4970
4972
bool found ;
4971
4973
4974
+ if (!MtmGucHash )
4975
+ MtmGucInit ();
4976
+
4972
4977
hentry = (MtmGucEntry * )hash_search (MtmGucHash , key , HASH_ENTER , & found );
4973
4978
if (found )
4974
4979
{
@@ -4984,6 +4989,9 @@ static inline void MtmGucRemove(const char *key)
4984
4989
MtmGucEntry * hentry ;
4985
4990
bool found ;
4986
4991
4992
+ if (!MtmGucHash )
4993
+ MtmGucInit ();
4994
+
4987
4995
hentry = (MtmGucEntry * )hash_search (MtmGucHash , key , HASH_FIND , & found );
4988
4996
if (found )
4989
4997
{
@@ -5042,23 +5050,19 @@ char* MtmGucSerialize(void)
5042
5050
5043
5051
serialized_gucs = makeStringInfo ();
5044
5052
5045
- /*
5046
- * Crutch for scheduler. It sets search_path through SetConfigOption()
5047
- * so our callback do not react on that.
5048
- */
5049
- search_path = GetConfigOption ("search_path" , false, true);
5050
- appendStringInfo (serialized_gucs , "SET search_path TO %s; " , search_path );
5051
-
5052
5053
dlist_foreach (iter , & MtmGucList )
5053
5054
{
5054
5055
MtmGucEntry * cur_entry = dlist_container (MtmGucEntry , list_node , iter .cur );
5055
5056
5057
+ if (strcmp (cur_entry -> key , "search_path" ) == 0 )
5058
+ continue ;
5059
+
5056
5060
appendStringInfoString (serialized_gucs , "SET " );
5057
5061
appendStringInfoString (serialized_gucs , cur_entry -> key );
5058
5062
appendStringInfoString (serialized_gucs , " TO " );
5059
5063
5060
5064
/* quite a crutch */
5061
- if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' || strchr ( cur_entry -> value , ',' ) != NULL )
5065
+ if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' )
5062
5066
{
5063
5067
appendStringInfoString (serialized_gucs , "'" );
5064
5068
appendStringInfoString (serialized_gucs , cur_entry -> value );
@@ -5071,6 +5075,13 @@ char* MtmGucSerialize(void)
5071
5075
appendStringInfoString (serialized_gucs , "; " );
5072
5076
}
5073
5077
5078
+ /*
5079
+ * Crutch for scheduler. It sets search_path through SetConfigOption()
5080
+ * so our callback do not react on that.
5081
+ */
5082
+ search_path = GetConfigOption ("search_path" , false, true);
5083
+ appendStringInfo (serialized_gucs , "SET search_path TO %s; " , search_path );
5084
+
5074
5085
return serialized_gucs -> data ;
5075
5086
}
5076
5087
@@ -5363,6 +5374,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
5363
5374
return ;
5364
5375
}
5365
5376
}
5377
+ else if (stmt -> removeType == OBJECT_FUNCTION && MtmTx .isReplicated )
5378
+ {
5379
+ /* Make it possible to drop functions which were not replicated */
5380
+ stmt -> missing_ok = true;
5381
+ }
5366
5382
}
5367
5383
break ;
5368
5384
@@ -5395,16 +5411,14 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
5395
5411
break ;
5396
5412
}
5397
5413
5398
- if (!skipCommand && !MtmTx .isReplicated && ( context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId ()) )
5414
+ if (!skipCommand && !MtmTx .isReplicated && ! MtmDDLStatement )
5399
5415
{
5400
- MtmUtilityProcessedInXid = GetCurrentTransactionId ();
5401
- if (context == PROCESS_UTILITY_TOPLEVEL || !ActivePortal ) {
5402
- MtmProcessDDLCommand (queryString , true);
5403
- } else {
5404
- MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5405
- }
5416
+ MTM_LOG3 ("Process DDL statement '%s', MtmTx.isReplicated=%d, MtmIsLogicalReceiver=%d" , queryString , MtmTx .isReplicated , MtmIsLogicalReceiver );
5417
+ MtmProcessDDLCommand (queryString , true);
5406
5418
executed = true;
5419
+ MtmDDLStatement = queryString ;
5407
5420
}
5421
+ else MTM_LOG3 ("Skip utility statement '%s': skip=%d, insideDDL=%d" , queryString , skipCommand , MtmDDLStatement != NULL );
5408
5422
5409
5423
if (PreviousProcessUtilityHook != NULL )
5410
5424
{
@@ -5423,16 +5437,17 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
5423
5437
#endif
5424
5438
if (MyXactAccessedTempRel )
5425
5439
{
5426
- MTM_LOG1 ("Xact accessed temp table, stopping replication" );
5440
+ MTM_LOG1 ("Xact accessed temp table, stopping replication of statement '%s'" , queryString );
5427
5441
MtmTx .isDistributed = false; /* Skip */
5428
5442
MtmTx .snapshot = INVALID_CSN ;
5429
5443
}
5430
5444
5431
5445
if (executed )
5432
5446
{
5433
5447
MtmFinishDDLCommand ();
5448
+ MtmDDLStatement = NULL ;
5434
5449
}
5435
- if (nodeTag (parsetree ) == T_CreateStmt )
5450
+ if (IsA (parsetree , CreateStmt ) )
5436
5451
{
5437
5452
CreateStmt * create = (CreateStmt * )parsetree ;
5438
5453
Oid relid = RangeVarGetRelid (create -> relation , NoLock , true);
@@ -5449,15 +5464,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
5449
5464
}
5450
5465
}
5451
5466
}
5452
- if (context == PROCESS_UTILITY_TOPLEVEL ) {
5453
- MtmUtilityProcessedInXid = InvalidTransactionId ;
5454
- }
5455
5467
}
5456
5468
5457
5469
static void
5458
5470
MtmExecutorStart (QueryDesc * queryDesc , int eflags )
5459
5471
{
5460
- if (!MtmTx .isReplicated && ActivePortal )
5472
+ if (!MtmTx .isReplicated && ! MtmDDLStatement )
5461
5473
{
5462
5474
ListCell * tlist ;
5463
5475
@@ -5471,11 +5483,32 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
5471
5483
TargetEntry * tle = (TargetEntry * ) lfirst (tlist );
5472
5484
if (tle -> expr && IsA (tle -> expr , FuncExpr ))
5473
5485
{
5474
- if (hash_search (MtmRemoteFunctions , & ((FuncExpr * )tle -> expr )-> funcid , HASH_FIND , NULL ))
5486
+ Oid func_oid = ((FuncExpr * )tle -> expr )-> funcid ;
5487
+ if (!hash_search (MtmRemoteFunctions , & func_oid , HASH_FIND , NULL ))
5475
5488
{
5476
- MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5477
- break ;
5489
+ Form_pg_proc funcform ;
5490
+ bool is_sec_def ;
5491
+ HeapTuple func_tuple = SearchSysCache1 (PROCOID , ObjectIdGetDatum (func_oid ));
5492
+ if (!HeapTupleIsValid (func_tuple ))
5493
+ elog (ERROR , "cache lookup failed for function %u" , func_oid );
5494
+ funcform = (Form_pg_proc ) GETSTRUCT (func_tuple );
5495
+ is_sec_def = funcform -> prosecdef ;
5496
+ ReleaseSysCache (func_tuple );
5497
+ elog (LOG , "Function %s security defined=%d" , tle -> resname , is_sec_def );
5498
+ if (!is_sec_def )
5499
+ {
5500
+ continue ;
5501
+ }
5478
5502
}
5503
+ /*
5504
+ * Execute security defined functions or functions marked as remote at replicated nodes.
5505
+ * Them are executed as DDL statements.
5506
+ * All data modifications done inside this function are not replicated.
5507
+ * As a result generated content can vary at different nodes.
5508
+ */
5509
+ MtmProcessDDLCommand (queryDesc -> sourceText , true);
5510
+ MtmDDLStatement = queryDesc ;
5511
+ break ;
5479
5512
}
5480
5513
}
5481
5514
}
@@ -5524,6 +5557,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
5524
5557
{
5525
5558
standard_ExecutorFinish (queryDesc );
5526
5559
}
5560
+
5561
+ if (MtmDDLStatement == queryDesc )
5562
+ {
5563
+ MtmFinishDDLCommand ();
5564
+ MtmDDLStatement = NULL ;
5565
+ }
5527
5566
}
5528
5567
5529
5568
static void MtmSeqNextvalHook (Oid seqid , int64 next )
0 commit comments