Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 7f4beb1

Browse files
committed
fixes and crutches for scheduler
1 parent ab08a7a commit 7f4beb1

File tree

5 files changed

+25
-8
lines changed

5 files changed

+25
-8
lines changed

arbiter.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,8 @@ static void MtmSender(Datum arg)
710710
int nNodes = MtmMaxNodes;
711711
int i;
712712

713+
MtmBackgroundWorker = true;
714+
713715
MtmBuffer* txBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
714716
MTM_ELOG(LOG, "Start arbiter sender %d", MyProcPid);
715717
InitializeTimeouts();
@@ -797,6 +799,8 @@ static void MtmMonitor(Datum arg)
797799
pqsignal(SIGQUIT, SetStop);
798800
pqsignal(SIGTERM, SetStop);
799801

802+
MtmBackgroundWorker = true;
803+
800804
/* We're now ready to receive signals */
801805
BackgroundWorkerUnblockSignals();
802806

@@ -833,7 +837,9 @@ static void MtmReceiver(Datum arg)
833837
pqsignal(SIGINT, SetStop);
834838
pqsignal(SIGQUIT, SetStop);
835839
pqsignal(SIGTERM, SetStop);
836-
840+
841+
MtmBackgroundWorker = true;
842+
837843
/* We're now ready to receive signals */
838844
BackgroundWorkerUnblockSignals();
839845

bgwpool.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3636

3737
MTM_ELOG(LOG, "Start background worker %d, shutdown=%d", MyProcPid, pool->shutdown);
3838

39+
MtmBackgroundWorker = true;
3940
MtmIsLogicalReceiver = true;
4041
MtmPool = pool;
4142

multimaster.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ bool MtmDoReplication;
236236
char* MtmDatabaseName;
237237
char* MtmDatabaseUser;
238238
Oid MtmDatabaseId;
239+
bool MtmBackgroundWorker;
239240

240241
int MtmNodes;
241242
int MtmNodeId;
@@ -898,7 +899,7 @@ MtmIsUserTransaction()
898899
IsNormalProcessingMode() &&
899900
MtmDoReplication &&
900901
!am_walsender &&
901-
!IsBackgroundWorker &&
902+
!MtmBackgroundWorker &&
902903
!IsAutoVacuumWorkerProcess();
903904
}
904905

@@ -4538,7 +4539,7 @@ static void MtmGucInit(void)
45384539
*/
45394540
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
45404541
current_role = GetConfigOptionByName("session_authorization", NULL, false);
4541-
if (strcmp(MtmDatabaseUser, current_role) != 0)
4542+
if (current_role && *current_role && strcmp(MtmDatabaseUser, current_role) != 0)
45424543
MtmGucUpdate("session_authorization", current_role);
45434544
MemoryContextSwitchTo(oldcontext);
45444545
}
@@ -4632,13 +4633,20 @@ char* MtmGucSerialize(void)
46324633
{
46334634
StringInfo serialized_gucs;
46344635
dlist_iter iter;
4635-
int nvars = 0;
4636+
const char *search_path;
46364637

46374638
if (!MtmGucHash)
46384639
MtmGucInit();
46394640

46404641
serialized_gucs = makeStringInfo();
46414642

4643+
/*
4644+
* Crutch for scheduler. It sets search_path through SetConfigOption()
4645+
* so our callback do not react on that.
4646+
*/
4647+
search_path = GetConfigOption("search_path", false, true);
4648+
appendStringInfo(serialized_gucs, "SET search_path TO %s; ", search_path);
4649+
46424650
dlist_foreach(iter, &MtmGucList)
46434651
{
46444652
MtmGucEntry *cur_entry = dlist_container(MtmGucEntry, list_node, iter.cur);
@@ -4659,7 +4667,6 @@ char* MtmGucSerialize(void)
46594667
appendStringInfoString(serialized_gucs, cur_entry->value);
46604668
}
46614669
appendStringInfoString(serialized_gucs, "; ");
4662-
nvars++;
46634670
}
46644671

46654672
return serialized_gucs->data;
@@ -4680,7 +4687,7 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
46804687
{
46814688
char *gucCtx = MtmGucSerialize();
46824689
if (*gucCtx)
4683-
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s; %s", gucCtx, queryString);
4690+
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s", gucCtx, queryString);
46844691
else
46854692
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s", queryString);
46864693

@@ -4991,7 +4998,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
49914998
if (!skipCommand && !MtmTx.isReplicated && (context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
49924999
{
49935000
MtmUtilityProcessedInXid = GetCurrentTransactionId();
4994-
if (context == PROCESS_UTILITY_TOPLEVEL) {
5001+
if (context == PROCESS_UTILITY_TOPLEVEL || !ActivePortal) {
49955002
MtmProcessDDLCommand(queryString, true);
49965003
} else {
49975004
MtmProcessDDLCommand(ActivePortal->sourceText, true);

multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,8 @@ extern MemoryContext MtmApplyContext;
379379
extern lsn_t MtmSenderWalEnd;
380380
extern timestamp_t MtmRefreshClusterStatusSchedule;
381381
extern MtmConnectionInfo* MtmConnections;
382-
extern bool MtmMajorNode;
382+
extern bool MtmMajorNode;
383+
extern bool MtmBackgroundWorker;
383384

384385

385386
extern void MtmArbiterInitialize(void);

pglogical_receiver.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,8 @@ pglogical_receiver_main(Datum main_arg)
227227
char* connString = psprintf("replication=database %s", Mtm->nodes[nodeId-1].con.connStr);
228228
static PortalData fakePortal;
229229

230+
MtmBackgroundWorker = true;
231+
230232
ByteBufferAlloc(&buf);
231233

232234
slotName = psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId);

0 commit comments

Comments
 (0)