Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1b59ad0

Browse files
author
Mikhail Rutman
committed
Porting multimaster to postgres 14
This commit ports multimaster to Postgres 14: - removed tablespace-setup from mmts/Makefile (according to commit 6c788d9f6aadb41d76a72d56149268371a7895ee); - add readOnlyTree flag to MtmProcessUtility() (according to commit 7c337b6b527b7052e6a751f966d5734c56f668b5); - explicitly specify the hash function HASH_STRING (according to commit b3817f5f774663d55931dd4fab9c5a94a15ae7ab); - refactor the use of EState (according to commit 1375422c7826a2bf387be29895e961614f69de4b); - add 'include_out_arguments' to the function FuncnameGetCandidates() (caccording to commit e56bce5d43789cce95d099554ae9593ada92b3b7); - use MyProc->xmin instead of MyPgXact->xmin in dmq.c (according to commit 73487a60fc1063ba4b5178b69aee4ee210c182c4); - remove EE pooler support; - use SearchNamedReplicationSlot() instead of ReplicationSlotAcquire()
1 parent b879d10 commit 1b59ad0

File tree

10 files changed

+1923
-137
lines changed

10 files changed

+1923
-137
lines changed

Cluster.pm

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ sub init
141141
{
142142
$node->init(allows_streaming => 'logical');
143143
$node->append_conf('postgresql.conf', qq{
144+
enable_self_join_removal = off
144145
max_connections = 50
145146
log_line_prefix = '%m [%p] [xid%x] %i '
146147
log_statement = all

Makefile

-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ all: multimaster.so
6969

7070
submake-regress:
7171
$(MAKE) -C $(top_builddir)/src/test/regress all
72-
$(MAKE) -C $(top_builddir)/src/test/regress tablespace-setup
7372

7473
# all .pl tests should pass now, but let's see what the buildfarm says
7574
# ifndef MTM_ALL

expected/regression_ee.diff

+1,817-70
Large diffs are not rendered by default.

src/ddl.c

+21-26
Original file line numberDiff line numberDiff line change
@@ -119,19 +119,21 @@ static void MtmSeqNextvalHook(Oid seqid, int64 next);
119119
static void MtmExecutorStart(QueryDesc *queryDesc, int eflags);
120120
static void MtmExecutorFinish(QueryDesc *queryDesc);
121121

122-
static void MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
122+
static void MtmProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree,
123123
ProcessUtilityContext context, ParamListInfo params,
124124
QueryEnvironment *queryEnv, DestReceiver *dest,
125125
QueryCompletion *qc);
126126

127127
static void MtmProcessUtilityReceiver(PlannedStmt *pstmt,
128128
const char *queryString,
129+
bool readOnlyTree,
129130
ProcessUtilityContext context, ParamListInfo params,
130131
QueryEnvironment *queryEnv, DestReceiver *dest,
131132
QueryCompletion *qc);
132133

133134
static void MtmProcessUtilitySender(PlannedStmt *pstmt,
134135
const char *queryString,
136+
bool readOnlyTree,
135137
ProcessUtilityContext context, ParamListInfo params,
136138
QueryEnvironment *queryEnv, DestReceiver *dest,
137139
QueryCompletion *qc);
@@ -359,7 +361,7 @@ MtmGucInit(void)
359361
MtmGucHash = hash_create("MtmGucHash",
360362
MTM_GUC_HASHSIZE,
361363
&hash_ctl,
362-
HASH_ELEM | HASH_CONTEXT);
364+
HASH_ELEM | HASH_CONTEXT | HASH_STRINGS);
363365

364366
/*
365367
* If current role is not equal to MtmDatabaseUser, than set it before any
@@ -661,7 +663,7 @@ MtmFinishDDLCommand()
661663

662664

663665
static void
664-
MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
666+
MtmProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree,
665667
ProcessUtilityContext context, ParamListInfo params,
666668
QueryEnvironment *queryEnv, DestReceiver *dest,
667669
QueryCompletion *qc)
@@ -676,13 +678,13 @@ MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
676678
{
677679
if (PreviousProcessUtilityHook != NULL)
678680
{
679-
PreviousProcessUtilityHook(pstmt, queryString,
681+
PreviousProcessUtilityHook(pstmt, queryString, readOnlyTree,
680682
context, params, queryEnv,
681683
dest, qc);
682684
}
683685
else
684686
{
685-
standard_ProcessUtility(pstmt, queryString,
687+
standard_ProcessUtility(pstmt, queryString, readOnlyTree,
686688
context, params, queryEnv,
687689
dest, qc);
688690
}
@@ -691,12 +693,12 @@ MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
691693

692694
if (MtmIsLogicalReceiver)
693695
{
694-
MtmProcessUtilityReceiver(pstmt, queryString, context, params,
696+
MtmProcessUtilityReceiver(pstmt, queryString, context, readOnlyTree, params,
695697
queryEnv, dest, qc);
696698
}
697699
else
698700
{
699-
MtmProcessUtilitySender(pstmt, queryString, context, params,
701+
MtmProcessUtilitySender(pstmt, queryString, readOnlyTree, context, params,
700702
queryEnv, dest, qc);
701703
}
702704

@@ -718,7 +720,7 @@ MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
718720
* receiver (e.g calling DDL from trigger) this hook does nothing.
719721
*/
720722
static void
721-
MtmProcessUtilityReceiver(PlannedStmt *pstmt, const char *queryString,
723+
MtmProcessUtilityReceiver(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree,
722724
ProcessUtilityContext context, ParamListInfo params,
723725
QueryEnvironment *queryEnv, DestReceiver *dest,
724726
QueryCompletion *qc)
@@ -839,22 +841,18 @@ MtmProcessUtilityReceiver(PlannedStmt *pstmt, const char *queryString,
839841
}
840842

841843
if (PreviousProcessUtilityHook != NULL)
842-
{
843-
PreviousProcessUtilityHook(pstmt, queryString,
844+
PreviousProcessUtilityHook(pstmt, queryString, readOnlyTree,
844845
context, params, queryEnv,
845846
dest, qc);
846-
}
847847
else
848-
{
849-
standard_ProcessUtility(pstmt, queryString,
848+
standard_ProcessUtility(pstmt, queryString, readOnlyTree,
850849
context, params, queryEnv,
851850
dest, qc);
852-
}
853851
}
854852

855853

856854
static void
857-
MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString,
855+
MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree,
858856
ProcessUtilityContext context, ParamListInfo params,
859857
QueryEnvironment *queryEnv, DestReceiver *dest,
860858
QueryCompletion *qc)
@@ -1186,17 +1184,13 @@ MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString,
11861184
stmt_string, skipCommand, MtmDDLStatement != NULL);
11871185

11881186
if (PreviousProcessUtilityHook != NULL)
1189-
{
1190-
PreviousProcessUtilityHook(pstmt, queryString,
1187+
PreviousProcessUtilityHook(pstmt, queryString, readOnlyTree,
11911188
context, params, queryEnv,
11921189
dest, qc);
1193-
}
11941190
else
1195-
{
1196-
standard_ProcessUtility(pstmt, queryString,
1191+
standard_ProcessUtility(pstmt, queryString, readOnlyTree,
11971192
context, params, queryEnv,
11981193
dest, qc);
1199-
}
12001194

12011195
/* Catch GUC assignment */
12021196
if (nodeTag(parsetree) == T_VariableSetStmt)
@@ -1311,11 +1305,12 @@ MtmExecutorFinish(QueryDesc *queryDesc)
13111305
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
13121306
operation == CMD_DELETE || pstmt->hasModifyingCTE)
13131307
{
1314-
int i;
1308+
ListCell *l;
13151309

1316-
for (i = 0; i < estate->es_num_result_relations; i++)
1310+
foreach(l, estate->es_opened_result_relations)
13171311
{
1318-
Relation rel = estate->es_result_relations[i].ri_RelationDesc;
1312+
ResultRelInfo *resultRelInfo = lfirst(l);
1313+
Relation rel = resultRelInfo->ri_RelationDesc;
13191314

13201315
/*
13211316
* Don't run 3pc unless we modified at least one non-local table.
@@ -1709,7 +1704,7 @@ MtmInitializeRemoteFunctionsMap()
17091704
if (q != NULL)
17101705
*q++ = '\0';
17111706

1712-
clist = FuncnameGetCandidates(stringToQualifiedNameList(p), -1, NIL, false, false, true);
1707+
clist = FuncnameGetCandidates(stringToQualifiedNameList(p), -1, NIL, false, false, true, true);
17131708
if (clist == NULL)
17141709
mtm_log(DEBUG1, "Can't resolve function '%s', postponing that", p);
17151710
else
@@ -1724,7 +1719,7 @@ MtmInitializeRemoteFunctionsMap()
17241719
p = q;
17251720
} while (p != NULL);
17261721

1727-
clist = FuncnameGetCandidates(stringToQualifiedNameList("mtm.alter_sequences"), -1, NIL, false, false, true);
1722+
clist = FuncnameGetCandidates(stringToQualifiedNameList("mtm.alter_sequences"), -1, NIL, false, false, true, true);
17281723
if (clist != NULL)
17291724
hash_search(MtmRemoteFunctions, &clist->oid, HASH_ENTER, NULL);
17301725

src/dmq.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ dmq_shmem_startup_hook(void)
353353
DMQ_MAX_SUBS_PER_BACKEND * MaxBackends,
354354
DMQ_MAX_SUBS_PER_BACKEND * MaxBackends,
355355
&hash_info,
356-
HASH_ELEM);
356+
HASH_ELEM | HASH_STRINGS);
357357

358358
LWLockRelease(AddinShmemInitLock);
359359
}
@@ -1443,7 +1443,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
14431443
extra = dmq_receiver_start_hook(sender_name);
14441444

14451445
/* do not hold globalxmin. XXX: try to carefully release snaps */
1446-
MyPgXact->xmin = InvalidTransactionId;
1446+
MyProc->xmin = InvalidTransactionId;
14471447

14481448
for (;;)
14491449
{

src/global_tx.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ MtmGlobalTxShmemStartup(void)
185185
gtx_shared->lock = &(GetNamedLWLockTranche("mtm-gtx-lock"))->lock;
186186

187187
gtx_shared->gid2gtx = ShmemInitHash("gid2gtx", 2*MaxConnections, 2*MaxConnections,
188-
&info, HASH_ELEM);
188+
&info, HASH_ELEM | HASH_STRINGS);
189189

190190
LWLockRelease(AddinShmemInitLock);
191191
}

src/include/compat.h

-10
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
11
#ifndef MTMCOMPAT_H
22
#define MTMCOMPAT_H
33

4-
/* EE pooler gets rid of static variable */
5-
#ifdef PGPRO_EE
6-
#define FeBeWaitSetCompat() (MyProcPort->pqcomm_waitset)
7-
#else
84
#define FeBeWaitSetCompat() (FeBeWaitSet)
9-
#endif
105

116
#ifdef PGPRO_EE /* atx */
127
#define BeginTransactionBlockCompat() (BeginTransactionBlock(false, NIL))
@@ -16,11 +11,6 @@
1611
#define UserAbortTransactionBlockCompat(chain) (UserAbortTransactionBlock(chain))
1712
#endif
1813

19-
/* atx renames this for some reason */
20-
#ifdef PGPRO_EE
21-
#define on_commits_compat() (pg_on_commit_actions)
22-
#else
2314
#define on_commits_compat() (on_commits)
24-
#endif
2515

2616
#endif /* MTMCOMPAT_H */

src/pglogical_apply.c

+18-10
Original file line numberDiff line numberDiff line change
@@ -166,24 +166,27 @@ create_rel_estate(Relation rel)
166166
EState *estate;
167167
ResultRelInfo *resultRelInfo;
168168
RangeTblEntry *rte;
169+
List *rangeTable;
169170

170171
estate = CreateExecutorState();
171172

172173
resultRelInfo = makeNode(ResultRelInfo);
173174
InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
174175

175-
estate->es_result_relations = resultRelInfo;
176-
estate->es_num_result_relations = 1;
177-
estate->es_result_relation_info = resultRelInfo;
178-
estate->es_output_cid = GetCurrentCommandId(true);
179-
180176
rte = makeNode(RangeTblEntry);
181177
rte->rtekind = RTE_RELATION;
182178
rte->relid = RelationGetRelid(rel);
183179
rte->relkind = rel->rd_rel->relkind;
184180
rte->rellockmode = AccessShareLock;
181+
rangeTable = list_make1(rte);
185182
ExecInitRangeTable(estate, list_make1(rte));
186183

184+
ExecInitRangeTable(estate, rangeTable);
185+
ExecInitResultRelation(estate, resultRelInfo, 1);
186+
187+
estate->es_result_relation_info = resultRelInfo;
188+
estate->es_output_cid = GetCurrentCommandId(true);
189+
187190
/* Prepare to catch AFTER triggers. */
188191
AfterTriggerBeginQuery();
189192

@@ -1238,9 +1241,10 @@ process_remote_insert(StringInfo s, Relation rel)
12381241
if (relinfo->ri_NumIndices > 0)
12391242
{
12401243
List *recheckIndexes;
1244+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
12411245

1242-
recheckIndexes = ExecInsertIndexTuples(bufferedSlots[i],
1243-
estate, false, NULL, NIL);
1246+
recheckIndexes = ExecInsertIndexTuples(resultRelInfo, bufferedSlots[i],
1247+
estate, false, false, NULL, NIL);
12441248

12451249
/* AFTER ROW INSERT Triggers */
12461250
ExecARInsertTriggers(estate, relinfo, bufferedSlots[i],
@@ -1267,11 +1271,12 @@ process_remote_insert(StringInfo s, Relation rel)
12671271
else
12681272
{
12691273
TupleTableSlot *newslot;
1274+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
12701275

12711276
newslot = ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsHeapTuple);
12721277
tuple_to_slot(estate, rel, &new_tuple, newslot);
12731278

1274-
ExecSimpleRelationInsert(estate, newslot);
1279+
ExecSimpleRelationInsert(resultRelInfo, estate, newslot);
12751280
}
12761281
ExecCloseIndices(estate->es_result_relation_info);
12771282
if (ActiveSnapshotSet())
@@ -1367,6 +1372,7 @@ process_remote_update(StringInfo s, Relation rel)
13671372
if (found)
13681373
{
13691374
HeapTuple remote_tuple = NULL;
1375+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
13701376

13711377
remote_tuple = heap_modify_tuple(ExecFetchSlotHeapTuple(localslot, true, NULL),
13721378
tupDesc,
@@ -1376,7 +1382,7 @@ process_remote_update(StringInfo s, Relation rel)
13761382
ExecStoreHeapTuple(remote_tuple, remoteslot, false);
13771383

13781384
EvalPlanQualSetSlot(&epqstate, remoteslot);
1379-
ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
1385+
ExecSimpleRelationUpdate(resultRelInfo, estate, &epqstate, localslot, remoteslot);
13801386
}
13811387
else
13821388
{
@@ -1444,8 +1450,10 @@ process_remote_delete(StringInfo s, Relation rel)
14441450

14451451
if (found)
14461452
{
1453+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
1454+
14471455
EvalPlanQualSetSlot(&epqstate, localslot);
1448-
ExecSimpleRelationDelete(estate, &epqstate, localslot);
1456+
ExecSimpleRelationDelete(resultRelInfo, estate, &epqstate, localslot);
14491457
}
14501458
else
14511459
{

0 commit comments

Comments
 (0)