Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit bacbfb0

Browse files
committed
fixes and crutches for scheduler
1 parent 8b99228 commit bacbfb0

File tree

5 files changed

+24
-7
lines changed

5 files changed

+24
-7
lines changed

contrib/mmts/arbiter.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,8 @@ static void MtmSender(Datum arg)
715715
int nNodes = MtmMaxNodes;
716716
int i;
717717

718+
MtmBackgroundWorker = true;
719+
718720
MtmBuffer* txBuffer = (MtmBuffer*)palloc0(sizeof(MtmBuffer)*nNodes);
719721
MTM_ELOG(LOG, "Start arbiter sender %d", MyProcPid);
720722
InitializeTimeouts();
@@ -802,6 +804,8 @@ static void MtmMonitor(Datum arg)
802804
pqsignal(SIGQUIT, SetStop);
803805
pqsignal(SIGTERM, SetStop);
804806

807+
MtmBackgroundWorker = true;
808+
805809
/* We're now ready to receive signals */
806810
BackgroundWorkerUnblockSignals();
807811

@@ -838,7 +842,9 @@ static void MtmReceiver(Datum arg)
838842
pqsignal(SIGINT, SetStop);
839843
pqsignal(SIGQUIT, SetStop);
840844
pqsignal(SIGTERM, SetStop);
841-
845+
846+
MtmBackgroundWorker = true;
847+
842848
/* We're now ready to receive signals */
843849
BackgroundWorkerUnblockSignals();
844850

contrib/mmts/bgwpool.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
3636

3737
MTM_ELOG(LOG, "Start background worker %d, shutdown=%d", MyProcPid, pool->shutdown);
3838

39+
MtmBackgroundWorker = true;
3940
MtmIsLogicalReceiver = true;
4041
MtmPool = pool;
4142

contrib/mmts/multimaster.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ bool MtmDoReplication;
235235
char* MtmDatabaseName;
236236
char* MtmDatabaseUser;
237237
Oid MtmDatabaseId;
238+
bool MtmBackgroundWorker;
238239

239240
int MtmNodes;
240241
int MtmNodeId;
@@ -898,7 +899,7 @@ MtmIsUserTransaction()
898899
IsNormalProcessingMode() &&
899900
MtmDoReplication &&
900901
!am_walsender &&
901-
!IsBackgroundWorker &&
902+
!MtmBackgroundWorker &&
902903
!IsAutoVacuumWorkerProcess();
903904
}
904905

@@ -4865,7 +4866,7 @@ static void MtmGucInit(void)
48654866
*/
48664867
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
48674868
current_role = GetConfigOptionByName("session_authorization", NULL, false);
4868-
if (strcmp(MtmDatabaseUser, current_role) != 0)
4869+
if (current_role && *current_role && strcmp(MtmDatabaseUser, current_role) != 0)
48694870
MtmGucUpdate("session_authorization", current_role);
48704871
MemoryContextSwitchTo(oldcontext);
48714872
}
@@ -4959,13 +4960,20 @@ char* MtmGucSerialize(void)
49594960
{
49604961
StringInfo serialized_gucs;
49614962
dlist_iter iter;
4962-
int nvars = 0;
4963+
const char *search_path;
49634964

49644965
if (!MtmGucHash)
49654966
MtmGucInit();
49664967

49674968
serialized_gucs = makeStringInfo();
49684969

4970+
/*
4971+
* Crutch for scheduler. It sets search_path through SetConfigOption()
4972+
* so our callback do not react on that.
4973+
*/
4974+
search_path = GetConfigOption("search_path", false, true);
4975+
appendStringInfo(serialized_gucs, "SET search_path TO %s; ", search_path);
4976+
49694977
dlist_foreach(iter, &MtmGucList)
49704978
{
49714979
MtmGucEntry *cur_entry = dlist_container(MtmGucEntry, list_node, iter.cur);
@@ -4986,7 +4994,6 @@ char* MtmGucSerialize(void)
49864994
appendStringInfoString(serialized_gucs, cur_entry->value);
49874995
}
49884996
appendStringInfoString(serialized_gucs, "; ");
4989-
nvars++;
49904997
}
49914998

49924999
return serialized_gucs->data;
@@ -5007,7 +5014,7 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
50075014
{
50085015
char *gucCtx = MtmGucSerialize();
50095016
if (*gucCtx)
5010-
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s; %s", gucCtx, queryString);
5017+
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s", gucCtx, queryString);
50115018
else
50125019
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s", queryString);
50135020

@@ -5318,7 +5325,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53185325
if (!skipCommand && !MtmTx.isReplicated && (context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
53195326
{
53205327
MtmUtilityProcessedInXid = GetCurrentTransactionId();
5321-
if (context == PROCESS_UTILITY_TOPLEVEL) {
5328+
if (context == PROCESS_UTILITY_TOPLEVEL || !ActivePortal) {
53225329
MtmProcessDDLCommand(queryString, true);
53235330
} else {
53245331
MtmProcessDDLCommand(ActivePortal->sourceText, true);

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ extern MemoryContext MtmApplyContext;
378378
extern lsn_t MtmSenderWalEnd;
379379
extern timestamp_t MtmRefreshClusterStatusSchedule;
380380
extern MtmConnectionInfo* MtmConnections;
381+
extern bool MtmBackgroundWorker;
381382

382383

383384
extern void MtmArbiterInitialize(void);

contrib/mmts/pglogical_receiver.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ pglogical_receiver_main(Datum main_arg)
224224
char* connString = psprintf("replication=database %s", Mtm->nodes[nodeId-1].con.connStr);
225225
static PortalData fakePortal;
226226

227+
MtmBackgroundWorker = true;
228+
227229
ByteBufferAlloc(&buf);
228230

229231
slotName = psprintf(MULTIMASTER_SLOT_PATTERN, MtmNodeId);

0 commit comments

Comments
 (0)