Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9dae3b8

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parents 7e32d6f + 484e3d8 commit 9dae3b8

File tree

6 files changed

+192
-72
lines changed

6 files changed

+192
-72
lines changed

contrib/mmts/multimaster.c

Lines changed: 168 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
#include "catalog/indexing.h"
6565
#include "catalog/namespace.h"
6666
#include "catalog/pg_constraint_fn.h"
67+
#include "catalog/pg_proc.h"
6768
#include "pglogical_output/hooks.h"
6869
#include "parser/analyze.h"
6970
#include "parser/parse_relation.h"
@@ -256,8 +257,6 @@ bool MtmUseRDMA;
256257
bool MtmPreserveCommitOrder;
257258
bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
258259

259-
TransactionId MtmUtilityProcessedInXid;
260-
261260
static char* MtmConnStrs;
262261
static char* MtmRemoteFunctionsList;
263262
static char* MtmClusterName;
@@ -276,6 +275,7 @@ static bool MtmClusterLocked;
276275
static bool MtmInsideTransaction;
277276
static bool MtmReferee;
278277
static bool MtmMonotonicSequences;
278+
static void const* MtmDDLStatement;
279279

280280
static ExecutorStart_hook_type PreviousExecutorStartHook;
281281
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -924,6 +924,7 @@ MtmResetTransaction()
924924
x->csn = INVALID_CSN;
925925
x->status = TRANSACTION_STATUS_UNKNOWN;
926926
x->gid[0] = '\0';
927+
MtmDDLStatement = NULL;
927928
}
928929

929930
#if 0
@@ -987,6 +988,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
987988
MtmCheckClusterLock();
988989
}
989990
MtmInsideTransaction = true;
991+
MtmDDLStatement = NULL;
990992
Mtm->nRunningTransactions += 1;
991993

992994
x->snapshot = MtmAssignCSN();
@@ -3461,7 +3463,7 @@ _PG_init(void)
34613463
&MtmRemoteFunctionsList,
34623464
"lo_create,lo_unlink",
34633465
PGC_USERSET, /* context */
3464-
0, /* flags */
3466+
GUC_LIST_INPUT | GUC_LIST_QUOTE, /* flags */
34653467
NULL, /* GucStringCheckHook check_hook */
34663468
MtmSetRemoteFunction, /* GucStringAssignHook assign_hook */
34673469
NULL /* GucShowHook show_hook */
@@ -4975,14 +4977,17 @@ static void MtmGucDiscard()
49754977
dlist_init(&MtmGucList);
49764978

49774979
hash_destroy(MtmGucHash);
4978-
MtmGucInit();
4980+
MtmGucHash = NULL;
49794981
}
49804982

49814983
static inline void MtmGucUpdate(const char *key, char *value)
49824984
{
49834985
MtmGucEntry *hentry;
49844986
bool found;
49854987

4988+
if (!MtmGucHash)
4989+
MtmGucInit();
4990+
49864991
hentry = (MtmGucEntry*)hash_search(MtmGucHash, key, HASH_ENTER, &found);
49874992
if (found)
49884993
{
@@ -4998,6 +5003,9 @@ static inline void MtmGucRemove(const char *key)
49985003
MtmGucEntry *hentry;
49995004
bool found;
50005005

5006+
if (!MtmGucHash)
5007+
MtmGucInit();
5008+
50015009
hentry = (MtmGucEntry*)hash_search(MtmGucHash, key, HASH_FIND, &found);
50025010
if (found)
50035011
{
@@ -5056,23 +5064,19 @@ char* MtmGucSerialize(void)
50565064

50575065
serialized_gucs = makeStringInfo();
50585066

5059-
/*
5060-
* Crutch for scheduler. It sets search_path through SetConfigOption()
5061-
* so our callback do not react on that.
5062-
*/
5063-
search_path = GetConfigOption("search_path", false, true);
5064-
appendStringInfo(serialized_gucs, "SET search_path TO %s; ", search_path);
5065-
50665067
dlist_foreach(iter, &MtmGucList)
50675068
{
50685069
MtmGucEntry *cur_entry = dlist_container(MtmGucEntry, list_node, iter.cur);
50695070

5071+
if (strcmp(cur_entry->key, "search_path") == 0)
5072+
continue;
5073+
50705074
appendStringInfoString(serialized_gucs, "SET ");
50715075
appendStringInfoString(serialized_gucs, cur_entry->key);
50725076
appendStringInfoString(serialized_gucs, " TO ");
50735077

50745078
/* quite a crutch */
5075-
if (strstr(cur_entry->key, "_mem") != NULL || *(cur_entry->value) == '\0' || strchr(cur_entry->value, ',') != NULL)
5079+
if (strstr(cur_entry->key, "_mem") != NULL || *(cur_entry->value) == '\0')
50765080
{
50775081
appendStringInfoString(serialized_gucs, "'");
50785082
appendStringInfoString(serialized_gucs, cur_entry->value);
@@ -5085,6 +5089,13 @@ char* MtmGucSerialize(void)
50855089
appendStringInfoString(serialized_gucs, "; ");
50865090
}
50875091

5092+
/*
5093+
* Crutch for scheduler. It sets search_path through SetConfigOption()
5094+
* so our callback do not react on that.
5095+
*/
5096+
search_path = GetConfigOption("search_path", false, true);
5097+
appendStringInfo(serialized_gucs, "SET search_path TO %s; ", search_path);
5098+
50885099
return serialized_gucs->data;
50895100
}
50905101

@@ -5142,12 +5153,60 @@ void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
51425153
MTM_LOG1("Update deadlock graph for node %d size %d", nodeId, messageSize);
51435154
}
51445155

5156+
static bool MtmIsTempType(TypeName* typeName)
5157+
{
5158+
bool isTemp = false;
5159+
5160+
if (typeName != NULL)
5161+
{
5162+
Type typeTuple = LookupTypeName(NULL, typeName, NULL, false);
5163+
if (typeTuple != NULL)
5164+
{
5165+
Form_pg_type typeStruct = (Form_pg_type) GETSTRUCT(typeTuple);
5166+
Oid relid = typeStruct->typrelid;
5167+
ReleaseSysCache(typeTuple);
5168+
5169+
if (relid != InvalidOid)
5170+
{
5171+
HeapTuple classTuple = SearchSysCache1(RELOID, relid);
5172+
Form_pg_class classStruct = (Form_pg_class) GETSTRUCT(classTuple);
5173+
if (classStruct->relpersistence == 't')
5174+
isTemp = true;
5175+
ReleaseSysCache(classTuple);
5176+
}
5177+
}
5178+
}
5179+
return isTemp;
5180+
}
5181+
5182+
static bool MtmFunctionProfileDependsOnTempTable(CreateFunctionStmt* func)
5183+
{
5184+
ListCell* elem;
5185+
5186+
if (MtmIsTempType(func->returnType))
5187+
{
5188+
return true;
5189+
}
5190+
foreach (elem, func->parameters)
5191+
{
5192+
FunctionParameter* param = (FunctionParameter*) lfirst(elem);
5193+
if (MtmIsTempType(param->argType))
5194+
{
5195+
return true;
5196+
}
5197+
}
5198+
return false;
5199+
}
5200+
5201+
5202+
51455203
static void MtmProcessUtility(Node *parsetree, const char *queryString,
51465204
ProcessUtilityContext context, ParamListInfo params,
51475205
DestReceiver *dest, char *completionTag)
51485206
{
51495207
bool skipCommand = false;
51505208
bool executed = false;
5209+
bool prevMyXactAccessedTempRel;
51515210

51525211
MTM_LOG2("%d: Process utility statement tag=%d, context=%d, issubtrans=%d, creating_extension=%d, query=%s",
51535212
MyProcPid, nodeTag(parsetree), context, IsSubTransaction(), creating_extension, queryString);
@@ -5229,19 +5288,24 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
52295288
break;
52305289

52315290
case T_VacuumStmt:
5232-
skipCommand = true;
5233-
if (context == PROCESS_UTILITY_TOPLEVEL) {
5234-
MtmProcessDDLCommand(queryString, false);
5235-
MtmTx.isDistributed = false;
5236-
} else if (MtmApplyContext != NULL) {
5237-
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
5238-
Assert(oldContext != MtmApplyContext);
5239-
MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
5240-
MemoryContextSwitchTo(oldContext);
5241-
return;
5242-
}
5243-
break;
5244-
5291+
{
5292+
VacuumStmt* vacuum = (VacuumStmt*)parsetree;
5293+
skipCommand = true;
5294+
if ((vacuum->options & VACOPT_LOCAL) == 0 && !MtmVolksWagenMode)
5295+
{
5296+
if (context == PROCESS_UTILITY_TOPLEVEL) {
5297+
MtmProcessDDLCommand(queryString, false);
5298+
MtmTx.isDistributed = false;
5299+
} else if (MtmApplyContext != NULL) {
5300+
MemoryContext oldContext = MemoryContextSwitchTo(MtmApplyContext);
5301+
Assert(oldContext != MtmApplyContext);
5302+
MtmVacuumStmt = (VacuumStmt*)copyObject(parsetree);
5303+
MemoryContextSwitchTo(oldContext);
5304+
return;
5305+
}
5306+
}
5307+
break;
5308+
}
52455309
case T_CreateDomainStmt:
52465310
/* Detect temp tables access */
52475311
{
@@ -5377,6 +5441,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53775441
return;
53785442
}
53795443
}
5444+
else if (stmt->removeType == OBJECT_FUNCTION && MtmTx.isReplicated)
5445+
{
5446+
/* Make it possible to drop functions which were not replicated */
5447+
stmt->missing_ok = true;
5448+
}
53805449
}
53815450
break;
53825451

@@ -5386,6 +5455,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53865455
CopyStmt *copyStatement = (CopyStmt *) parsetree;
53875456
skipCommand = true;
53885457
if (copyStatement->is_from) {
5458+
ListCell *opt;
53895459
RangeVar *relation = copyStatement->relation;
53905460

53915461
if (relation != NULL)
@@ -5400,6 +5470,25 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54005470
heap_close(rel, ShareLock);
54015471
}
54025472
}
5473+
5474+
foreach(opt, copyStatement->options)
5475+
{
5476+
DefElem *elem = lfirst(opt);
5477+
if (strcmp("local", elem->defname) == 0) {
5478+
MtmTx.isDistributed = false; /* Skip */
5479+
MtmTx.snapshot = INVALID_CSN;
5480+
MtmTx.containsDML = false;
5481+
break;
5482+
}
5483+
}
5484+
}
5485+
case T_CreateFunctionStmt:
5486+
{
5487+
if (MtmTx.isReplicated)
5488+
{
5489+
// disable functiob body cehck at replica
5490+
check_function_bodies = false;
5491+
}
54035492
}
54045493
break;
54055494
}
@@ -5409,16 +5498,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54095498
break;
54105499
}
54115500

5412-
if (!skipCommand && !MtmTx.isReplicated && (context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
5501+
if (!skipCommand && !MtmTx.isReplicated && !MtmDDLStatement)
54135502
{
5414-
MtmUtilityProcessedInXid = GetCurrentTransactionId();
5415-
if (context == PROCESS_UTILITY_TOPLEVEL || !ActivePortal) {
5416-
MtmProcessDDLCommand(queryString, true);
5417-
} else {
5418-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5419-
}
5503+
MTM_LOG3("Process DDL statement '%s', MtmTx.isReplicated=%d, MtmIsLogicalReceiver=%d", queryString, MtmTx.isReplicated, MtmIsLogicalReceiver);
5504+
MtmProcessDDLCommand(queryString, true);
54205505
executed = true;
5506+
MtmDDLStatement = queryString;
54215507
}
5508+
else MTM_LOG3("Skip utility statement '%s': skip=%d, insideDDL=%d", queryString, skipCommand, MtmDDLStatement != NULL);
5509+
5510+
prevMyXactAccessedTempRel = MyXactAccessedTempRel;
54225511

54235512
if (PreviousProcessUtilityHook != NULL)
54245513
{
@@ -5435,18 +5524,32 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54355524
MTM_ELOG(ERROR, "Isolation level %s is not supported by multimaster", isoLevelStr[XactIsoLevel]);
54365525
}
54375526
#endif
5527+
/* Allow replication of functions operating on temporary tables.
5528+
* Even through temporary table doesn't exist at replica, diasabling functoin body check makes it possible to create such function at replica.
5529+
* And it can be accessed later at replica if correspondent temporary table will be created.
5530+
* But disable replication of functions returning temporary tables: such functions can not be created at replica in any case.
5531+
*/
5532+
if (IsA(parsetree, CreateFunctionStmt))
5533+
{
5534+
if (MtmFunctionProfileDependsOnTempTable((CreateFunctionStmt*)parsetree))
5535+
{
5536+
prevMyXactAccessedTempRel = true;
5537+
}
5538+
MyXactAccessedTempRel = prevMyXactAccessedTempRel;
5539+
}
54385540
if (MyXactAccessedTempRel)
54395541
{
5440-
MTM_LOG1("Xact accessed temp table, stopping replication");
5542+
MTM_LOG1("Xact accessed temp table, stopping replication of statement '%s'", queryString);
54415543
MtmTx.isDistributed = false; /* Skip */
54425544
MtmTx.snapshot = INVALID_CSN;
54435545
}
54445546

54455547
if (executed)
54465548
{
54475549
MtmFinishDDLCommand();
5550+
MtmDDLStatement = NULL;
54485551
}
5449-
if (nodeTag(parsetree) == T_CreateStmt)
5552+
if (IsA(parsetree, CreateStmt))
54505553
{
54515554
CreateStmt* create = (CreateStmt*)parsetree;
54525555
Oid relid = RangeVarGetRelid(create->relation, NoLock, true);
@@ -5463,15 +5566,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
54635566
}
54645567
}
54655568
}
5466-
if (context == PROCESS_UTILITY_TOPLEVEL) {
5467-
MtmUtilityProcessedInXid = InvalidTransactionId;
5468-
}
54695569
}
54705570

54715571
static void
54725572
MtmExecutorStart(QueryDesc *queryDesc, int eflags)
54735573
{
5474-
if (!MtmTx.isReplicated && ActivePortal)
5574+
if (!MtmTx.isReplicated && !MtmDDLStatement)
54755575
{
54765576
ListCell *tlist;
54775577

@@ -5485,11 +5585,32 @@ MtmExecutorStart(QueryDesc *queryDesc, int eflags)
54855585
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
54865586
if (tle->expr && IsA(tle->expr, FuncExpr))
54875587
{
5488-
if (hash_search(MtmRemoteFunctions, &((FuncExpr*)tle->expr)->funcid, HASH_FIND, NULL))
5588+
Oid func_oid = ((FuncExpr*)tle->expr)->funcid;
5589+
if (!hash_search(MtmRemoteFunctions, &func_oid, HASH_FIND, NULL))
54895590
{
5490-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5491-
break;
5591+
Form_pg_proc funcform;
5592+
bool is_sec_def;
5593+
HeapTuple func_tuple = SearchSysCache1(PROCOID, ObjectIdGetDatum(func_oid));
5594+
if (!HeapTupleIsValid(func_tuple))
5595+
elog(ERROR, "cache lookup failed for function %u", func_oid);
5596+
funcform = (Form_pg_proc) GETSTRUCT(func_tuple);
5597+
is_sec_def = funcform->prosecdef;
5598+
ReleaseSysCache(func_tuple);
5599+
elog(LOG, "Function %s security defined=%d", tle->resname, is_sec_def);
5600+
if (!is_sec_def)
5601+
{
5602+
continue;
5603+
}
54925604
}
5605+
/*
5606+
* Execute security defined functions or functions marked as remote at replicated nodes.
5607+
* Them are executed as DDL statements.
5608+
* All data modifications done inside this function are not replicated.
5609+
* As a result generated content can vary at different nodes.
5610+
*/
5611+
MtmProcessDDLCommand(queryDesc->sourceText, true);
5612+
MtmDDLStatement = queryDesc;
5613+
break;
54935614
}
54945615
}
54955616
}
@@ -5538,6 +5659,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
55385659
{
55395660
standard_ExecutorFinish(queryDesc);
55405661
}
5662+
5663+
if (MtmDDLStatement == queryDesc)
5664+
{
5665+
MtmFinishDDLCommand();
5666+
MtmDDLStatement = NULL;
5667+
}
55415668
}
55425669

55435670
static void MtmSeqNextvalHook(Oid seqid, int64 next)
File renamed without changes.

src/backend/commands/copy.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1191,7 +1191,7 @@ ProcessCopyOptions(CopyState cstate,
11911191
errmsg("argument to option \"%s\" must be a valid encoding name",
11921192
defel->defname)));
11931193
}
1194-
else
1194+
else if (strcmp(defel->defname, "local") != 0)
11951195
ereport(ERROR,
11961196
(errcode(ERRCODE_SYNTAX_ERROR),
11971197
errmsg("option \"%s\" not recognized",

0 commit comments

Comments
 (0)