Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 77a38b2

Browse files
author
Mikhail Rutman
committed
wip: porting to pgproee 14
1 parent 7ca0008 commit 77a38b2

File tree

12 files changed

+156
-74
lines changed

12 files changed

+156
-74
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ all: multimaster.so
6969

7070
submake-regress:
7171
$(MAKE) -C $(top_builddir)/src/test/regress all
72-
$(MAKE) -C $(top_builddir)/src/test/regress tablespace-setup
72+
# $(MAKE) -C $(top_builddir)/src/test/regress tablespace-setup
7373

7474
# all .pl tests should pass now, but let's see what the buildfarm says
7575
# ifndef MTM_ALL

src/commit.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -482,9 +482,9 @@ MtmTwoPhaseCommit(void)
482482
* XXX: check the same xact block stuff in case of cleanup
483483
*/
484484
#ifdef PGPRO_EE
485-
if (IsTransactionState())
486-
SuspendTransaction();
487-
else
485+
// if (IsTransactionState())
486+
// SuspendTransaction();
487+
// else
488488
#endif
489489
StartTransactionCommand();
490490

src/ddl.c

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ static void MtmSeqNextvalHook(Oid seqid, int64 next);
119119
static void MtmExecutorStart(QueryDesc *queryDesc, int eflags);
120120
static void MtmExecutorFinish(QueryDesc *queryDesc);
121121

122-
static void MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
122+
static void MtmProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree,
123123
ProcessUtilityContext context, ParamListInfo params,
124124
QueryEnvironment *queryEnv, DestReceiver *dest,
125125
QueryCompletion *qc);
@@ -359,7 +359,7 @@ MtmGucInit(void)
359359
MtmGucHash = hash_create("MtmGucHash",
360360
MTM_GUC_HASHSIZE,
361361
&hash_ctl,
362-
HASH_ELEM | HASH_CONTEXT);
362+
HASH_ELEM | HASH_CONTEXT | HASH_STRINGS);
363363

364364
/*
365365
* If current role is not equal to MtmDatabaseUser, than set it before any
@@ -661,7 +661,7 @@ MtmFinishDDLCommand()
661661

662662

663663
static void
664-
MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
664+
MtmProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree,
665665
ProcessUtilityContext context, ParamListInfo params,
666666
QueryEnvironment *queryEnv, DestReceiver *dest,
667667
QueryCompletion *qc)
@@ -677,13 +677,13 @@ MtmProcessUtility(PlannedStmt *pstmt, const char *queryString,
677677
{
678678
if (PreviousProcessUtilityHook != NULL)
679679
{
680-
PreviousProcessUtilityHook(pstmt, queryString,
680+
PreviousProcessUtilityHook(pstmt, queryString, readOnlyTree,
681681
context, params, queryEnv,
682682
dest, qc);
683683
}
684684
else
685685
{
686-
standard_ProcessUtility(pstmt, queryString,
686+
standard_ProcessUtility(pstmt, queryString, readOnlyTree,
687687
context, params, queryEnv,
688688
dest, qc);
689689
}
@@ -841,13 +841,17 @@ MtmProcessUtilityReceiver(PlannedStmt *pstmt, const char *queryString,
841841

842842
if (PreviousProcessUtilityHook != NULL)
843843
{
844-
PreviousProcessUtilityHook(pstmt, queryString,
844+
bool readOnlyTree = false;
845+
846+
PreviousProcessUtilityHook(pstmt, queryString, readOnlyTree,
845847
context, params, queryEnv,
846848
dest, qc);
847849
}
848850
else
849851
{
850-
standard_ProcessUtility(pstmt, queryString,
852+
bool readOnlyTree = false;
853+
854+
standard_ProcessUtility(pstmt, queryString, readOnlyTree,
851855
context, params, queryEnv,
852856
dest, qc);
853857
}
@@ -1188,13 +1192,17 @@ MtmProcessUtilitySender(PlannedStmt *pstmt, const char *queryString,
11881192

11891193
if (PreviousProcessUtilityHook != NULL)
11901194
{
1191-
PreviousProcessUtilityHook(pstmt, queryString,
1195+
bool readOnlyTree = false;
1196+
1197+
PreviousProcessUtilityHook(pstmt, queryString, readOnlyTree,
11921198
context, params, queryEnv,
11931199
dest, qc);
11941200
}
11951201
else
11961202
{
1197-
standard_ProcessUtility(pstmt, queryString,
1203+
bool readOnlyTree = false;
1204+
1205+
standard_ProcessUtility(pstmt, queryString, readOnlyTree,
11981206
context, params, queryEnv,
11991207
dest, qc);
12001208
}
@@ -1312,11 +1320,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
13121320
if (operation == CMD_INSERT || operation == CMD_UPDATE ||
13131321
operation == CMD_DELETE || pstmt->hasModifyingCTE)
13141322
{
1315-
int i;
1323+
// int i;
13161324

1317-
for (i = 0; i < estate->es_num_result_relations; i++)
1318-
{
1319-
Relation rel = estate->es_result_relations[i].ri_RelationDesc;
1325+
// for (i = 0; i < estate->es_num_result_relations; i++)
1326+
// {
1327+
//
1328+
ListCell *l;
1329+
1330+
foreach(l, estate->es_opened_result_relations)
1331+
{
1332+
ResultRelInfo *resultRelInfo = lfirst(l);
1333+
Relation rel = resultRelInfo->ri_RelationDesc;
13201334

13211335
/*
13221336
* Don't run 3pc unless we modified at least one non-local table.
@@ -1710,7 +1724,7 @@ MtmInitializeRemoteFunctionsMap()
17101724
if (q != NULL)
17111725
*q++ = '\0';
17121726

1713-
clist = FuncnameGetCandidates(stringToQualifiedNameList(p), -1, NIL, false, false, true);
1727+
clist = FuncnameGetCandidates(stringToQualifiedNameList(p), -1, NIL, false, false, true, true);
17141728
if (clist == NULL)
17151729
mtm_log(DEBUG1, "Can't resolve function '%s', postponing that", p);
17161730
else
@@ -1725,7 +1739,7 @@ MtmInitializeRemoteFunctionsMap()
17251739
p = q;
17261740
} while (p != NULL);
17271741

1728-
clist = FuncnameGetCandidates(stringToQualifiedNameList("mtm.alter_sequences"), -1, NIL, false, false, true);
1742+
clist = FuncnameGetCandidates(stringToQualifiedNameList("mtm.alter_sequences"), -1, NIL, false, false, true, true);
17291743
if (clist != NULL)
17301744
hash_search(MtmRemoteFunctions, &clist->oid, HASH_ENTER, NULL);
17311745

src/dmq.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ dmq_shmem_startup_hook(void)
367367
DMQ_MAX_SUBS_PER_BACKEND * MaxBackends,
368368
DMQ_MAX_SUBS_PER_BACKEND * MaxBackends,
369369
&hash_info,
370-
HASH_ELEM);
370+
HASH_ELEM | HASH_STRINGS);
371371

372372
LWLockRelease(AddinShmemInitLock);
373373
}
@@ -388,6 +388,7 @@ dmq_init(int send_timeout, int connect_timeout)
388388
{
389389
BackgroundWorker worker;
390390

391+
mtm_log(LOG, "------> dmq_init starting");
391392
if (!process_shared_preload_libraries_in_progress)
392393
return;
393394

@@ -414,6 +415,7 @@ dmq_init(int send_timeout, int connect_timeout)
414415
/* Register shmem hooks */
415416
PreviousShmemStartupHook = shmem_startup_hook;
416417
shmem_startup_hook = dmq_shmem_startup_hook;
418+
mtm_log(LOG, "------> dmq_init started");
417419
}
418420

419421
static Size
@@ -516,6 +518,8 @@ dmq_sender_main(Datum main_arg)
516518
*/
517519
int sconn_cnt[DMQ_MAX_DESTINATIONS];
518520
double prev_timer_at = dmq_now();
521+
522+
mtm_log(LOG, "------> dmq_sender starting");
519523

520524
MtmBackgroundWorker = true; /* includes bgw name in mtm_log */
521525

@@ -562,6 +566,7 @@ dmq_sender_main(Datum main_arg)
562566

563567
got_SIGHUP = true;
564568

569+
mtm_log(LOG, "------> dmq_sender 1");
565570
for (;;)
566571
{
567572
WaitEvent event;
@@ -570,6 +575,7 @@ dmq_sender_main(Datum main_arg)
570575
double now_millisec;
571576
bool timer_event = false;
572577

578+
mtm_log(LOG, "------> dmq_sender 2");
573579
if (ProcDiePending)
574580
break;
575581

@@ -1444,7 +1450,7 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
14441450
extra = dmq_receiver_start_hook(sender_name);
14451451

14461452
/* do not hold globalxmin. XXX: try to carefully release snaps */
1447-
MyPgXact->xmin = InvalidTransactionId;
1453+
// MyPgXact->xmin = InvalidTransactionId;
14481454

14491455
for (;;)
14501456
{

src/global_tx.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ MtmGlobalTxShmemStartup(void)
185185
gtx_shared->lock = &(GetNamedLWLockTranche("mtm-gtx-lock"))->lock;
186186

187187
gtx_shared->gid2gtx = ShmemInitHash("gid2gtx", 2*MaxConnections, 2*MaxConnections,
188-
&info, HASH_ELEM);
188+
&info, HASH_ELEM | HASH_STRINGS);
189189

190190
LWLockRelease(AddinShmemInitLock);
191191
}

src/include/compat.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,25 @@
22
#define MTMCOMPAT_H
33

44
/* EE pooler gets rid of static variable */
5-
#ifdef PGPRO_EE
5+
#ifndef PGPRO_EE
66
#define FeBeWaitSetCompat() (MyProcPort->pqcomm_waitset)
77
#else
88
#define FeBeWaitSetCompat() (FeBeWaitSet)
99
#endif
1010

1111
#ifdef PGPRO_EE /* atx */
12-
#define BeginTransactionBlockCompat() (BeginTransactionBlock(false, NIL))
13-
#define UserAbortTransactionBlockCompat(chain) (UserAbortTransactionBlock(false, (chain)))
12+
#define BeginTransactionBlockCompat() (BeginTransactionBlock())
13+
#define UserAbortTransactionBlockCompat(chain) (UserAbortTransactionBlock((chain)))
1414
#else
1515
#define BeginTransactionBlockCompat() (BeginTransactionBlock())
1616
#define UserAbortTransactionBlockCompat(chain) (UserAbortTransactionBlock(chain))
1717
#endif
1818

1919
/* atx renames this for some reason */
20-
#ifdef PGPRO_EE
21-
#define on_commits_compat() (pg_on_commit_actions)
22-
#else
20+
//#ifdef PGPRO_EE
21+
//#define on_commits_compat() (pg_on_commit_actions)
22+
//#else
2323
#define on_commits_compat() (on_commits)
24-
#endif
24+
//#endif
2525

2626
#endif /* MTMCOMPAT_H */

src/multimaster.c

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ static void MtmDeserializeTransactionState(void *ctx);
8888
#endif
8989

9090
#ifdef PGPRO_EE
91-
static void *MtmSuspendTransaction(void);
92-
static void MtmResumeTransaction(void *ctx);
91+
//static void *MtmSuspendTransaction(void);
92+
//static void MtmResumeTransaction(void *ctx);
9393
#endif
9494

9595
static void release_pb_holders_xact_cb(XactEvent event, void *arg);
@@ -363,21 +363,21 @@ MtmDeserializeTransactionState(void *ctx)
363363
* ATX compatibility support.
364364
*/
365365
#ifdef PGPRO_EE
366-
static void *
367-
MtmSuspendTransaction(void)
368-
{
369-
MtmCurrentTrans *ctx = MemoryContextAlloc(CurTransactionContext, sizeof(MtmCurrentTrans));
370-
371-
*ctx = MtmTx;
372-
return ctx;
373-
}
374-
375-
static void
376-
MtmResumeTransaction(void *ctx)
377-
{
378-
MtmTx = *(MtmCurrentTrans *) ctx;
379-
pfree(ctx);
380-
}
366+
//static void *
367+
//MtmSuspendTransaction(void)
368+
//{
369+
// MtmCurrentTrans *ctx = MemoryContextAlloc(CurTransactionContext, sizeof(MtmCurrentTrans));
370+
//
371+
// *ctx = MtmTx;
372+
// return ctx;
373+
//}
374+
375+
//static void
376+
//MtmResumeTransaction(void *ctx)
377+
//{
378+
// MtmTx = *(MtmCurrentTrans *) ctx;
379+
// pfree(ctx);
380+
//}
381381
#endif
382382

383383
/*
@@ -744,8 +744,8 @@ NULL);
744744
DetectGlobalDeadLock = MtmDetectGlobalDeadLock;
745745

746746
#ifdef PGPRO_EE
747-
SuspendTransactionHook = MtmSuspendTransaction;
748-
ResumeTransactionHook = MtmResumeTransaction;
747+
// SuspendTransactionHook = MtmSuspendTransaction;
748+
// ResumeTransactionHook = MtmResumeTransaction;
749749
#endif
750750
}
751751

@@ -1712,8 +1712,10 @@ MtmReloadConfig(MtmConfig *old_cfg, mtm_cfg_change_cb node_add_cb,
17121712
int i,
17131713
node_id;
17141714

1715+
mtm_log(LOG, "-------> ReloadConfig");
17151716
new_cfg = MtmLoadConfig(elevel_on_absent);
17161717

1718+
mtm_log(LOG, "-------> ReloadConfig. new config %p", new_cfg);
17171719
/*
17181720
* Construct bitmapsets from old and new mtm_config's and find out whether
17191721
* some nodes were added or deleted.
@@ -1758,8 +1760,9 @@ MtmReloadConfig(MtmConfig *old_cfg, mtm_cfg_change_cb node_add_cb,
17581760
}
17591761

17601762
node_id = -1;
1761-
while ((node_id = bms_next_member(created, node_id)) >= 0)
1763+
while ((node_id = bms_next_member(created, node_id)) >= 0) {
17621764
node_add_cb(node_id, new_cfg, arg);
1765+
}
17631766
}
17641767

17651768
if (node_drop_cb)

src/pglogical_apply.c

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@ create_rel_estate(Relation rel)
172172
resultRelInfo = makeNode(ResultRelInfo);
173173
InitResultRelInfo(resultRelInfo, rel, 1, NULL, 0);
174174

175-
estate->es_result_relations = resultRelInfo;
176-
estate->es_num_result_relations = 1;
175+
estate->es_result_relations = &resultRelInfo;
176+
// estate->es_num_result_relations = 1; // 1375422c7826a2bf387be29895e961614f69de4b
177177
estate->es_result_relation_info = resultRelInfo;
178178
estate->es_output_cid = GetCurrentCommandId(true);
179179

@@ -1238,9 +1238,10 @@ process_remote_insert(StringInfo s, Relation rel)
12381238
if (relinfo->ri_NumIndices > 0)
12391239
{
12401240
List *recheckIndexes;
1241+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
12411242

1242-
recheckIndexes = ExecInsertIndexTuples(bufferedSlots[i],
1243-
estate, false, NULL, NIL);
1243+
recheckIndexes = ExecInsertIndexTuples(resultRelInfo, bufferedSlots[i],
1244+
estate, false, false, NULL, NIL);
12441245

12451246
/* AFTER ROW INSERT Triggers */
12461247
ExecARInsertTriggers(estate, relinfo, bufferedSlots[i],
@@ -1267,11 +1268,12 @@ process_remote_insert(StringInfo s, Relation rel)
12671268
else
12681269
{
12691270
TupleTableSlot *newslot;
1271+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
12701272

12711273
newslot = ExecInitExtraTupleSlot(estate, tupDesc, &TTSOpsHeapTuple);
12721274
tuple_to_slot(estate, rel, &new_tuple, newslot);
12731275

1274-
ExecSimpleRelationInsert(estate, newslot);
1276+
ExecSimpleRelationInsert(resultRelInfo, estate, newslot);
12751277
}
12761278
ExecCloseIndices(estate->es_result_relation_info);
12771279
if (ActiveSnapshotSet())
@@ -1367,6 +1369,7 @@ process_remote_update(StringInfo s, Relation rel)
13671369
if (found)
13681370
{
13691371
HeapTuple remote_tuple = NULL;
1372+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
13701373

13711374
remote_tuple = heap_modify_tuple(ExecFetchSlotHeapTuple(localslot, true, NULL),
13721375
tupDesc,
@@ -1376,7 +1379,7 @@ process_remote_update(StringInfo s, Relation rel)
13761379
ExecStoreHeapTuple(remote_tuple, remoteslot, false);
13771380

13781381
EvalPlanQualSetSlot(&epqstate, remoteslot);
1379-
ExecSimpleRelationUpdate(estate, &epqstate, localslot, remoteslot);
1382+
ExecSimpleRelationUpdate(resultRelInfo, estate, &epqstate, localslot, remoteslot);
13801383
}
13811384
else
13821385
{
@@ -1444,8 +1447,10 @@ process_remote_delete(StringInfo s, Relation rel)
14441447

14451448
if (found)
14461449
{
1450+
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
1451+
14471452
EvalPlanQualSetSlot(&epqstate, localslot);
1448-
ExecSimpleRelationDelete(estate, &epqstate, localslot);
1453+
ExecSimpleRelationDelete(resultRelInfo, estate, &epqstate, localslot);
14491454
}
14501455
else
14511456
{

0 commit comments

Comments
 (0)